Skip to content

Data Flow Engine - 示例

以下示例预期会收到 TimeSeriesSource 的实体 f7012b9215b448228def9726deffcfc9 发出的 JSON 数组格式的传入消息。
如果预期收到单个时间序列数据对象,则使用以下消息:

[
 {
 "writeTime":"2018-05-21T11:27:09Z",
 "entityId": "f7012b9215b448228def9726deffcfc9",
 "propertySetName": "tire1",
 "newProperties": {
 "revolutionPerMinute":2000,
 "temperature":83,
 "tireType":"SUMMER"
 },
 "_time":"2018-05-21T11:27:03.780Z"
 }
]

如果预期收到多个时间序列数据对象,则使用以下消息:

[
 {
 "writeTime":"2018-05-21T11:27:09Z",
 "entityId": "f7012b9215b448228def9726deffcfc9",
 "propertySetName": "tire1",
 "newProperties": {
 "revolutionPerMinute":2000,
 "temperature":83,
 "tireType":"SUMMER"
 },
 "_time":"2018-05-21T11:27:03.780Z"
 },
 {
 "writeTime":"2018-05-21T11:27:19Z",
 "entityId": "f7012b9215b448228def9726deffcfc9",
 "propertySetName": "tire2",
 "newProperties": {
 "revolutionPerMinute":2000,
 "temperature":123,
 "tireType":"WINTER"
 },
 "_time":"2018-05-21T11:27:13.158Z"
 }
]

说明

当时间序列数据从 IoT 经 TimeSeriesSource 到达时,这些消息的格式完全相同。entityId 指 asset 的 ID,propertySetName 指 aspect 的名称。Aspect variables 将收集在 newProperties 下的数组中。

创建流以监视实体的时间序列数据

此示例显示如何创建流以监视 id 为 f7012b9215b448228def9726deffcfc9 的实体的时间序列数据。 每次上传时间序列数据后,流将创建一个事件。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --severity=1 --description="description"

上面定义的流在未经过滤的情况下将传入消息转发到创建事件的 EventSink。

创建可创建自定义事件的流

此示例显示如何使用传入消息中的值填充自动生成的事件的自定义字段。必须事先创建名为 CustomEventType 的所需事件类型。每次上传时间序列数据后,流将创建一个自定义事件。它将传入消息的 tireType 值写入已生成的事件的 tireSeasonType 字段。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | EventSink --entity-id-path=entityId --event-type-name="CustomEventType" --field-mapping-json='{"tireSeasonType":"newProperties.tireType"}' --severity=1 --description="description" --timestamp-path="_time"

CustomEventType 有一个字符串类型的自定义字段,名为 tireSeasonType。该字段填充传入消息的 newProperties.tireType 值。

创建可检测字段值变化的流

此示例显示如何使用 ChangeDetector 处理器创建一个流,以检测值变化。 仅当 revolutionPerMinute 字段值自上次上传时间序列后发生变化时,该流才会创建一个事件。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | ChangeDetector --differentiating-key = propertySetName,newProperties.tireType --triggering-variable = newProperties.revolutionPerMinute | EventSink --entity-id-path=entityId --severity=1 --description="description"

上面定义的流的 ChangeDetector 仅会转发 revolutionPerMinute 值有异于前条消息的消息。第一条消息不会推送,因为它将用于设定初值。
参数 differentiatingKey 有两个输入,分别为 propertySetNamenewProperties.tireType。第一个输入用于逐个处理从不同实体上传的数据。例如,假设 asset 为带有四个轮胎的汽车,并且每个轮胎都通过 variable revolutionPerMinute 注册为 tire1tire2 等 aspect。ChangeDetector 将逐个处理各个轮胎,因为它们具有不同的名称 (propertySetName)。否则,无论数据来自哪个轮胎,ChangeDetector 都会对传入的 revolutionPerMinute 数据进行比较。differentiatingKey 参数的第二个输入可帮助应用区分同一轮胎的不同类型。

创建可根据条件过滤消息的流

此示例显示如何使用 FilterProcessor 处理器创建一个流,以过滤特定值不满足给定条件的消息。仅当 revolutionPerMinute 值大于 1500 时,该流才会发送电子邮件。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.revolutionPerMinute > 1500" | EventSink --message-category-id=7070 --body'{"message":"The RPM value exceeded the 1500 value"}

上面定义的流的 FilterProcessor 将根据已定义参数 expression 的标准过滤传入消息。如果表达式为 false,则消息将被丢弃。在此示例中,FilterProcessor 继续执行消息,因为 revolutionPerMinute 的值大于 1500。

可基于多个表达式的组合进行过滤。以下流定义就是一个有效示例:

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature < 50 || newProperties.temperature > 80" | EventSink --message-category-id=7070 --body'{"message":"The temperature value left the desired interval"}

上面定义的流仅转发 temperature 值小于 50 或大于 80 的消息。

创建用于避免消息溢出的流

此示例显示了在传入时间序列数据频率过高的情况下,如何使用 MessageSampler 处理器创建一个流,以减少推送消息的数量。在预定义的时间段内只能推送一个事件。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | MessageSampler --key=propertySetName --duration="PT2M" | EventSink --entity-id-path=entityId --severity=1 --description="description"

上面定义的流的 MessageSampler 仅在参数 -duration 中定义的时间段内转发第一条消息。由 TimeSeriesSource 转发的消息将被过滤,直到该时间段结束。然后再次转发第一个传入消息并重新启动定时器。上述示例中并未提供 timestamp-pathtimestamp-format 参数,因此使用默认值。可以使用以下语法对其进行覆盖:

MessageSampler --timestamp-path="$.writeTime" --timestamp-format="yyyy-MM-dd'T'HH:mm:ss.SSSXXX" --key=entityId --duration="PT10S"

创建用于指示振荡的流

此示例显示如何使用 Hysteresis 处理器创建一个流,以指示给定值的可疑宽振荡。仅当字段 temperature 的值自上一事件发生起低于 60 度后又超过 100 度时,该流才会创建一个新事件。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | Hysteresis --key=propertySetName --message-emitting-expression="newProperties.temperature > 100" --suppression-releasing-expression="newProperties.temperature < 60" | EventSink --entity-id-path=entityId --severity=1 --description="description"

上面定义的流的 Hysteresis 处理器在 temperature 的值高于 100 度时转发第一条消息。它会抑制满足此条件的任何后续消息,直到 temperature 的值降至 60 度以下,即满足 suppression-releasing-expression。然后,如果 temperature 的值再次超过 100 度,则将转发另一条消息,如参数 message-emitting-expression 中所定义。之后循环再次开始。

例如,假设以下温度值以单独消息按给定顺序到达:108、101、51、75、120、102、57、107、103。Hysteresis 会转发第一条消息,因为它满足条件。由 Hysteresis 转发的下一条消息会包含 120 度的值,因为值为 51 度的消息此前已满足 suppression-releasing-expression。之后,循环再次开始。即使下一个值(102 度)满足了 message-emitting-expression,Hysteresis 处理器仍会处于抑制状态,因此该消息将被丢弃。按照相同的逻辑,包含值 107 的消息也将转发。

创建用于接收电子邮件消息的流

此示例显示如何在满足条件时获取电子邮件消息。仅当温度超过 60 度时,流才会发送带有自定义内容的预定义电子邮件消息。

TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.temperature > 60" | EmailSink --message-category-id=1253 --body='{"placeholderForMessageInEmailTemplate":"Attention!Engine temperature has exceeded the 60 degree limit"}'

EmailSink 使用 categoryId 中汇总的 HTML 模板、收件人列表、电子邮件详细信息等预设属性发送消息。无论传入消息的频率如何,EmailSink 都会在时间序列数据到达时发送消息。因此,建议在流中使用适当数量的处理器进行过滤。在这种情况下,所使用的 HTML 模板包含一个占位符字段 placeholderForMessageInEmailTemplate,可用 JSON 对象中 body 参数给出的值进行替换。如果模板包含多个占位符,则必须相应增加 JSON 对象的大小。

创建用于识别不同 Assets 数据模式的流

此示例显示如何使用 Sequence 处理器创建流。此场景与之前场景的主要区别在于 Sequence 处理器使用来自不同流的消息:多个并行流将其输出发布到同一 SNS 主题,再由该主题将数据转发到 Sequence 处理器。

在本示例中共有三个流,每个流使用不同的处理器处理不同 asset 的时间序列数据。每个不同的流都包含一个 Marker,用于向 header-key=color 添加一个带有唯一 header-valueorangebluegreen)的标签。SNS 主题称为 destinationTopic

  • 流 A:
    TimeSeriesSource --entities=[{"id":"f7012b9215b448228def9726deffcfc9"}] | FilterProcessor --expression="newProperties.revolutionPerMinute > 1500" | Marker --header-key=color --header-value=orange > :destinationTopic"}
  • 流 B:
    TimeSeriesSource --entities=[{"id":"7twr2b63248948336def9726demruzdw"}] | ChangeDetector --differentiating-key = propertySetName,newProperties.tireType --triggering-variable = newProperties.revolutionPerMinute | Marker --header-key=color --header-value=blue > :destinationTopic"}
  • 流 C:
    TimeSeriesSource --entities=[{"id":"g9012b9792b448336def9726deffcxf2"}] | FilterProcessor --expression="newProperties.temperature > 30" | Marker --header-key=color --header-value=green > :destinationTopic"}

destinationTopic 充当流 A、B 和 C 的 sink,并作为下面定义的流的来源。当消息发布到 destinationTopic 时会触发这些流。其中每个流都包含一个 Sequence 处理器和一个 EventSink。如果 Sequence 处理器识别出所定义的模式,则会向 EventSink 发送序列消息。这会导致 EventSink 创建一个事件。

在每个给定的示例中,第一个序列消息如下所示:

示例序列消息
[{
"filterId" : "{streamName}",
"_timestamp" : "2018.10.07T11:06:40.000Z"
"messages" : [
    {
        "id": "d654bd32-6a42-72f4-911b-dfdec811ee14",
        "headers": {
            "color": "orange",
            ...
        },
        "payload": {
            "entityId": "f7012b9215b448228def9726deffcfc9",
            ...
            "_time": "2018.10.07T11:06:10.000Z"
        }
    },
    {
        "id": "fa79e9de-105e-d014-0718-f7cc77a9d96c",
        "headers": {
            "color": "blue",
            ...
        },
        "payload": {
              "entityId": "7twr2b63248948336def9726demruzdw",
              ...
              "_time": "2018.10.07T11:06:20.000Z"
        }
    },
    {
        "id": "7466c8b0-3eef-34d2-2476-059b41bb4f9b",
        "headers": {
            "color": "green",
            ...
          },
        "payload": {
            "entityId": "g9012b9792b448336def9726deffcxf2",
            ...
            "_time": "2018.10.07T11:06:40.000Z"
        }
    }]
}]

模式 1

如果每个流至少有一个消息在 90 秒内到达,则以下流定义中的 Sequence 处理器将输出一条消息。带有 orangebluegreen 标签的消息以任意顺序到达:

:destinationTopic > Sequence --labels=[\"orange\",\"blue\",\"green\"] --conditions=[\"orange\",\"blue\",\"green\"] --within=\"PT90S\" --label-path=\"headers['color']\" | EventSink --entity-id-path=\"messages[0].payload.entityId\"

为 EventSink 定义的 entity-id-path 指向序列消息的有效载荷。下图显示了有关传入消息、Sequence 处理器跟踪的候选项(序列状态)和序列消息的示例。

Pattern1

模式 2

如果带有 orange 标签的消息首先到达,之后,其它两个流中每个流至少有一个消息在 90 秒内到达,则以下流定义中的 Sequence 处理器将输出一条消息。带有 bluegreen 标签的消息以任意顺序到达:

:destinationTopic > Sequence --labels=[\"orange\",\"blue\",\"green\"] --initial-condition=\"orange\" --conditions=[\"green\",\"blue\"] --within=\"PT90S\" --label-path=\"headers['color']\" | EventSink --entity-id-path=\"messages[0].payload.entityId\"

为 EventSink 定义的 entity-id-path 指向序列消息的有效载荷。下图显示了有关传入消息、Sequence 处理器跟踪的候选项(序列状态)和序列消息的示例。

Pattern2

模式 3

如果带有 orangebluegreen 标签的消息按此顺序到达,则以下流定义中的 Sequence 处理器将输出一条消息。第二条消息必须在第一条消息到达后 20 秒内到达,且第三条消息在第二条消息达到后 30 秒内到达:

:destinationTopic > Sequence --labels=[\"orange\",\"blue\",\"green\"] --initial-conditions={\"first\":\"orange\",\"second\":\"blue\",\"within\":\"PT20S\"},{\"first\":\"blue\",\"second\":\"green\",\"within\":\"PT30S\"} --label-path=\"headers['color']\" | EventSink --entity-id-path=\"messages[0].payload.entityId\"

为 EventSink 定义的 entity-id-path 指向序列消息的有效载荷。下图显示了有关传入消息、Sequence 处理器跟踪的候选项(序列状态)和序列消息的示例。

Pattern3

还有问题?

向社区提问


除非另行声明,该网站内容遵循MindSphere开发许可协议.


Last update: September 23, 2019