Loading

  1. Using a WarpScript™ UDF
    1. Registering the UDF
    2. Using the UDF
  2. Using a WarpScript™ User Defined Aggregate Function
    1. Initialize
    2. Update
    3. Merge
    4. Evaluate
    5. Creating a User Defined Aggregate Function
    6. Using a User Defined Aggregate Function
  3. Running the job

What is PySpark?

PySpark is the name given to the Spark Python API. It defines how the Spark analytics engine can be leveraged from the Python programming language and tools which support it such as Jupyter.

WarpScript™ in PySpark

The integration of WarpScript™ in PySpark is provided by the warp10-spark-x.y.z.jar built from source (use the pack Gradle task).

This artifact defines both User Defined Functions (UDFs) and a User Defined Aggregate Function (UDAF) which can be used in PySpark jobs to execute WarpScript™ code.

Those functions are implemented as Java classes and thus do not suffer from the Py4J overhead experienced when executing Python functions in Spark jobs.

Using a WarpScript™ UDF

The warp10-spark2 package defines a range of Spark SQL User Defined Functions which accept from 1 to 22 parameters (as allowed by the Spark SQL API). The first of those parameters is expected to be the WarpScript™ code fragment to execute. The extra parameters will be pushed onto the stack, in reverse order, i.e. the parameter following the WarpScript™ fragment will be on top of the stack when the fragment is executed.

After the WarpScript™ has been executed, the content of the stack will be returned to the Spark job. If the stack contained a single level after the execution, the object on top of the stack will be returned, otherwise an array with the levels of the stack will be returned, top first.

PySpark expects the datasets to be strongly typed, therefore when declaring the UDF in your job, you must also specify the types of its return values, with arrays and maps being strongly typed too.

The types supported by PySpark are defined in the Python package pyspark.sql.type, the Catalyst code can be looked up to understand type conversion. The following table gives some matching between Java and PySpark types:

Java typePySpark type
doubleDoubleType
floatFloatType
byteByteType
shortShortType
intIntegerType
longLongType
nullNullType
booleanBooleanType
StringStringType
byte[]BinaryType
ListArrayType
MapMapType
RowStructType
BigDecimalDecimalType
BigIntegerDecimalType

Registering the UDF

The registration of a User Defined Function is done with the following code in PySpark:

sqlContext.registerJavaFunction("name", "io.warp10.spark.WarpScriptUDFx", TYPE)

where name is the name of the function as it will be available in PySpark, x is the number of arguments of the functions, from 1 to 22, and TYPE is the type of its return value.

As a more detailed example, the following will define a foo function which returns an array of strings and expects 3 parameters:

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType

spark = SparkSession.builder.appName("WarpScript Spark Test").getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(sc)

sqlContext.registerJavaFunction("foo", "io.warp10.spark.WarpScriptUDF3", ArrayType(StringType()))

Using the UDF

Once registered, a UDF can be called in your PySpark job just as built-in functions.

Building upon the previous example, adding the following line demonstrates the function invocation:

print sqlContext.sql("SELECT foo('SNAPSHOT \"Easy!\"', 3.14, 'pi')").collect()

It will simply convert the state of the stack to a STRING (via the SNAPSHOT function) and will add the Easy! STRING on top of the stack.

Instead of explicit WarpScript™ code, you could have invoked WarpScript™ code from an external file by using the @path/to/file syntax.

Using a WarpScript™ User Defined Aggregate Function

User Defined Aggregate Functions are used to aggregate a sequence of values then emit a final value.

The lifecycle of those functions contains several steps, initialize, update, merge and evaluate.

The link between steps is done using an aggregation buffer which is passed at each step with other parameters.

Initialize

The initialize step is meant to set an initial value into the aggregation buffer. The WarpScript™ code is called with the following items on the stack:

TOP: the STRING 'initialize'
2: the aggregation buffer (a list)

The WarpScript™ code is expected to leave a stack with as many values as the size of the aggregation buffer. The values on the stack will be put into the aggregation buffer.

Update

The update step is meant to update the aggregation buffer given a row of input. The code is called with the following items on the stack:

TOP: the STRING 'update'
2: a row of input (a list)
3: the aggregation buffer (a list)

There again, the code is expected to leave a stack with as many values as the size of the aggregation buffer. The aggregation buffer will be updated accordingly.

Merge

The merge step is meant to merge two aggregation buffers. The code is called with the following input:

TOP: the STRING 'merge'
2: an aggregation buffer (a list)
3: the target aggregation buffer (a list)

As before, the code is expected to leave a stack with as many values as the size of the aggregation buffer. The aggregation buffer will be updated with those values.

Evaluate

Lastly, the evaluate step is meant to emit a final value. The code is called with the following stack:

TOP: the STRING 'evaluate'
2: the aggregation buffer (a list)

It is expected to leave on top of the stack the final value (possibly a compound one) and nothing else.

Creating a User Defined Aggregate Function

A User Defined Aggregate Function is created by instructing the JVM running the Py4J gateway to create an instance of io.warp10.spark.WarpScriptUDAF with the WarpScript™ code the UDAF will run.

Then the input, buffer and output types must be specified and apply method must be called on the created object.

The example below creates a WarpScript™ UDAF which executes the WarpScript™ code in file macro.mc2 with a row containing a column input of type double as the input schema, an aggregation buffer which is a row containing a single agg of type string and a return value of type string too. The UDAF can also be flagged as deterministic, meaning that it will always produce the same output for a given input. This is to help the Spark executor make decisions when it has to reschedule a task.

from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import ArrayType

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
sc = spark.sparkContext

insch = StructType([StructField("input", DoubleType(), True)]).json()
buf = StructType([StructField("agg", StringType(), True)]).json()
datatype = StringType().json()

udaf = sc._jvm.io.warp10.spark.WarpScriptUDAF('@macro.mc2')
udaf.setDeterministic(True)
udaf.setInputSchema(insch)
udaf.setBufferSchema(buf)
udaf.setDataType(datatype)
udaf = udaf.apply

Using a User Defined Aggregate Function

Once a User Defined Aggregate Function is defined, it must be wrapped into a Column before it can be applied to data.

The wrapping process will also specify the input parameters to your WarpScript™ code, i.e. columns from the DataFrame which will be passed on the stack when your code is called.

from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq

// Create the wrapped UDAF, assuming the UDAF is the one defined above
cudaf = Column(udaf(_to_seq(sc, ['col1', 'col2', ...], _to_java_column)))

The wrapped UDAF can then be used in your actual Spark analytics pipeline:

df.select("col1","col2",...).agg(cudaf).show()

The UDAF can also be used with a window definition:

window_spec = Window.partitionBy("col1").orderBy("col2").rowsBetween(-1, 1)
result = df.withColumn("foo", cudaf.over(window_spec))

Note: starting with version 2.3, Spark makes it easier to register UDAFs in PySpark, we'll update this page when this new way of doing things becomes available.

Running the job

A PySpark job is launched using the spark-submit command. Some specific options must be passed to it to enable the WarpScript™ integration.

The warp10-spark2 jar must be added to the job via the --jars option.

The warp.timeunits property must be set on both the driver and the worker via the following options of spark-submit:

--conf 'spark.driver.extraJavaOptions=-Dwarp.timeunits=us' --conf 'spark.executor.extraJavaOptions=-Dwarp.timeunits=us'

Alternatively, if additional Warp 10™ configuration properties must be set, for example to load specific WarpScript™ extensions, the following options should be used:

--conf spark.executor.extraJavaOptions=-Dwarp10.config=warp10-spark.conf
--conf spark.driver.extraJavaOptions=-Dwarp10.config=warp10-spark.conf
--files warp10-spark.conf

Any WarpScript™ code referenced (via @) in function invocations must be specified using the --files option.

Lastly, the actual Python (.py) file of the job should be the last parameter of the call to spark-submit.