flink-iterations-iterate-operator
Step函数在每一轮迭代中都会被执行,它可以是由map、reduce、join等Operator组成的数据流。下面通过官网给出的一个例子来说明Iterate Operator,非常简单直观,如下图所示:
flink-iterations-iterate-operator-example
上面迭代过程中,输入数据为1到5的数字,Step函数就是一个简单的map函数,会对每个输入的数字进行加1处理,而Next Partial Solution对应于经过map函数处理后的结果,比如第一轮迭代,对输入的数字1加1后结果为2,对输入的数字2加1后结果为3,直到对输入数字5加1后结果为变为6,这些新生成结果数字2~6会作为第二轮迭代的输入。迭代终止条件为进行10轮迭代,则最终的结果为11~15。
Delta Iterate
Delta Iterate Operator实现了增量迭代,它的实现原理如下图所示:
flink-iterations-delta-iterate-operator
基于Delta Iterate Operator实现增量迭代,它有2个输入,其中一个是初始Workset,表示输入待处理的增量Stream数据,另一个是初始Solution Set,它是经过Stream方向上Operator处理过的结果。第一轮迭代会将Step函数作用在初始Workset上,得到的计算结果Workset作为下一轮迭代的输入,同时还要增量更新初始Solution Set。如果反复迭代知道满足迭代终止条件,最后会根据Solution Set的结果,输出最终迭代结果。
比如,我们现在已知一个Solution集合中保存的是,已有的商品分类大类中购买量最多的商品,而Workset输入的是来自线上实时交易中最新达成购买的商品的人数,经过计算会生成新的商品分类大类中商品购买量最多的结果,如果某些大类中商品购买量突然增长,它需要更新Solution Set中的结果(原来购买量最多的商品,经过增量迭代计算,可能已经不是最多),最后会输出最终商品分类大类中购买量最多的商品结果集合。更详细的例子,可以参考官网给出的“Propagate Minimum in Graph”,这里不再累述。
Backpressure监控
Backpressure在流式计算系统中会比较受到关注,因为在一个Stream上进行处理的多个Operator之间,它们处理速度和方式可能非常不同,所以就存在上游Operator如果处理速度过快,下游Operator处可能机会堆积Stream记录,严重会造成处理延迟或下游Operator负载过重而崩溃(有些系统可能会丢失数据)。因此,对下游Operator处理速度跟不上的情况,如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。
Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现,具体实现方式如下图所示:
flink-back-pressure-sampling
JobManager会反复调用一个Job的Task运行所在线程的Thread.getStackTrace(),默认情况下,JobManager会每间隔50ms触发对一个Job的每个Task依次进行100次堆栈跟踪调用,根据调用调用结果来确定Backpressure,Flink是通过计算得到一个比值(Radio)来确定当前运行Job的Backpressure状态。在Web界面上可以看到这个Radio值,它表示在一个内部方法调用中阻塞(Stuck)的堆栈跟踪次数,例如,radio=0.01,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
另外,Flink还提供了3个参数来配置Backpressure监控行为:
参数名称 默认值 说明
jobmanager.web.backpressure.refresh-interval 60000 默认1分钟,表示采样统计结果刷新时间间隔
jobmanager.web.backpressure.num-samples 100 评估Backpressure状态,所使用的堆栈跟踪调用次数