Telegraf

Created by InfluxDB, Telegraf is an open-source agent to collect metrics from stacks, sensors and systems.

This document is based on Telegraf 1.31.1.

Requirements

  1. EdgeLake
  2. Telegraf

Configure Telegraf

EdgeLake accepts data from Telegraf using the serialized list in JSON format, either via a message broker or REST (POST).

The examples below is using machine monitoring as data inputs (cpu, mem, net and swap), but the same logic can be applied for any type of data input.

Note, data coming via JSON, seats inside a key called metrics (as shown below). The configurations below for both REST and MQTT extract the list of JSONs from within the metrics key, thus sending only the associated list.

Sample Data Generated

{"metrics":[
  {
    "fields":{"active":7080853504,"available":7166590976,"available_percent":41.715049743652344,"free":415137792,"inactive":6751453184,"total":17179869184,"used":10013278208,"used_percent":58.284950256347656,"wired":1292861440},
    "name":"mem",
    "tags":{"host":"Oris-Mac-mini.local"},
    "timestamp":1715018940
  },
  {
    "fields":{"usage_guest":0,"usage_guest_nice":0,"usage_idle":89.91935483869311,"usage_iowait":0,"usage_irq":0,"usage_nice":0,"usage_softirq":0,"usage_steal":0,"usage_system":2.7217741935480912,"usage_user":7.358870967749625},
    "name":"cpu",
    "tags":{"cpu":"cpu0","host":"Oris-Mac-mini.local"},
    "timestamp":1715018940
  },
  {
    "fields":{"free":0,"total":0,"used":0,"used_percent":0},
    "name":"swap",
    "tags":{"host":"Oris-Mac-mini.local"},
    "timestamp":1715018940
  },
  {
    "fields":{"bytes_recv":0,"bytes_sent":80,"drop_in":0,"drop_out":0,"err_in":0,"err_out":0,"packets_recv":0,"packets_sent":1,"speed":-1},
    "name":"net",
    "tags":{"host":"Oris-Mac-mini.local","interface":"utun3"},
    "timestamp":1715018940
  }
]}

REST

  1. Create a configurations file for REST
    telegraf --input-filter cpu:mem:net:swap  --output-filter http config > telegraf.conf
  2. Update [[outputs.http]] section in configuration file
    • URL
    • method - POST
    • data_format - JSON
    • use_batch_format - false
    • header information
      • command - data
      • topic - topic name
      • User-Agent - AnyLog/1.23
      • Content-Type - text/plain
    [[outputs.http]]
      url = "http://127.0.0.1:32149"
      method = "POST"
      data_format = "json"
      use_batch_format=false
      [outputs.http.headers]
        command = "data"
        topic = "telegraf-data"
        User-Agent = "AnyLog/1.23"
        Content-Type = "text/plain"
    
  3. Start Telegraf
    telegraf --config telegraf.conf > /dev/null 2>&1 & 

Message Broker

The following example uses MQTT output configuration; however, EdgeLake’s message broker client also accepts data from Kafka.

  1. Create a configurations file for MQTT
    telegraf --input-filter cpu:mem:net:swap  --output-filter mqtt config > telegraf.conf
  2. Update [[outputs.mqtt]] section in configuration file
    • servers - MQTT message broker connection information
    • method - POST
    • topic - topic name
    • layout - non-batch
    • data_format - json
    [[outputs.mqtt]]
        servers = ["127.0.0.1:32150"]
        topic = "telegraf-data"
        layout = "non-batch"
        data_format = "json"
    
  3. Start Telegraf
    telegraf --config telegraf.conf > /dev/null 2>&1 & 

Configure EdgeLake

EdgeLake requires a mapping policy for data coming in via REST POST or data published on the EdgeLake node (as a message broker). The mapping policy transforms the JSON readings to the EdgeLake target structure. For Telegraf, EdgeLake provides a generic mapping option without the need to consider the individual attributes generated.

The following steps can be executed using telegraf.al script, which exists in the deployment scripts by default.

  1. If using message client to accept data, make sure EdgeLake's Message Broker is running
    <run message broker where
        external_ip=!external_ip and external_port=!anylog_broker_port and
        internal_ip=!ip and internal_port=!anylog_broker_port and
        bind=!broker_bind and threads=!broker_threads>
  2. Create a mapping policy to accept data - notice that except for timestamp, all other source attributes remain unchanged.
    policy_id = telegraf-mapping
    topic_name=telegraf-data
    
    <new_policy = {"mapping" : {
            "id" : !policy_id,
            "dbms" : !default_dbms,
            "table" : "bring [name] _ [tags][name]:[tags][host]",
            "schema" : {
                    "timestamp" : {
                        "type" : "timestamp",
                        "default": "now()",
                        "bring" : "[timestamp]",
                        "apply" :  "epoch_to_datetime"
                    },
                    "*" : {
                        "type": "*",
                        "bring": ["fields", "tags"]
                    }
             }
       }
    }>
    
    blockchain insert where policy=!new_policy and local=true and master=!ledger_conn
  3. Enable a message client to accept the data from Telegraf.
    \# REST message client 
    <run msg client where broker=rest and user-agent=anylog and log=false and topic=(
        name=!topic_name and
        policy=!policy_id
    )>
    
    \# MQTT broker message client
    <run msg client where broker=local and log=false and topic=(
        name=!topic_name and
        policy=!policy_id
    )>

Generated Data

Using the mapping policy, shown above, EdgeLake generates a unique table per input (cpu, mem, net and swap). The mapping policy generates a table name based on the input ([name]) and hostname of where the data is coming from ([tags][name]:[tags][host]). This causes a unique table per input for each hostname.
To create a single table for all inputs (regardless of the hostname), use [name] in the mapping policy.

Directions for updating configurations within a docker container can be found here.

  1. List of generated tables
    AL anylog-query +> get data nodes 
    
    Company     DBMS        Table            Cluster ID                       Cluster Status Node Name       Member ID External IP/Port    Local IP/Port       Node Status 
    -----------|-----------|----------------|--------------------------------|--------------|---------------|---------|-------------------|-------------------|-----------|
    New Company|new_company|mem_raspberrypi |7b5b46361816b6d290b3dc430b221b79|active        |anylog-operator|      115|45.24.145.123:32148|192.168.1.215:32148|active     |
    New Company|new_company|net_raspberrypi |7b5b46361816b6d290b3dc430b221b79|active        |anylog-operator|      115|45.24.145.123:32148|192.168.1.215:32148|active     |
    New Company|new_company|swap_raspberrypi|7b5b46361816b6d290b3dc430b221b79|active        |anylog-operator|      115|45.24.145.123:32148|192.168.1.215:32148|active     |
    New Company|new_company|cpu_raspberrypi |7b5b46361816b6d290b3dc430b221b79|active        |anylog-operator|      115|45.24.145.123:32148|192.168.1.215:32148|active     |
  2. Describe table
    AL anylog-query +> AL anylog-query +> get columns where dbms=new_company and table=swap_raspberrypi  
    
    Schema for DBMS: 'new_company' and Table: 'swap_raspberrypi'
    Column Name         Column Type                 
    -------------------|---------------------------|
    row_id             |integer                    |
    insert_timestamp   |timestamp without time zone|
    tsd_name           |char(3)                    |
    tsd_id             |int                        |
    timestamp          |timestamp without time zone|
    fields_free        |bigint                     |
    fields_total       |bigint                     |
    fields_used        |int                        |
    fields_used_percent|int                        |
    tags_host          |character varying          |
    fields_in          |int                        |
    fields_out         |int                        |
  3. Aggregate overall used_percent per minute
    AL anylog-query +> run client () sql new_company format=table  "select increments(minute, 1, timestamp), min(timestamp), max(timestamp), min(fields_usage_system)::float(2), avg(fields_usage_system)::float(2), max(fields_usage_system)::float(2) from cpu_raspberrypi;"
    [9]
    AL anylog-query +>
    min(timestamp)        max(timestamp)        min(fields_usage_system) avg(fields_usage_system) max(fields_usage_system)
    --------------------- --------------------- ------------------------ ------------------------ ------------------------ 
    2024-07-10 22:26:40.0 2024-07-10 22:26:50.0                      0.7                     1.03                     1.81 
    2024-07-10 22:27:00.0 2024-07-10 22:27:50.0                      0.3                     0.97                     1.81 
    2024-07-10 22:47:40.0 2024-07-10 22:47:50.0                      0.4                     1.07                      1.8 
    2024-07-10 22:48:00.0 2024-07-10 22:48:50.0                      0.2                     0.53                      1.0 
    
    {"Statistics":[{"Count": 4,
                    "Time":"00:00:00",
                    "Nodes": 1}]}
  4. Get last minute of data coming into cpu_raspberrypi for cpu-total
    AL anylog-query +> run client () sql new_company format=table "select timestamp, fields_usage_guest, fields_usage_guest_nice, fields_usage_idle::float(2), fields_usage_iowait::float(2), fields_usage_irq, fields_usage_nice, fields_usage_softirq::float(2), fields_usage_steal, fields_usage_system::float(2), fields_usage_user::float(2),  tags_cpu,  tags_host from cpu_raspberrypi where period(minute, 1, now(), timestamp) and tags_cpu='cpu-total' order by timestamp desc;"  
    [21]
    AL anylog-query +> 
    timestamp             fields_usage_guest fields_usage_guest_nice fields_usage_idle fields_usage_iowait fields_usage_irq fields_usage_nice fields_usage_softirq fields_usage_steal fields_usage_system fields_usage_user tags_cpu  tags_host
    --------------------- ------------------ ----------------------- ----------------- ------------------- ---------------- ----------------- -------------------- ------------------ ------------------- ----------------- --------- ----------- 
    2024-07-10 22:48:50.0                  0                       0             97.97                0.75                0                 0                 0.03                  0                0.58              0.68 cpu-total raspberrypi
    2024-07-10 22:48:40.0                  0                       0             98.04                0.65                0                 0                  0.0                  0                0.48              0.83 cpu-total raspberrypi
    2024-07-10 22:48:30.0                  0                       0             98.37                0.25                0                 0                 0.03                  0                0.48              0.88 cpu-total raspberrypi
    2024-07-10 22:48:20.0                  0                       0             98.25                 0.4                0                 0                  0.0                  0                 0.5              0.85 cpu-total raspberrypi
    2024-07-10 22:48:10.0                  0                       0             98.02                0.68                0                 0                  0.0                  0                0.55              0.75 cpu-total raspberrypi
    2024-07-10 22:48:00.0                  0                       0              98.1                0.63                0                 0                 0.03                  0                 0.6              0.65 cpu-total raspberrypi
    2024-07-10 22:47:50.0                  0                       0             97.24                0.63                0                 0                  0.0                  0                0.75              1.38 cpu-total raspberrypi
    
    {"Statistics":[{"Count": 7,
                    "Time":"00:00:00",
                    "Nodes": 1}]}