3.4 算子
算子在Flink中叫作StreamOperator。StreamOperator是流计算的算子。Flink作业运行时由Task组成一个Dataflow,每个Task中包含一个或者多个算子,1个算子就是1个计算步骤,具体的计算由算子中包装的Function来执行。除了业务逻辑的执行算子外,还提供了生命周期的管理。
Flink的DataStream和DataSet有两套不同的算子体系,未来发展重点是以算子来取代DataSet的算子,实现流批算子的统一,所以在本书中如果不特指,算子指的就是流算子,批处理的算子未来会被流算子取代。
阿里巴巴自收购了Flink的商业公司以来,逐渐将Blink的改进优化合并到Flink中。Blink合并进来之后对Table模块做了比较大的重构,引入了flink-table-runtime-blink模块,该模块中实现了一批新的算子。以流计算为基础,Blink SQL中实现了流批算子的统一。
本节介绍如下所示的两套算子。
(1)Flink DataStream和Flink SQL的算子体系
在flink-streaming-java中的算子实现的算子体系,用来支撑原有的DataStream API编程和旧的Flink SQL。
(2)Blink SQL算子体系
在flink-table-runtime-blink中的流批通用的算子体系,用来支撑Blink SQL,Blink SQL中同时也使用了Flink-streaming-java和CEP模块的一些算子。
3.4.1 算子行为
所有的算子都包含了生命周期管理、状态与容错管理、数据处理3个方面的关键行为。
1.生命周期管理
所有的算子都有共同的生命周期管理,其核心生命周期阶段如下。
1)setup:初始化环境、时间服务、注册监控等。
2)open:该行为由各个具体的算子负责实现,包含了算子的初始化逻辑,如状态初始化等。算子执行该方法之后,才会执行Function进行数据的处理。
3)close:所有的数据处理完毕之后关闭算子,此时需要确保将所有的缓存数据向下游发送。
4)dispose:该方法在算子生命周期的最后阶段执行,此时算子已经关闭,停止处理数据,进行资源的释放。
StreamTask作为算子的容器,负责管理算子的生命周期。
2.状态与容错管理
算子负责状态管理,提供状态存储,触发检查点的时候,保存状态快照,并且将快照异步保存到外部的分布式存储。当作业失败的时候算子负责从保存的快照中恢复状态。
3.数据处理
算子对数据的处理,不仅会进行数据记录的处理,同时也会提供对Watermark和LatencyMarker的处理。算子按照单流输入和双流输入,定义了不同的行为接口。
(1)OneInputStreamOperator
OneInputStreamOperator是单输入算子,对输入流提供了3个关键处理接口,分别对数据、Watermark、LatencyMarker进行处理,如代码清单3-2所示。
代码清单3-2 OneInputStreamOperator接口
(2)TowInputStreamOperator
TwoInputStreamOperator是双流输入算子,对于两个输入流提供了6个关键处理接口:
1)数据处理processElement1、processElement2接口,分别对应上游两个输入流的数据记录。
2)Watermark处理processWatermark1、processWatermark2接口分别对应上游两个输入流的Watermark。
3)LatencyMark处理processLatencyMarker1、processLatencyMarker2接口分别对应上游两个输入流的LatencyMarker,如代码清单3-3所示。
代码清单3-3 TwoInputStreamOperator接口
(3)算子融合优化策略
算子中还定义了OperatorChain的策略,具体规则在JobGraph优化相关章节中介绍。
3.4.2 Flink算子
Flink流计算算子体系最开始是围绕DataStream API设计的,同时也是Flink SQL流计算的底层执行框架,其整个体系如图3-16和图3-17所示。
在设计的时候分为两条线:
1)数据处理。主要是OneInputStreamOperator、TwoInputStreamOperator接口。
2)生命周期、状态与容错。主要是AbstractStreamOperator抽象类及其子实现类。AbstractUdfStreamOperator是主要的实现类,目前所有的算子都继承自此类。SourceReaderOperator是抽象类,目前还没有实现类,其在Flip-27中引入,未来要做Source接口的重构,实现流和批Source在实现层面上的统一。
DataStream的算子跟DataStrea API几乎是对应的,从行为上来说分为4类:
(1)单流输入算子
该类型算子只接收上游1个数据流作为输入,一般的算子都属此类型。
此类算子包含StreamProject、StreamFilter、StreamMap、StreamFlatMap、StreamSink、StreamGroupedReduce、StreamGroupedFold、KeyedProcessOperator和ProcessOperator。
从算子的名称就能看其与DataStream API中接口的对应关系。
(2)双流输入算子
该算子与TwoInputTransformation对应,接收上游2个不同的DataStream作为输入,如CoGroup、Join类操作。
此类算子包含CoStreamMap、CoStreamFlatMap、CoProcessOperator、KeyedCoProcessOperator、IntervalJoinOperator、CoBroadcastWithKeyedOperator和CoBroadcastWithNonKeyedOperator。
从算子的名称就能看其与DataStream API中接口的对应关系。
注意:并不存在2个以上输入流的算子。
(3)数据源算子
无输入流的算子只有一个,就是StreamSource,Source是DataStream的起点,其从数据源读取数据,向下游发送数据,只有输出流。
此类算子之所以只有StreamSource 1个实现,是因为该类算子直接从外部存储读取数据,是Dataflow的起点,并没有上游算子。
(4)异步算子AsyncWaitOperator
将在后边章节中详细介绍。
图3-16 Flink算子体系1
图3-17 Flink算子体系2
3.4.3 Blink算子
Blink Runtime中的算子是阿里巴巴Blink引入的流批统一的算子,在当前阶段用来支撑Blink的Table & SQL的运行。正因为要实现流批统一、动态代码生成等高级能力特性,所以在目前的Blink Runtime中重新实现了很多与SQL相关的算子,但是仍然使用了Flink-Streaming-java中定义的Transformation来包装这些算子。
Blink算子体系如图3-18和图3-19所示。
Blink Runtime中用到的算子大概可以分为3类:
1)Blink Runtime内置算子。
2)其他模块内置的算子,如CEP算子(CepOperator)、ProcessOperator等。
3)通过动态代码生成的算子。
Blink Runtime中的算子并不是完全的流批通用,流批算子最大的差异就是数据是否有界,所有支持批的算子都实现了BoundedOneInput接口或者BoundedMultiInput接口。这两个接口中有一个关键方法endInput用来标识数据是否是有界,如代码清单3-4所示。
代码清单3-4 endInput方法
图3-18 Blink算子体系1
1. Blink内置算子
(1)Join算子
1)HashJoin算子。Blink中普通的Join目前实现了Hash Join,有7个实现类,分别为InnerHashJoinOperator、BuildOuterHashJoinOperator、BuildLeftSemiOrAntiHashJoinOperator、ProbeOuterHashJoinOperator、FullOuterHashJoinOperator、AntiHashJoinOperator、SemiHashJoinOperator。
2)维表Join(Lookup Join)。是通过代码生成的方式实现的。
(2)Temporal Join
在ANSI-SQL 2011中提出了Temporal的概念,Oracle、SQLServer、DB2等大型数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点的所有数据改动,Temporal Join就是按照时间找到该事件点对应的数据进行维表JOIN的。
Flink实现了两种算子:TemporalProcessTimeJoinOperator基于处理时间的Temporal Join、TemporalRowTimeJoinOperator基于事件时间的Temporal Join。
(3)Sort算子
排序算子,分为流批两种,排序是一种全局操作,所以对于流批而言,无法共用相同的算子实现。
流上算子包括RowTimeSortOperator和ProcTimeSortOperator,分别是在事件时间和处理时间上进行排序的算子,支持时间+其他排序字段。
图3-19 Blink算子体系2
(4)OverWindow算子
OverWindow运算的算子,包含两种实现:
1)BufferDataOverWindowOperator:Over开窗运算经常需要用当前数据跟之前N条数据一起计算,所以需要采用将之前的数据缓存起来的方式,在内存不足的情况下会自动溢出到磁盘。
2)NonBufferOverWindowOperator:该算子应用于rank等不需要跟之前N条数据一起计算的开窗运算,无须缓存数据,可以提高计算效率。
(5)Window算子
流上窗口算子有两个:
1)AggregateWindowOperator:使用普通聚合函数(UDAF)的窗口算子。
2)TableAggregateWindowOperator:使用表聚合函数(UDTAF)的窗口算子。
SQL上的UDF介绍参见后边的Flink SQL章节。
(6)Watermark算子
Watermark算子用在流上负责生成Watermark,有3个实现:
1)WatermarkAssignerOperator:从数据元素中提取时间戳,周期性地生成Watermark。
2)RowTimeMiniBatchAssignerOperator:用在mini-batch模式下,依赖上游的Watermark,基于事件时间周期性地生成Watermark。
3)ProcTimeMiniBatchAssignerOperator:用在mini-batch模式下,基于处理时间周期性地生成Watermark,不依赖上游。
(7)Mini-batch算子
Mini-batch算子用微批来提升计算效率,提高吞吐量。使用Java的Map来缓存数据,Map的Key与State的Key保持一致,在进行聚合运算的时候可以批量操作,避免每一条数据都访问State。有2个实现:
1)MapBundleOperator:应用于未按照Key分组的数据流。
2)KeyedMapBundleOperator:应用于按照Key分组后的数据流,即KeyedStream。
2.批上算子
1)SortOperator:实现批上的全局数据排序。
2)SortLimitOperator:实现批上的带有Limit的排序。
3)LimitOperator:实现批上的limit语义。
4)RankOperator:实现批上的Top N语义。
3.其他模块的算子
(1)通用算子
对于StreamMap、StreamFlatMap等比较通用的算子,Blink直接使用了Flink DataStream体系中定义的算子,并没有重复实现。
(2)ProcessOperator & KeyedProcessOperator
ProcessOperator和KeyedProcessOperator执行比较底层的ProcessFunction,Join、LookupJoin、Window Join、去重、聚合、Limit、Rank、Sort等运算中都使用到了该算子。
(3)CEPOperator
SQL2016标准中加入了MatchRecognize语义,即SQL CEP复杂事件处理,其底层对应的就是CEPOperator。在Blink Runtime中,受限于Calcite,对MatchRecognize语义提供了基本的支持,没有支持完整的MatchRecognize语义。
4.代码生成算子
对于像SQL中的Filter、Project、Correlate等这一类比较简单的运算,可以通过代码动态生成算子。关于代码生成的内容,请参考Table & SQL中Blink Planner的代码生成章节。
3.4.4 异步算子
异步算子的目的是解决与外部系统交互时网络延迟所导致的系统瓶颈问题。
流计算系统中经常需要与外部系统进行交互,如需要查询外部数据库以关联上用户的额外信息。通常的实现方式是向数据库发送用户a的查询请求(如在MapFunction中),然后等待结果返回,在这之前无法发送用户b的查询请求。这是一种同步访问的模式,如图3-20左边所示。
图3-20中没有字母的空白长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回,就处理哪个回复,从而使连续的请求之间不需要阻塞等待,如图3-20右边所示。这也正是Async I/O的实现原理。
图3-20 同步IO与异步IO
既然是异步请求,那么就存在后调用的请求先返回的情况,所以为了更好地适应实际场景,Flink在异步算子中提供了两种输出模式。
(1)顺序输出模式
先收到的数据元素先输出,后续数据元素的异步函数调用无论是否先完成,都需要等待。顺序输出模式可以保证消息不乱序,但是可能增加延迟、降低算子的吞吐量,其原理如图3-21所示。
图3-21 异步算子顺序输出
数据进入算子,对每一条数据元素调用异步函数(AsyncFunction,后边会介绍),并封装为ResultFuture放入到队列中,如R1表示第1条数据元素异步调用的ResultFuture,如果是Watermark,也会放入到队列中(图中是W1)。输出时则严格按照R1、R2、R3、W1、R4的顺序输出给下游,如R3比R1先完成了,也需要等待R1返回结果,R1先输出。注意,W1不允许提前输出,即必须等待R1、R2、R3首先输出。
(2)无序输出模式
简单来讲,就是先处理完的数据元素先输出,不保证消息顺序,相比顺序模式,无序输出模式算子延迟低、吞吐量更高,其原理如图3-22所示。
图3-22 异步算子无序输出
数据进入算子,对每一条数据元素调用异步函数(AsyncFunction),并封装为ResultFuture放入到队列中,如R1表示第1条数据元素异步调用的ResultFuture,如果是Watermark也会放入到队列中(图中是W1),当异步函数返回结果时,放入已完成队列,按照顺序输出给下游。
无序输出模式并不是完全的无序,仍然要保持Watermark不能超越其前面数据元素的原则。等待完成队列中将按照Watermark切分成组,组内可以无序输出,组之间必须严格保证顺序。
如图3-22所示,R1、R2、R3属于一个组,R4、R5属于一组。在异步顺序输出时严格按照[R1、R2、R3]、W1、[R4、R5]的顺序输出给下游,其中R1、R2、R3可以无序输出。图中R2先于R1完成,被放入已完成队列,W1则需要等待[R1、R2、R3]都进入完成队列之后,才能进入已完成队列,同样[R4、R5]必须等待W1进入已完成队列,才能进入已完成队列。
在实际场景中,可以根据业务需要选择输出模式。
异步算子同时也支持对异步函数调用的超时处理,支持完整的容错特性。