Skip to content

Data Flow Engine

构想

Data Flow Engine (DFE) 使客户能够基于平台事件和提取的数据创建工作流。例如,这些工作流可用于在第三方应用中自动执行 KPI 计算。

访问

要访问此服务,您需要具有 Data Flow Engine 角色和范围中列出的相应角色。

基础知识

使用 Data Flow Engine 创建的工作流称为流。它们可用于过滤和处理提取的数据、对事件做出响应、计算 KPI、聚合数据、修改数据模型、将数据写入平台数据湖以及发送通知或创建平台事件。DFE 流由 DFE 提供的一系列不同应用组成。可以对这些应用进行组合和配置,以通过使用和处理海量数据来进行实时计算。应用的数量仍在不断增加,未来还可扩展自定义应用。

DFE 基于 Spring Cloud Data Flow 架构实现。请参见 Spring Cloud Data Flow 文档来了解引擎背后的构想。不同术语可能会具有类似的概念,例如:来源、处理器和 sinks 也被称为触发器、规则和动作。

DFE 流

工作流在 Data Flow Engine 中定义为可配置应用之间基于消息的流。在此情况下,应区分三种类型的应用:

工作流包含一个来源应用和一个 sink 应用。两者之间还可包含一个或多个处理器。使用多个处理器时,必须按照合理顺序进行排列。使用这些组件的 DFE 流模型可概括为以下形式:

Schematic diagram of a DFE stream

下面列出的应用均已提供,但可以根据一般用途或自定义需求增强 DFE。

来源

来源应用用于侦听不同的来源实体或事件。这种类型的应用只能位于流的起始端。来源可以提供过滤选项以减少馈送到流中的消息数量。Data Flow Engine 有两种类型的可用来源:

  • TimeSeriesSource
    TimeSeriesSource 监视时间序列上传并触发流。这是时间序列数据进入 DFE 流的入口点。
  • TimerSource
    TimerSource 根据固定延迟、日期或 cron 表达式发送时间戳消息。

处理器

处理器应用可以置于 DFE 流内。它们可以中断流并抑制数据流,但不会修改数据 (JSON)。Data Flow Engine 提供以下处理器 - 有关更多详细信息,请参见处理器

  • ChangeDetector
    ChangeDetector 可过滤掉与上一条消息具有相同指定 variable 的消息。
  • FilterProcessor
    FilterProcessor 可过滤掉未满足指定表达式的消息。
  • MessageSampler
    MessageSampler 限制在给定时间段内来自同一设备的最大消息数,以防止下游流量过载。
  • Hysteresis
    Hysteresis 处理器会在事件发生后引入一小段闲置空间,以防止由于在阈值周围振荡而导致消息泛滥。
  • Marker
    Marker 可在传入消息转发之前为其添加标签。
  • Sequence
    Sequence 处理器可过滤掉与指定模式不匹配的并行流消息。

Sinks

Sinks 是 DFE 流的末端应用。它们只有输入并执行流的最后一步。最后一步可以是平台内的通知或事件。Data Flow Engine 有两种类型的 sinks - 有关更多详细信息,请参见 Sinks

  • EventSink
    当 EventSink 收到消息时,它会根据配置的参数创建一个事件,并将其转发到 Event Management Service
  • EmailSink
    EmailSink 在收到来自流的消息时会发送电子邮件。

数据流服务器

数据流服务器 (DFS) 充当 Data Flow Engine 的管理服务,可提供列示和修改流及其组件的功能。要部署新的 DFE 流,用户必须调用数据流服务器上的 API 端点。

功能

使用 Data Flow Engine API 实现以下任务:

  • 创建或删除流
  • 获取现有流的列表或按名称查找具体流
  • 部署或取消部署流
  • 获取运行系统应用列表或按名称查找具体应用
  • 获取特定的应用实例

Data Flow Engine 能够:

  • 订阅 MindSphere 的时间序列数据流
  • 在触发特定流时发送事件
  • 过滤包含不相关信息的消息

限制

  • MindSphere 可以存储任意数量的流定义,但将根据购买的计划限制同时部署的流数量。
  • 在Europe 1,FIFO 队列不可用,数据流服务器无法保证消息的有序性。
  • 由于消息具有异步性,消息的处理时间是不确定的,具体取决于系统负载。
  • 流名称不得超过 15 个字符。
  • 删除 asset 与删除相关流之间的延迟最多为 30 分钟。

示例场景

客户想要监控工厂中的发动机。该发动机是制造过程的关键部分,并且曾发生过热情况(简单场景)。根据该发动机在运行期间的共振程度可以得出有关该发动机状况的结论(扩展场景)。

简单场景

客户使用 TimeSeriesSource、FilterProcessor 和 EventSink 创建 Data Flow Engine 流。TimeSeriesSource 对放置在发动机上的热传感器进行侦听。FilterProcessor 设置为仅转发温度高于允许阈值的消息。EventSink 选择一个内置事件,而不是自定义事件。 sink 暂时处于休眠状态,因为过滤器在温度达到阈值之前不会转发消息。一旦温度达到阈值,客户就会立即收到有关发动机过热的通知。

扩展场景

同一客户希望监控发动机的共振情况。客户设置了 TimeSeriesSource 和 EventSink 以监控温度,但在二者之间插入了 FilterProcessor 和 MessageSamplerFilter。FilterProcessor 设置为仅在共振高于阈值(但仍低于发动机生命周期结束时的预期值)时转发消息。MessageSamplerFilter 设置为每天仅转发一次消息,以便客户每天都能得到提醒,但客户可以等待来自发动机制造商的有利报价,同时避免大量通知的频繁干扰。

相关链接

还有问题?

向社区提问


除非另行声明,该网站内容遵循MindSphere开发许可协议.


Last update: July 11, 2019