Skip to content

Data Flow Engine – Samples

For the examples below the following time series data of entity f7012b9215b448228def9726deffcfc9 in JSON array format is used as an incoming message from TimeSeriesSource.
If a single time series data package is expected:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "newProperties": {
      "roundPerMinute": 2000,
      "temperature": 83
    },
    "_time": "2018-05-21T11:27:03.780Z"
  }
]

If multiple time series data packages are expected:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "newProperties": {
      "roundPerMinute": 2000,
      "temperature": 83
    },
    "_time": "2018-05-21T11:27:03.780Z"
  },
  {
    "writeTime": "2018-05-21T11:27:19Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "newProperties": {
      "roundPerMinute": 2000,
      "temperature": 123
    },
    "_time": "2018-05-21T11:27:13.158Z"
  }
]

Note

Those messages are in exactly the same format as time series data arrives from IoT through TimeSeriesSource.

Creating a stream to monitor time series data of an entity

This sample shows how to create a stream for monitoring time series data from an entity with id f7012b9215b448228def9726deffcfc9. The stream shall create an event after each upload of time series data.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --severity=1 --description="description"

The stream defined above forwards the incoming messages without filtration towards EventSink where events are created.

Creating a stream which creates custom events

This sample shows how to create a stream which copies values from the incoming message into custom fields of newly created events. The stream shall create an event after each time series data upload. The temperature value of the incoming message shall be set in the event as internalTemperature.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --event-type-name="CustomEventType" --auto-create-type=true --field-mapping-json='{"temperature":"internalTemperature"}' --severity=1 --description="description" --timestamp-path="_time"

The stream defined above creates a new event type with name CustomEventType. This event type has an additional field called internalTemperature which is set as the value of temperature from the incoming message.

Creating a stream which detects value changes of a field

This sample shows how to create a stream using ChangeDetector processor for detecting the change of a value. The stream shall only create an event, if the value of the field roundPerMinute has changed since the last time series upload.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | ChangeDetector --differentiating-key=entityId --triggering-variable=newProperties.roundPerMinute --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

The ChangeDetector of the stream defined above exclusively forwards those messages where the value of roundPerMinute is different to the value from the previous message. The very first message is not posted, as it is used for initialization. When multiple entities upload their data, their data shall be handled separately. The parameter --differentiatingKey is used to differentiate between different entities.

Since the incoming time series data arrives from IoT in JSON array format, the parameter --splitListToItems must be set as true. In other cases this parameter can be omitted.

Creating a stream for filtering messages according a condition

This samples shows how to create a stream using FilterProcessor processor for filtering messages, where a specific value does not fulfill a given condition. The stream shall only create an event, if the value of roundPerMinute is greater than 1500.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.roundPerMinute > 1500" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

The FilterProcessor of the stream defined above filters the incoming messages according the criterion of the defined parameter --expression. If the expression is false, the message is discarded. In this example, the FilterProcessor proceeds the message, because the value of roundPerMinute is greater than 1500.

Since the incoming time series data arrives from IoT in JSON array format, the parameter --splitListToItems must be set as true. In other cases this parameter can be omitted.

Filtering based on a combination of multiple expressions is possible. The following stream definition is a valid example:

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature < 50 || newProperties.temperature > 80" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

The stream defined above only forwards messages, where the value of temperature is less than 50 or greater than 80.

Creating a stream to avoid message overflow

This samples shows how to create a stream using MessageSampler processor for reducing the number of posted messages, if the incoming time series data frequency is undesirably high. Only one event shall be posted within a predefined time period.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | MessageSampler --key=entityId --duration="PT2M" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

The MessageSampler of the stream defined above only forwards the first message within the time period set as in parameter --duration. Any messages forwarded by the TimeSeriesSource are filterd until the time period has passed. Then the first upcoming message is forwarded and timer is restarted.

Since the incoming time series data arrives from IoT in JSON array format, the parameter --splitListToItems must be set as true. In other cases this parameter can be omitted.

Creating a stream for indicating oscillation

This samples shows how to create a stream using Hysteresis processor for indicating suspiciously wide oscillations of a given value. The stream shall only create an event, if the value of the field temperature exceeds 100 degrees after it has been below 60 degrees since event.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | Hysteresis --key=entityId --message-emitting-expression="newProperties.temperature > 100" --suppression-releasing-expression="newProperties.temperature < 60" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

The Hysteresis processor of the stream defined above forwards the first message where the value of temperature is higher than 100 degrees. It suppresses any following messages fulfilling this condition until the suppression-releasing-expression is fulfilled and the value of temperature drops below 60 degrees. Afterwards, another message is forwarded, if the value of temperature exceeds 100 degrees again, as defined in the parameter --message-emitting-expression.

Since the incoming time series data arrives from IoT in JSON array format, the parameter --splitListToItems must be set as true. In other cases this parameter can be omitted.

Assume for example that the following temperature values arrive in the given order in separate messages: 108, 101, 51, 75, 120, 100, 57, 107, 103. Hysteresis would forward the first message, because it is fulfills the condition. The next message forwarded by Hysteresis is the one which contains the value 107 degrees, because the value has gone below 60 degrees after the last message has been forwarded and the current value is above 100 degrees. All other messages are filtered.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.