Skip to content

Data Flow Engine – Samples

The following examples expect an incoming message in JSON array format from TimeSeriesSource from entity f7012b9215b448228def9726deffcfc9.
If a single time series data object is expected, the following message is used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "propertySetName": "tire1",
    "newProperties": {
      "revolutionPerMinute": 2000,
      "temperature": 83,
      "tireType": "SUMMER"
    },
    "_time": "2018-05-21T11:27:03.780Z"
  }
]

If multiple time series data objects are expected, the following message is used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "propertySetName": "tire1",
    "newProperties": {
      "revolutionPerMinute": 2000,
      "temperature": 83,
      "tireType": "SUMMER"
    },
    "_time": "2018-05-21T11:27:03.780Z"
  },
  {
    "writeTime": "2018-05-21T11:27:19Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "propertySetName": "tire2",
    "newProperties": {
      "revolutionPerMinute": 2000,
      "temperature": 123,
      "tireType": "WINTER"
    },
    "_time": "2018-05-21T11:27:13.158Z"
  }
]

Note

Those messages are in exactly the same format as time series data arrives from IoT through TimeSeriesSource. The entityId refers to the ID of the asset and the propertySetName refers to the name of aspect. The aspect variables are collected in an array under newProperties.

Creating a stream to monitor time series data of an entity

This sample shows how to create a stream for monitoring time series data from an entity with id f7012b9215b448228def9726deffcfc9. The stream shall create an event after each upload of time series data.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --severity=1 --description="description"

The stream defined above forwards the incoming messages without filtration towards EventSink where events are created.

Creating a stream which creates custom events

This sample shows how to define a custom event type inside a stream definition and how to fill the custom fields of automatically generated events with values from incoming messages. The stream shall create a custom event after each upload of time series data. It shall write the tireType value of the incoming message into the tireSeasonType field of the generated event.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --event-type-name="CustomEventType" --auto-create-type=true --field-mapping-json='{"tireSeasonType":"newProperties.tireType"}' --severity=1 --description="description" --timestamp-path="_time"

The stream defined above creates a new event type with name CustomEventType. This event type has an additional field of type String, which is called tireSeasonType. The field is filled with the value of newProperties.tireType from incoming messages.

Creating a stream which detects value changes of a field

This sample shows how to create a stream using ChangeDetector processor for detecting the change of a value. The stream shall only create an event, if the value of the field revolutionPerMinute has changed since the last time series upload.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | ChangeDetector --differentiating-key=propertySetName,newProperties.tireType --triggering-variable=newProperties.revolutionPerMinute | EventSink --entity-id-path=entityId --severity=1 --description="description"

The ChangeDetector of the stream defined above exclusively forwards those messages where the value of revolutionPerMinute is different to the value from the previous message. The very first message is not posted, as it is used for initialization.
The parameter differentiatingKey is given two inputs, the propertySetName and the newProperties.tireType. The first input is used to handle data uploaded from different entities individually. For example, assume the asset is a car with four tires and each tire is registered as an aspect tire1, tire2, etc. with the variable revolutionPerMinute. The ChangeDetector handles the tires individually, because they have different names (propertySetName). Otherwise, the ChangeDetector would compare incoming revolutionPerMinute data regardless of which tire sent it. The second input for the differentiatingKey parameter tells the application to distinguish between different types of the same tire.

Creating a stream for filtering messages according a condition

This sample shows how to create a stream using FilterProcessor processor for filtering messages, where a specific value does not fulfill a given condition. The stream shall only send an e-mail, if the value of revolutionPerMinute is greater than 1500.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.revolutionPerMinute > 1500" | EventSink --message-category-id=7070 --body'{"message":"The RPM value exceeded the 1500 value"}

The FilterProcessor of the stream defined above filters the incoming messages according the criterion of the defined parameter expression. If the expression is false, the message is discarded. In this example, the FilterProcessor proceeds the message, because the value of revolutionPerMinute is greater than 1500.

Filtering based on a combination of multiple expressions is possible. The following stream definition is a valid example:

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature < 50 || newProperties.temperature > 80" | EventSink --message-category-id=7070 --body'{"message":"The temperature value left the desired interval"}

The stream defined above only forwards messages, where the value of temperature is less than 50 or greater than 80.

Creating a stream to avoid message overflow

This sample shows how to create a stream using MessageSampler processor for reducing the number of posted messages, if the incoming time series data frequency is undesirably high. Only one event shall be posted within a predefined time period.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | MessageSampler --key=propertySetName --duration="PT2M" | EventSink --entity-id-path=entityId --severity=1 --description="description"

The MessageSampler of the stream defined above only forwards the first message within the time period defined in the parameter --duration. Messages forwarded by the TimeSeriesSource are filtered until the time period has passed. Then the first incoming message is forwarded again and timer is restarted. The timestamp-path and timestamp-format parameters are not provided in the sample above and thus the default values are used. These can be overriden using the following syntax:

MessageSampler --timestamp-path="$.writeTime" --timestamp-format="yyyy-MM-dd'T'HH:mm:ss.SSSXXX" --key=entityId --duration="PT10S"

Creating a stream for indicating oscillation

This sample shows how to create a stream using Hysteresis processor for indicating suspiciously wide oscillations of a given value. The stream shall only create an event, if the value of the field temperature exceeds 100 degrees after it has been below 60 degrees since event.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | Hysteresis --key=propertySetName --message-emitting-expression="newProperties.temperature > 100" --suppression-releasing-expression="newProperties.temperature < 60" | EventSink --entity-id-path=entityId --severity=1 --description="description"

The Hysteresis processor of the stream defined above forwards the first message where the value of temperature is higher than 100 degrees. It suppresses any following messages fulfilling this condition until the value of temperature drops below 60 degrees, which fulfills the suppression-releasing-expression. Afterwards, another message is forwarded, if the value of temperature exceeds 100 degrees again, as defined in the parameter message-emitting-expression. Then the circle starts again.

Assume for example that the following temperature values arrive in the given order in separate messages: 108, 101, 51, 75, 120, 102, 57, 107, 103. Hysteresis would forward the first message, because it fulfills the condition. The next message forwarded by Hysteresis contains the value 120 degrees, because the message with value 51 degrees has fulfilled the suppression-releasing-expression earlier. After that, the circle starts again. Even though the next value (102 degrees) fulfills the message-emitting-expression, the Hysteresis processor is still in a suppressed state, therefore this message will be discarded. Applying the same logic, the message containing value 107 is forwarded as well.

Creating a stream for getting e-mail messages

This sample shows how to get e-mail message if a condition is fulfilled. The stream shall only send a predefined e-mail message with custom content, when the temperature exceeds 60 degrees.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature > 60" | EmailSink --message-category-id=1253 --body='{"substitutablePlaceholderForMessageInEmailTemplate":"Attention! Engine temperature has exceeded the 60 degree limit"}'

EmailSink sends a message using preset properties e.c. HTML template, list of recipients, e-mail details etc. summarised in categoryId. EmailSink sends message each time when a time series data reach the application regardless of the frequency of incoming messages, therefore using the proper amount of processors in the stream for filtering is advised. In current case the used HTML template contains a substitutable filed substitutablePlaceholderForMessageInEmailTemplate which is replaced by the value of body JSON. If the template contains more then one substitutable elements, than the size of JSON should be increased accordingly.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.