Skip to content

Capturing Edge Streaming Analytics results in MindSphere IoT

This section describes how to capture Edge Streaming Analytics results as time series in MindSphere IoT storage with the help of MindSphere Open Edge Device Kit module. The Open Edge Device Kit (OEDK) aims at third-party device producers who want to make their devices compatible with MindSphere. It is a stand-alone module that can be installed on the device and acts as an interface to MindSphere.

It also describes how to setup Open Edge Device Kit and to create Edge Streaming Analytics project that can read, process, and send time series data to MindSphere.

Note

Edge Streaming Analytics (ESA) runs on RedHat 7. On the linux machine, ESA components like ESA Edge Proxy and Event Streaming Server (ESS) will be installed.

Prerequisites

Hardware requirements:

  • 6 GB RAM available
  • dual or quad core CPU, atleast 1 GHz
  • 8 GB free disk space

Software requirements:

  • Redhat 7
  • Root privileges (sudo)
  • Mosquitto broker
  • Java 8

Installing OEDK for Redhat machine

Steps to install Mosquitto broker:

In order to install Mosquitto broker, proceed as follows:

  1. Enable the repository, where the Mosquitto is installed.

1
sudo yum-config-manager --add-repo=http://download-ib01.fedoraproject.org/pub/epel/7/x86_64/
2. Import the repository's key in RedHat.

1
2
sudo wget https://download-ib01.fedoraproject.org/pub/epel/RPM-GPG-KEY-EPEL-7
sudo rpm --import RPM-GPG-KEY-EPEL-7
3. Install Mosquitto.

1
sudo yum install mosquitto

Note

Mosquitto version used is 1.6.7.

4.Start Mosquitto.

1
sudo service mosquitto start
5. In order to test if Mosquitto has started, use.

1
sudo netstat -tulpn
The output should contain two lines with the port 1883.

Mosquitto' status

Installing Open Edge Device Kit (OEDK)

The OEDK is available in two versions: Executable and Docker image.

Download the executable version, unzip the archive, copy the tarball archive to linux machine, install and run the OEDK agent.

For information about how install and configure the OEDK agent refer OEDK documentation.

Creating an OEDK Agent

Creating an agent requires rights for accessing the Asset Manager in the tenant account. From the MindSphere Launchpad, click the "Asset Manager" icon as shown below.

Asset Manager

For more information on installing and configuring OEDK agent refer Open Edge Device Kit.

Creating an agent in Asset Manager

  1. Access Asset Manager from Launchpad.
  2. Select the Assets (left side) and click the Create asset button.
  3. Create an agent of type core.mcnano and enter the required details.

Onboarding an OEDK Agent

To onboard the agent, the credentials must be downloaded from Asset Manager. Navigate to the agent, then go to the agent's settings Settings and click the download credentials button. The credentials will be downloaded as json into ConBox_{ID}.cfg file. Upload the file to the machine and run following command:

1
mosquitto_pub -t agentruntime/controlling/command/init -f $parent_path/ConBox_{ID}.cfg -q 0

This command will publish the content of .cfg file to the topic agentruntime/controlling/command/init. The agent receives the message and can register in Mindsphere using these credentials.

Refresh the page to view the latest status of the agent.
The status of the agent

Sending data to OEDK Agent

Data is sent to a specific data source defined in agent's configuration page. An agent can have one or more datasources. A datasource can have one or more datapoints. We will create a datasource that will contain two datapoints: temperature and humidity.

Creating a datasource

  1. Click "Add new datasource". Enter the required details about the datasource.

Creating a datasource
2. Add two datapoints.For example, temperature and humidity. Enter the required details for the datapoints.

Creating datapoint for temperature Creating datapoint for humidity
3. Save the changes and click on the button "Apply changes". The data.cfg file is deployed on machine at the path: /persistent_massdata/appData/configurations/data.cfg.

Deploying datasource on agent (2)

The OEDK agent listens to specific topic. The generic topic is: runtime/inject/data/timeseries/{protocol}/{dataSourceId}, where the values for protocol and data_source_id are taken from data.cfg file.

After replacing with the right values: runtime/inject/data/timeseries/S7/Demo_Data_Source_6ff6152a-a29

The content of data.cfg file is as shown below:
data.cfg file

The payload template is as shown in the image below. The format for timestamp is: yyyy-MM-dd'T'hh:mm:ss.SSS'Z'. Also, each datapoint has an id that can be found inside data.cfg file. If multiple datapoints are sent, the json must be adapted accordingly.
Payload template

Sending data using CLI

The linux command for sending a message to agent is:

1
mosquitto_pub -t runtime/inject/data/timeseries/S7/Demo_Data_Source_6ff6152a-a29 -m [ { "timestamp": "2019-10-18T15:15:00.000Z", "values":[ { "dataPointId": "536c5086a57b4", "value": 40, "qualityCode":0 }, { "dataPointId": "beae98be2a9a4", "value": 40, "qualityCode":0 } ] } ]

Note

MQTTBox, or any MQTT clients written in various languages can be used for sending messages.

On the agent's page, click on a datapoint's circle to view the information about the last message received.
Details about the last received message

Starting ESA OEDK Adapter

The Edge Streaming Adapter is a java application that communicates with an MQTT broker for both publish and subscribe operations that are specific for Open Edge Device Kit integration. The purpose of this adapter is to convert the received messages into OEDK format. It has an input as the data source name and data point names, instead of data source id and data point ids that are used directly by OEDK.
This way, the user only needs to know the names as they are displayed in the Asset Manager application and is not necessary to know the ids of data source and data points.

In order to create the communication, the following operations are available:

  • publish received messages ESA/adapter/<data_source_name>
  • subscribe to ESA/logs topic to see all the logs of the application

In order to run the adapter, use the following script: /opt/mdsp/esa/startup_esa_java_adapter.sh

This script is used for starting or restarting the adapter and it takes one optional parameter which is the path to the OEDK data.cfg file.

The following options are available:

  • without parameter: the adapter starts with default settings
  • with parameter: the adapter starts the specified path for data.cfg

If restart of the adapter is needed, re-run the startup_esa_java_adapter.sh script with the same parameters.

Sending data via Edge Streaming Creator project

  1. Measurements data prerequisites

On the streaming server, a .csv file that contains columns with timestamp and measurements must exist. The number of columns with measurements must have the same length as the datapoints from datasoure from data.cfg file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
.csv file example

Timestamp,Temperature,Humidity
2019-10-24T16:15:00.000Z,40,80
2019-10-24T16:16:00.000Z,25,79
2019-10-24T16:17:00.000Z,27,75
2019-10-24T16:18:00.000Z,30,72
2019-10-24T16:19:00.000Z,45,70
2019-10-24T16:20:00.000Z,44,72
2019-10-24T16:21:00.000Z,43,74
2019-10-24T16:22:00.000Z,40,79
2019-10-24T16:23:00.000Z,36,80
2019-10-24T16:24:00.000Z,30,81

2.Import the following Edge Streaming Creator project into the workspace.

Note

Project .xml file is attached to this document.

Sample
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
  <project name="OEDK_adapter_project" threads="1" pubsub="auto" heartbeat-interval="1">
  <metadata>
  <meta id="layout">{"p":{"mqtt_Logging":{"x":50,"y":50},"oedk_format":{"x":260,"y":175},"read_sensor_data":{"x":260,"y":50},"retain_logs":{"x":50,"y":175}}}</meta>
  </metadata>
    <contqueries>
  <contquery name="p">
    <windows>
      <window-source pubsub="true" name="read_sensor_data">
        <schema>
          <fields>
            <field name="Timestamp" type="stamp" key="true"/>
            <field name="Temperature" type="double"/>
            <field name="Humidity" type="double"/>
          </fields>
        </schema>
        <connectors>
          <connector class="fs" name="import_data_conn">
            <properties>
              <property name="type"><![CDATA[pub]]></property>
              <property name="dateformat"><![CDATA[%Y-%m-%dT%H:%M:%S]]></property>
              <property name="addcsvopcode"><![CDATA[true]]></property>
              <property name="addcsvflags"><![CDATA[normal]]></property>
              <property name="fsname"><![CDATA[/home/my_user/datasources.csv]]></property>
              <property name="fstype"><![CDATA[csv]]></property>
            </properties>
          </connector>
        </connectors>
      </window-source>
      <window-compute pubsub="true" name="oedk_format">
        <schema>
          <fields>
            <field name="Timestamp" type="stamp" key="true"/>
            <field name="Humidity" type="double"/>
            <field name="Temperature" type="double"/>
          </fields>
        </schema>
        <output>
          <field-expr><![CDATA[Humidity]]></field-expr>
          <field-expr><![CDATA[Temperature]]></field-expr>
        </output>
        <connectors>
          <connector class="mqtt" name="Oedk_topic_conn">
            <properties>
              <property name="type"><![CDATA[sub]]></property>
              <property name="snapshot"><![CDATA[false]]></property>
              <property name="mqtthost"><![CDATA[localhost]]></property>
              <property name="mqttclientid"><![CDATA[mqtt_demo]]></property>
              <property name="mqtttopic"><![CDATA[ESA/adapter/Demo_Data_source]]></property>
              <property name="mqttqos"><![CDATA[0]]></property>
              <property name="mqttmsgtype"><![CDATA[json]]></property>
            </properties>
          </connector>
        </connectors>
      </window-compute>
      <window-source pubsub="true" name="mqtt_Logging">
        <schema>
          <fields>
            <field name="timestamp" type="stamp" key="true"/>
            <field name="logLevel" type="string"/>
            <field name="message" type="string"/>
          </fields>
        </schema>
        <connectors>
          <connector class="mqtt" name="mqtt_log_topic">
            <properties>
              <property name="type"><![CDATA[pub]]></property>
              <property name="publishwithupsert"><![CDATA[true]]></property>
              <property name="dateformat"><![CDATA[%Y-%m-%d %H:%M:%S]]></property>
              <property name="addcsvopcode"><![CDATA[true]]></property>
              <property name="addcsvflags"><![CDATA[normal]]></property>
              <property name="mqtthost"><![CDATA[172.30.5.7]]></property>
              <property name="mqttclientid"><![CDATA[log_client]]></property>
              <property name="mqtttopic"><![CDATA[ESA/adapter/logs]]></property>
              <property name="mqttqos"><![CDATA[2]]></property>
              <property name="mqttmsgtype"><![CDATA[json]]></property>
            </properties>
          </connector>
        </connectors>
      </window-source>
      <window-copy index="pi_EMPTY" pubsub="true" name="retain_logs"/>
    </windows>
    <edges>
      <edge source="read_sensor_data" target="oedk_format"/>
      <edge source="mqtt_Logging" target="retain_logs"/>
    </edges>
    </contquery>
  </contqueries>
  </project>

After the project is imported, the workspace is as shown below:

Edge Streaming Creator workspace

This project contains following windows:

  • read_sensor_data: reads the .csv file from source
  • oedk_format: sends the data to OEDK adapter MQTT topic
  • mqtt_Logging: receives logging information from OEDK adapter on the "ESA/adapter/logs" topic
  • retain_logs: retains the logs from OEDK adapter for further processing

3.Configure the read_sensor_data window. For this window, an input schema must be configured that reads data from the .csv file and imports them in the project. The schema must have same number of fields and datatypes as the .csv file.

1
2
3
4
5
6
7
<schema>
    <fields>
        <field name="Timestamp" type="stamp" key="true"/>
        <field name="Temperature" type="double"/>
        <field name="Humidity" type="double"/>
    </fields>
</schema>

Connector settings:

  • addcsvopcode and addcsvflags must be set to true and normal to insert the measurements into the project.
  • fsname must contain the path of the .csv file. (Example: /home/my_user/input_data.csv)
1
2
3
4
5
6
7
8
9
<connector class="fs" name="ts_connector">
    <properties>
        <property name="type"><![CDATA[pub]]></property>
        <property name="addcsvopcode"><![CDATA[true]]></property>
        <property name="addcsvflags"><![CDATA[normal]]></property>
        <property name="fsname"><![CDATA[/home/my_user/input_data.csv]]></property>
        <property name="fstype"><![CDATA[csv]]></property>
    </properties>
</connector>

4.Configure the oedk_format window

To send data to OEDK adapter, a MQTT subscriber connector must be defined and configured to connect to the installed MQTT broker. The connector will send data on the topic ESA/adapter/<OEDK Datasource name> where <OEDK Datasource name> is the name of the data source created in the Asset Manager application for the OEDK agent.

The oedk_format schema:

1
2
3
4
5
6
7
<schema>
    <fields>
        <field name="Timestamp" type="stamp" key="true"/>
        <field name="Humidity" type="double"/>
        <field name="Temperature" type="double"/>
    </fields>
</schema>

Connector settings:

mqtttopic - ESA/adapter/Demo_Data_source where Demo_Data_source is the name of the datasource where data is sent.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<connector class="mqtt" name="Oedk_topic_conn">
    <properties>
        <property name="type"><![CDATA[sub]]></property>
        <property name="snapshot"><![CDATA[false]]></property>
        <property name="mqtthost"><![CDATA[localhost]]></property>
        <property name="mqttclientid"><![CDATA[mqtt_demo]]></property>
        <property name="mqtttopic"><![CDATA[ESA/adapter/Demo_Data_source]]></property>
        <property name="mqttqos"><![CDATA[0]]></property>
        <property name="mqttmsgtype"><![CDATA[json]]></property>
    </properties>
</connector>

5.Configure mqtt_Logging window. This is an optional step.

The logging messages from the OEDK addapter can be retrieved via a MQTT topic ESA/adapter/logs. The mqtt_Logging schema is as below:

1
2
3
4
5
6
7
<schema>
    <fields>
        <field name="timestamp" type="stamp" key="true"/>
        <field name="logLevel" type="string"/>
        <field name="message" type="string"/>
    </fields>
  </schema>

Connector settings:

mqtttopic - ESA/adapter/logs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
<connector class="mqtt" name="mqtt_log_topic">
      <properties>
          <property name="type"><![CDATA[pub]]></property>
          <property name="publishwithupsert"><![CDATA[true]]></property>
          <property name="dateformat"><![CDATA[%Y-%m-%d %H:%M:%S]]></property>
          <property name="addcsvopcode"><![CDATA[true]]></property>
          <property name="addcsvflags"><![CDATA[normal]]></property>
          <property name="mqtthost"><![CDATA[localhost]]></property>
          <property name="mqttclientid"><![CDATA[log_client]]></property>
          <property name="mqtttopic"><![CDATA[ESA/adapter/logs]]></property>
          <property name="mqttqos"><![CDATA[2]]></property>
          <property name="mqttmsgtype"><![CDATA[json]]></property>
      </properties>
</connector>

The following output will be displayed when running the project in Test Mode:

How to visualize data in test workspace.

Visualizing the datapoints in Fleet Manager

For the data to be visualized in Mindsphere Fleet Manager application another asset needs to be created. A link between agent's datapoints and new asset's aspects must be created. The process is depicted in the image below:

How to visualize data in Fleet Managere

Steps for linking:

  1. Navigate to Asset Manager, select Aspects and create a new aspect. For category, choose the "Dynamic" option. In this way, we can specify that the aspect will be used with IoT Time-Series. When variables are added, it is recommended that the data type and unit type to be same as in OEDK definition. Creating an aspect

  2. The next step is creating a Type in Asset Manager. The aspect created previously will be linked to this type, in Aspect section. Creating a type

  3. The next step is creating an Asset. The previous created Type will be linked to the Asset. Select "Assets" in Asset Manager and click Create asset. Enter the name of previously created Type (e.g. Type_FM). Further, enter the details of the asset.

Creating an asset (1) Creating an asset (2)

4.The last step is to link OEDK agent's datapoints and aspect's variables. Navigate in Asset Manager to the OEDK agent, click on button View Datamappings. For each datapoint, create a link to the right variable in the aspect: The mapping between variables (1) The mapping between variables (2) The mapping between variables (3) The mapping between variables (4)

Now, some datapoints need to be pushed to the OEDK agent in order to visualize the data in Fleet Manager.

Visualization of datapoints in Fleet Manager

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.