Distributed Warp 10 deployment
Planning your deployment
The distributed version of Warp 10 relies on the following components which you should deploy in the specified versions:
Introducing the Warp 10 components
The distributed version of Warp 10 is a set of components (or daemons) working together to provide the service. All components can be deployed multiple times to address scalability and high availability.
Almost any combination of components can be launched in a single JVM. Some components can be omitted if you do not wish to
provide the associated functionality. The core components you should run are ingress
, directory
, store
and egress
,
all others can be considered optional.
Below is a brief description of each of the components.
Ingress
The ingress
component is responsible for ingesting and deleting data and for modifying the Geo Time Series metadata.
It pushes the data and metadata in dedicated Kafka topics, namely data and metadata. It also consumes the Kafka metadata topic in order to keep
track of known GTS and to enforce Throttling.
Multiple instances of ingress
can be launched to increase the ingestion capacity.
Directory
The directory
component is responsible for tracking all known Geo Time Series, it maintains their list in memory and
responds to requests initiated by egress
and plasma frontend
.
The metadata are consumed off of the metadata Kafka topic and stored in HBase. Upon startup directory
also reads all
metadata stored in HBase.
Multiple instances of directory
can be launched and each instance can be sharded (running as a set of multiple
directory
daemons each responsible for a subset of metadata).
Store
The store
daemon is in charge of persisting datapoints in HBase. It consumes the Kafka data topic and writes the data
in HBase.
Multiple store
can be launched, each consuming some partitions of the Kafka topic.
Egress
The egress
component is where the data analytics happen, it is the WarpScript Analytics Engine. It talks to directory
and fetches the datapoints from HBase. The egress
component also knows how to talk to ingress
to support the UPDATE
,
META
and DELETE
functions. It also pushes messages onto the Kafka webcall topic via the WEBCALL
function.
Plasma backend (plasmaBE)
Plasma is the publish/subscribe subsystem of Warp 10. The plasmaBE
component consumes the Kafka data
topic and pushes the needed data to the topics associated with the various plasmaFE
instances.
Plasma frontend (plasmaFE)
The plasmaFE
component is in charge of the WebSocket endpoint where clients can subscribe to Geo Time Series. It talks
to directory
, stores subscriptions in ZooKeeper and consumes datapoints off of a dedicated Kafka topic.
Webcall
The webcall
component consumes WEBCALL
requests from the webcall Kafka topic and issues outgoing HTTP requests.
Runner
The runner
component is in charge of scheduling and/or running WarpScript code at various periods. Each instance can
be configured in standalone
mode where it schedules and runs scripts from the same JVM or in one or both of scheduler
or worker
modes where the scheduler
role schedules scripts by pushing them onto a dedicated Kafka topic and the
worker
role consumes the scripts to run from this same topic and triggers their execution.
Fetcher
The fetcher
component can be used by batch jobs (in Pig, Flink, Spark or MapReduce) to load data using the
Warp10InputFormat
. It is meant to be launched on each RegionServer to benefit from data locality.
Infrastructure components
ZooKeeper
ZooKeeper is needed by Kafka, Hadoop (when enabling HA) and HBase. Warp 10 also uses ZooKeeper to coordinate and discover services.
The following Warp 10 configuration parameters are related to ZooKeeper, please refer to the template configuration files for the detail of each one:
directory.hbase.metadata.zkconnect
directory.hbase.metadata.znode
directory.kafka.metadata.zkconnect
directory.zk.quorum
directory.zk.znode
egress.hbase.data.zkconnect
egress.hbase.data.znode
egress.zk.quorum
ingress.kafka.data.zkconnect
ingress.kafka.metadata.zkconnect
ingress.zk.quorum
plasma.backend.kafka.in.zkconnect
plasma.backend.kafka.out.zkconnect
plasma.backend.subscriptions.zkconnect
plasma.backend.subscriptions.znode
plasma.frontend.kafka.zkconnect
plasma.frontend.zkconnect
plasma.frontend.znode
runner.kafka.zkconnect
runner.zk.quorum
runner.zk.znode
store.hbase.data.zkconnect
store.hbase.data.znode
store.kafka.data.zkconnect
store.zk.quorum
webcall.kafka.zkconnect
Kafka
Kafka is central to Warp 10, it acts as the data hub of the platform and a great shock absorber which makes all components loosely coupled and enables them to scale and be updated without interrupting the overall service.
The following Kafka topics are needed by Warp 10:
- metadata
- data
- throttling
- webcall
- plasmabe
- plasmafe (one per instance)
- warpscript
The following configuration parameters are related to Kafka, please review their description in the configuration template:
directory.kafka.metadata.aes
directory.kafka.metadata.commitperiod
directory.kafka.metadata.consumer.clientid
directory.kafka.metadata.groupid
directory.kafka.metadata.mac
directory.kafka.metadata.topic
directory.kafka.metadata.zkconnect
directory.kafka.nthreads
ingress.kafka.data.aes
ingress.kafka.data.brokerlist
ingress.kafka.data.mac
ingress.kafka.data.maxsize
ingress.kafka.data.poolsize
ingress.kafka.data.producer.clientid
ingress.kafka.data.topic
ingress.kafka.data.zkconnect
ingress.kafka.metadata.aes
ingress.kafka.metadata.brokerlist
ingress.kafka.metadata.commitperiod
ingress.kafka.metadata.consumer.clientid
ingress.kafka.metadata.groupid
ingress.kafka.metadata.mac
ingress.kafka.metadata.maxsize
ingress.kafka.metadata.nthreads
ingress.kafka.metadata.poolsize
ingress.kafka.metadata.producer.clientid
ingress.kafka.metadata.topic
ingress.kafka.metadata.zkconnect
plasma.backend.kafka.in.aes
plasma.backend.kafka.in.commitperiod
plasma.backend.kafka.in.consumer.clientid
plasma.backend.kafka.in.groupid
plasma.backend.kafka.in.mac
plasma.backend.kafka.in.nthreads
plasma.backend.kafka.in.topic
plasma.backend.kafka.in.zkconnect
plasma.backend.kafka.out.aes
plasma.backend.kafka.out.brokerlist
plasma.backend.kafka.out.mac
plasma.backend.kafka.out.maxsize
plasma.backend.kafka.out.producer.clientid
plasma.backend.kafka.out.topic
plasma.backend.kafka.out.zkconnect
plasma.frontend.kafka.aes
plasma.frontend.kafka.commitperiod
plasma.frontend.kafka.consumer.clientid
plasma.frontend.kafka.groupid
plasma.frontend.kafka.mac
plasma.frontend.kafka.nthreads
plasma.frontend.kafka.topic
plasma.frontend.kafka.zkconnect
runner.kafka.aes
runner.kafka.brokerlist
runner.kafka.commitperiod
runner.kafka.consumer.clientid
runner.kafka.groupid
runner.kafka.mac
runner.kafka.nthreads
runner.kafka.poolsize
runner.kafka.producer.clientid
runner.kafka.topic
runner.kafka.zkconnect
store.kafka.data.aes
store.kafka.data.brokerlist
store.kafka.data.commitperiod
store.kafka.data.consumer.clientid
store.kafka.data.groupid
store.kafka.data.intercommits.maxtime
store.kafka.data.mac
store.kafka.data.producer.clientid
store.kafka.data.topic
store.kafka.data.zkconnect
webcall.kafka.aes
webcall.kafka.brokerlist
webcall.kafka.commitperiod
webcall.kafka.consumer.clientid
webcall.kafka.groupid
webcall.kafka.mac
webcall.kafka.producer.clientid
webcall.kafka.topic
webcall.kafka.zkconnect
ingress.kafka.data.mac
, store.kafka.data.mac
and plasma.backend.kafka.in.mac
must have the same value. ingress.kafka.metadata.mac
and directory.kafka.metadata.mac
must also have the same value. Same goes with the optional configuration keys ending in .aes
.
Hadoop
Hadoop is not needed directly by Warp 10 but used as the underlying storage of HBase. Beyond this storage role, Hadoop, and YARN specifically, can be leveraged to process massive amounts of Geo Time Series stored either in Warp 10 itself (in HBase) or in any other format using WarpScript on Pig, Flink or Spark.
Hadoop (HDFS) can also be used to perform backups of data stored in HBase.
HBase
HBase is used as the main datastore of Warp 10. The data is organized in a single table containing two column families, one for the metadata and another one for the datapoints. Row keys for both types of data start with different prefixes. The creation command is available on GitHub.
For improved datapoints retrieval, a Warp 10 custom filter (org.apache.hadoop.hbase.filter.SlicedRowFilter
)
should be deployed on HBase. The jar file containing this filter can be found on Maven or be built using the hbaseFilters:jar Gradle task. Simply add this jar to the HBase classpath to make possible the use of this filter.
In order to support datapoints deletion, HBase must be configured with the BulkDeleteEndpoint
. To do so, you can download the jar on maven, add it to your HBase classpath and include in your hbase-site.xml the following lines:
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.example.BulkDeleteEndpoint</value>
</property>
Putting it all together
Once you've set up ZooKeeper, Kafka, Hadoop and HBase and with your Warp 10 configuration files on hand, launching Warp 10 is simply a matter of executing
java -cp warp10-x.y.z.jar io.warp10.WarpDist path/to/configuration.file
on each machine where you want to run Warp 10 daemons.
HaProxy
In order to provide high-availability and load-balancing for ingress
, egress
and plasmaFE
, we recommend
you run haproxy
in front of those Warp 10 daemons.
Setup
A Warp 10 distributed platform is called a cell, throughout this guide we will consider that your cell is named prod
.
Configuring ZooKeeper
We highly recommend you use a single ZooKeeper ensemble for Kafka, Hadoop, HBase and Warp 10, this makes the ops side of things simpler.
Our best practice is also to isolate the various znodes hierarchies under dedicated root znodes.
Here are the znodes
you should create in your ZooKeeper:
/zk
/zk/kafka
/zk/kafka/prod
/zk/warp
/zk/warp/prod
/zk/warp/prod/services
/zk/warp/prod/plasma
/zk/hbase
/zk/hbase/prod
We add a /zk
prefix to all znodes so we recognize a znode path when we see one.
We assume no authentication is performed by Warp 10, HBase or Kafka when connecting to ZooKeeper, so everyone should be able to write to the above znodes.
Configuring Kafka
Warp 10 uses the 0.8.2 API, your Kafka cluster must therefore use this format (0.8.2
) for its log messages (log.message.format.version
) and inter broker communications (inter.broker.protocol.version
).
Several Kafka topics are used to exchange messages between Warp 10 components.
Remember that the parallelism unit when consuming Kafka topics is the partition so adjust the number of partitions of the various topics so you achieve the expected parallelism (based on thread count configured) in your consumers.
The Kafka producers ensure that messages for a given Geo Time Series are always sent to the same partition.
A Kafka topic is created using the kafka-topics.sh
tool provided by Kafka.
Note that you MUST disable log compaction in Kafka (cleanup.policy
MUST be set to delete
).
metadata
topic
This topic is populated by ingress
whenever new Geo Time Series are encountered, when their attributes are modified
or when a complete GTS is deleted. It is consumed by directory
to update the GTS registry and by ingress
to update
their GTS cache.
This topic must be consumed using a dedicated groupid
per ingress
and per directory
.
data
topic
This topic is populated by ingress
when new data is pushed for a Geo Time Series or when a deletion request is made.
This serialization of updates and deletes in the same topic (and same partition for a given GTS) is there to ensure the
order of those operations.
The data
topic is consumed by store
which persists the updates in HBase and performs calls to the BulkDeleteEndpoint
for deletes. The data is also consumed by plasmaBE
.
The groupid
used to consume this topic is common to all store
instances. The plasmaBE
instances also share a common
groupid
.
throttling
topic
This topic is populated and consumed by ingress
, it conveys estimators of updated Geo Time Series so the throttling
mechanism on all ingress
sees a converged view of the active data streams. This topic is consumed using a dedicated
groupid
per ingress
.
webcall
topic
This topic is used by egress
whenever the WEBCALL
function is called to push a request which will be handled by an
instance of webcall
. It must be consumed using a single groupid
among all webcall
instances.
runner
topic
This topic is used by runner
instances to push script execution requests. It is consumed by worker
instances which
should all use an identical groupid
.
plasmafeXX
topics
There is one such topic defined per plasmaFE
instance. These topics are used by plasmaBE
instances to push the data
to which clients subscribed. Each topic is consumed by a single plasmaFE
instance.
Configuring HBase
A Warp 10 instance uses a single table for storing both metadata and datapoints. This table is created using the HBase shell command in file continuum.hbase.
Deploying Warp 10 components
Warp 10 is configured using a single configuration file which defines properties. The template config files are extensively commented and we invite you to carefully read all those comments when configuring a given component.
Warp 10 makes a heavy use of cryptographic hash functions and encryption and relies on a set of keys being defined in the configuration, these keys are mandatory and cannot be changed once they have been used. Treat those keys as you would your most precious secrets.
Once your keys are set you can generate write and read tokens using the Worf tool. Generate a pair of such tokens now as you will need them to test the Warp 10 components later.
The components should be deployed in the order specified below. Note that plasma
is optional, only deploy it if you
want to support the streaming (WebSocket) endpoints.
An instance of Warp 10 can run multiple components, refer to the resources each component requires when selecting which ones to run together.
As a best practice, egress
, directory
, store
and ingress
should be run in their own JVM. webcall
and runner
can be colocated with egress
.
A Warp 10 instance is started using
java ${JAVA_OPTS} -cp warp10-x.y.z.jar io.warp10.WarpDist path/to/warp.config
Deploying ingress
ingress
is CPU and network bound and in a lesser way also memory bound (depending on the size of the cache you have
configured).
Once ingress
is deployed and started it will listen on port 8882. You can push test data to ingress
using:
curl -H 'X-Warp10-Token: WRITE_TOKEN' --data-binary '// test{} T' http://host:8882/api/v0/update
By checking the metadata
and data
topics in Kafka, you should see that one message arrived in each of those topics.
If this is the case, then your ingress
deployment is a success.
Deploying directory
directory
is memory bound.
When directory
is started, it should read metadatas in HBase and populate its Geo Time Series registry, then it will
consume the metadata
topic and update its registry accordingly.
If you check Kafka, you should see that the message you identified above has been consumed using the groupid
of your
directory
. You can also scan the table in HBase for the m
column family, it should have an entry.
Deploying store
store
is CPU and network bound.
Once started, store
will consume the data
topic and populate HBase or proceed with deletions. Check Kafka and confirm
that the data
topic is being consumed. You can also scan the v
column family in the HBase table to confirm there is
an entry.
Deploying egress
egress
is memory and network bound.
Once started, egress
will listen on port 8881 for incoming requests. You can check that your egress
instance is
working by running the following command:
curl --data-binary "[ 'READ_TOKEN' 'test' {} NOW -1 ] FETCH" http://host:8881/api/v0/exec
which should return a Geo Time Series object with a single datapoint.
Congratulations, you now have a working Distributed Warp 10 Cell!
Deploying webcall
webcall
does not consume any particular resources, it must have access to the sites allowed in WEBCALL
calls.
Check that the webcall
topic is indeed consumed after issuing a WEBCALL
call to egress
.
Deploying Plasma
plasmaBE
is network bound as it consumes the whole data
topic. plasmaFE
may become network bound if many subscriptions
are done.
PlasmaFE
listens on port 8884.
You can test plasma
by using this Python script
and pushing data to ingress
.
Deploying runner
The runner
component can be configured to assume one of several roles. In the standalone
role, the runner
instance
will schedule the scripts and will submit them to the configured egress
endpoint. In the scheduler
role, runner
will schedule the scripts and push execution requests into the runner
Kafka topic. And last, in the worker
role,
runner
consumes the runner
Kafka topic and executes the requests on the configured egress
.
To test runner
, deploy a script in a subdirectory of the configured script directory in a sub-sub directory with the
expected script periodicity, i.e.
testing/60000/foo.mc2
for a script which should be executed every 60s.
Deploying fetcher
fetcher
is network bound.
Testing fetcher
is out of scope of this guide, please refer to the warp10-pig tutorial for more info.
Getting more help
Should you need help in setting up the distributed version of Warp 10, please start a conversation in the Warp 10 Google Group.