Background Services

The background services enable data processing on each participating node.
Services can be enabled using one of the following methods:

  • As command line arguments passed to the EdgeLake process when initiated.
  • On the EdgeLake CLI.
  • As an EdgeLake configuration file that is processed using the command: process [path and file name]
  • As a policy in the metadata that is associated to a node using the command: config from policy where id = [policy id]

The following command lists the background services and their status:

Usage:

get processes [where format = json]

Explanation: List the background processes and their status.

Details: The “get processes” command

The main services are listed below:

Connect to the EdgeLake Network

Connecting to an EdgeLake Network requires 2 services:

  1. Enable the TCP service - A listener service for incoming messages.
  2. Enable the blockchain synchronization service - A service that continuously synchronizes the metadata.

Monitoring the services’ status:

  1. get connections - Return the node’s connection information.
  2. get synchronizer - Return the metadata synchronization information.

Enable the TCP service

The TCP service provides the functionality to send and recieve messages from peer nodes using the EdgeLake Network Protocol.

Usage:

<run tcp server where 
  external_ip = [ip] and external_port = [port] and 
  internal_ip = [local_ip] and internal_port = [local_port] and 
  bind = [true/false] and threads = [threads count]>

Explanation: Set a TCP server in a listening mode on the specified IP and port.

  • The first pair of IP and Port that are used by a listener process to receive messages from members of the network.
  • The second pair of IP and Port are optional, to indicate the IP and Port that are accessible from a local network.
  • threads - an optional parameter for the number of workers threads that process requests which are sent to the provided IP and Port. The default value is 6.

Examples:

run tcp server where external_ip = !ip and external_port = !port  and threads = 3

<run tcp server where 
  external_ip = !external_ip and external_port = 7850 and 
  internal_ip = !ip and internal_port = 7850 and 
  bind=false and threads = 6>

Details: run tcp server

Get the network configuration info

Usage:

get connections

Explanation:

Return the node’s connection information including the TCP service information.

Enable the blockchain synchronization service

Usage:

run blockchain sync [options]

Explanation:
Repeatedly update the local copy of the blockchain

Options:

  • source - The source of the metadata (blockchain or a Master Node).
  • dest - The destination of the blockchain data such as a file (a local file) or a DBMS (a local DBMS).
  • connection - The connection information that is needed to retrieve the data. For a Master, the IP and Port of the master node.
  • time - The frequency of updates.

Examples:

run blockchain sync where source = master and time = 3 seconds and dest = file and dest = dbms and connection = !ip_port
run blockchain sync where source = blockchain and time = !sync_time and dest = file and platform = ethereum

Get the metadata synchronization info

Usage:

get synchronizer

Explanation:

Return the synchronizer status.

Details: Synchronier Status.

The local data storage service

Configure the Operator service

The Operator service provides the local data storage functionalities.
The service captures the data streams, identifies or creates schemas and ingest the data into a local database.

Usage:

run operator where [option] = [value] and [option] = [value] ...

Explanation:

Monitors new data added to the watch directory and load the new data to a local database

Options:

  • policy - The ID of the operator policy.
  • compress_json - True/False to enable/disable compression of the JSON file.
  • compress_sql - True/False to enable/disable compression of the SQL file.
  • archive_json - True moves the JSON file to the ‘archive’ dir if processing is successful. The file deleted if archive_sql is false.
  • archive_sql - True moves the SQL file to the ‘archive’ dir if processing is successful. The file deleted if archive_sql is false.
  • limit_tables - A list of comma separated names within brackets listing the table names to process.
  • craete_table - A True value creates a table if the table doesn’t exist.
  • master_node - The IP and Port of a Master Node (if a master node is used).
  • update_tsd_info - True/False to update a summary table (tsd_info table in almgm dbms) with status of files ingeste

Examples:

run operator where create_table = true and update_tsd_info = true and archive_json = true and distributor = true and master_node = !master_node and policy = !operator_policy  and threads = 3
run operator where create_table = true and update_tsd_info = true and archive_json = true and distributor = true and blockchain = ethereum and policy = !operator_policy  and threads = 3

Details: Operator Process.

Get Operator info

The get operator command returns info on the configuration and data processed by the Operator service.

Usage:

get operator [options] [where format = json]

Explanation:

Return information on the Operator processes and configuration.

Examples:

get operator
get operator inserts
get operator summary
get operator config
get operator summary where format = json

details: get operator

REST Services

Enable and monitor a service that receives commands and data via REST from 3rd parties applications and data sources.

Enable the REST service

Usage:

<run rest server where 
  external_ip = [external_ip ip] and external_port = [external port] and 
  internal_ip = [internal ip] and internal_port = [internal port] and 
  timeout = [timeout] and ssl = [true/false] and bind = [true/false]>

Explanation:

Enable a REST server in a listening mode on the specified ip and port.

  • The IP and Ports associate the service with external and internal IPs and Ports.
  • [timeout] - Max wait time in seconds. A 0 value means no wait limit, and the default value is 20 seconds. A timeout returns an error message to the calling application.
  • If ssl is set to True, connection is using HTTPS.
  • If bind is true, only the specified IP is allowed (with 2 IPs, the external is ignored).

Examples:

<run rest server where 
  internal_ip = !ip and internal_port = 7849 and 
  timeout = 0 and threads = 6 and ssl = true>

Details: Rest Requests

REST service info commands

The following commands return info on the REST service configuration and processes:

Configuration information of the REST service:

get rest server info

Statistics on the REST calls:

get rest calls

Status of REST threads:

get rest pool

Message broker services

Enable and monitor a service that operates as a message broker, allowing to publish data on the EdgeLake Node.

Enable the message broker service

Usage:

<run message broker where 
  external_ip = [ip] and external_port = [port] and 
  internal_ip = [local_ip] and internal_port = [local_port] and 
  bind = [true/false] and threads = [threads count]>

Explanation: Set a message broker in a listening mode on the specified IP and port. Threads count represents the number of threads supporting the service.

Examples:

run message broker where external_ip = !ip and external_port = !port  and threads = 3
run message broker where external_ip = !external_ip and external_port = 7850 and internal_ip = !ip and internal_port = 7850 and threads = 6

Details: Message Broker Services

Get info on the Message Broker service

The following command returns statistics on the local broker:

get local broker

Subscribe to a 3rd party broker

Retrieve data from a 3rd party broker and monitor the streaming process.

Subscribe to a broker

The run msg client command subscribes to a 3rd party broker. It includes options to map the source data (the data on the broker) to a destination format.
The mapping can be done using command variables, or by associating a mapping policy (from the metadata). See details below.

Usage:

<run msg client where 
  broker = [url] and port = [port] and 
  user = [user] and password = [password] and log=[true/false] 
  topic = (
    name = [topic name] and 
    dbms = [dbms name] and 
    table = [table name] and 
    [participating columns info]
)>

Explanation: Subscribe to a broker according to the URL provided to receive data on the provided topic.

Examples:

<run msg client where 
  broker = "driver.cloudmqtt.com" and port = 18975 and 
  user = mqwdtklv and password = uRimssLO4dIo and 
  topic = (
    name = test and 
    dbms = "bring [metadata][company]" and 
    table = "bring [metadata][machine_name] _ [metadata][serial_number]" and 
    column.timestamp.timestamp = "bring [ts]" and 
    column.value.int = "bring [value]"
)>

Details: Message Client

Get subscription info

Get configuration and statistics information from the subscription process.

Usage:

get msg clients where [options]

Explanation: Information on messages received by clients subscribed to message brokers.

Examples:

get msg clients
get msg client where id = 3
get msg client where topic = anylogedgex
get msg client where broker = driver.cloudmqtt.com:18785 and topic = anylogedgex

Details: get msg client

Subscribe to Kafka

The command is similar to the run msg client command. Monitoring is with the get msg client command.

Usage:

<run kafka consumer where 
  ip = [ip] and port = [port] and 
  reset = [latest/earliest] and 
  topic = [topic and mapping instructions]>

Explanation: Initialize a Kafka consumer that subscribes to one or more topics of a kafka instance and continuously polls data assigned to the subscribed topics using the provided IP and Port. The reset value determines the offset whereas the default is latest.

Examples:

<run kafka consumer where 
  ip = 198.74.50.131 and port = 9092 and 
  reset = earliest and topic = (
    name = sensor and 
    dbms = lsl_demo and 
    table = ping_sensor and 
    column.timestamp.timestamp = "bring [timestamp]" and 
    column.value.int = "bring [value]"
)>

Details: Using Kafka.

gRPC Client Service

Subscribe to a gRPC broker and monitor the data flow.

Enable gRPC client service

Usage:

run grpc client where name = [unique name] and ip = [IP] and port = [port] and policy = [policy id]

Explanation: Subscribe to a gRPC broker on the provided IP and Port and map data using the designated mapping policy.

Examples:

run grpc client where name = kubearmor and ip = 127.0.0.1 and port = 32767 and policy = deff520f1096bcd054b22b50458a5d1c

Details: Using gRPC.

Get gRPC connection info

List the active gRPC clients and the data exchange info.

Usage:

get grpc client

Enable smtp client service

Usage:

<run smtp client where 
  host = [host name] and port = [port] and 
  email = [email address] and password = [email password] and 
  ssl = [true/false]>

Explanation: Initiates an SMTP instance encapsulates an SMTP connection to a server.

Examples:

run smtp client where email = anylog.iot@gmail.com and password = google4anylog

Details: SMTP Client.

The Scheduler Services

Users can enable multiple schedulers. A scheduler contains a group of tasks that are executed periodically.

A detailed description is available in the section.

Run Scheduler

Usage:

run scheduler [id]

[id] - Optional value, representing the scheduler ID. The default value is 1, representing a user scheduler.

Explanation: Repeatedly execute scheduled jobs.

Examples:

run scheduler 1

Details: The Scehdualer.

Get Scheduler

Monitor the scheduler

Usage:

get scheduler [n]

Explanation: Information on the scheduled tasks. [n] - an optional ID for the scheduler. Scheduler 1 manage user scheduled tasks, 0 is the system scheduler.

Examples:

get scheduler
get scheduler 1

Details: View Scheduled Commands.

Blob Archiver Services

The Blob Archiver is a service that manage blob data on the node.

Enable the blobs archiver service

Usage:

<run blobs archiver where 
  blobs_dir = [data directory location] and archive_dir = [archive directory location] and 
  dbms = [true/false] and file = [true/false] and compress = [true/false]>

Explanation: Archive large objects.

Examples:

run blobs archiver where dbms = true and file = true and compress = false

Details: The Blobs Archiver.

Get blobs archiver info

Return information on the Blobs Archiver processes.

Usage:

get blobs archiver