Getting your Warp 10 instance ready for the Warp10InputFormat
The Warp10InputFormat
expects your Warp 10 instance to provide endpoints for retrieving input splits and for retrieving the data for each split.
In the distributed version of Warp 10, the /api/vX/splits
endpoint is enabled automatically on any egress
component you deploy.
In the standalone version, you need to explicitly configure your instance to enable this endpoint via the standalone.splits.enable
property set to true
.
Dedicated components called fetchers
can be deployed in the distributed version of Warp 10, ideally on each machine acting as an HBase RegionServer to improve data locality.
The egress
components of a distributed Warp 10 instance or a standalone instance will serve the /api/vX/sfetch
endpoint which the InputFormat uses to retrieve the data for each split. Make sure any reverse proxy or load balancer you have in place forwards this endpoint to the correct server.
Using the Warp10InputFormat
The complete class for the Warp 10 InputFormat is io.warp10.hadoop.Warp10InputFormat
.
The Warp10InputFormat
produces records with a Text key and a BytesWritable value.
The InputFormat behavior is governed by properties defined in the Hadoop Job configuration, either directly if you are using Map Reduce, or via the configuration mechanisms of any other tool you use (Pig, Spark, Flink, ...).
The supported configuration keys are:
Key | Description |
---|---|
warp10.splits.endpoint | URL of the endpoint to access for retrieving splits. Typically http://HOST:PORT/api/v0/splits |
warp10.fetcher.fallbacks | Comma separated list of hosts which can server as fallback fetchers in case one of the fetchers defined for a split is unavailable. When you reading data from a standalone Warp 10 instance, add the IP of this instance here. Note that the instance should listen on the port defined in warp10.fetcher.port |
warp10.fetcher.fallbacksonly | Boolean indicating whether to use the fetchers ('false') or only the fallbacks ('true'). Set to 'true ' when retrieving data from a standalone Warp 10 |
warp10.fetcher.protocol | Protocol to use when talking to the fetchers, defaults to http |
warp10.fetcher.port | Port to use when talking to the fetchers, defaults to 8881 |
warp10.fetcher.path | URL patch of the fetcher endpoint, defaults to /api/v0/sfetch |
warp10.splits.selector | Geo Time Series selector to use to retrieve the list of GTS, for example class{label1~regexp1} |
warp10.splits.token | Token to use for retrieving the list of Geo Time Series and later their datapoints |
warp10.http.connect.timeout | Connection timeout to the splits and sfetch endpoints. Defaults to 10000 ms |
warp10.http.read.timeout | Read timeout for the splits and sfetch endpoints, also defaults to 10000 ms |
warp10.fetch.now | Timestamp to use as the now parameter for the datapoints retrieval |
warp10.fetch.timespan | Timespan to use for the datapoints retrieval |
warp10.max.combined.splits | Maximum number of splits to combine in a single split. Each original split corresponds to a single Geo Time Series, do not set to let the InputFormat infer the right number for you |
warp10.max.splits | Maximum number of splits to produce. The InputFormat will combine the individual splits to producer that many splits |
warp10.inputformat.suffix | Suffix to use for the configuration keys, see the note below |
Note that the Warp10InputFormat
can be instantiated with a suffix. The configuration keys will be first looked up with that suffix appended. If no suffixed version is found, the unsuffixed version will be looked up and if not found the default value (when it exists) will be used.
The tuples returned by the Warp10InputFormat
have a key set to a wrapper ID, which can be safely ignored, and the values set to wrapped Geo Time Series encoders which can be unwrapped using UNWRAP
or UNWRAPENCODER
.