2.2.1 StreamOperator接口实现
接下来我们深入了解StreamOperator接口的定义。如图2-4所示,StreamOperator接口实现的方法主要供Task调用和执行。
图2-4 StreamOperator接口
如图2-4所示,StreamOperator接口主要包括如下核心方法。
·open():定义当前Operator的初始化方法,在数据元素正式接入Operator运算之前,Task会调用StreamOperator.open()方法对该算子进行初始化,具体open()方法的定义由子类实现,常见的用法如调用RichFunction中的open()方法创建相应的状态变量。
·close():当所有的数据元素都添加到当前Operator时,就会调用该方法刷新所有剩余的缓冲数据,保证算子中所有数据被正确处理。
·dispose():算子生命周期结束时会调用此方法,包括算子操作执行成功、失败或者取消时。
·prepareSnapshotPreBarrier():在StreamOperator正式执行checkpoint操作之前会调用该方法,目前仅在MapBundleOperator算子中使用该方法。
·snapshotState():当SubTask执行checkpoint操作时会调用该方法,用于触发该Operator中状态数据的快照操作。
·initializeState():当算子启动或重启时,调用该方法初始化状态数据,当恢复作业任务时,算子会从检查点(checkpoint)持久化的数据中恢复状态数据。
1.AbstractStreamOperator的基本实现
AbstractStreamOperator作为StreamOperator的基本实现类,所有的Operator都会继承和实现该抽象实现类。在AbstractStreamOperator中定义了Operator用到的基础方法和成员信息。如图2-5所示,我们重点梳理AbstractStreamOperator的主要成员变量和方法。
AbstractStreamOperator包含的主要成员变量如下。
·ChainingStrategy chainingStrategy:用于指定Operator的上下游算子链接策略,其中ChainStrategy可以是ALWAYS、NEVER或HEAD类型,该参数实际上就是转换过程中配置的链接策略。
·StreamTask<?, ?> container:表示当前Operator所属的StreamTask,最终会通过StreamTask中的invoke()方法执行当前StreamTask中的所有Operator。
图2-5 AbstractStreamOperator UML关系图
·StreamConfig config:存储了该StreamOperator的配置信息,实际上是对Configuration参数进行了封装。
·Output<StreamRecord<OUT>> output:定义了当前StreamOperator的输出操作,执行完该算子的所有转换操作后,会通过Output组件将数据推送到下游算子继续执行。
·StreamingRuntimeContext runtimeContext:主要定义了UDF执行过程中的上下文信息,例如获取累加器、状态数据。
·KeySelector<?, ?> stateKeySelector1:只有DataStream经过keyBy()转换操作生成KeyedStream后,才会设定该算子的stateKeySelector1变量信息。
·KeySelector<?, ?> stateKeySelector2:只在执行两个KeyedStream关联操作时使用,例如Join操作,在AbstractStreamOperator中会保存stateKeySelector2的信息。
·AbstractKeyedStateBackend<?> keyedStateBackend:用于存储KeyedState的状态管理后端,默认为HeapKeyedStateBackend。如果配置RocksDB作为状态存储后端,则此处为RocksDBKeyedStateBackend。
·DefaultKeyedStateStore keyedStateStore:主要提供KeyedState的状态存储服务,实际上是对KeyedStateBackend进行封装并提供了不同类型的KeyedState获取方法,例如通过getReducingState(ReducingStateDescriptor stateProperties)方法获取ReducingState。
·OperatorStateBackend operatorStateBackend:和keyedStateBackend相似,主要提供OperatorState对应的状态后端存储,默认OperatorStateBackend只有DefaultOperatorStateBackend实现。
·OperatorMetricGroup metrics:用于记录当前算子层面的监控指标,包括numRecordsIn、numRecordsOut、numRecordsInRate、numRecordsOutRate等。
·LatencyStats latencyStats:用于采集和汇报当前Operator的延时状况。
·ProcessingTimeService processingTimeService:基于ProcessingTime的时间服务,实现ProcessingTime时间域操作,例如获取当前ProcessingTime,然后创建定时器回调等。
·InternalTimeServiceManager<?>timeServiceManager:Flink内部时间服务,和processingTimeService相似,但支持基于事件时间的时间域处理数据,还可以同时注册基于事件时间和处理时间的定时器,例如在窗口、CEP等高级类型的算子中,会在ProcessFunction中通过timeServiceManager注册Timer定时器,当事件时间或处理时间到达指定时间后执行Timer定时器,以实现复杂的函数计算。
·long combinedWatermark:在双输入类型的算子中,如果基于事件时间处理乱序事件,会在AbstractStreamOperator中合并输入的Watermark,选择最小的Watermark作为合并后的指标,并存储在combinedWatermark变量中。
·long input1Watermark:二元输入算子中input1对应的Watermark大小。
·long input2Watermark:二元输入算子中input2对应的Watermark大小。
AbstractStreamOperator除了定义主要的成员变量之外,还定义了子类实现的基本抽象方法。
·processLatencyMarker():用于处理在SourceOperator中产生的LatencyMarker信息。在当前Operator中会计算事件和LatencyMarker之间的差值,用于评估当前算子的延时程度。
·processWatermark():用于处理接入的Watermark时间戳信息,并用最新的Watermark更新当前算子内部的时钟。
·getInternalTimerService():提供子类获取InternalTimerService的方法,以实现不同类型的Timer注册操作。
2.AbstractUdfStreamOperator基本实现
当StreamOperator涉及自定义用户函数数据转换处理时,对应的Operator会继承AbstractUdfStreamOperator抽象实现类,常见的有StreamMap、CoProcessOperator等算子。当然,并不是所有的Operator都继承自AbstractUdfStreamOperator。在Flink Table API模块实现的算子中,都会直接继承和实现AbstractStreamOperator抽象实现类。另外,有状态查询的AbstractQueryableStateOperator也不需要使用用户自定义函数处理数据。
AbstractUdfStreamOperator继承自AbstractStreamOperator抽象类,对于AbstractUdfStreamOperator抽象类来讲,最重要的拓展就是增加了成员变量userFunction,且提供了userFunction初始化以及状态持久化的抽象方法。下面我们简单介绍AbstractUdfStreamOperator提供的主要方法。
如代码清单2-4所示,在AbstractUdfStreamOperator.setup()方法中会调用FunctionUtils为userFunction设定RuntimeContext变量。此时userFunction能够获取RuntimeContext变量,然后实现获取状态数据等操作。
代码清单2-4 AbstractUdfStreamOperator.setup()方法定义
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); }
如代码清单2-5所示,在AbstractUdfStreamOperator.snapshotState()方法中调用了StreamingFunctionUtils.snapshotFunctionState()方法,以实现对userFunction中的状态进行快照操作。
代码清单2-5 AbstractUdfStreamOperator.snapshotState()方法
public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction); }
如代码清单2-6所示,在initializeState()方法中调用StreamingFunctionUtils.restoreFunctionState()方法初始化userFunction的状态值。
代码清单2-6 AbstractUdfStreamOperator.initializeState()方法定义
public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); StreamingFunctionUtils.restoreFunctionState(context, userFunction); }
如代码清单2-7所示,在AbstractUdfStreamOperator.open()方法中调用了FunctionUtils.openFunction()方法。当用户自定义并实现RichFunction时,FunctionUtils.openFunction()方法会调用RichFunction.open()方法,完成用户自定义状态的创建和初始化。
代码清单2-7 AbstractUdfStreamOperator.open()方法定义
public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); }
可以看出,当用户自定义实现Function时,在AbstractUdfStreamOperator抽象类中提供了对这些Function的初始化操作,也就实现了Operator和Function之间的关联。Operator也是Function的载体,具体数据处理操作借助Operator中的Function进行。StreamOperator提供了执行Function的环境,包括状态数据管理和处理Watermark、LatencyMarker等信息。