What is PySpark?
PySpark is the name given to the Spark Python API. It defines how the Spark analytics engine can be leveraged from the Python programming language and tools which support it such as Jupyter.
WarpScript in PySpark
The integration of WarpScript in PySpark is provided by the warp10-spark-x.y.z.jar
built from source (use the pack
Gradle task).
This artifact defines both User Defined Functions (UDFs) and a User Defined Aggregate Function (UDAF) which can be used in PySpark jobs to execute WarpScript code.
Those functions are implemented as Java classes and thus do not suffer from the Py4J overhead experienced when executing Python functions in Spark jobs.
Using a WarpScript UDF
The warp10-spark2
package defines a range of Spark SQL User Defined Functions which accept from 1 to 22 parameters (as allowed by the Spark SQL API). The first of those parameters is expected to be the WarpScript code fragment to execute. The extra parameters will be pushed onto the stack, in reverse order, i.e. the parameter following the WarpScript fragment will be on top of the stack when the fragment is executed.
After the WarpScript has been executed, the content of the stack will be returned to the Spark job. If the stack contained a single level after the execution, the object on top of the stack will be returned, otherwise an array with the levels of the stack will be returned, top first.
PySpark expects the datasets to be strongly typed, therefore when declaring the UDF in your job, you must also specify the types of its return values, with arrays and maps being strongly typed too.
The types supported by PySpark are defined in the Python package pyspark.sql.type
, the Catalyst code can be looked up to understand type conversion. The following table gives some matching between Java and PySpark types:
Java type | PySpark type |
---|---|
double | DoubleType |
float | FloatType |
byte | ByteType |
short | ShortType |
int | IntegerType |
long | LongType |
null | NullType |
boolean | BooleanType |
String | StringType |
byte[] | BinaryType |
List | ArrayType |
Map | MapType |
Row | StructType |
BigDecimal | DecimalType |
BigInteger | DecimalType |
Registering the UDF
The registration of a User Defined Function is done with the following code in PySpark:
sqlContext.registerJavaFunction("name", "io.warp10.spark.WarpScriptUDFx", TYPE)
where name
is the name of the function as it will be available in PySpark, x
is the number of arguments of the functions, from 1 to 22, and TYPE
is the type of its return value.
As a more detailed example, the following will define a foo
function which returns an array of strings and expects 3 parameters:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType
spark = SparkSession.builder.appName("WarpScript Spark Test").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sqlContext.registerJavaFunction("foo", "io.warp10.spark.WarpScriptUDF3", ArrayType(StringType()))
Using the UDF
Once registered, a UDF can be called in your PySpark job just as built-in functions.
Building upon the previous example, adding the following line demonstrates the function invocation:
print sqlContext.sql("SELECT foo('SNAPSHOT \"Easy!\"', 3.14, 'pi')").collect()
It will simply convert the state of the stack to a STRING (via the SNAPSHOT
function) and will add the Easy!
STRING on top of the stack.
Instead of explicit WarpScript code, you could have invoked WarpScript code from an external file by using the @path/to/file
syntax.
Using a WarpScript User Defined Aggregate Function
User Defined Aggregate Functions are used to aggregate a sequence of values then emit a final value.
The lifecycle of those functions contains several steps, initialize, update, merge and evaluate.
The link between steps is done using an aggregation buffer which is passed at each step with other parameters.
Initialize
The initialize step is meant to set an initial value into the aggregation buffer. The WarpScript code is called with the following items on the stack:
TOP: the STRING 'initialize'
2: the aggregation buffer (a list)
The WarpScript code is expected to leave a stack with as many values as the size of the aggregation buffer. The values on the stack will be put into the aggregation buffer.
Update
The update step is meant to update the aggregation buffer given a row of input. The code is called with the following items on the stack:
TOP: the STRING 'update'
2: a row of input (a list)
3: the aggregation buffer (a list)
There again, the code is expected to leave a stack with as many values as the size of the aggregation buffer. The aggregation buffer will be updated accordingly.
Merge
The merge step is meant to merge two aggregation buffers. The code is called with the following input:
TOP: the STRING 'merge'
2: an aggregation buffer (a list)
3: the target aggregation buffer (a list)
As before, the code is expected to leave a stack with as many values as the size of the aggregation buffer. The aggregation buffer will be updated with those values.
Evaluate
Lastly, the evaluate step is meant to emit a final value. The code is called with the following stack:
TOP: the STRING 'evaluate'
2: the aggregation buffer (a list)
It is expected to leave on top of the stack the final value (possibly a compound one) and nothing else.
Creating a User Defined Aggregate Function
A User Defined Aggregate Function is created by instructing the JVM running the Py4J gateway to create an instance of io.warp10.spark.WarpScriptUDAF
with the WarpScript code the UDAF will run.
Then the input, buffer and output types must be specified and apply
method must be called on the created object.
The example below creates a WarpScript UDAF which executes the WarpScript code in file macro.mc2
with a row containing a column input
of type double
as the input schema, an aggregation buffer which is a row containing a single agg
of type string
and a return value of type string
too. The UDAF can also be flagged as deterministic, meaning that it will always produce the same output for a given input. This is to help the Spark executor make decisions when it has to reschedule a task.
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import ArrayType
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
sc = spark.sparkContext
insch = StructType([StructField("input", DoubleType(), True)]).json()
buf = StructType([StructField("agg", StringType(), True)]).json()
datatype = StringType().json()
udaf = sc._jvm.io.warp10.spark.WarpScriptUDAF('@macro.mc2')
udaf.setDeterministic(True)
udaf.setInputSchema(insch)
udaf.setBufferSchema(buf)
udaf.setDataType(datatype)
udaf = udaf.apply
Using a User Defined Aggregate Function
Once a User Defined Aggregate Function is defined, it must be wrapped into a Column
before it can be applied to data.
The wrapping process will also specify the input parameters to your WarpScript code, i.e. columns from the DataFrame which will be passed on the stack when your code is called.
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
// Create the wrapped UDAF, assuming the UDAF is the one defined above
cudaf = Column(udaf(_to_seq(sc, ['col1', 'col2', ...], _to_java_column)))
The wrapped UDAF can then be used in your actual Spark analytics pipeline:
df.select("col1","col2",...).agg(cudaf).show()
The UDAF can also be used with a window definition:
window_spec = Window.partitionBy("col1").orderBy("col2").rowsBetween(-1, 1)
result = df.withColumn("foo", cudaf.over(window_spec))
Note: starting with version 2.3, Spark makes it easier to register UDAFs in PySpark, we'll update this page when this new way of doing things becomes available.
Running the job
A PySpark job is launched using the spark-submit
command. Some specific options must be passed to it to enable the WarpScript integration.
The warp10-spark2
jar must be added to the job via the --jars
option.
The warp.timeunits
property must be set on both the driver and the worker via the following options of spark-submit
:
--conf 'spark.driver.extraJavaOptions=-Dwarp.timeunits=us' --conf 'spark.executor.extraJavaOptions=-Dwarp.timeunits=us'
Alternatively, if additional Warp 10 configuration properties must be set, for example to load specific WarpScript extensions, the following options should be used:
--conf spark.executor.extraJavaOptions=-Dwarp10.config=warp10-spark.conf
--conf spark.driver.extraJavaOptions=-Dwarp10.config=warp10-spark.conf
--files warp10-spark.conf
Any WarpScript code referenced (via @
) in function invocations must be specified using the --files
option.
Lastly, the actual Python (.py) file of the job should be the last parameter of the call to spark-submit
.