Creating and Using User-Defined Functions (UDFs) in Spark using Scala
User-Defined Functions (UDFs) in Spark allow us to define our own custom functions that can be used to process data in Spark DataFrames. UDFs can be written in Scala, Java, Python or R.
Steps for Create an UDF
In this example, we will be using Scala to create and apply a UDF on a DataFrame.
Suppose we have a DataFrame that contains a column of integers, and we want to create a UDF to double these integers.
Step 1: Import the necessary libraries
We will start by importing the necessary libraries for our code to work:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.IntegerType
The udf function is provided by the org.apache.spark.sql.functions
package, and we will use it to create our UDF. The IntegerType
is a type in Spark that represents integer values, which is the type of data we will be processing.
Step 2: Define the UDF logic
Next, we will define our UDF logic. We want to create a UDF that doubles an integer. We will define a function that takes an integer as an input and returns an integer that is twice the input:
def doubleInt(i: Int): Int = { return i * 2 }
Step 3: Convert the UDF to a Spark UDF
Now that we have defined our UDF logic, we can convert it to a Spark UDF using the udf
function:
val doubleIntUDF = udf(doubleInt _)
The udf
function takes a function as an argument and returns a UDF that can be applied to a DataFrame. In this case, we pass our doubleInt
function as an argument, and the udf
function returns a UDF that we can use to double integers in our DataFrame.
Step 4: Apply the UDF to our data
Now that we have defined our UDF, we can apply it to our data. We will create a DataFrame with integers, and apply our UDF to double each integer:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder() .appName("UDFExample") .getOrCreate()
// Create a DataFrame with integers
val numbersDF = Seq(1, 2, 3, 4, 5).toDF("number")
// Apply the UDF to double the numbers
val doubledNumbersDF = numbersDF.withColumn("doubled_number", doubleIntUDF($"number"))
// Show the resulting DataFrame
doubledNumbersDF.show()
Explanation of step 1 to step 4
We start by creating a SparkSession object. The appName
parameter specifies the name of our Spark application, and the getOrCreate
method creates a new SparkSession if one doesn't already exist.
Next, we create a DataFrame with integers using the Seq
object and the toDF
method. We specify the name of the column as "number"
.
Finally, we apply our UDF to double the numbers in the DataFrame using the withColumn
method. We specify the name of the new column as "doubled_number"
, and pass our UDF as an argument using the $
syntax. The $
syntax is used to convert a column name to a Column object, which can then be passed as an argument to functions like our UDF.
We then show the resulting DataFrame using the show
method.
The output of this code should be:
+------+--------------+
|number|doubled_number|
+------+--------------+
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
| 5| 10|
+------+--------------+
As expected, each number in the `"number"` column has been doubled and added to a new column `"doubled_number"`.
Step 5: Register the UDF with Spark
If we want to reuse our UDF in other parts of our Spark application, we can register it with Spark using the spark.udf.register method:
spark.udf.register("doubleIntUDF", doubleIntUDF)
The first argument to spark.udf.register
is the name of the UDF, and the second argument is the UDF itself.
Now, we can use the registered UDF in other parts of our Spark application by referencing it by its name:
val df = Seq(1, 2, 3, 4, 5).toDF("number") df.createOrReplaceTempView("numbers") val query = "SELECT number, doubleIntUDF(number) as doubled_number FROM numbers" spark.sql(query).show()
Here, we create a DataFrame with integers, and register our UDF using the name "doubleIntUDF"
. We then create a temporary view of the DataFrame using createOrReplaceTempView
, which allows us to query the DataFrame using Spark SQL.
We then write a Spark SQL query that uses our UDF to double the numbers in the "number"
column. We use the AS
keyword to rename the new column to "doubled_number"
. Finally, we show the resulting DataFrame using the show
method.
The output of this code should be the same as the previous example:
+------+--------------+
|number|doubled_number|
+------+--------------+
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
| 5| 10|
+------+--------------+
In summary, creating a UDF in Spark is straightforward in Scala. We first define the UDF logic, convert it to a Spark UDF using the udf
function, and apply it to our data using the withColumn
method. We can also register the UDF with Spark and use it in Spark SQL queries.