2.5.2 WindowedStream的设计与实现
和其他DataStream操作相比,WindowedStream转换操作相对复杂一些,在本节我们结合前面学习的内容,继续了解WindowedStream转换操作的实现。
我们知道,如果将DataStream根据Key进行分组,生成KeyedStream数据集,然后在KeyedStream上执行window()转换操作,就会生成WindowedStream数据集。如果直接调用DataStream.windowAll()方法进行转换,就会生成AllWindowedStream数据集。WindowedStream和AllWindowedStream的主要区别在于是否按照Key进行分区处理,这里我们以WindowedStream为例讲解窗口转换操作的具体实现。
1.WindowAssigner设计与实现
如代码清单2-39所示,当用户调用KeyedStream.window()方法时,会创建WindowedStream转换操作。通过window()方法可以看出,此时需要传递WindowAssigner作为窗口数据元素的分配器,通过WindowAssigner组件,可以根据指定的窗口类型将数据元素分配到指定的窗口中。
代码清单2-39 KeyedStream.window()方法定义
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner); }
接下来我们看WindowAssigner的具体实现。如图2-17所示,WindowAssigner作为抽象类,其子类实现是非常多的,例如基于事件时间实现的SlidingEventTimeWindows、基于处理时间实现的TumblingProcessingTimeWindows等。这些WindowAssigner根据窗口类型进行区分,且属于DataStream API中内置的窗口分配器,用户可以直接调用它们创建不同类型的窗口转换。
从图2-17中可以看出,SessionWindow类型的窗口比较特殊,在WindowAssigner的基础上又实现了MergingWindowAssigner抽象类,在MergingWindowAssigner抽象类中定义了MergeCallback接口。这样做的原因是SessionWindow的窗口长度不固定,SessionWindow窗口的长度取决于指定时间范围内是否有数据元素接入,然后动态地将接入数据切分成独立的窗口,最后完成窗口计算。此时涉及对窗口中的元素进行动态Merge操作,这里主要借助MergingWindowAssigner提供的mergeWindows()方法来实现。
图2-17 WindowAssigner UML关系图
在WindowAssigner中通过提供WindowAssignerContext上下文获取CurrentProcessingTime等时间信息。在WindowAssigner抽象类中提供了以下方法供子类选择。
·assignWindows():定义将数据元素分配到对应窗口的逻辑。
·getDefaultTrigger():获取默认的Trigger,也就是默认窗口触发器,例如EventTimeTrigger。
·getWindowSerializer():获取WindowSerializer实现,默认为TimeWindow.Serializer()。
·isEventTime():判断是否为基于EventTime时间类型实现的窗口。
如代码清单2-40所示,我们以SlidingEventTimeWindows为例进行说明。
代码清单2-40 SlidingEventTimeWindows.assignWindows()方法定义
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
在SlidingEventTimeWindows.assignWindows()方法中可以看出,assignWindows()方法的参数包含了当前数据元素element、timestamp和WindowAssignerContext的上下文信息,且方法主要包含如下逻辑。
·判断timestamp是否有效,然后根据窗口长度和滑动时间计算数据元素所属窗口的数量,再根据窗口数量创建窗口列表。
·调用TimeWindow.getWindowStartWithOffset()方法,确定窗口列表中最晚的窗口对应的WindowStart时间,并赋值给lastStart变量;然后从lastStart开始遍历,每次向前移动固定的slide长度;最后向windows窗口列表中添加创建的TimeWindow,在TimeWindow中需要指定窗口的起始时间和结束时间。
·返回创建的窗口列表windows,也就是当前数据元素所属的窗口列表。
创建的WindowAssigner实例会在WindowOperator中使用,输入一条数据元素时会调用WindowAssigner.assignWindows()方法为接入的数据元素分配窗口,WindowOperator会根据元素所属的窗口分别对数据元素进行处理。
当然还有其他类型的WindowAssigner实现,基本功能都是一样的,主要是根据输入的元素确定和分配窗口。对于SlidingWindow类型的窗口来讲,同一个数据元素可能属于多个窗口,主要取决于窗口大小和滑动时间长度;而对于TumpleWindow类型来讲,每个数据元素仅属于一个窗口。
2.Window Trigger的核心实现
Window Trigger决定了窗口触发WindowFunction计算的时机,当接入的数据元素通过WindowAssigner分配到不同的窗口后,数据元素会被不断地累积在窗口状态中。当满足窗口触发条件时,会取出当前窗口中的所有数据元素,基于指定的WindowFunction对窗口中的数据元素进行运算,最后产生窗口计算结果并发送到下游的算子中。
如图2-18所示,在DataStream API中,所有定义的Window Trigger继承自Trigger基本实现类。每种窗口的触发策略不同,相应的Trigger触发器也有所不同。例如TumblingProcessingTimeWindows对应的默认Trigger为ProcessingTimeTrigger,而SlidingEventTimeWindows默认对应的是EventTimeTrigger。
数据元素接入WindowOperator后,调用窗口触发器的onElement()方法,判断窗口是否满足触发条件。如果满足,则触发窗口计算操作。我们以EventTimeTrigger为例介绍Trigger的核心实现,如代码清单2-41所示。
图2-18 Trigger UML关系图
·当数据元素接入后,根据窗口中maxTimestamp是否大于当前算子中的Watermark决定是否触发窗口计算。如果符合触发条件,则返回TriggerResult.FIRE事件,这里的maxTimestamp实际上是窗口的结束时间减1,属于该窗口的最大时间戳。
·如果不满足以上条件,就会继续向TriggerContext中注册Timer定时器,等待指定的时间再通过定时器触发窗口计算,此时方法会返回TriggerResult.CONTINUE消息给WindowOperator,表示此时窗口不会触发计算,继续等待新的数据接入。
·当数据元素不断接入WindowOperator,不断更新Watermark时,只要Watermark大于窗口的右边界就会触发相应的窗口计算。
代码清单2-41 EventTimeTrigger.onElement()方法定义
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // 如果Watermark超过窗口最大时间戳,则立即执行 return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } }
在EventTimeTrigger.onElement()方法定义中我们可以看到,当窗口不满足触发条件时,会向TriggerContext中注册EventTimeTimer定时器,指定的触发时间为窗口中的最大时间戳。算子中的Watermark到达该时间戳时,会自动触发窗口计算,不需要等待新的数据元素接入。这里TriggerContext使用到的TimerService实际上就是我们在2.4.2节介绍过的InternalTimerService,EventTimeTimer会基于InternalTimerService的实现类进行存储和管理。
当Timer定时器到达maxTimestamp时就会调用EventTimeTrigger.onEventTime()方法。如代码清单2-42所示,在EventTimeTrigger.onEventTime()方法中,实际上会判断传入的事件时间和窗口的maxTimestamp是否相等,如果相等则返回TriggerResult.FIRE并触发窗口的统计计算。
代码清单2-42 EventTimeTrigger.onEventTime()方法定义
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; }
对于其他类型的窗口触发器,在原理上和EventTimeTrigger基本相同,感兴趣的读者可以阅读相关代码实现。
3.WindowFunction的设计与实现
经过以上几个步骤,基本上就能够确认窗口的类型及相应的触发时机了。窗口符合触发条件之后,就会对窗口中已经积蓄的数据元素进行统计计算,以得到最终的统计结果。对窗口元素的计算逻辑定义则主要通过窗口函数来实现。
在WindowStream的计算中,将窗口函数分为两种类型:用户指定的聚合函数AggregateFunction和专门用于窗口计算的WindowFunction。对于大部分用户来讲,基本都是基于窗口做聚合类型的统计运算,因此只需要在WindowStream中指定相应的聚合函数,如ReduceFunction和AggregateFunction。而在WindowStream的计算过程中,实际上会通过WindowFunction完成更加复杂的窗口计算。
如图2-19所示,WindowFunction继承了Function接口,同时又被不同类型的聚合函数实现,例如实现窗口关联计算的CoGroupWindowFunction、在窗口中对元素进行Reduce操作的ReduceApplyWindowFunction。这些函数同时继承自WrappingFunction,WrappingFunction对WindowFunction进行了一层封装,主要通过继承AbstractRichFunction抽象类,拓展和实现了RichFunction提供的能力。
图2-19 WindowFunction UML类图
总而言之,窗口中的函数会将用户定义的聚合函数和WindowFunction进行整合,形成统一的RichWindowFunction,然后基于RichWindowFunction进行后续的操作。
如代码清单2-43所示,用户创建WindowStream后,将ReduceFunction传递给WindowStream.reduce()方法。在WindowStream.reduce()方法中可以看出,还需要将WindowFunction作为参数,但这里的WindowFunction会在WindowStream中创建PassThroughWindowFunction默认实现类。
代码清单2-43 WindowStream.reduce()方法定义
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction."); } //清理函数闭包 function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function); KeySelector<T, K> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; if (evictor != null) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input. getType().createSerializer(getExecutionEnvironment().getConfig())); ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment(). getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment(). getConfig()), stateDesc, new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction <>(reduceFunction, function)), trigger, evictor, allowedLateness, lateDataOutputTag); } else { ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor <>("window-contents", reduceFunction, input.getType().createSerializer(getExecutionEnvironment(). getConfig())); operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment(). getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment(). getConfig()), stateDesc, new InternalSingleValueWindowFunction<>(function), trigger, allowedLateness, lateDataOutputTag); } return input.transform(opName, resultType, operator); }
最后实际上就是创建OneInputStreamOperator实例,StreamOperator会根据evictor数据剔除器是否为空,选择创建EvictingWindowOperator还是WindowOperator。在创建EvictingWindowOperator时,通过调用new ReduceApplyWindowFunction <?> (reduceFunction, function)合并ReduceFunction和WindowFunction,然后转换为InternalIterableWindowFunction函数供WindowOperator使用。接下来调用input.transform()方法将创建好的EvictingWindowOperator或WindowOperator实例添加到OneInputTransformation转换操作中。其他的窗口计算函数和Reduce聚合函数基本一致,这里不再赘述。