Loading

Getting your Warp 10 instance ready for the Warp10InputFormat

The Warp10InputFormat expects your Warp 10 instance to provide endpoints for retrieving input splits and for retrieving the data for each split.

In the distributed version of Warp 10, the /api/vX/splits endpoint is enabled automatically on any egress component you deploy.

In the standalone version, you need to explicitly configure your instance to enable this endpoint via the standalone.splits.enable property set to true.

Dedicated components called fetchers can be deployed in the distributed version of Warp 10, ideally on each machine acting as an HBase RegionServer to improve data locality.

The egress components of a distributed Warp 10 instance or a standalone instance will serve the /api/vX/sfetch endpoint which the InputFormat uses to retrieve the data for each split. Make sure any reverse proxy or load balancer you have in place forwards this endpoint to the correct server.

Using the Warp10InputFormat

The complete class for the Warp 10 InputFormat is io.warp10.hadoop.Warp10InputFormat.

The Warp10InputFormat produces records with a Text key and a BytesWritable value.

The InputFormat behavior is governed by properties defined in the Hadoop Job configuration, either directly if you are using Map Reduce, or via the configuration mechanisms of any other tool you use (Pig, Spark, Flink, ...).

The supported configuration keys are:

KeyDescription
warp10.splits.endpointURL of the endpoint to access for retrieving splits. Typically http://HOST:PORT/api/v0/splits
warp10.fetcher.fallbacksComma separated list of hosts which can server as fallback fetchers in case one of the fetchers defined for a split is unavailable. When you reading data from a standalone Warp 10 instance, add the IP of this instance here. Note that the instance should listen on the port defined in warp10.fetcher.port
warp10.fetcher.fallbacksonlyBoolean indicating whether to use the fetchers ('false') or only the fallbacks ('true'). Set to 'true' when retrieving data from a standalone Warp 10
warp10.fetcher.protocolProtocol to use when talking to the fetchers, defaults to http
warp10.fetcher.portPort to use when talking to the fetchers, defaults to 8881
warp10.fetcher.pathURL patch of the fetcher endpoint, defaults to /api/v0/sfetch
warp10.splits.selectorGeo Time Series selector to use to retrieve the list of GTS, for example class{label1~regexp1}
warp10.splits.tokenToken to use for retrieving the list of Geo Time Series and later their datapoints
warp10.http.connect.timeoutConnection timeout to the splits and sfetch endpoints. Defaults to 10000 ms
warp10.http.read.timeoutRead timeout for the splits and sfetch endpoints, also defaults to 10000 ms
warp10.fetch.nowTimestamp to use as the now parameter for the datapoints retrieval
warp10.fetch.timespanTimespan to use for the datapoints retrieval
warp10.max.combined.splitsMaximum number of splits to combine in a single split. Each original split corresponds to a single Geo Time Series, do not set to let the InputFormat infer the right number for you
warp10.max.splitsMaximum number of splits to produce. The InputFormat will combine the individual splits to producer that many splits
warp10.inputformat.suffixSuffix to use for the configuration keys, see the note below

Note that the Warp10InputFormat can be instantiated with a suffix. The configuration keys will be first looked up with that suffix appended. If no suffixed version is found, the unsuffixed version will be looked up and if not found the default value (when it exists) will be used.

The tuples returned by the Warp10InputFormat have a key set to a wrapper ID, which can be safely ignored, and the values set to wrapped Geo Time Series encoders which can be unwrapped using UNWRAP or UNWRAPENCODER.