Skip to content

Data Flow Engine


The Data Flow Engine (DFE) enables customers to create workflows based on platform events and ingested data. These workflows can be used for example to automate KPI calculations in third party apps.


For accessing this service you need to have the respective roles listed in Data Flow Engine roles and scopes.


Workflows defined by DFE can be used to filter and process ingested data, react on events, calculate KPIs, aggregate data, modify the data model, write data into platform data lake and to send notifications or create platform events. These workflows are called streams. They consist of a chain of predefined apps. These apps are autonomous, deployed processing units and can be combined and configured to do real time computation by consuming and processing an unbound amount of data. The number of apps is continually enhanced and can be extended by custom apps in the future. Once a stream is deployed, it will automatically start to ingest the configured data flow of the tenant that has deployed it.
The DFE is implemented based on the Spring Cloud Data Flow architecture. Refer to the Spring Cloud Data Flow documentation to understand the idea behind the engine. Similar concepts can be found under different terminologies, such as like triggers - sources, rules - processors, actions - sinks.

DFE streams

The DFE uses message based streams with configurable apps. It differentiates between three types of apps:

  • Source: Input data in order to trigger a flow.

  • Processor: Filter or manipulate the input data.

  • Sink: Execute an action based on the flow's result.

A working stream must contain one source app and one sink app. It can also contain one or more processors between them. MindSphere can store any number of stream definitions, but the number of concurrently deployed streams is restricted according to the purchased plan.

Using these components the model of a DFE stream can be outlined as here:

Schematic diagram of a DFE stream

The apps listed below are already available, but DFE can be enhanced based on general usage or custom needs.


Source apps are used to listen to different source entities or events. This type of application can only stand at the beginning of a stream. A source may also provide an opportunity for filtering to decrease the number of messages flowing through the stream.


The TimeSeriesSource monitors time series uploads and triggers a stream. This is the entry point for time series data into a DFE stream.


TimerSource sends timestamp messages based on a fixed delay, date or cron expression.


Processor apps can be placed inside a DFE stream. Processors can interrupt the stream, but they leave the input and output data (JSON) untouched.


This app filters out all messages that have the same value in the specified triggering variable field as the previous message. The differentiating key can finetune the granularity of the filtering by arranging the messages with the same differentiating key value into a filter group. That way for example one can use a single flow and perform separate filterings for every aspect.


This processor interrupts the stream if the configured expression is not fulfilled. This app can be used to filter messages in a DFE stream with custom expressions defined in Spring Expression Language.


The MessageSampler provides the "limit requests in time" functionality. The purpose of this element is to prevent the overload of the downstream flow by suppressing messages from the same device for a given time period.


The MessageSampler works based on the timestamp sent with the time series data. If the timestamp is missing or cannot be parsed, the message is suppressed from the stream regardless of the payload.


A MessageSampler is set up to let messages through every 2 minutes. The following time series data arrives at the MessageSampler:

1. First message passes MessageSampler:

      {someKey} : {someValue},
      "_timestamp" : "2018.06.07T11:06:00.000Z"

2. Second message is suppressed:

      {some key} : {some value},
      "_timestamp" : "2018.06.07T11:07:30.000Z"

In this example 1 minute and 30 seconds have passed according to the timestamps of the messages. Thus, the second message does not flow into the downstream. However, after the 2 minutes have passed, the next message will be forwarded and restart the cycle.

Working principle of the MessageSampler


Hysteresis can be used to react to events with a little slack space. For example, someone wants to know if an engine's RPM goes above a predefined threshold, but they don't want a lot of messages because of oscillations around the threshold. They can choose to suppress messages until some condition is met, like the RPM drops below another threshold value.

Here is an example hysteresis definition:

Message emitting expression: "MAINMOTOR.rpm > 100"
Suppression releasing expression: "MAINMOTOR.rpm < 80"

The Hysteresis processor emits a single output message when the message emitting expression is fulfilled by an input message for the first time. Afterwards, it becomes dormant and suppresses any input messages until the suppression releasing expression is fulfilled. Only after that has happened, an output message is emitted again as soon as an input message fulfills the message emitting expression. After emission, the application returns into dormant state.

If the suppression releasing expression is not defined, the application forwards all incoming messages which fulfill the message emitting expression. The message-emitting-expression and suppression-releasing-expression properties can be defined as Spring Expression Language expressions.

Working principle of Hysteresis


Sinks are the ending apps of a DFE stream. They only have inputs and perform the last step of a stream. This can be a notification or a change inside the platform.


The EventSink receives a message, converts it to an event based on the configured parameters and forwards it to the Event Management Service.
EventSink supports field mapping, which means that a JSON can be defined to map attributes from the incoming message to the event to be created. The parameters description, source, code, severity and acknowledged can be defined as expressions, like custom fields in the field-mapping-json. Of course, they can be defined as static values as well. For more details check the field-mapping-json parameter in EventSink parameters list.

The following picture shows the decision process to determine whether the system should use an existing event type or create a new one: Decision process for event creation


The EmailSink sends an e-mail when it receives a time series message from the stream. EmailSink is uses the Notification Service and requires that a categoryID has already been created. The Notification Service provides Instructions on how to do this. When time series data arrives at the EmailSink, the application creates the payload using predefined parameters, and triggers the e-mail sending procedure. The EmailSink sends the same message every time it is triggered by the stream.

Data Flow Server

The Data Flow Server (DFS) acts as a management service for the Data Flow Engine, it provides functionalities for listing and modifying streams and their components. To deploy a new DFE stream, a user has to call the API endpoints on the Data Flow Server.


Use the Data Flow Engine API for realizing the following tasks:

  • Create or delete streams
  • Get the list of existing streams or find an exact stream by name
  • Deploy or undeploy a stream
  • Get the list of runtime apps or find an exact app by name
  • Get the specific app instances

The Data Flow Engine is able to:

  • Subscribe to time series data flows from MindSphere
  • Send events when a specific stream has been triggered
  • Filter messages containing irrelevant information


  • AWS FIFO queues are not available in Frankfurt Region, this means that Data Flow Server does not guarantee message ordering.
  • Owing to the asynchronous nature of messages, the processing time of a message is non-deterministic depending on system load.
  • The stream name must be no longer than 15 characters due to infrastructural restrictions.
  • Up to 30 minute delay is expected between deleting an asset and deletion of streams which are subscribed for that asset.
  • EventSink can create custom events, but currently only supports custom fields of type String. If your stream requires custom fields of other data types, create the custom event type using the Event Management Service before setting up your stream.

Example Scenarios

A client wants to monitor an engine in a factory. The engine is a critical part of the manufacturing process and has a history of overheating (Simple Scenario). Conclusions about the engine's condition can be drawn from how much it resonates during operation (Extended Scenario).

Simple Scenario

The client creates a Data Flow Engine stream with a TimeSeriesSource, a FilterProcessor and an EventSink. The TimeSeriesSource listens to a heat sensor placed on the engine. The FilterProcessor is set to only forward messages of temperatures higher than an allowed threshold. In the EventSink chooses a built-in event, instead of customizing one. The sink stays dormant for a while, because the filter does not forward messages until the temperature threshold is reached. Once it is, the client receives notifications about the engine overheating.

Extended Scenario

The same client wants to monitor the resonance of the engine. The client sets up the TimeSeriesSource and the EventSink as for monitoring the temperature, but inserts a FilterProcessor and a MessageSamplerFilter between them. The FilterProcessor is set to only forward messages when the resonance is higher than a threshold (but still lower than what is expected at the end of the lifecycle of the engine). The MessageSamplerFilter is set to only forward messages once per day so the client gets a daily reminder, but can wait for a good offer from the engine manufacturers without being bombarded with notifications.

Any questions left?

Ask the community

Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.