一次TOPN需求带来的历险-与Flink SQL的爱恨情仇

一次TOPN需求带来的历险-与Flink SQL的爱恨情仇,第1张

一次TOPN需求带来的历险-与Flink SQL的爱恨情仇 背景

最近接到一个需求,算是一个比较常见的需求,我们公司的业务会涉及到直播,因此直播自然避免不了高并发的房间内发消息,因此业务端给出的需求就是:实时得为业务端提供近10s内的每个房间的消息总数TOP20, 业务端会根据TOP前几进行相应的限流的一系列 *** 作。

思考

其实接到这个需求,我的思考实现方式是这样的:
1、先求出每个房间的近10s的消息总数
2、将第一步得到的每个房间的近10s的消息总数进行汇总,排序,从而得出最终结果: 近10s内,消息数TOP10的房间是哪些

可以看到上面的每一步,都对应一个时间范围:10s内, 也就是说每一步都要基于一个window进行计算,每一步我都知道,但两步如何联合起来呢,即如何两个window之间进行联合呢? 之前没有做过类似的 *** 作,于是去官方翻了翻,发现了一些奥妙:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/#consecutive-windowed-operations

这里给出了两个相同大小的窗口进行联合的方式:

DataStream input = ...;

DataStream resultsPerKey = input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());
   

简单来说,就是直接在后面接一个相同大小的窗口即可,那么后面的窗口即获取到了前一个窗口的所有数据,可是为什么这样就可以链接呢?
其实笔者最开始对这里也是不解,为什么后面接一个相同的窗口就能够获取到前一个窗口的输出呢?直到看了一下这里的源码来慢慢理解,事件时间窗口的触发是依靠watermark来推动的:

//AbstractStreamOperator中	
public void processWatermark(Watermark mark) throws Exception {	
        if (timeServiceManager != null) {	
            timeServiceManager.advanceWatermark(mark);	
        }	
        output.emitWatermark(mark);	
    }

advanceWatermark会触发满足要求的窗口,并且将窗口的结果输出,之后在才输出watermark, 在这里有一个很重要的关系:watermark是在窗口数据输出之后输出,那么下一个窗口是如何判断上一个窗口的输出应该划分在同一个窗口呢,当然是按照时间,但是窗口输出数据时间是什么呢?

//WindowOperator	
private void emitWindowContents(W window, ACC contents) throws Exception {	
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());	
        processContext.window = window;	
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);	
    }

可以看到TimestampedCollector类型的collector,设置的时间正是窗口的endTime, 也就是窗口输出数据的数据时间就是窗口的endTime, 那么同一个窗口的输出数据具有相同的数据时间endTime, 这些数据正好可以在下游窗口被分配到同一个窗口中。在上一个窗口触发之后输出watermark正好可以触发下游窗口的窗口 *** 作。

到这里为止,终于知道咋整了,可还有一个细节需要思考,第二步的排序,怎么排序比较好? 要保留全量数据吗? 可以不保留全量吗? 很显然是不需要的,可以利用一些比较合适的数据结构来完成:
很容易想到的就是Sorted的数据结构TreeSet或者是优先级队列PriorityQueue , TreeSet 实现原理是红黑树,优先队列实现原理就是最大/最小堆,这两个都可以满足需求,但是需要选择哪一个呢?红黑树的时间复杂度是logN,而堆的构造复杂度是N, 读取复杂度是1, 但是我们这里需要不断的做数据插入那么就涉及不断的构造过程,相对而言选择红黑树比较好(其实flink sql内部做topN也是选择红黑树类型的TreeMap)。

将TreeSet设置成为升序排序,那么第一个节点数据就是最小值,当TreeSet里面的数据到达N, 就获取第一个节点数据(最小值)与当前需要插入的数据进行比较,如果比其大,则直接舍弃,如果比其小,那么就将TreeSet中第一个节点数据删除,插入新的数据,最终得到的TreeSet 数据就是我们需要的topN。

实现

扯了这么多,有没有觉得我有点标题党,但是不要急朋友们,其实我虽然一顿思路想的好像很明白,但是我懒啊,而且觉得之前的计算任务都是玩的flink的代码实现,最近在挖掘flink的sql部分,顺便在思考怎么搭建数据平台,而且这个需求显然也是sql实现起来更容易简单一些,是的,没错,所以最终我其实实现是用的Flink SQL,但是SQL解析之后的逻辑代码肯定还是大差不差的就是我上面的逻辑,因此最后,放出我的SQL实现吧:

SELECT *
FROM (
         SELECt DATE_FORMAT(utc2local(window_start), 'yyyy-MM-dd HH:mm:ss') startTime,
                DATE_FORMAT(utc2local(window_end), 'yyyy-MM-dd HH:mm:ss')   endTime,
                roomId,
                totalSize,
                cnt,
                ROW_NUMBER()  OVER (PARTITION BY window_start, window_end ORDER BY totalSize DESC) as rankNo
         FROM (
                  SELECt window_start, window_end, roomId, SUM(size) as totalSize, COUNT(*) as cnt
                  FROM TABLE(
                          TUMBLE(TABLE pusher_broadcast_message, DEscriptOR(msgTime), INTERVAL '10' SECONDS))
                  GROUP BY window_start, window_end, roomId
              )
     )
WHERe rankNo <= 20
。。。

关于上面的SQL,我有几点要说。。。
因为之前没有深入接触过Flink SQL,所以其实最开始我理解的SQL写法应该简单粗暴是这样滴:

SELECt roomId,sum(size) as totalSize FROM pusher_broadcast_message
GROUP BY roomId,TUMBLE(msgTime, INTERVAL '10' SECONDS)
ORDER BY totalSize DESC
LIMIT 20

但,效果并不是我想要的效果,后来想了想,因为flink sql对表的查询,是一种动态持续查询的机制,这样的写法得到的结果将是每来一条窗口数据,都将与历史的所有窗口数据排个序。
于是我就去查阅官方关于使用SQL依据窗口进行聚合的相关内容:
Window Aggregation

在这一节,它对窗口提出了有2种类型的窗口 *** 作方式:
1、Window TVF Aggregation(TVF:Table-Valued Functions)
2、Group Window Aggregation
我的理解是第一种类似于是把窗口聚合的数据作为一个表函数进行查询
第二种类似于单纯是一个函数, 而且只能作用于group by这种聚合函数的后面。
其中在这之前,我在网上的帖子上,对于SQL window聚合的语法,只见过第二种的写法,第一种第一次见到。
而官方其实更推荐使用第一种,因为:

Warning: Group Window Aggregation is deprecated. It’s encouraged to
use Window TVF Aggregation which is more powerful and effective.

Compared to Group Window Aggregation, Window TVF Aggregation have many
advantages, including:

Have all performance optimizations mentioned in Performance Tuning.
Support standard GROUPING SETS syntax. Can apply Window TopN after
window aggregation result. and so on.

翻译:
警告:不推荐使用组窗口聚合。鼓励使用功能更强大、更有效的 Window TVF Aggregation。

与Group Window Aggregation相比,Window TVF Aggregation有很多优点,包括:

在提到的所有性能优化性能优化。 支持标准GROUPING SETS语法。 可以在窗口聚合结果后应用Window TopN。 等等。

这里看到了关键字,使用Window TVF Aggregation可以支持TOP N的实现
官方文档-Window Top-N

比着葫芦画瓢,按照官方的写法,我顺利的使用SQL完成了TOPN的本次需求,结尾,在放一次最终版本SQL吧:

SELECt *
FROM (
         SELECt DATE_FORMAT(utc2local(window_start), 'yyyy-MM-dd HH:mm:ss') startTime,
                DATE_FORMAT(utc2local(window_end), 'yyyy-MM-dd HH:mm:ss')   endTime,
                roomId,
                totalSize,
                cnt,
                ROW_NUMBER()  OVER (PARTITION BY window_start, window_end ORDER BY totalSize DESC) as rankNo
         FROM (
                  SELECt window_start, window_end, roomId, SUM(size) as totalSize, COUNT(*) as cnt
                  FROM TABLE(
                          TUMBLE(TABLE pusher_broadcast_message, DEscriptOR(msgTime), INTERVAL '10' SECONDS))
                  GROUP BY window_start, window_end, roomId
              )
     )
WHERe rankNo <= 20

可以看到最里层的SQL,先对10s内窗口根据roomId进行聚合(使用的窗口聚合语法就是Group Window Aggregation), 接着外层再使用OVER函数对同一时间窗口不同房间的数据进行排序, 利用ROW_NUMBER()就可以得到对应的排行数。

至此,完成了对本次需求的实现,也让我对Flink SQL的实践和理解更上一步

参考

Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5665783.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存