Loading

Distributed Warp 10 deployment

Planning your deployment

The distributed version of Warp 10 relies on the following components which you should deploy in the specified versions:

Introducing the Warp 10 components

The distributed version of Warp 10 is a set of components (or daemons) working together to provide the service. All components can be deployed multiple times to address scalability and high availability.

Almost any combination of components can be launched in a single JVM. Some components can be omitted if you do not wish to provide the associated functionality. The core components you should run are ingress, directory, store and egress, all others can be considered optional.

Below is a brief description of each of the components.

Ingress

The ingress component is responsible for ingesting and deleting data and for modifying the Geo Time Series metadata. It pushes the data and metadata in dedicated Kafka topics, namely data and metadata. It also consumes the Kafka metadata topic in order to keep track of known GTS and to enforce Throttling.

Multiple instances of ingress can be launched to increase the ingestion capacity.

Directory

The directory component is responsible for tracking all known Geo Time Series, it maintains their list in memory and responds to requests initiated by egress and plasma frontend.

The metadata are consumed off of the metadata Kafka topic and stored in HBase. Upon startup directory also reads all metadata stored in HBase.

Multiple instances of directory can be launched and each instance can be sharded (running as a set of multiple directory daemons each responsible for a subset of metadata).

Store

The store daemon is in charge of persisting datapoints in HBase. It consumes the Kafka data topic and writes the data in HBase.

Multiple store can be launched, each consuming some partitions of the Kafka topic.

Egress

The egress component is where the data analytics happen, it is the WarpScript Analytics Engine. It talks to directory and fetches the datapoints from HBase. The egress component also knows how to talk to ingress to support the UPDATE, META and DELETE functions. It also pushes messages onto the Kafka webcall topic via the WEBCALL function.

Plasma backend (plasmaBE)

Plasma is the publish/subscribe subsystem of Warp 10. The plasmaBE component consumes the Kafka data topic and pushes the needed data to the topics associated with the various plasmaFE instances.

Plasma frontend (plasmaFE)

The plasmaFE component is in charge of the WebSocket endpoint where clients can subscribe to Geo Time Series. It talks to directory, stores subscriptions in ZooKeeper and consumes datapoints off of a dedicated Kafka topic.

Webcall

The webcall component consumes WEBCALL requests from the webcall Kafka topic and issues outgoing HTTP requests.

Runner

The runner component is in charge of scheduling and/or running WarpScript code at various periods. Each instance can be configured in standalone mode where it schedules and runs scripts from the same JVM or in one or both of scheduler or worker modes where the scheduler role schedules scripts by pushing them onto a dedicated Kafka topic and the worker role consumes the scripts to run from this same topic and triggers their execution.

Fetcher

The fetcher component can be used by batch jobs (in Pig, Flink, Spark or MapReduce) to load data using the Warp10InputFormat. It is meant to be launched on each RegionServer to benefit from data locality.

Infrastructure components

ZooKeeper

ZooKeeper is needed by Kafka, Hadoop (when enabling HA) and HBase. Warp 10 also uses ZooKeeper to coordinate and discover services.

The following Warp 10 configuration parameters are related to ZooKeeper, please refer to the template configuration files for the detail of each one:

directory.hbase.metadata.zkconnect
directory.hbase.metadata.znode
directory.kafka.metadata.zkconnect
directory.zk.quorum
directory.zk.znode

egress.hbase.data.zkconnect
egress.hbase.data.znode
egress.zk.quorum

ingress.kafka.data.zkconnect
ingress.kafka.metadata.zkconnect
ingress.zk.quorum

plasma.backend.kafka.in.zkconnect
plasma.backend.kafka.out.zkconnect
plasma.backend.subscriptions.zkconnect
plasma.backend.subscriptions.znode
plasma.frontend.kafka.zkconnect
plasma.frontend.zkconnect
plasma.frontend.znode

runner.kafka.zkconnect
runner.zk.quorum
runner.zk.znode

store.hbase.data.zkconnect
store.hbase.data.znode
store.kafka.data.zkconnect
store.zk.quorum

webcall.kafka.zkconnect

Kafka

Kafka is central to Warp 10, it acts as the data hub of the platform and a great shock absorber which makes all components loosely coupled and enables them to scale and be updated without interrupting the overall service.

The following Kafka topics are needed by Warp 10:

  • metadata
  • data
  • throttling
  • webcall
  • plasmabe
  • plasmafe (one per instance)
  • warpscript

The following configuration parameters are related to Kafka, please review their description in the configuration template:

directory.kafka.metadata.aes
directory.kafka.metadata.commitperiod
directory.kafka.metadata.consumer.clientid
directory.kafka.metadata.groupid
directory.kafka.metadata.mac
directory.kafka.metadata.topic
directory.kafka.metadata.zkconnect
directory.kafka.nthreads

ingress.kafka.data.aes
ingress.kafka.data.brokerlist
ingress.kafka.data.mac
ingress.kafka.data.maxsize
ingress.kafka.data.poolsize
ingress.kafka.data.producer.clientid
ingress.kafka.data.topic
ingress.kafka.data.zkconnect
ingress.kafka.metadata.aes
ingress.kafka.metadata.brokerlist
ingress.kafka.metadata.commitperiod
ingress.kafka.metadata.consumer.clientid
ingress.kafka.metadata.groupid
ingress.kafka.metadata.mac
ingress.kafka.metadata.maxsize
ingress.kafka.metadata.nthreads
ingress.kafka.metadata.poolsize
ingress.kafka.metadata.producer.clientid
ingress.kafka.metadata.topic
ingress.kafka.metadata.zkconnect

plasma.backend.kafka.in.aes
plasma.backend.kafka.in.commitperiod
plasma.backend.kafka.in.consumer.clientid
plasma.backend.kafka.in.groupid
plasma.backend.kafka.in.mac
plasma.backend.kafka.in.nthreads
plasma.backend.kafka.in.topic
plasma.backend.kafka.in.zkconnect
plasma.backend.kafka.out.aes
plasma.backend.kafka.out.brokerlist
plasma.backend.kafka.out.mac
plasma.backend.kafka.out.maxsize
plasma.backend.kafka.out.producer.clientid
plasma.backend.kafka.out.topic
plasma.backend.kafka.out.zkconnect

plasma.frontend.kafka.aes
plasma.frontend.kafka.commitperiod
plasma.frontend.kafka.consumer.clientid
plasma.frontend.kafka.groupid
plasma.frontend.kafka.mac
plasma.frontend.kafka.nthreads
plasma.frontend.kafka.topic
plasma.frontend.kafka.zkconnect

runner.kafka.aes
runner.kafka.brokerlist
runner.kafka.commitperiod
runner.kafka.consumer.clientid
runner.kafka.groupid
runner.kafka.mac
runner.kafka.nthreads
runner.kafka.poolsize
runner.kafka.producer.clientid
runner.kafka.topic
runner.kafka.zkconnect

store.kafka.data.aes
store.kafka.data.brokerlist
store.kafka.data.commitperiod
store.kafka.data.consumer.clientid
store.kafka.data.groupid
store.kafka.data.intercommits.maxtime
store.kafka.data.mac
store.kafka.data.producer.clientid
store.kafka.data.topic
store.kafka.data.zkconnect

webcall.kafka.aes
webcall.kafka.brokerlist
webcall.kafka.commitperiod
webcall.kafka.consumer.clientid
webcall.kafka.groupid
webcall.kafka.mac
webcall.kafka.producer.clientid
webcall.kafka.topic
webcall.kafka.zkconnect

ingress.kafka.data.mac, store.kafka.data.mac and plasma.backend.kafka.in.mac must have the same value. ingress.kafka.metadata.mac and directory.kafka.metadata.mac must also have the same value. Same goes with the optional configuration keys ending in .aes.

Hadoop

Hadoop is not needed directly by Warp 10 but used as the underlying storage of HBase. Beyond this storage role, Hadoop, and YARN specifically, can be leveraged to process massive amounts of Geo Time Series stored either in Warp 10 itself (in HBase) or in any other format using WarpScript on Pig, Flink or Spark.

Hadoop (HDFS) can also be used to perform backups of data stored in HBase.

HBase

HBase is used as the main datastore of Warp 10. The data is organized in a single table containing two column families, one for the metadata and another one for the datapoints. Row keys for both types of data start with different prefixes. The creation command is available on GitHub.

For improved datapoints retrieval, a Warp 10 custom filter (org.apache.hadoop.hbase.filter.SlicedRowFilter) should be deployed on HBase. The jar file containing this filter can be found on Maven or be built using the hbaseFilters:jar Gradle task. Simply add this jar to the HBase classpath to make possible the use of this filter.

In order to support datapoints deletion, HBase must be configured with the BulkDeleteEndpoint. To do so, you can download the jar on maven, add it to your HBase classpath and include in your hbase-site.xml the following lines:

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>org.apache.hadoop.hbase.coprocessor.example.BulkDeleteEndpoint</value>
</property>

Putting it all together

Once you've set up ZooKeeper, Kafka, Hadoop and HBase and with your Warp 10 configuration files on hand, launching Warp 10 is simply a matter of executing

java -cp warp10-x.y.z.jar io.warp10.WarpDist path/to/configuration.file

on each machine where you want to run Warp 10 daemons.

HaProxy

In order to provide high-availability and load-balancing for ingress, egress and plasmaFE, we recommend you run haproxy in front of those Warp 10 daemons.

Setup

A Warp 10 distributed platform is called a cell, throughout this guide we will consider that your cell is named prod.

Configuring ZooKeeper

We highly recommend you use a single ZooKeeper ensemble for Kafka, Hadoop, HBase and Warp 10, this makes the ops side of things simpler.

Our best practice is also to isolate the various znodes hierarchies under dedicated root znodes.

Here are the znodes you should create in your ZooKeeper:

/zk

/zk/kafka
/zk/kafka/prod

/zk/warp
/zk/warp/prod
/zk/warp/prod/services
/zk/warp/prod/plasma

/zk/hbase
/zk/hbase/prod

We add a /zk prefix to all znodes so we recognize a znode path when we see one.

We assume no authentication is performed by Warp 10, HBase or Kafka when connecting to ZooKeeper, so everyone should be able to write to the above znodes.

Configuring Kafka

Warp 10 uses the 0.8.2 API, your Kafka cluster must therefore use this format (0.8.2) for its log messages (log.message.format.version) and inter broker communications (inter.broker.protocol.version).

Several Kafka topics are used to exchange messages between Warp 10 components.

Remember that the parallelism unit when consuming Kafka topics is the partition so adjust the number of partitions of the various topics so you achieve the expected parallelism (based on thread count configured) in your consumers.

The Kafka producers ensure that messages for a given Geo Time Series are always sent to the same partition.

A Kafka topic is created using the kafka-topics.sh tool provided by Kafka.

Note that you MUST disable log compaction in Kafka (cleanup.policy MUST be set to delete).

metadata topic

This topic is populated by ingress whenever new Geo Time Series are encountered, when their attributes are modified or when a complete GTS is deleted. It is consumed by directory to update the GTS registry and by ingress to update their GTS cache.

This topic must be consumed using a dedicated groupid per ingress and per directory.

data topic

This topic is populated by ingress when new data is pushed for a Geo Time Series or when a deletion request is made. This serialization of updates and deletes in the same topic (and same partition for a given GTS) is there to ensure the order of those operations.

The data topic is consumed by store which persists the updates in HBase and performs calls to the BulkDeleteEndpoint for deletes. The data is also consumed by plasmaBE.

The groupid used to consume this topic is common to all store instances. The plasmaBE instances also share a common groupid.

throttling topic

This topic is populated and consumed by ingress, it conveys estimators of updated Geo Time Series so the throttling mechanism on all ingress sees a converged view of the active data streams. This topic is consumed using a dedicated groupid per ingress.

webcall topic

This topic is used by egress whenever the WEBCALL function is called to push a request which will be handled by an instance of webcall. It must be consumed using a single groupid among all webcall instances.

runner topic

This topic is used by runner instances to push script execution requests. It is consumed by worker instances which should all use an identical groupid.

plasmafeXX topics

There is one such topic defined per plasmaFE instance. These topics are used by plasmaBE instances to push the data to which clients subscribed. Each topic is consumed by a single plasmaFE instance.

Configuring HBase

A Warp 10 instance uses a single table for storing both metadata and datapoints. This table is created using the HBase shell command in file continuum.hbase.

Deploying Warp 10 components

Warp 10 is configured using a single configuration file which defines properties. The template config files are extensively commented and we invite you to carefully read all those comments when configuring a given component.

Warp 10 makes a heavy use of cryptographic hash functions and encryption and relies on a set of keys being defined in the configuration, these keys are mandatory and cannot be changed once they have been used. Treat those keys as you would your most precious secrets.

Once your keys are set you can generate write and read tokens using the Worf tool. Generate a pair of such tokens now as you will need them to test the Warp 10 components later.

The components should be deployed in the order specified below. Note that plasma is optional, only deploy it if you want to support the streaming (WebSocket) endpoints.

An instance of Warp 10 can run multiple components, refer to the resources each component requires when selecting which ones to run together.

As a best practice, egress, directory, store and ingress should be run in their own JVM. webcall and runner can be colocated with egress.

A Warp 10 instance is started using

java ${JAVA_OPTS} -cp warp10-x.y.z.jar io.warp10.WarpDist path/to/warp.config

Deploying ingress

ingress is CPU and network bound and in a lesser way also memory bound (depending on the size of the cache you have configured).

Once ingress is deployed and started it will listen on port 8882. You can push test data to ingress using:

curl -H 'X-Warp10-Token: WRITE_TOKEN' --data-binary '// test{} T' http://host:8882/api/v0/update

By checking the metadata and data topics in Kafka, you should see that one message arrived in each of those topics. If this is the case, then your ingress deployment is a success.

Deploying directory

directory is memory bound.

When directory is started, it should read metadatas in HBase and populate its Geo Time Series registry, then it will consume the metadata topic and update its registry accordingly.

If you check Kafka, you should see that the message you identified above has been consumed using the groupid of your directory. You can also scan the table in HBase for the m column family, it should have an entry.

Deploying store

store is CPU and network bound.

Once started, store will consume the data topic and populate HBase or proceed with deletions. Check Kafka and confirm that the data topic is being consumed. You can also scan the v column family in the HBase table to confirm there is an entry.

Deploying egress

egress is memory and network bound.

Once started, egress will listen on port 8881 for incoming requests. You can check that your egress instance is working by running the following command:

curl --data-binary "[ 'READ_TOKEN' 'test' {} NOW -1 ] FETCH" http://host:8881/api/v0/exec

which should return a Geo Time Series object with a single datapoint.

Congratulations, you now have a working Distributed Warp 10 Cell!

Deploying webcall

webcall does not consume any particular resources, it must have access to the sites allowed in WEBCALL calls.

Check that the webcall topic is indeed consumed after issuing a WEBCALL call to egress.

Deploying Plasma

plasmaBE is network bound as it consumes the whole data topic. plasmaFE may become network bound if many subscriptions are done.

PlasmaFE listens on port 8884.

You can test plasma by using this Python script and pushing data to ingress.

Deploying runner

The runner component can be configured to assume one of several roles. In the standalone role, the runner instance will schedule the scripts and will submit them to the configured egress endpoint. In the scheduler role, runner will schedule the scripts and push execution requests into the runner Kafka topic. And last, in the worker role, runner consumes the runner Kafka topic and executes the requests on the configured egress.

To test runner, deploy a script in a subdirectory of the configured script directory in a sub-sub directory with the expected script periodicity, i.e.

testing/60000/foo.mc2

for a script which should be executed every 60s.

Deploying fetcher

fetcher is network bound.

Testing fetcher is out of scope of this guide, please refer to the warp10-pig tutorial for more info.

Getting more help

Should you need help in setting up the distributed version of Warp 10, please start a conversation in the Warp 10 Google Group.