Background Services
The background services enable data processing on each participating node.
Services can be enabled using one of the following methods:
- As command line arguments passed to the EdgeLake process when initiated.
- On the EdgeLake CLI.
- As an EdgeLake configuration file that is processed using the command: process [path and file name]
- As a policy in the metadata that is associated to a node using the command: config from policy where id = [policy id]
The following command lists the background services and their status:
Usage:
get processes [where format = json]
Explanation: List the background processes and their status.
Details: The “get processes” command
The main services are listed below:
- Connect to the EdgeLake Network
- The local data storage service
- REST Services
- The message broker services
- Subscribe to a 3rd party broker
- Subscribe to Kafka
- gRPC Client Service
- SMTP Client
- The Scheduler Services
- The Blobs Archiver Services
Connect to the EdgeLake Network
Connecting to an EdgeLake Network requires 2 services:
- Enable the TCP service - A listener service for incoming messages.
- Enable the blockchain synchronization service - A service that continuously synchronizes the metadata.
Monitoring the services’ status:
- get connections - Return the node’s connection information.
- get synchronizer - Return the metadata synchronization information.
Enable the TCP service
The TCP service provides the functionality to send and recieve messages from peer nodes using the EdgeLake Network Protocol.
Usage:
<run tcp server where
external_ip = [ip] and external_port = [port] and
internal_ip = [local_ip] and internal_port = [local_port] and
bind = [true/false] and threads = [threads count]>
Explanation: Set a TCP server in a listening mode on the specified IP and port.
- The first pair of IP and Port that are used by a listener process to receive messages from members of the network.
- The second pair of IP and Port are optional, to indicate the IP and Port that are accessible from a local network.
- threads - an optional parameter for the number of workers threads that process requests which are sent to the provided IP and Port. The default value is 6.
Examples:
run tcp server where external_ip = !ip and external_port = !port and threads = 3
<run tcp server where
external_ip = !external_ip and external_port = 7850 and
internal_ip = !ip and internal_port = 7850 and
bind=false and threads = 6>
Details: run tcp server
Get the network configuration info
Usage:
get connections
Explanation:
Return the node’s connection information including the TCP service information.
Enable the blockchain synchronization service
Usage:
run blockchain sync [options]
Explanation:
Repeatedly update the local copy of the blockchain
Options:
source
- The source of the metadata (blockchain or a Master Node).dest
- The destination of the blockchain data such as a file (a local file) or a DBMS (a local DBMS).connection
- The connection information that is needed to retrieve the data. For a Master, the IP and Port of the master node.time
- The frequency of updates.
Examples:
run blockchain sync where source = master and time = 3 seconds and dest = file and dest = dbms and connection = !ip_port
run blockchain sync where source = blockchain and time = !sync_time and dest = file and platform = ethereum
Get the metadata synchronization info
Usage:
get synchronizer
Explanation:
Return the synchronizer status.
Details: Synchronier Status.
The local data storage service
Configure the Operator service
The Operator service provides the local data storage functionalities.
The service captures the data streams, identifies or creates schemas and ingest the data into a local database.
Usage:
run operator where [option] = [value] and [option] = [value] ...
Explanation:
Monitors new data added to the watch directory and load the new data to a local database
Options:
policy
- The ID of the operator policy.compress_json
- True/False to enable/disable compression of the JSON file.compress_sql
- True/False to enable/disable compression of the SQL file.archive_json
- True moves the JSON file to the ‘archive’ dir if processing is successful. The file deleted if archive_sql is false.archive_sql
- True moves the SQL file to the ‘archive’ dir if processing is successful. The file deleted if archive_sql is false.limit_tables
- A list of comma separated names within brackets listing the table names to process.craete_table
- A True value creates a table if the table doesn’t exist.master_node
- The IP and Port of a Master Node (if a master node is used).update_tsd_info
- True/False to update a summary table (tsd_info table in almgm dbms) with status of files ingeste
Examples:
run operator where create_table = true and update_tsd_info = true and archive_json = true and distributor = true and master_node = !master_node and policy = !operator_policy and threads = 3
run operator where create_table = true and update_tsd_info = true and archive_json = true and distributor = true and blockchain = ethereum and policy = !operator_policy and threads = 3
Details: Operator Process.
Get Operator info
The get operator command returns info on the configuration and data processed by the Operator service.
Usage:
get operator [options] [where format = json]
Explanation:
Return information on the Operator processes and configuration.
Examples:
get operator
get operator inserts
get operator summary
get operator config
get operator summary where format = json
details: get operator
REST Services
Enable and monitor a service that receives commands and data via REST from 3rd parties applications and data sources.
- Enable the service: run rest server
- Monitor the REST service:
Enable the REST service
Usage:
<run rest server where
external_ip = [external_ip ip] and external_port = [external port] and
internal_ip = [internal ip] and internal_port = [internal port] and
timeout = [timeout] and ssl = [true/false] and bind = [true/false]>
Explanation:
Enable a REST server in a listening mode on the specified ip and port.
- The IP and Ports associate the service with external and internal IPs and Ports.
- [timeout] - Max wait time in seconds. A 0 value means no wait limit, and the default value is 20 seconds. A timeout returns an error message to the calling application.
- If ssl is set to True, connection is using HTTPS.
- If bind is true, only the specified IP is allowed (with 2 IPs, the external is ignored).
Examples:
<run rest server where
internal_ip = !ip and internal_port = 7849 and
timeout = 0 and threads = 6 and ssl = true>
Details: Rest Requests
REST service info commands
The following commands return info on the REST service configuration and processes:
Configuration information of the REST service:
get rest server info
Statistics on the REST calls:
get rest calls
Status of REST threads:
get rest pool
Message broker services
Enable and monitor a service that operates as a message broker, allowing to publish data on the EdgeLake Node.
Enable the message broker service
Usage:
<run message broker where
external_ip = [ip] and external_port = [port] and
internal_ip = [local_ip] and internal_port = [local_port] and
bind = [true/false] and threads = [threads count]>
Explanation: Set a message broker in a listening mode on the specified IP and port. Threads count represents the number of threads supporting the service.
Examples:
run message broker where external_ip = !ip and external_port = !port and threads = 3
run message broker where external_ip = !external_ip and external_port = 7850 and internal_ip = !ip and internal_port = 7850 and threads = 6
Details: Message Broker Services
Get info on the Message Broker service
The following command returns statistics on the local broker:
get local broker
Subscribe to a 3rd party broker
Retrieve data from a 3rd party broker and monitor the streaming process.
Subscribe to a broker
The run msg client command subscribes to a 3rd party broker. It includes options to map the source data (the data on the broker) to a destination format.
The mapping can be done using command variables, or by associating a mapping policy (from the metadata). See details below.
Usage:
<run msg client where
broker = [url] and port = [port] and
user = [user] and password = [password] and log=[true/false]
topic = (
name = [topic name] and
dbms = [dbms name] and
table = [table name] and
[participating columns info]
)>
Explanation: Subscribe to a broker according to the URL provided to receive data on the provided topic.
Examples:
<run msg client where
broker = "driver.cloudmqtt.com" and port = 18975 and
user = mqwdtklv and password = uRimssLO4dIo and
topic = (
name = test and
dbms = "bring [metadata][company]" and
table = "bring [metadata][machine_name] _ [metadata][serial_number]" and
column.timestamp.timestamp = "bring [ts]" and
column.value.int = "bring [value]"
)>
Details: Message Client
Get subscription info
Get configuration and statistics information from the subscription process.
Usage:
get msg clients where [options]
Explanation: Information on messages received by clients subscribed to message brokers.
Examples:
get msg clients
get msg client where id = 3
get msg client where topic = anylogedgex
get msg client where broker = driver.cloudmqtt.com:18785 and topic = anylogedgex
Details: get msg client
Subscribe to Kafka
The command is similar to the run msg client command. Monitoring is with the get msg client command.
Usage:
<run kafka consumer where
ip = [ip] and port = [port] and
reset = [latest/earliest] and
topic = [topic and mapping instructions]>
Explanation: Initialize a Kafka consumer that subscribes to one or more topics of a kafka instance and continuously polls data assigned to the subscribed topics using the provided IP and Port. The reset value determines the offset whereas the default is latest.
Examples:
<run kafka consumer where
ip = 198.74.50.131 and port = 9092 and
reset = earliest and topic = (
name = sensor and
dbms = lsl_demo and
table = ping_sensor and
column.timestamp.timestamp = "bring [timestamp]" and
column.value.int = "bring [value]"
)>
Details: Using Kafka.
gRPC Client Service
Subscribe to a gRPC broker and monitor the data flow.
Enable gRPC client service
Usage:
run grpc client where name = [unique name] and ip = [IP] and port = [port] and policy = [policy id]
Explanation: Subscribe to a gRPC broker on the provided IP and Port and map data using the designated mapping policy.
Examples:
run grpc client where name = kubearmor and ip = 127.0.0.1 and port = 32767 and policy = deff520f1096bcd054b22b50458a5d1c
Details: Using gRPC.
Get gRPC connection info
List the active gRPC clients and the data exchange info.
Usage:
get grpc client
Enable smtp client service
Usage:
<run smtp client where
host = [host name] and port = [port] and
email = [email address] and password = [email password] and
ssl = [true/false]>
Explanation: Initiates an SMTP instance encapsulates an SMTP connection to a server.
Examples:
run smtp client where email = anylog.iot@gmail.com and password = google4anylog
Details: SMTP Client.
The Scheduler Services
Users can enable multiple schedulers. A scheduler contains a group of tasks that are executed periodically.
- run scheduler - Enable the service.
- get scheduler - Get the scheduler service info.
A detailed description is available in the section.
Run Scheduler
Usage:
run scheduler [id]
[id]
- Optional value, representing the scheduler ID. The default value is 1, representing a user scheduler.
Explanation: Repeatedly execute scheduled jobs.
Examples:
run scheduler 1
Details: The Scehdualer.
Get Scheduler
Monitor the scheduler
Usage:
get scheduler [n]
Explanation: Information on the scheduled tasks. [n] - an optional ID for the scheduler. Scheduler 1 manage user scheduled tasks, 0 is the system scheduler.
Examples:
get scheduler
get scheduler 1
Details: View Scheduled Commands.
Blob Archiver Services
The Blob Archiver is a service that manage blob data on the node.
Enable the blobs archiver service
Usage:
<run blobs archiver where
blobs_dir = [data directory location] and archive_dir = [archive directory location] and
dbms = [true/false] and file = [true/false] and compress = [true/false]>
Explanation: Archive large objects.
Examples:
run blobs archiver where dbms = true and file = true and compress = false
Details: The Blobs Archiver.
Get blobs archiver info
Return information on the Blobs Archiver processes.
Usage:
get blobs archiver