Overview
The WarpScriptInputFormat
is a little known treasure available in the Warp 10 ecosystem. It allows you to wrap any other Hadoop InputFormat and process each read tuple using WarpScript code, returning new tuples at a potentially different rate.
This InputFormat can for example be used to create Geo Time Series on the fly from individual values being read from a CSV file in a way far more efficient than relying on GROUP BY and aggregation operations on an observation dataset.
Using the WarpScriptInputFormat
The behavior of the WarpScriptInputFormat
is governed by the following configuration keys:
Key | Description |
---|---|
warpscript.inputformat.class | Fully qualified Java class name of the Hadoop InputFormat to wrap |
warpscript.inputformat.script | WarpScript code to apply to input tuples. This is either explicit WarpScript code or a construct of the form @file.mc2 or %file.mc2 to read an existing .mc2 file with either raw WarpScript (@... ) or WarpScript code defining a macro (%... ). |
warpscript.inputformat.suffix | Suffix to use for the configuration keys, see the note below |
warpscript.inputformat.conf.suffix | If this configuration parameter is set, the Hadoop Job Configuration will be altered by copying all parameters with a key ending with this suffix into parameters with a key with the suffix removed. This is to allow multiple configurations of a given InputFormat in a single Hadoop Job configuration |
The WarpScriptInputFormat
can be instantiated with a suffix. The configuration keys will be first looked up with that suffix appended. If no suffixed version is found, the unsuffixed version will be looked up and if not found the default value (when it exists) will be used.
The job configuration should also contain configuration keys for the Hadoop InputFormat specified in warpscript.inputformat.class
. For example in the case of InputFormat inheriting from FileInputFormat
, the configuration key mapreduce.input.fileinputformat.inputdir
should be used to specify the paths to read.
Calling convention for the WarpScript code
The defined WarpScript code will be called for each input tuple returned by the wrapped InputFormat. There will be a single stack used per calling thread, so the WarpScript code can store things in stack variables between calls.
The code is called once for each input tuple with a stack composed of the boolean false
on top of the stack to indicate that this is not the end of the split currently being processed, the value of the input tuple and the key of the input tuple.
The WarpScript code is expected to return either nothing (i.e. an empty stack), or one or more (key,value) lists, one on each stack level. Those lists will be the tuples returned by the WarpScriptInputFormat
.
When the end of the current split is reached, the WarpScript code is called with only the boolean true
on top of the stack. This indicates the WarpScript code that it should return any remaining (key,value) lists it might have as it will not be called again.
The following type conversions occur when calling the WarpScript code:
Hadoop Writable Type | WarpScript Type |
---|---|
LongWritable | LONG |
IntWritable | LONG |
ShortWritable | LONG |
ByteWritable | LONG |
VIntWriable | LONG |
VLongWritable | LONG |
DoubleWritable | DOUBLE |
FloatWritable | DOUBLE |
BooleanWritable | BOOLEAN |
Text | STRING |
BytesWritable | byte array |
ArrayWritable | LIST |
MapWritable | MAP |
SortedMapWritable | MAP |
GenericWritable | depends on actual type |
NullWritable | null |
The tuples returned by the WarpScript code are converted according with the following conventions:
WarpScript Type | Hadoop Writable Type |
---|---|
Long | LongWritable |
Integer | IntWritable |
Short | ShortWritable |
Byte | ByteWritable |
Double | DoubleWritable |
Float | FloatWritable |
Boolean | BooleanWritable |
String | Text |
byte array | BytesWritable |
List | ArrayWritable of ObjectWritable |
Map | MapWritable |
null | NullWritable |
The warp10-spark2
package contains a SparkWarpScriptInputFormat modified version of this InputFormat which will correctly find .mc2
files during a Spark job execution.