Using Kafka

Overview

Nodes in the AnyLog Network interact with Kafka in 2 ways:

  • EdgeLake serves as a Data Producer to Kafka - any result set of a query can be directed to a Kafka instance.
  • EdgeLake serves as a Data Consumer with Kafka serving as a message broker that transfers data to the network nodes.

Prerequisites

  • An AnyLog Network with nodes hosting data.
  • A configured Kafka instance.

EdgeLake serves as a Data Producer

A query issued to the network can direct the result set to a Kafka instance.
The Kafka instance is identified by an IP and port, and the query result set is associated with a topic.

The following command, issued on an EdgeLake instance, sends 10 row from a table managed by nodes in the network to a Kafka instance:

run client () sql litsanleandro format = json:output and stat  = false and dest = kafka@198.74.50.131:9092 and topic = ping_data "select device_name, timestamp, value, from ping_sensor where timestamp > now() - 1 day limit 10"

Note:

  • The format directive json:output organizes each set of timestamp and value (that are returned by the query) in JSON.
  • The destination is identified by the key Kafka followed by the Kafka configured IP and Port (dest = kafka@198.74.50.131:9092).
  • The Kafka topic that is associated with the data in the example above is ping_data

EdgeLake serves as a Data Consumer

Each node in the AnyLog Network can be configured as a data consumer.
The flow of data from a Kafka instance to the network is detailed in The Southbound Connectors Diagram.

The command run kafka consumer initiates a process that serves as a client that subscribes to one or more topics and consume published messages by pulling data from the Kafka instance.

Usage:

<run kafka consumer where 
    ip = [ip] and port = [port] and 
    reset = [latest/earliest] and
    topic = [topic and mapping instructions]
>

Command options:

KeyValueDefault
ipThe Kafka broker IP. 
PortThe Kafka broker port. 
resetDetermines the offset policy. Optional values are latest or earliestlatest
topicOne or more topics with mapping instructions. 

Details on the topic declaration and mapping instructions are available here.

Example:

<run kafka consumer where ip = 198.74.50.131 and port = 9092 and reset = latest and topic = (
    name = ping_data and 
    dbms = lsl_demo and 
    table = ping_sensor and 
    column.timestamp.timestamp = "bring [timestamp]" and 
    column.value.int = "bring [value]"
)>
CommandInfo provided
get processesBackground processes to determine if Kafka is enabled
get msg clientSubscriptions to brokers to determine related configurations and data consumed from Kafka instances
get streamingData consumed from brokers associated to dbms tables
get operatorStatistics on ingestion of data to database tables