Skip to content

Data Flow Engine - Processors

ChangeDetector

The ChangeDetector filters out messages, where a specified variable is the same as in the previous message. If multiple devices send messages with the specified variable, another field of the message can be defined, by which the devices are distinguished. The second field is defined in the differentiating-key property of the ChangeDetector.

FilterProcessor

The FilterProcessor filters out messages, if a specified expression is not fulfilled. It can be used to filter messages with custom expressions, defined in Spring Expression Language.

MessageSampler

The MessageSampler limits the maximum number of messages coming from one device in a given time period. This prevents overload of the downstream flow. After receiving the first message from one device, the MessageSampler suppresses any further messages from this device until a given time period has passed.

Note

The MessageSampler works based on the timestamp sent with the time series data. If the timestamp is missing or cannot be parsed, the message is suppressed from the stream regardless of the payload.

Example

A MessageSampler is set up to allow messages every 2 minutes. The following time series data arrives at the MessageSampler:

1. First message passes MessageSampler:

1
2
3
4
  {
      {someKey} : {someValue},
      "_timestamp" : "2018.06.07T11:06:00.000Z"
  }

2. Second message is suppressed:

1
2
3
4
  {
      {some key} : {some value},
      "_timestamp" : "2018.06.07T11:07:30.000Z"
  }

In this example, 1 minute and 30 seconds have passed according to the timestamps of the messages. Thus, the second message does not flow into the downstream. However, after the 2 minutes have passed, the next message will be forwarded and restart the cycle.

Working principle of the MessageSampler

Hysteresis

The Hysteresis processor introduces a little slack space after an event has occurred to prevent message spamming because of oscillations around the threshold.

For example, someone wants to know if an engine's RPM goes above a threshold, but they don't want to be spammed with messages in case of oscillations around the threshold. They use Hysteresis to suppress messages until a releasing condition is met, e.g. the RPM drops below another threshold value.

Here is an example hysteresis definition:

1
2
message emitting expression: "MAINMOTOR.rpm > 100"
suppression releasing expression: "MAINMOTOR.rpm < 80"

The Hysteresis processor emits an output message, when an input message fulfills the message emitting expression. After emitting this message, it suppresses any further messages and waits until a message fulfills the suppression releasing expression. This sets the Hysteresis processor back into its initial state. An output message is emitted, when an input message fulfills the message emitting expression and the application goes into suppression state again.

Working principle of Hysteresis

If the suppression releasing expression is not defined, the application forwards all incoming messages which fulfill the message emitting expression. The message-emitting-expression and suppression-releasing-expression properties can be defined as Spring Expression Language expressions.

Marker

The Marker adds a label to incoming messages before forwarding them. It labels the data by adding a header with defined key and value to the message.

Sequence

The Sequence processor filters out messages coming from parallel streams, if they don't match a specified pattern. The Sequence processor distinguishes messages only by their time stamps and the stream they arrive from.

Three types of patterns are available for the Sequence processor:

  • Pattern #1: A message from each stream arrives within a defined time window. The order of the messages is arbitrary.
  • Pattern #2: The first message arrives from a specific stream. The following messages arrive from the other streams within a defined time window in arbitrary order.
  • Pattern #3: The messages arrive in a defined sequence. A maximum time window is defined for each two consecutive messages.

Schematic diagram of message order patterns

The Sequence processor provides two options for distinguishing messages from different streams:

  • Labels: If each stream contains a Marker, the Sequence processor can filter by the unique header-value in their message headers.
  • Payload: If the messages contain a filter-id field, the Sequence processor can filter by its value.

The Sequence processor orders arriving messages based on their timestamps. The Sequence processor stores a list of sequence candidates. For each arriving message, which is a valid start of a pattern, it creates a new entry. If a later message is a valid extension of the sequence, the sequence candidate is updated. When a sequence candidate is updated and as a result fulfills the defined pattern, the matching messages are output in a sequence message and the sequence is removed from the list of candidates.

Parallel Streams and the Collector Stream

The Sequence processor requires input data from different streams, which run in parallel and ingest and process data of different assets. Multiple parallel streams end in the same destination, which makes them different from traditional streams. Instead of a sink component, these streams are collected by an SNS topic: Schematic diagram of parallel input streams The SNS topic acts as the source for the following stream, which contains a Sequence processor and ends with a sink: Schematic diagram of a collector stream Find examples for these stream definitions in the Samples.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.