Skip to content

Data Flow Engine – Samples

The following examples expect an incoming message in JSON array format from TimeSeriesSource from entity f7012b9215b448228def9726deffcfc9.
If a single time series data object is expected, the following message is used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "propertySetName": "tire1",
    "newProperties": {
      "revolutionPerMinute": 2000,
      "temperature": 83,
      "tireType": "SUMMER"
    },
    "_time": "2018-05-21T11:27:03.780Z"
  }
]

If multiple time series data objects are expected, the following message is used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[
  {
    "writeTime": "2018-05-21T11:27:09Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "propertySetName": "tire1",
    "newProperties": {
      "revolutionPerMinute": 2000,
      "temperature": 83,
      "tireType": "SUMMER"
    },
    "_time": "2018-05-21T11:27:03.780Z"
  },
  {
    "writeTime": "2018-05-21T11:27:19Z",
    "entityId": "f7012b9215b448228def9726deffcfc9",
    "propertySetName": "tire2",
    "newProperties": {
      "revolutionPerMinute": 2000,
      "temperature": 123,
      "tireType": "WINTER"
    },
    "_time": "2018-05-21T11:27:13.158Z"
  }
]

Note

Those messages are in exactly the same format as time series data arrives from IoT through TimeSeriesSource. The entityId refers to the ID of the asset and the propertySetName refers to the name of aspect. The aspect variables are collected in an array under newProperties.

Creating a Stream to Monitor Time Series Data of an Entity

This sample shows how to create a stream for monitoring time series data from an entity with id f7012b9215b448228def9726deffcfc9. The stream shall create an event after each upload of time series data.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --severity=1 --description="description"

The stream defined above forwards the incoming messages without filtration towards EventSink where events are created.

Creating a Stream which Creates Custom Events

This sample shows how to fill the custom fields of automatically generated events with values from incoming messages. The desired event type called CustomEventType has to be created beforehand. The stream shall create a custom event after each upload of time series data. It shall write the tireType value of the incoming message into the tireSeasonType field of the generated event.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --event-type-name="CustomEventType" --field-mapping-json='{"tireSeasonType":"newProperties.tireType"}' --severity=1 --description="description" --timestamp-path="_time"

CustomEventType has a custom field of type String, which is called tireSeasonType. The field is filled with the value of newProperties.tireType from incoming messages.

Creating a Stream which Detects Value Changes of a Field

This sample shows how to create a stream using ChangeDetector processor for detecting the change of a value. The stream shall only create an event, if the value of the field revolutionPerMinute has changed since the last time series upload.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | ChangeDetector --differentiating-key=propertySetName,newProperties.tireType --triggering-variable=newProperties.revolutionPerMinute | EventSink --entity-id-path=entityId --severity=1 --description="description"

The ChangeDetector of the stream defined above exclusively forwards those messages where the value of revolutionPerMinute is different to the value from the previous message. The very first message is not posted, as it is used for initialization.
The parameter differentiatingKey is given two inputs, the propertySetName and the newProperties.tireType. The first input is used to handle data uploaded from different entities individually. For example, assume the asset is a car with four tires and each tire is registered as an aspect tire1, tire2, etc. with the variable revolutionPerMinute. The ChangeDetector handles the tires individually, because they have different names (propertySetName). Otherwise, the ChangeDetector would compare incoming revolutionPerMinute data regardless of which tire sent it. The second input for the differentiatingKey parameter tells the application to distinguish between different types of the same tire.

Creating a Stream for Filtering Messages according to a Condition

This sample shows how to create a stream using FilterProcessor processor for filtering messages, where a specific value does not fulfill a given condition. The stream shall only send an e-mail, if the value of revolutionPerMinute is greater than 1500.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.revolutionPerMinute > 1500" | EventSink --message-category-id=7070 --body'{"message":"The RPM value exceeded the 1500 value"}

The FilterProcessor of the stream defined above filters the incoming messages according the criterion of the defined parameter expression. If the expression is false, the message is discarded. In this example, the FilterProcessor proceeds the message, because the value of revolutionPerMinute is greater than 1500.

Filtering based on a combination of multiple expressions is possible. The following stream definition is a valid example:

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature < 50 || newProperties.temperature > 80" | EventSink --message-category-id=7070 --body'{"message":"The temperature value left the desired interval"}

The stream defined above only forwards messages, where the value of temperature is less than 50 or greater than 80.

Creating a Stream to Avoid Message Overflow

This sample shows how to create a stream using MessageSampler processor for reducing the number of posted messages, if the incoming time series data frequency is undesirably high. Only one event shall be posted within a predefined time period.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | MessageSampler --key=propertySetName --duration="PT2M" | EventSink --entity-id-path=entityId --severity=1 --description="description"

The MessageSampler of the stream defined above only forwards the first message within the time period defined in the parameter --duration. Messages forwarded by the TimeSeriesSource are filtered until the time period has passed. Then the first incoming message is forwarded again and timer is restarted. The timestamp-path and timestamp-format parameters are not provided in the sample above and thus the default values are used. These can be overridden using the following syntax:

MessageSampler --timestamp-path="$.writeTime" --timestamp-format="yyyy-MM-dd'T'HH:mm:ss.SSSXXX" --key=entityId --duration="PT10S"

Creating a Stream for Indicating Oscillation

This sample shows how to create a stream using Hysteresis processor for indicating suspiciously wide oscillations of a given value. The stream shall only create an event, if the value of the field temperature exceeds 100 degrees after it has been below 60 degrees since event.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | Hysteresis --key=propertySetName --message-emitting-expression="newProperties.temperature > 100" --suppression-releasing-expression="newProperties.temperature < 60" | EventSink --entity-id-path=entityId --severity=1 --description="description"

The Hysteresis processor of the stream defined above forwards the first message where the value of temperature is higher than 100 degrees. It suppresses any following messages fulfilling this condition until the value of temperature drops below 60 degrees, which fulfills the suppression-releasing-expression. Afterwards, another message is forwarded, if the value of temperature exceeds 100 degrees again, as defined in the parameter message-emitting-expression. Then the circle starts again.

Assume for example that the following temperature values arrive in the given order in separate messages: 108, 101, 51, 75, 120, 102, 57, 107, 103. Hysteresis would forward the first message, because it fulfills the condition. The next message forwarded by Hysteresis contains the value 120 degrees, because the message with value 51 degrees has fulfilled the suppression-releasing-expression earlier. After that, the circle starts again. Even though the next value (102 degrees) fulfills the message-emitting-expression, the Hysteresis processor is still in a suppressed state, therefore this message will be discarded. Applying the same logic, the message containing value 107 is forwarded as well.

Creating a Stream for Receiving E-Mail Messages

This sample shows how to get e-mail message if a condition is fulfilled. The stream shall only send a predefined e-mail message with custom content, when the temperature exceeds 60 degrees.

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature > 60" | EmailSink --message-category-id=1253 --body='{"placeholderForMessageInEmailTemplate":"Attention! Engine temperature has exceeded the 60 degree limit"}'

EmailSink sends a message using preset properties for the HTML template, list of recipients, e-mail details etc. summarized in categoryId. EmailSink sends a message when time series data arrives, regardless of the frequency of incoming messages. Thus using the proper amount of processors in the stream for filtering is advised. In this case the used HTML template contains a placeholder field placeholderForMessageInEmailTemplate, which is replaced by the value given by the body parameter in the JSON object. If the template contains more then one placeholder, the size of the JSON object must be increased accordingly.

Creating a Stream for Recognizing Patterns among Data from Different Assets

This sample shows how to create streams using the Sequence processor. The main difference between this scenario and the previous ones is that the Sequence processor works with messages coming from different streams: Multiple parallel streams publish their outputs to the same SNS topic, which forwards the data to the Sequence processor.

In this example there are three streams, each processing timeseries data from a different asset using different processors. The different streams each contain a Marker, which adds a label with a unique header-value (orange, blue, or green) for the header-key=color. The SNS topic is called destinationTopic.

  • Stream A:
    TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.revolutionPerMinute > 1500" | Marker --header-key=color --header-value=orange > :destinationTopic"}
  • Stream B:
    TimeSeriesSource --entities=[{"id":"7twr2b63248948336def9726demruzdw"}] | ChangeDetector --differentiating-key=propertySetName,newProperties.tireType --triggering-variable=newProperties.revolutionPerMinute | Marker --header-key=color --header-value=blue > :destinationTopic"}
  • Stream C:
    TimeSeriesSource --entities=[{"id":"g9012b9792b448336def9726deffcxf2"}] | FilterProcessor --expression="newProperties.temperature > 30" | Marker --header-key=color --header-value=green > :destinationTopic"}

The destinationTopic acts as the sink for streams A, B and C and as the source for the streams defined below. They are triggered when a message is published to the destinationTopic. Each of these streams contains a Sequence processor and an EventSink. The Sequence processor sends a sequence message to the EventSink, if it recognizes the defined pattern. This causes the EventSink to create an event.

In each of the given examples, the first sequence message would look like this:

Sample sequence message
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[{
"filterId" : "{streamName}",
"_timestamp" : "2018.10.07T11:06:40.000Z"
"messages" : [
    {
        "id": "d654bd32-6a42-72f4-911b-dfdec811ee14",
        "headers": {
            "color": "orange",
            ...
        },
        "payload": {
            "entityId": "f7012b9215b448228def9726deffcfc9",
            ...
            "_time": "2018.10.07T11:06:10.000Z"
        }
    },
    {
        "id": "fa79e9de-105e-d014-0718-f7cc77a9d96c",
        "headers": {
            "color": "blue",
            ...
        },
        "payload": {
              "entityId": "7twr2b63248948336def9726demruzdw",
              ...
              "_time": "2018.10.07T11:06:20.000Z"
        }
    },
    {
        "id": "7466c8b0-3eef-34d2-2476-059b41bb4f9b",
        "headers": {
            "color": "green",
            ...
          },
        "payload": {
            "entityId": "g9012b9792b448336def9726deffcxf2",
            ...
            "_time": "2018.10.07T11:06:40.000Z"
        }
    }]
}]

Pattern#1

The Sequence processor in the following stream definition outputs a message, if at least one message from each stream arrives within 90 seconds. The order in which messages with labels orange, blue, green arrive is arbitrary:

:destinationTopic > Sequence --labels=[\"orange\",\"blue\",\"green\"] --conditions=[\"orange\",\"blue\",\"green\"] --within=\"PT90S\" --label-path=\"headers['color']\" | EventSink --entity-id-path=\"messages[0].payload.entityId\"

The entity-id-path defined for the EventSink points to the payload of the sequence message. The following diagram shows an example for the incoming messages, the candidates (Sequence States) tracked by the Sequence processor, and the sequence messages.

Pattern1

Pattern#2

The Sequence processor in the following stream definition outputs a message, if first a message with label orange and afterwards at least one message from each of the other two streams arrives within 90 seconds. The order in which messages with labels blue, green arrive is arbitrary:

:destinationTopic > Sequence --labels=[\"orange\",\"blue\",\"green\"] --initial-condition=\"orange\" --conditions=[\"green\",\"blue\"] --within=\"PT90S\" --label-path=\"headers['color']\" | EventSink --entity-id-path=\"messages[0].payload.entityId\"

The entity-id-path defined for the EventSink points to the payload of the sequence message. The following diagram shows an example for the incoming messages, the candidates (Sequence States) tracked by the Sequence processor, and the sequence messages.

Pattern2

Pattern#3

The Sequence processor in the following stream definition outputs a message, if messages with labels orange, blue and green arrive in this order. The second message must arrive within 20 seconds after the first and the third message within 30 seconds after the second:

:destinationTopic > Sequence --labels=[\"orange\",\"blue\",\"green\"] --initial-conditions={\"first\":\"orange\",\"second\":\"blue\",\"within\":\"PT20S\"},{\"first\":\"blue\",\"second\":\"green\",\"within\":\"PT30S\"} --label-path=\"headers['color']\" | EventSink --entity-id-path=\"messages[0].payload.entityId\"

The entity-id-path defined for the EventSink points to the payload of the sequence message. The following diagram shows an example for the incoming messages, the candidates (Sequence States) tracked by the Sequence processor, and the sequence messages.

Pattern3

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.