Skip to content

Data Flow Engine – Samples

For the examples below the following time series data of entity f7012b9215b448228def9726deffcfc9 in JSON array format is used as an incoming message from TimeSeriesSource if single time series data is expected:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[
  {
      "writeTime": "2018-05-21T11:27:09Z",
      "entityId": "f7012b9215b448228def9726deffcfc9",
      "newProperties": {
          "roundPerMinute": 2000,
          "temperature": 83
      },
      "_time": "2018-05-21T11:27:03.780Z"
  }
]

Or if multiple time series data are expected:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "newProperties": {
      "roundPerMinute": 2000,
      "temperature": 83
    },
    "_time": "2018-05-2T11:27:03.780Z"
  },
  {
    "writeTime": "2018-05-21T11:27:19Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "newProperties": {
      "roundPerMinute": 2000,
      "temperature": 123
    },
    "_time": "2018-05-21T11:27:13.158Z"
  }
]

Note

Those messages are in exactly the same format as time series data is arriving from IoT through TimeSeriesSource.

Creating stream to monitor time series data of an entity

Creating a stream for monitoring time series upload from entity with id f7012b9215b448228def9726deffcfc9. The stream should post an alert to the Event Management Service every time after time series data upload.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --severity=1 --description="description"

The above stream definition is letting through the incoming messages without filtration towards EventSink where events are created.


Creating stream which creates custom events

Creating a stream which is setting values from the incoming message to custom fields of newly created events. The stream should post an alert to the event management service every time after time series data upload, and the incoming temperature value from the incoming message should be set in the event as internalTemperature.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --event-type-name="CustomEventType" --auto-create-type=true --field-mapping-json='{"temperature":"internalTemperature"}' --severity=1 --description="description" --timestamp-path="_time"

According the definition, a new event-type will be created with name CustomEventType. This event-type will have one additional field called internalTemperature which will contain the value of temperature from the incoming message.


Creating stream which detects changing value of a field

Creating a stream with using ChangeDetector processor for detecting the change of a value. The event should only be posted, if the value of roundPerMinute field has changed since the last time series upload.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | ChangeDetector --differentiating-key=entityId --triggering-variable=newProperties.roundPerMinute --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

ChangeDetector will let through exclusively those messages where the value of roundPerMinute field is different compared to the value from the previous message. The very first message won't be posted this time, as it is used as the first reference data. The same variables could appear at different entities, therefore it must be specified how to make difference between these variables with using --differentiatingKey property.

Since the incoming time series data is arriving from IoT in JSON array format, the --splitListToItems property should be included as true. Otherwise this property can be omitted.


Creating stream for filtering messages according a condition

Creating a stream with using FilterProcessor processor for filtering messages, where the specific value is not fulfilling a given condition. The event should only be posted, if the value of roundPerMinute is greater than 1500.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.roundPerMinute > 1500" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

FilterProcessor will filter the incoming messages according the criterion of the defined --expression property. If the expression is not true, the message will be discarded. In this example, the FilterProcessor proceeds the message when the value of roundPerMinute is greater than 1500, therefore the example time series data will be posted.

Since the incoming time series data is arriving from IoT in JSON array format, the --splitListToItems property should be included as true. Otherwise this property can be omitted.

Combination of multiple expressions is also enabled. The following stream definition is valid as well:

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature < 50 || newProperties.temperature > 80" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

This time, only messages will be forwarded, where the value of temperature is less than 50 or greater than 80.


Creating stream for avoiding message overflow

Creating a stream with using MessageSampler processor for reducing the number of posted messages, if the incoming time series data frequency is undesirably high. Only one event should be posted in a predefined time period.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | MessageSampler --key=assetId --duration="PT2M" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

Despite the fact that the TimeSeriesSource may forwarding multiple messages, with using Message Sampler only the first message will be posted in the specified period of time according to the value of --duration property. The additional messages will be rejected. When this period is expired, the upcoming first message will be posted and time counter will start again.

Since the incoming time series data is arriving from IoT in JSON array format, the --splitListToItems property should be included as true. Otherwise this property can be omitted.


Creating stream for indicating oscillation

Creating a stream with using Hysteresis processor for indicating suspiciously wide oscillation in a given value. The alert should only be posted, if the value of temperature field exceeds 100 degree on condition that this value has been beneath the 60 since last posted alert.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | Hysteresis --key=entityId --message-emitting-expression="newProperties.temperature > 100" --suppression-releasing-expression="newProperties.temperature < 60" --splitListToItems=true | EventSink --entity-id-path=entityId --severity=1 --description="description"

In this case, Hysteresis processor will filter all messages regardless of the value of temperature as long as this value is not dropping under 60 degree. Then the next message will be posted only if it is containing temperature value higher than 100 degree. Exceptionally the first message will be posted, if it is passed the --message-emitting-expression.

Since the incoming time series data is arriving from IoT in JSON array format, the --splitListToItems property should be included as true. Otherwise this property can be omitted.

Lets assume that the following temperature values are incoming with this order in separated messages: 108, 101, 51, 75, 120, 100, 57, 107, 103. Because of system behavior the first message will be emitted, if it is fulfills the condition. The next message which will be posted is which containing the value 107, because the previous value was beneath 60 degrees and the current value is above 100 degrees. All other messages will be discarded.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.