Skip to content

Data Flow Engine - APIs and References

Sources

TimeSeriesSource

App name: TimeSeriesSource
App type: source

Property Type Description Example

entities

String(JSON)

Id set of entity or entities to subscribe for.

--entities=[{"id":"f7012b9215b448228def9726deffcfc9"}]

Input: -

Output: A message containing time series data of defined entity or entities in JSON array format.

For more details see Creating stream to monitor time series data of an entity.

TimerSource

App name: TimerSource
App type: source

Property Type Description Example

fixed-delay

Integer

Fixed delay between two emission. (Optional, default: 1)

--fixed-delay=10

timeUnit

TimeUnit

Setting up what time unit to use for fixed and initial delay. Possible options are NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS and DAYS. (Optional, default: SECONDS)

--timeUnit=SECONDS

initial-delay

Integer

Initial delay before first emission. (Optional)

--initial-delay=3

date

String

The date value for the date. (Optional)

--date="01/01/18 11:11:11"

dateFormat

String

Format for the date value. (Optional, default: "MM/dd/yy HH:mm:ss")

--dateFormat="MM/dd/yy HH:mm:ss"

cron

String

Cron expression value for the Cron Trigger. (Optional)

--cron="*/10 * * * * *"

Input: -

Output: A timestamp. For example:

{ "timestamp": "01/01/18 11:11:11" }

Processors

ChangeDetector

App name: ChangeDetector
App type: processor

Property Type Description Example

differentiating-key

String

The field in the message that will be used to differentiate between devices of interest (e.g.: the id of the assets).

--differentiating-key=id

triggering-variable

String

If this field is provided, the flow will be triggered only if the value of this variable changes between subsequent events. When it is left empty then all events trigger the flow (e.g.: temperature). (Optional)

--triggering-variable=temperature

splitListToItems

Boolean

Provided the incoming message is an array of JSON data, this parameter should be true, therefore the data is going to be processed one-by-one. Otherwise the incoming message is going to be processed as a whole. (Optional, default: false)

--splitListToItems=true

Input: Output of the preceding element of the stream. For example a time series data in JSON array format.

Output: Identical to the input or absent.

For more details see Creating stream which detects changing value of a field.

FilterProcessor

App name: FilterProcessor
App type: processor

Property Type Description Example

expression

String

Expression for filtering messages. String values must be put between apostrophes, but no apostrophes should be used for number values. See also Enabled expression operators. If not defined, all messages will be forwarded. (Optional)

--expression="assetId == 'ABC127'" or --expression="MAINMOTOR.rpm > 156 && SIDECAR.wheelRpm >= 3500"

splitListToItems

Boolean

Provided the incoming message is an array of JSON data, this parameter should be true, therefore the data is going to be processed one-by-one. Otherwise the incoming message is going to be processed as a whole. (Optional, default: false)

--splitListToItems=true

Input: Output of the preceding element of the stream. For example a time series data in JSON array format:

{"assetId": "ABC127'", "MAINMOTOR": { "rpm": 100, "temp": 76 }, "SIDECAR": { "wheelRpm": 5000 }}

Output: Identical to the input or absent.

Enabled expression operators

The following operators are enabled for expressions. The operands are constant values or input JSON variables.

Operator Description Example
==, ep equal entityId == 'ABC123'
!=, ne not equal temperature ne 75
<, lt less than rpm < 1500
<= , le less than or equal wheelNumber le 4
>, gt greater than frequency > 220
>=, ge greater than or equal minimalValue >= 101
&& logical AND operation motor.rpm > 10000 && motor.temperature > 180
|| logical OR operation status == 'error' || status == 'pending'
+ addition line1.quantity + line2.quantity > 200
- subtraction temperature1 - temperature2 < temperature3
* multiplication errorCounter * assetNumber < 20
/ division availableAssetNumber / allAssetNumber >= 1
% modulus productNumber % 2 != 1
^ exponential power results ^ 2
property.contains(variable) true, if property array is containing variable changedProperties.contains(location)

For more details see Creating stream for filtering messages according a condition.

MessageSampler

App name: MessageSampler
App type: processor

Property Type Description Example

key

String

The field in the message that will be used to differentiate between devices of interest (e.g.: the id of the assets).

--key=aspectId

duration

String

The minimal duration between two events. All events from the last event till the end of the duration will be dismissed. Use the ISO-8601 duration format to define the duration. Format: P[n]Y[n]M[n]DT[n]H[n]M[n]S (e.g.: P1DT12H means that the minimal time between two events is 1 day and 12 hours). (Optional)

--duration="P1DT12H"

splitListToItems

Boolean

Provided the incoming message is an array of JSON data, this parameter should be true, therefore the data is going to be processed one-by-one. Otherwise the incoming message is going to be processed as a whole. (Optional, default: false)

--splitListToItems=true

Input: Output of the preceding element of the stream. For example a time series data in JSON array format.

Output: Identical to the input or absent.

For more details see Creating stream for avoiding message overflow.

Hysteresis

App name: Hysteresis
App type: processor

Property Type Description Example

key

String

The field in the message that will be used to differentiate between device groups of interest, in the simple use case it could be an assetId. That way it handles all devices individually.

--key=aspectId

message-emitting-expression

String

The system emits messages based on this filter. A message gets through the filter if the previous message has been evaluated to false and the current message evaluates to true. See also Enabled expression operators.

--message-emitting-expression="MAINMOTOR.rpm > 100"

suppression-releasing-expression

String

This expression can be used to suppress messages based on the criteria in the expression. It filters out all subsequent messages after an unfiltered message as long as this criteria is not met, even if the message should not be filtered. See also Enabled expression operators. (Optional)

--suppression-releasing-expression="MAINMOTOR.rpm < 50 || MAINMOTOR.rpm > 250"

splitListToItems

Boolean

Provided the incoming message is an array of JSON data, this parameter should be true, therefore the data is going to be processed one-by-one. Otherwise the incoming message is going to be processed as a whole. (Optional, default: false)

--splitListToItems=true

Input: Output of the preceding element of the stream. For example a time series data in JSON array format.

{"assetId": "afhasf", "MAINMOTOR": { "rpm": 100, "temp": 76 }, "SIDECAR": { "wheelRpm": 5000 }}

Output: Identical to the input or absent.

For more details see Creating stream for indicating oscillation.

Sinks

EventSink

App name: EventSink
App type: sink

Property Type Description Example

entity-id-path

String

Path to the field that identifies the ID of the resource entity (e.g.: assetId or userId or similar). If the incoming message does not contain any ID, a static value can also be set here (e.g.:an exact entity id). If so, the value should be in format: [quotation mark][apostrophe]id[apostrophe][quotation mark].

--entity-id-path=assetId or --entity-id-path="'f7012b9215b448228def9726deffcfc9'"

event-type-name

String

Name of the event type that want to be used for creating new event. If the --event-type-name is set, but no event type is existing with this name in the system created by your tenant, then it will be checked, whether an event type is created with the given name by Siemens. If the event type has been created by your tenant or Siemens, it will be used. If event type cannot be found by name and --auto-create-type is not set to true, the stream won't deploy. (Opitonal, default: "MindSphereStandardEvent")

--event-type-name="MyCustomEvent1"

auto-create-type

Boolean

This property should be true, if new event type is want to be created. For creating new event type with LOCAL scope, the --event-type-name property must contain unique name for new enity-type, the --auto-create-type must be true and --field-mapping-json might be used for adding additional fields. Newly created event-types are extending the MindSphereStandardEvent. (Optional, default: false)

--auto-create-type=true

field-mapping-json

String(JSON)

JSON String for defining additional fields during new event type creation. The new event type will contain the defined fields. For example if the JSON looks like this: '{"wheelRpm": "car.wheel.rpm", "motorTemp": "car.motor.temp"}', then in the created event the wheelRpm; field is going to contain the value of the car.wheel.rpm field in the incoming message. If the car.wheel.rpm field is cannot be resolved as a value in the incoming message, then the string value car.wheel.rpm will be set for the wheelRpm field. The same is true for every other fields defined in the --field-mapping-json. (Optional)

--field-mapping-json='{"wheelRpm": "car.wheel.rpm", "motorTemp": "car.motor.temp"}'

description

String

Description of the event. The maximum length is 255 characters. If not specified, the value is the name of the stream definition. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and it’s extensions). (Optional)

--description="This is an event"

source

String

Source of the event. The maximum length is 255 characters. If not specified, the value is the name of the stream definition. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and it’s extensions). (Optional)

--source="My TimeSeries"

severity

Integer

Severity of the event. Value must be between 1 and 100. If not specified, default value is 1. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and it’s extensions). (Optional)

--severity=3

code

String

Code of the event. The maximum length is 16 characters. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and it’s extensions). (Optional)

--code="CylinderError3"

acknowledged

Boolean

Indicates whether the given event is acknowledged or not. Default value is false. Only applies when the used event type has such field (e.g. MindSphereStandardEvent and it’s extensions). (Optional)

--acknowledged=true

timestamp-path

String

Name of timestamp field from the incoming message. The value of referenced field must be Unix timestamp in milliseconds or UTC timestamp. If set, the timestamp value of property will be used during entity creation, otherwise the timestamp will be generated. (Optional)

--timestamp-path="_time"

splitListToItems

Boolean

Provided the incoming message is an array of JSON data, this parameter should be true, therefore the data is going to be processed one-by-one. Otherwise the incoming message is going to be processed as a whole. (Optional, default: true)

--splitListToItems=true

Input: Output of the preceding element of the stream. For example a time series data in JSON array format.

Output: An event will be created in Event Management according the incoming time series data. This is the responsibility of the application, the user should only configure the given properties.

For more details see Creating stream which creates custom events.

API Specification

Download OpenAPI Specification

Any questions left?

Ask the community


Except where otherwise noted, content on this site is licensed under the MindSphere Development License Agreement.