Skip to content

Data Flow Engine - Apps

Sources

TimeSeriesSource

App name: TimeSeriesSource
App type: source

Property Type Description Example

entities

String(JSON)

ID set of entity or entities to subscribe for.

--entities=[{"id":"f7012b9215b448228def9726deffcfc9"}]

Input: -

Output: A message containing time series data of defined entity or entities in JSON array format.

For more details see Creating stream to monitor time series data of an entity.

TimerSource

App name: TimerSource
App type: source

Property Type Description Example

fixed-delay

Integer

Fixed delay between two emissions. (Optional, default: 1)

--fixed-delay=10

timeUnit

TimeUnit

Defines the time unit for fixed and initial delay. Possible options are NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS and DAYS. (Optional, default: SECONDS)

--timeUnit=SECONDS

initial-delay

Integer

Initial delay before first emission. (Optional)

--initial-delay=3

date

String

The date value for the date field. (Optional)

--date="01/01/18 11:11:11"

dateFormat

String

Format for the date value. (Optional, default: "MM/dd/yy HH:mm:ss")

--dateFormat="MM/dd/yy HH:mm:ss"

cron

String

Cron expression value for the Cron Trigger. (Optional)

--cron="*/10 * * * * *"

Input: -

Output: A timestamp. For example:

{ "timestamp": "01/01/18 11:11:11" }

Processors

ChangeDetector

App name: ChangeDetector
App type: processor

Property Type Description Example

differentiating-key

String

Defines which field in the message is used to differentiate between devices of interest, so they are handled individually. For example, this could simply be the entityId from the message or the propertySetName for differentiating between aspects of an asset. Concatenation of multiple values can be done using a comma separated list of strings. (Optional, default: entityId,propertySetName)

--differentiating-key=propertySetName or --differentiating-key=entityId,propertySetName

triggering-variable

String

If this field is provided, the flow is triggered only if the value of this variable changes between subsequent events. If it is left empty, all events trigger the flow (e.g.: temperature). (Optional)

--triggering-variable=temperature

split-list-to-items

Boolean

If the incoming message is an array of JSON data, set this parameter to true, so the data is processed one-by-one. Otherwise, the incoming message is processed as a whole. (Optional, default: true)

--split-list-to-items=false

Input: Output of the preceding element of the stream. For example, time series data in JSON array format.

Output: Identical to the input or absent.

For more details see Creating stream which detects changing value of a field.

FilterProcessor

App name: FilterProcessor
App type: processor

Property Type Description Example

expression

String

Expression for filtering messages. String values must be written inside single quotes (' '), but number values must be written without quotes. See also Enabled expression operators. If not defined, all messages are forwarded. (Optional)

--expression="propertySetName == 'myAspect1'" or --expression="MAINMOTOR.rpm > 156 && SIDECAR.wheelRpm >= 3500"

split-list-to-items

Boolean

If the incoming message is an array of JSON data, set this parameter to true, so the data is processed one-by-one. Otherwise, the incoming message is processed as a whole. (Optional, default: true)

--split-list-to-items=false

Input: Output of the preceding element of the stream. For example, time series data in JSON array format:

{"assetId": "ABC127'", "MAINMOTOR": { "rpm": 100, "temp": 76 }, "SIDECAR": { "wheelRpm": 5000 }}

Output: Identical to the input or absent.

Enabled expression operators

The following operators are enabled for expressions. The operands are constant values or input JSON variables.

Operator Description Example
==, ep equal entityId == 'ABC123'
!=, ne not equal temperature ne 75
<, lt less than rpm < 1500
<= , le less than or equal wheelNumber le 4
>, gt greater than frequency > 220
>=, ge greater than or equal minimalValue >= 101
&& logical AND operation motor.rpm > 10000 && motor.temperature > 180
|| logical OR operation status == 'error' || status == 'pending'
+ addition line1.quantity + line2.quantity > 200
- subtraction temperature1 - temperature2 < temperature3
* multiplication errorCounter * assetNumber < 20
/ division availableAssetNumber / allAssetNumber >= 1
% modulus productNumber % 2 != 1
^ exponential power results ^ 2
property.contains(variable) true, if property array contains variable changedProperties.contains(location)

For more details see Creating stream for filtering messages according a condition.

MessageSampler

App name: MessageSampler
App type: processor

Property Type Description Example

key

String

Defines which field in the message is used to differentiate between devices of interest, so they are handled individually. For example, this could simply be the assetId or, for differentiating between aspects of an asset, the propertySetName.

--key=propertySetName

duration

String

Minimum duration between two events. All events between the last event and the end of the duration are dismissed. Use the ISO-8601 duration format to define the duration. Format: P[n]Y[n]M[n]DT[n]H[n]M[n]S (e.g.: P1DT12H means that the minimum time between two events is 1 day and 12 hours). (Optional)

--duration="P1DT12H"

timestamp-path

String

Path of the timestamp in the JSON structured message described with JSONPath pattern. The default is: $._time. See: JSONPath pattern (Optional)

--timestamp-path='$._time'

timestamp-format

String

Format of the timestamp in the JSON structured message, which is queried by JSONPath pattern. The default is: yyyy-MM-dd'T'HH:mm:ss.SSSX, where 'X' refers to the time zone. See: ISO_8601 time format (Optional)

--timestamp-format="yyyy-MM-dd'T'HH:mm:ss.SSSX"

split-list-to-items

Boolean

If the incoming message is an array of JSON data, set this parameter to true, so the data is processed one-by-one. Otherwise, the incoming message is processed as a whole. (Optional, default: true)

--split-list-to-items=false

Input: Output of the preceding element of the stream. For example, time series data in JSON array format.

Output: Identical to the input or absent.

For more details see Creating stream for avoiding message overflow.

Hysteresis

App name: Hysteresis
App type: processor

Property Type Description Example

key

String

Defines which field in the message is used to differentiate between devices of interest, so they are handled individually. For example, this could simply be the propertySetName (the field, which refers to the aspect name) for differentiating between aspects of an asset.

--key=propertySetName

message-emitting-expression

String

Expression that must be true for the system to emit messages. If a suppression-releasing-expression is defined, the suppression must previously have been released to emit a message. See also Enabled expression operators.

--message-emitting-expression="MAINMOTOR.rpm > 100"

suppression-releasing-expression

String

Expression that must be true to release message suppression. After forwarding one message, the filter suppresses all subsequent messages until this expression is true, even if the message fulfills the message-emitting-expression. See also Enabled expression operators. (Optional)

--suppression-releasing-expression="MAINMOTOR.rpm < 50 || MAINMOTOR.rpm > 250"

split-list-to-items

Boolean

If the incoming message is an array of JSON data, set this parameter to true, so the data is processed one-by-one. Otherwise, the incoming message is processed as a whole. (Optional, default: true)

--split-list-to-items=false

Input: Output of the preceding element of the stream. For example, time series data in JSON array format.

{"assetId": "afhasf", "MAINMOTOR": { "rpm": 100, "temp": 76 }, "SIDECAR": { "wheelRpm": 5000 }}

Output: Identical to the input or absent.

For more details see Creating stream for indicating oscillation.

Sinks

EventSink

App name: EventSink
App type: sink

Property Type Description Example

entity-id-path

String

Path to the field that identifies the ID of the resource asset (e.g.: entityId which contains the ID of asset). If the incoming message does not contain an ID, a static value can be set here (e.g., an exact entity ID). If so, the value must be in format: [quotation mark][apostrophe]id[apostrophe][quotation mark].

--entity-id-path=entityId or --entity-id-path="'f7012b9215b448228def9726deffcfc9'"

event-type-name

String

Name of the event type to be used for creating the new event. If no event type with this name exists in the tenant system, an event type with the given name created by Siemens is searched. If the event type cannot be found and auto-create-type is not set to true, the stream won't deploy. (Optional, default: "MindSphereStandardEvent")

--event-type-name="MyCustomEvent1"

auto-create-type

Boolean

Must be true, if a new event type shall be created. For creating a new event type with LOCAL scope, the event-type-name parameter must contain a unique name for new entity type, the auto-create-type must be true and field-mapping-json might be used for adding additional fields. Newly created event types extend the MindSphereStandardEvent. (Optional, default: false)

--auto-create-type=true

field-mapping-json

String(JSON)

JSON string, which defines custom fields, when creating a custom event type. For already existing event types, it defines, which values are mapped to which custom fileds. For example, if the JSON string looks like this: '{"wheelRpm": "car.wheel.rpm"}', the wheelRpm field of the created event will contain the value of the car.wheel.rpm field in the incoming message. If the car.wheel.rpm field cannot be resolved as a value in the incoming message, the static string value car.wheel.rpm will be set for the wheelRpm field. (Optional)

--field-mapping-json='{"wheelRpm": "car.wheel.rpm", "motorTemp": "car.motor.temp"}'

description

String

Description of the event. The maximum length is 255 characters. If not specified, the value is the name of the stream definition. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and its extensions). (Optional)

--description="This is an event"

source

String

Source of the event. The maximum length is 255 characters. If not specified, the value is the name of the stream definition. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and its extensions). (Optional)

--source="My TimeSeries"

severity

Integer

Severity of the event. Value must be between 1 and 100. The default value is 1. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and its extensions). (Optional)

--severity=3

code

String

Code of the event. The maximum length is 16 characters. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and its extensions). (Optional)

--code="CylinderError3"

acknowledged

Boolean

Indicates whether the given event is acknowledged or not. The default value is false. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and its extensions). (Optional)

--acknowledged=true

timestamp-path

String

Name of timestamp field from the incoming message. The value of the referenced field must be Unix timestamp in milliseconds or UTC timestamp. If set, the timestamp value of the incoming message is used during entity creation. Otherwise, the timestamp is generated. (Optional)

--timestamp-path="_time"

split-list-to-items

Boolean

If the incoming message is an array of JSON data, set this parameter to true, so the data is processed one-by-one. Otherwise, the incoming message is processed as a whole. (Optional, default: true)

--split-list-to-items=false

Input: Output of the preceding element of the stream. For example, time series data in JSON array format.

Output: An event is created in Event Management according to the incoming time series data. This is the responsibility of the application, the user should only configure the given properties.

For more details see Creating stream which creates custom events.

EmailSink

App name: EmailSink
App type: sink

Property Type Description Example

message-category-id

String

Refers to a Notification Service object, in which the recepients, subject, template for the content and further details of the e-mail are defined.

--message-category-id=1254

body

String(JSON)

Defines what to fill into the placeholders of the e-mail template. The Notification replaces placeholders based on key-value pairs. This property is mandatory, and it must be JSON formatted.

--body='{"testKey":"testValue"}'

Input: Output of the preceding element of the stream. For example a time series data in JSON array format.

Output: An e-mail sent to the preset recipients.

For more details see Creating stream for filtering messages according a condition.

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.