Skip to content

Data Flow Engine - 应用规范

来源

TimeSeriesSource

应用名称:TimeSeriesSource
**应用类型:**来源

属性 类型 描述 示例

entities

String(JSON)

要订阅的实体的 ID 集。

--entities=[{"id":"f7012b9215b448228def9726deffcfc9"}]

输入: -

**输出:**包含 JSON 数组格式的已定义实体时间序列数据的消息。

有关更多详细信息,请参见创建流以监视实体的时间序列数据

TimerSource

应用名称:TimerSource
**应用类型:**来源

属性 类型 描述 示例

fixed-delay

Integer

两次发送之间的固定延迟。(可选,默认值:1)

--fixed-delay=10

timeUnit

TimeUnit

定义固定和初始延迟的时间单位。可选项包括 NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTESHOURSDAYS。(可选,默认值:SECONDS

--timeUnit = SECONDS

initial-delay

Integer

首次发送前的初始延迟。(可选)

--initial-delay=3

date

String

日期字段的日期值。(可选)

--date="01/01/18 11:11:11"

dateFormat

String

日期值的格式。(可选,默认值:"MM/dd/yy HH:mm:ss")

--dateFormat="MM/dd/yy HH:mm:ss"

cron

String

Cron 触发器的 Cron 表达式值。(可选)

--cron="*/10 * * * * *"

输入: -

**输出:**时间戳。例如:

{ "timestamp":“01/01/18 11:11:11”}

处理器

ChangeDetector

应用名称:ChangeDetector **应用类型:**处理器

属性 类型 描述 示例

differentiating-key

String

定义消息中的哪个字段用于区分相关设备,以便对其进行单独处理。例如,该字段可能仅是消息中的 entityId 或用于区分 asset 各个 aspects 的 propertySetName。可以通过使用逗号分隔的字符串列表来完成多个值的串联。(可选,默认值:entityId、propertySetName)

--differentiating-key=propertySetName--differentiating-key=entityId,propertySetName

triggering-variable

String

如果提供此字段,则仅当此 variable 的值在后续事件之间更改时才会触发流。若将它保留为空,则所有事件都会触发流(例如:温度)。(可选)

--triggering-variable=temperature

split-list-to-items

Boolean

如果传入消息是一个 JSON 数据数组,请将此参数设置为 true,以便逐个处理数据。否则,传入消息将作为整体进行处理。(可选,默认值:true)

--split-list-to-items=false

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

**输出:**与输入相同或不存在。

有关更多详细信息,请参见创建可检测字段值变化的流

FilterProcessor

应用名称:FilterProcessor **应用类型:**处理器

属性 类型 描述 示例

expression

String

用于过滤消息的表达式。字符串值必须写在单引号 (' ') 中,但数字值不得使用引号。另请参见已启用的表达式运算符。如果未定义,则将转发所有消息。(可选)

--expression="propertySetName == 'myAspect1'"--expression="MAINMOTOR.rpm > 156 && SIDECAR.wheelRpm >= 3500"

split-list-to-items

Boolean

如果传入消息是一个 JSON 数据数组,请将此参数设置为 true,以便逐个处理数据。否则,传入消息将作为整体进行处理。(可选,默认值:true)

--split-list-to-items=false

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据:

{"assetId":"ABC127'", "MAINMOTOR": { "rpm":100, "temp":76 }, "SIDECAR": { "wheelRpm":5000 }}

**输出:**与输入相同或不存在。

已启用的表达式运算符

为表达式启用以下运算符。操作数为常量值或输入 JSON variables。

运算符 描述 示例
==、ep 等于 entityId =='ABC123'
!=、ne 不等于 temperature ne 75
<、lt 小于 rpm < 1500
<=、le 小于或等于 wheelNumber le 4
>、gt 大于 frequency > 220
>=、ge 大于或等于 minimalValue >= 101
&& 逻辑 AND 运算 motor.rpm > 10000 &amp;&amp; motor.temperature > 180
+ line1.quantity + line2.quantity > 200
- temperature1 - temperature2 < temperature3
* errorCounter * assetNumber < 20
/ availableAssetNumber / allAssetNumber >= 1
模数 productNumber % 2 != 1
^ 幂指数 results ^ 2
property.contains(variable) 如果属性数组包含variable,则为 true changedProperties.contains(location)

有关更多详细信息,请参见创建可根据条件过滤消息的流

MessageSampler

应用名称: MessageSampler **应用类型:**处理器

属性 类型 描述 示例

key

String

定义消息中的哪个字段用于区分相关设备,以便对其进行单独处理。例如,该字段可能仅是 assetId,或者用于区分 asset 的各个 aspects 的 propertySetName

--key=propertySetName

duration

String

两个事件之间的最短持续时间。最后一个事件与持续时间结束之间的所有事件都将被驳回。使用 ISO-8601 持续时间格式来定义持续时间。格式:P[n]Y[n]M[n]DT[n]H[n]M[n]S(例如:P1DT12H 表示两个事件之间的最短时间为 1 天 12 小时)。(可选)

--duration="P1DT12H"

timestamp-path

String

使用 JSONPath 模式描述的 JSON 结构化消息中的时间戳路径。默认值为:$._time。参见:JSONPath 模式(可选)

--timestamp-path='$._time'

timestamp-format

String

JSON 结构化消息中时间戳的格式,由 JSONPath 模式查询。默认值为:yyyy-MM-dd'T'HH:mm:ss.SSSX,其中“X”表示时区。参见:ISO_ 8601 时间格式(可选)

--timestamp-format="yyyy-MM-dd'T'HH:mm:ss.SSSX"

split-list-to-items

Boolean

如果传入消息是一个 JSON 数据数组,请将此参数设置为 true,以便逐个处理数据。否则,传入消息将作为整体进行处理。(可选,默认值:true)

--split-list-to-items=false

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

**输出:**与输入相同或不存在。

有关更多详细信息,请参见创建用于避免消息溢出的流

Hysteresis

应用名称:Hysteresis **应用类型:**处理器

属性 类型 描述 示例

key

String

定义消息中的哪个字段用于区分相关设备,以便对其进行单独处理。例如,该字段可能仅是 propertySetName(指 aspect 名称字段),用于区分 asset 的各个 aspects。

--key=propertySetName

message-emitting-expression

String

必须为 true 才能使系统发出消息的表达式。如果定义了 suppression-releasing-expression,则必须先释放抑制才可发出消息。另请参见已启用的表达式运算符

--message-emitting-expression="MAINMOTOR.rpm > 100"

suppression-releasing-expression

String

必须为 true 才能释放消息抑制的表达式。转发一条消息后,即使消息满足 message-emitting-expression,过滤器也会抑制所有后续消息,直到该表达式为 true。另请参见已启用的表达式运算符。(可选)

--suppression-releasing-expression="MAINMOTOR.rpm < 50 || MAINMOTOR.rpm > 250"

split-list-to-items

Boolean

如果传入消息是一个 JSON 数据数组,请将此参数设置为 true,以便逐个处理数据。否则,传入消息将作为整体进行处理。(可选,默认值:true)

--split-list-to-items=false

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

{"assetId": "afhasf", "MAINMOTOR": { "rpm":100, "temp":76 }, "SIDECAR": { "wheelRpm":5000 }}

**输出:**与输入相同或不存在。

有关更多详细信息,请参见创建用于指示振荡的流

Marker

应用名称:Marker **应用类型:**处理器

属性 类型 描述 示例

header-key

String

定义要添加的消息头键。

--header-key=color

header-value

String

定义要添加的消息头值。

--header-value=green

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

**输出:**与带有一个附加消息头元素的输入相同。

Sequence

应用名称:Sequence **应用类型:**处理器

属性 类型 描述 示例

labels

String 列表

来自不同流的消息标签。必须指定至少两个标签。Sequence 应用将忽略消息头和有效载荷中不包含任何指定标签的消息

--labels=["A","B","C"]

label-path

String

指向消息中标签的 SpEL。以下示例可搜索消息头中的 color 值。(可选,默认值:headers["sequence-label"])

--label-path=headers["color"]

conditions

String

包含消息标签的 JSON 数组,接收顺序是 任意的。仅在模式 1 和模式 2 的情况下才需要该字段。(可选)

--conditions=["A", "B"]

within

String

定义达成 conditions 所需的消息之间的最长时间段。仅在模式 1 和模式 2 的情况下才需要该字段。(可选)

--within="PT90S"

initial-condition

String

定义应首先到达的消息标签。仅在模式 2 的情况下才需要该字段(可选)

--initial-condition="A"

filter-id

String

定义传出消息中包含的过滤器。(可选,默认值:流的名称)

--filter-id=streamName

timestamp-path

String

指向传入消息时间戳的 SpEL。引用字段的值必须是以毫秒为单位的 Unix 时间戳或 UTC 时间戳。

--timestamp-path="payload._time"

timestamp-format

String

用于指定 timestamp-path 中提供的时间戳格式的 SpEL。该值必须遵循 java.text.SimpleDateFormat 模式。

(可选,默认值为 ISO8601 UTC 时间戳,如 2018-07-24T19:33:44.567Z。)

--timestamp-format="yyyy-MM-dd'T'HH:mm:ss[.SSS]X"

initial-conditions

JSON 数组

描述模式 3 情况下的消息 序列 的 JSON 数组。

 序列描述符 包含 以下字段:firstsecondwithinfirstsecond 是指到达顺序必须排在第一和第二位的消息的标签。within 以 ISO8601 格式定义了两条消息之间的最大时间段。

使用 AND 关系处理多个序列描述符。使用 EXCLUSIVE OR 关系处理关于 initial-conditionconditionswithininitial-conditions

--initial-conditions=[{\"first\":\"A\",\"second\":\"B\",\"within\":\"PT30S\"},{\"first\":\"B\",\"second\":\"C\",\"within\":\"PT30S\"}]

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

**输出:**包含已满足所需条件的消息的序列消息。

Sinks

EventSink

应用名称:EventSink **应用类型:**sink

属性 类型 描述 示例

entity-id-path

String

识别资源 asset ID 的字段的路径(例如:包含 asset ID 的 entityId)。如果传入消息不包含 ID,则可以在此设置静态值(例如,确切的实体 ID)。在这种情况下,值必须采用以下格式:[引号][撇号]id[撇号][引号]。

--entity-id-path=entityId or --entity-id-path="'f7012b9215b448228def9726deffcfc9'"

event-type-name

String

用于创建新事件的事件类型名称。如果租户系统中不存在具有此名称的事件类型,则将搜索由 Siemens 创建的具有给定名称的事件类型。如果找不到该事件类型,则不会部署流。要创建新的事件类型,请参见事件类型创建示例。(可选,默认值:"MindSphereStandardEvent")

--event-type-name="MyCustomEvent1"

field-mapping-json

String(JSON)

JSON 字符串,用于定义将传入消息中的哪些值映射到事件类型中的哪些自定义字段。例如,如果 JSON 字符串为:'{“wheelRpm”:“car.wheel.rpm”}',则已创建事件的 wheelRpm 字段将包含传入消息中 car.wheel.rpm 字段的值。如果无法将 car.wheel.rpm 字段解析为传入消息中的值,则将为 wheelRpm 字段设置静态字符串值 car.wheel.rpm。(可选)

--field-mapping-json='{"wheelRpm": "car.wheel.rpm", "motorTemp": "car.motor.temp"}'

description

String

事件描述。最大长度为 255 个字符。如果未指定,则该值为流定义的名称。仅在使用的事件类型具有此类字段时才适用(例如MindSphereStandardEvent 及其扩展)。(可选)

--description="This is an event"

source

String

事件的来源。最大长度为 255 个字符。如果未指定,则该值为流定义的名称。仅在使用的事件类型具有此类字段时才适用(例如MindSphereStandardEvent 及其扩展)。(可选)

--source="My TimeSeries"

severity

Integer

事件的严重度。值必须介于 1 和 100 之间。默认值为 1。仅在使用的事件类型具有此类字段时才适用(例如MindSphereStandardEvent 及其扩展)。(可选)

--severity=3

code

String

事件代码。最大长度为 16 个字符。仅在使用的事件类型具有此类字段时才适用(例如MindSphereStandardEvent 及其扩展)。(可选)

--code="CylinderError3"

acknowledged

Boolean

指示是否已确认给定事件。默认值为 false。仅在使用的事件类型具有此类字段时才适用(例如MindSphereStandardEvent 及其扩展)。(可选)

--acknowledged=true

timestamp-path

String

传入消息的时间戳字段名称。引用字段的值必须是以毫秒为单位的 Unix 时间戳或 UTC 时间戳。如果已设置,则在实体创建期间使用传入消息的时间戳值。否则,将生成时间戳。(可选)

--timestamp-path="_time"

split-list-to-items

Boolean

如果传入消息是一个 JSON 数据数组,请将此参数设置为 true,以便逐个处理数据。否则,传入消息将作为整体进行处理。(可选,默认值:true)

--split-list-to-items=false

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

**输出:**根据传入的时间序列数据在事件管理中创建事件。这由应用负责,用户仅需配置给定的属性。

有关更多详细信息,请参见创建可创建自定义事件的流

EmailSink

应用名称: EmailSink **应用类型:**sink

属性 类型 描述 示例

message-category-id

String

指通知服务对象,其中定义了收件人、主题、内容模板和电子邮件的其它详细信息。

--message-category-id=1254

body

String(JSON)

定义用于填充电子邮件模板占位符的内容。通知基于键值对替换占位符。此属性是必需的,且必须是 JSON 格式。

--body='{"testKey":"testValue"}'

**输入:**流的前一个元素的输出。例如,JSON 数组格式的时间序列数据。

**输出:**发送给预设收件人的电子邮件。

有关更多详细信息,请参见创建可根据条件过滤消息的流

还有问题?

向社区提问


除非另行声明,该网站内容遵循MindSphere开发许可协议.


Last update: September 23, 2019