Skip to content

Data Flow Engine

Idea

The Data Flow Engine (DFE) enables customers to create workflows based on platform events and ingested data. These workflows can be used for example to automate KPI calculations in 3rd party apps.

Access

For accessing this service you need to have the respective roles listed in Data Flow Engine roles and scopes.

Basics

Workflows defined by DFE can be used to filter and process ingested data, react on events, calculate KPIs, aggregate data, modify the data model, write data into platform data lake and to send notifications or create platform events. These workflows are called streams. They consist of a chain of predefined apps. These apps are autonomous, deployed processing units and can be combined and configured to do real time computation by consuming and processing an unbound amount of data. The number of apps is continually enhanced and can be extended by custom apps in the future. Once a stream is deployed, it will automatically start to ingest the configured data flow of the tenant that has deployed it.
The DFE is implemented based on the Spring Cloud Data Flow architecture. Refer to the Spring Cloud Data Flow documentation to understand the idea behind the engine. Similar concepts can be found under different terminologies, such as like triggers - sources, rules - processors, actions - sinks.

DFE streams

The DFE uses message based streams with configurable apps. It differentiates between three types of apps:

  • Source: Input data in order to trigger a flow.

  • Processor: Filter or manipulate the input data.

  • Sink: Execute an action based on the flow's result.

A working stream must contain one source app and one sink app. It can also contain one or more processors between them. MindSphere can store any number of stream definitions, but the number of concurrently deployed streams is restricted according to the purchased plan.

Using these components the model of a DFE stream can be outlined as here:

Schematic diagram of a DFE stream

The apps listed below are already available, but DFE can be enhanced based on general usage or custom needs.

Sources

Source apps are used to listen to different source entities or events. This type of application can only stand at the beginning of a stream. A source may also provide an opportunity for filtering to decrease the number of messages flowing through the stream.

TimeSeriesSource

The TimeSeriesSource monitors time series uploads and triggers a stream. This is the entry point for time series data into a DFE stream.

TimerSource

TimerSource sends timestamp messages based on a fixed delay, date or cron expression.

Processors

Processor apps can be placed inside a DFE stream. Processors can interrupt the stream, but they leave the input and output data (JSON) untouched.

ChangeDetector

This app filters out all messages that have the same value in the specified triggering variable field as the previous message. The differentiating key can finetune the granularity of the filtering by arranging the messages with the same differentiating key into a filter group. That way for example one can use a single flow and perform separate filterings for every asset.

FilterProcessor

This processor interrupts the stream if the configured expression is not fulfilled. This app can be used to filter messages in a DFE stream with custom expressions defined in Spring Expression Language.

MessageSampler

The MessageSampler provides the "limit requests in time" functionality. The purpose of this element is to prevent the overload of the downstream flow by suppressing messages from the same device for a given time period.

It is important to note that the MessageSampler works based on server time and ignores the timestamp sent with the time series data. Suppose we have a MessageSampler set up to allow messages through every 2 minutes and the following time series data given:

1. Message arrives at 11:27:00

1
2
3
4
  {
      {someKey} : {someValue},
      "_timestamp" : "2018.06.07T11:06:00.000Z"
  }

2. Another message arrives at 11:28:00 (one minute after the first one)

1
2
3
4
  {
      {someKey} : {someValue},
      "_timestamp" : "2018.06.07T11:21:00.000Z"
  }

According to the timestamp, 15 minutes have passed, but the MessageSampler ignores this value. The second message is not sent through. Even though the timestamp indicates otherwise, the messages arrived only 1 minute apart from each other (according to server time), not 15 minutes apart. This is a valid scenario because of the asynchronous nature of the system.

Working principle of the MessageSampler

Hysteresis

Hysteresis can be used to react to events with a little slack space. For example, someone wants to know if an engine's RPM goes above a predefined threshold, but they don't want a lot of messages because of oscillations around the threshold. They can choose to suppress messages until some condition is met, like the RPM drops below another threshold value.

Here is an example hysteresis definition:

1
2
Message emitting expression: "MAINMOTOR.rpm > 100"
Suppression releasing expression: "MAINMOTOR.rpm < 80"

This way Hysteresis only emits one message if the value is above 100 and then suppresses any further messages until the value drops under 80, instead of emitting a message each time the motor oscillates between 99 and 101.

If the suppression expression is not defined, the app emits a message for each event which fulfills the message emitting expression.
The --message-emitting-expression and --suppression-releasing-expression parameters can be defined as Spring Expression Language expressions.

Working principle of Hysteresis

Sinks

Sinks are the ending apps of a DFE stream. They only have inputs and perform the last step of a stream. This can be a notification or a change inside the platform.

EventSink

The EventSink receives a message, converts it to an event based on the configured parameters and forwards it to the Event Management Service.
EventSink supports field mapping, which means that a JSON can be defined to map attributes from the incoming message to the event to be created. The parameters description, source, code, severity and acknowledged also can be defined as expressions, like custom fields in the field-mapping-json. Of course, they can be defined as static values as well. For more details check the field-mapping-json parameter in EventSink parameters list.

The following picture shows the decision process to determine whether the system should use an existing event type or create a new one: Decision process for event creation

Data Flow Server

The Data Flow Server (DFS) acts as a management service for the Data Flow Engine, it provides functionalities for listing and modifying streams and their components. To deploy a new DFE stream, a user has to call the API endpoints on the Data Flow Server.

Features

Use the Data Flow Engine API for realizing the following tasks:

  • Create or delete streams
  • Get the list of existing streams or find an exact stream by name
  • Deploy or undeploy a stream
  • Get the list of runtime apps or find an exact app by name
  • Get the specific app instances

The Data Flow Engine is able to:

  • Subscribe to time series data flows from MindSphere
  • Send events when a specific stream has been triggered
  • Filter messages containing irrelevant information

Limitations

  • AWS FIFO queues are not available in Frankfurt REGION, this means that Data Flow Server does not guarantee message ordering.
  • Owing to the asynchronous nature of messages, the processing time of a message is non-deterministic depending on system load.
  • The stream name must be no longer than 15 characters due to infrastructural restrictions.

Example Scenarios

A client wants to monitor an engine in a factory. The engine is a critical part of the manufacturing process and has a history of overheating (Simple Scenario). Conclusions about the engine's condition can be drawn from how much it resonates during operation (Extended Scenario).

Simple Scenario

The client creates a Data Flow Engine stream with a TimeSeriesSource, a FilterProcessor and an EventSink. The TimeSeriesSource listens to a heat sensor placed on the engine. The FilterProcessor is set to only forward messages of temperatures higher than an allowed threshold. In the EventSink chooses a built-in event, instead of customizing one. The sink stays dormant for a while, because the filter does not forward messages until the temperature threshold is reached. Once it is, the client receives notifications about the engine overheating.

Extended Scenario

The same client wants to monitor the resonance of the engine. The client sets up the TimeSeriesSource and the EventSink as for monitoring the temperature, but inserts a FilterProcessor and a MessageSamplerFilter between them. The FilterProcessor is set to only forward messages when the resonance is higher than a threshold (but still lower than what is expected at the end of the lifecycle of the engine). The MessageSamplerFilter is set to only forward messages once per day so the client gets a daily reminder, but can wait for a good offer from the engine manufacturers without being bombarded with notifications.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.