Skip to content

Data Flow Engine

Idea

The Data Flow Engine (DFE) enables customers to create workflows based on platform events and ingested data which then can be used i.e. to automate KPI calculations in 3rd party applications.

Access

The Data Flow Engine is exposed to our application developers as a REST API. Monitoring, definition, deployment and modifications of workflow can be done by simple API calls.

The following permissions are required for creating streams or managing applications:

  • Managing streams: dfs.sm
  • Reading application management: dfs.amr
  • Reading runtime applications: dfs.ras

Data Flow Engine roles and scopes

Basics

Workflows defined by DFE can be used to filter and process ingested data, react on events, calculate KPIs, aggregate data, modify the data model, write data into platform data lake and to send notifications or create platform events. For this purpose a chain of predefined applications can be used, these processing pipelines are called streams. The applications are autonomous, deployed processing units and can be combined and configured to do real time computation by consuming and processing an unbound amount of data. The number of applications is continually enhanced and can be extended by custom applications in the future. Once a stream is deployed, it will automatically start to ingest the configured data flow of the tenant that has deployed it.
The engine implemented based on the Spring Cloud Data Flow architecture, so it could be useful to have a look into the Spring Cloud Data Flow documentation to understand the idea behind the engine. Similar or comparable concepts can be found under different terminologies — like triggers - sources, rules - processors, actions - sinks, etc.

DFE streams

The DFE uses message based streams with configurable apps. We differentiate between three types of apps:

  • Source: Monitor events in order to trigger a flow.

  • Processor: Filter or manipulate the input data.

  • Sink: Execute an action based on the flow's result.

A working stream needs to contain one source application, one sink application and optionally one or more processors between them. MindSphere can store any number of stream definitions, but number of concurrently deployed streams is restricted according to the size of the purchased plan.

Using these components the model of a DFE stream can be outlined as here:

The applications listed below are already available, but DFE can be enhanced based on general usage or custom needs.

Sources

Source apps are used to listen to different source entities or events. This type of application can stand only at the beginning of a stream. A source may also provide an opportunity for filtering to decrease the number of messages flowing through the stream.

TimeSeriesSource

The TimeSeriesSource monitors time series uploads and triggers a stream. This is the entry point of time series data into a DFE stream.

TimerSource

TimerSource sends timestamp messages based on a fixed delay, date or cron expression.

Processors

Processor apps can be placed inside a DFE stream. Processors can interrupt the stream, but they leave the input and output data (JSON) untouched.

ChangeDetector

This app filters out all messages that have the same value in the specified triggering variable field as the previous message. The differentiating key can fine tune the granularity of the filtering by arranging the messages with the same differentiating key into a filter group. That way for example one can use a single flow and have separate filtering for every asset.

FilterProcessor

This processor interrupts the stream if the configured expression isn’t fulfilled. This app can be used to filter messages in a DFE stream with custom expressions defined in Spring Expression Language.

MessageSampler

The MessageSampler provides "limit requests in time" functionality. The purpose of this element is to prevent the overload of the downstream flow by suppressing messages from the same device for a given time period.

It is important to note that the MessageSampler works based on server time and disregards the timestamp sent with the time series data. Suppose we have a MessageSampler set up to allow messages through every 2 minutes and the following time series data given:

1. Message arrives at 11:27:00

1
2
3
4
  {
      <some key> : <some value>,
      "_timestamp" : "2018.06.07T11:06:00.000Z"
  }

2. Another message arrives at 11:28:00 (one minute after the first one)

1
2
3
4
  {
      <some key> : <some value>,
      "_timestamp" : "2018.06.07T11:21:00.000Z"
  }

According to the timestamp, 15 minutes have passed, but the MessageSampler is ignoring this value. The second message won't go through. Even though the timestamp indicates so, the messages arrived only 1 minute apart from each other (according to server time), not 15 minutes apart. This is a valid scenario because of the asynchronous nature of the system.

Hysteresis

Hysteresis can be used to react to events with a little bit slack space. For example, if someone is interested in an engine's RPM going above a predefined threshold but he does not want a lot of messages because of the oscillation around the threshold, he can choose to suppress the events until some condition is met, like the RPM drops below another threshold value.

Here is an example hysteresis definition:

1
2
Message emitting expression: "MAINMOTOR.rpm > 100"
Suppression releasing expression: "MAINMOTOR.rpm < 80"
This way Hysteresis won’t emit a message each time the motor oscillates between 99 and 101, but will emit a message if the value has been under 80 in the previous message and currently is above 100.
If the suppression expression is not defined, the application will emit a message for each event which fulfills the main expression.
The --message-emitting-expression and --suppression-releasing-expression properties can be defined as Spring Expression Language expressions.

Sinks

Sinks are the ending apps of a DFE stream. They have only inputs and they perform the last stream step. This can be a notification or a change inside the platform.

EventSink

The EventSink receives a message and converts it to an event based on the configuration properties and forwards it to the Event Management Service.
Supports field mapping, which means that a JSON can be defined to map attributes from the incoming message to the event to be created. The properties description, source, code, severity and acknowledged also can be defined as expressions, like custom fields in the field-mapping-json. Of course, they can be defined as static values as well. For more details check the field-mapping-json property in EventSink properties list.

The following picture shows the decision process whether the system should use an existing event type or create a new one:

Data Flow Server

The Data Flow Server (DFS) acts as a management service for the Data Flow Engine, it provides functionalities for listing and modifying streams and their components. To deploy a new DFE stream, a user has to call the API endpoints on the Data Flow Server.

Features

Use the Data Flow Engine API for realizing the following tasks:

  • Create or delete streams.
  • Get the list of existing streams or find an exact stream by name.
  • Deploy or undeploy a stream.
  • Get the list of runtime apps or find an exact app by name.
  • Get the specific app instances.

The Data Flow Engine is able to:

  • Subscribe to time series data flows form MindSphere.
  • Send events when a specific stream has been triggered.
  • Filter messages containing disinterested information.

Restrictions

  • AWS FIFO queues are not available in Frankfurt REGION, this means that Data Flow Server does not guarantee message ordering.
  • Owing to the asynchronous nature of messages, the processing time of a message is non-deterministic depending on system load.
  • The stream name must be no longer than 15 characters due to infrastructural restrictions.

Example Scenarios

A client wants to monitor an engine in a factory. The engine is a critical part of the manufacturing process and has a history of overheating. Conclusions about the engine's condition can be drawn from how much it resonates during operation.

Simple Scenario

The client registers the engine and creates a variable for a heat sensor placed on it. Then he creates a Data Flow Engine stream with a TimeSeriesSource, a FilterProcessor and an EventSink. He sets the source's entity id property to match that of the engine's and sets the filter up so that it only allows those messages through that represent a temperature higher than the allowed threshold. At the end of the stream he installs an EventSink with either a built-in event, or a custom one. The sink stays dormant for a while, because the filter does not allow messages through until the temperature threshold is reached. Once it is, the client starts getting notifications about the engine overheating.

Extended Scenario

The same client wants to monitor the resonance of the engine. He sets up the TimeSeriesSource and the EventSink the same way he did for the temperature. Between them he installs a FilterProcessor, therefore he only gets events when the resonance is higher than a threshold (but still lower than what is expected at the end of the lifecycle of the engine). He doesn't want to order a new engine as soon as he gets the notification (he's waiting for a good offer from the engine manufacturers), so he sets up a MessageSamplerFilter so that he only gets a reminder notification once a day instead of being bombarded every second or so.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.