从零开始学Flink:TopN 榜单

张开发
2026/4/9 21:59:02 15 分钟阅读

分享文章

从零开始学Flink:TopN 榜单
0. 环境准备用 SQL Client 直接跑起来为了把注意力放在 SQL 本身本文用 Kafka 做数据源手动往 Topic 推送点击行为数据用print在 TaskManager Stdout 里观察结果。使用前请确认 Flink 已加载 Kafka SQL Connector把flink-sql-connector-kafka-*.jar放到$FLINK_HOME/lib并重启集群/SQL Client。先把下面几个参数设好后面跑窗口/TopN 时更容易看到输出-- 1) 避免 source 空闲导致 watermark 不推进从而窗口一直不触发 SET table.exec.source.idle-timeout 5s; -- 2) 让窗口/TopN 的结果更“及时”更快看到输出 SET execution.checkpointing.interval 10s; -- 3) 以流模式运行源是无界持续刷到 SQL Client SET execution.runtime-mode streaming; -- 4) 开启 changelog 模式使窗口/TopN 的结果更“及时”更快看到输出 SET sql-client.execution.result-mode changelog;接着创建一张点击行为表事件时间 WatermarkCREATE TABLE dwd_click_log ( user_id STRING, item_id STRING, category_id STRING, ts BIGINT, event_time AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR event_time AS event_time - INTERVAL 3 SECOND ) WITH ( connector kafka, topic dwd_click_log, properties.bootstrap.servers localhost:9092, properties.group.id flink-sql-dwd-click-log, scan.startup.mode earliest-offset, format json, json.ignore-parse-errors true );先准备 Kafka Topic$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic dwd_click_log --partitions 1 --replication-factor 1再创建几个printsink 用来观察输出CREATE TABLE ads_window_metrics_print ( window_start TIMESTAMP_LTZ(3), window_end TIMESTAMP_LTZ(3), pv BIGINT, uv BIGINT ) WITH (connector print, print-identifier ads_window_metrics_print ); CREATE TABLE ads_session_metrics_print ( window_start TIMESTAMP_LTZ(3), window_end TIMESTAMP_LTZ(3), user_id STRING, click_cnt BIGINT ) WITH (connector print, print-identifier ads_session_metrics_print ); CREATE TABLE ads_topn_print ( window_start TIMESTAMP_LTZ(3), window_end TIMESTAMP_LTZ(3), category_id STRING, item_id STRING, cnt BIGINT, rn BIGINT ) WITH (connector print, print-identifier ads_topn_print );1. 窗口聚合基础你到底在对“哪段时间”做统计在 Flink SQL 里窗口的本质是把无界流切成一个个“有限集合”再在集合上做 GROUP BY 聚合。窗口统计能否输出核心取决于两件事你选的是 Processing Time 还是 Event TimeEvent Time 场景下Watermark 是否在推进决定窗口是否“关窗”本文以 Event Time 为主因为绝大多数实时数仓指标都需要“按业务发生时间统计”而不是“按处理到达时间统计”。2. Window TVFFlink SQL 窗口的主流写法Flink 早期有GROUP BY TUMBLE(...)这类 Group Window 语法新版本更推荐Window TVFTable Valued Function它的输出会直接带上window_start/window_end/window_time字段更清晰也更容易与 TopN/Join 组合。2.1 滚动窗口TUMBLE每条数据只属于一个窗口典型场景按分钟/小时统计 PV、UV、GMV。INSERT INTO ads_window_metrics_print SELECT window_start, window_end, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM TABLE( TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL 1 SECOND) ) GROUP BY window_start, window_end;推送数据最简单控制台直接粘贴 JSON一行一条$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log粘贴下面数据ts用毫秒时间戳为了让事件时间窗口及时“关窗”建议ts单调递增或者最后补一条明显更大的ts用来推进 Watermark{user_id:u01,item_id:i01,category_id:c01,ts:1774454400000} {user_id:u02,item_id:i02,category_id:c01,ts:1774454402000} {user_id:u01,item_id:i03,category_id:c02,ts:1774454403000} {user_id:u03,item_id:i04,category_id:c02,ts:1774454404000} {user_id:u04,item_id:i01,category_id:c01,ts:1774454405000}到 Flink Web UI → TaskManagers → Stdout 查看输出要点TUMBLE适合“报表型”指标窗口不重叠状态相对可控COUNT(DISTINCT ...)会引入去重状态用户数大时要关注状态体积生产中可考虑用近似去重或分层聚合2.2 滑动窗口HOP一条数据会被“复制”到多个窗口典型场景最近 5 分钟滚动 UV、最近 1 小时成交额每 5 分钟刷新一次。示例窗口长度 30s每 10s 滑动一次。INSERT INTO ads_window_metrics_print SELECT window_start, window_end, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM TABLE( HOP(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL 10 SECOND, INTERVAL 30 SECOND) ) GROUP BY window_start, window_end;到 Flink Web UI → TaskManagers → Stdout 查看输出要点HOP 的状态压力通常显著高于 TUMBLE因为数据会进入多个窗口业务上能用 TUMBLE 不用 HOP必须用 HOP 时尽量降低窗口长度或放大 slide减少并行窗口数2.3 会话窗口SESSION按“事件间隔”自动切窗典型场景统计用户一次访问会话内的点击数/停留时长、按会话做转化漏斗。示例同一用户 10s 内没有新事件就认为会话结束。INSERT INTO ads_session_metrics_print SELECT window_start, window_end, user_id, COUNT(*) AS click_cnt FROM TABLE( SESSION(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL 10 SECOND) ) GROUP BY window_start, window_end, user_id;推送数据最简单控制台直接粘贴 JSON一行一条$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log往 topic dwd_click_log 推送数据控制台直接粘贴 JSON一行一条{user_id:u01,item_id:i01,category_id:c01,ts:1775143687285} {user_id:u02,item_id:i02,category_id:c01,ts:1775143688285} {user_id:u01,item_id:i03,category_id:c02,ts:1775143689285}到 Flink Web UI → TaskManagers → Stdout 查看输出要点SESSION 窗口边界不固定会因为迟到数据发生“合并”下游会看到更新/撤回更频繁如果你的下游只接受 Append只插入不更新SESSION 往往不合适除非你引入可更新的 sinkUpsert4. TopN把窗口聚合变成实时榜单TopN 的正确打开方式是“两段式”先做窗口聚合得到每个候选项的指标比如每个商品的点击数再在聚合结果上做排序取前 N4.1 窗口内 TopN每个窗口的热榜 Top3需求每 10 秒统计一次“各品类内点击 Top3 商品”。INSERT INTO ads_topn_print WITH item_cnt AS ( SELECT window_start, window_end, category_id, item_id, COUNT(*) AS cnt FROM TABLE( TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL 10 SECOND) ) GROUP BY window_start, window_end, category_id, item_id ), ranked AS ( SELECT window_start, window_end, category_id, item_id, cnt, ROW_NUMBER() OVER ( PARTITION BY window_start, window_end, category_id ORDER BY cnt DESC, item_id ) AS rn FROM item_cnt ) SELECT window_start, window_end, category_id, item_id, cnt, rn FROM ranked WHERE rn 3;到 Flink Web UI → TaskManagers → Stdout 查看输出几个关键点PARTITION BY window_start, window_end, category_id表示“每个窗口、每个品类各自一张榜单”ORDER BY cnt DESC决定榜单规则追加item_id是为了稳定排序避免并列时结果抖动TopN 本质是对一张动态表做排序截断窗口聚合尤其是 SESSION/HOP会产生更新因此 TopN 输出常常不是纯 Append5. 生产落地TopN 为什么“写不进”下游TopN 落地最常见的问题是下游只接受追加流Append-only但 TopN 的结果在运行过程中会不断更新。你会在print里看到类似I/-U/U的变更日志输出不同版本格式略有差异这意味着早期输出的第 3 名后续可能被挤掉需要撤回早期输出的第 1 名后续计数增长会以更新的形式重发落地时通常有两种策略写 Upsert SinkKafka Upsert、JDBC主键表、HBase、Redis 等要求结果表定义PRIMARY KEY (...) NOT ENFORCED把 TopN 变成“窗口结束一次性输出”只在窗口最终关闭后输出最终榜单减少更新更偏离线思路对于实时大屏、实时榜单通常选 Upsert Sink。6. 性能与稳定性窗口与 TopN 的几个关键参数6.1 状态 TTL给状态设“上限”即使是窗口聚合状态也不是“完全自动可控”的滑动窗口、去重、TopN 的中间表都会占用状态。生产建议给作业设置统一 TTL例如保留 7 天按业务调整SET table.exec.state.ttl 7 d;6.2 MiniBatch降低聚合与排序的更新频率当源数据更新非常频繁尤其是有去重、TopN时MiniBatch 能显著减少算子更新次数提升吞吐SET table.exec.mini-batch.enabled true; SET table.exec.mini-batch.allow-latency 2 s; SET table.exec.mini-batch.size 2000;6.3 Watermark 设计迟到与实时性的平衡WATERMARK FOR event_time AS event_time - INTERVAL 3 SECOND的 3 秒不是越大越好太小乱序稍大就会被判定为迟到数据而丢弃太大窗口输出延迟变大榜单刷新变慢通常做法是先用历史数据评估乱序分布P95/P99再给一个能接受的延迟阈值。7. 小结Window TVFTUMBLE/HOP/SESSION是 Flink SQL 窗口聚合的主流写法关键在于 Event Time Watermark 是否能把窗口按预期触发出来TopN 建议两段式先聚合再排名并根据业务选择ROW_NUMBER或RANKTopN 多数情况下会产生更新/撤回生产下游优先考虑 Upsert Sink或把输出改为“窗口结束一次性输出”性能与稳定性重点关注State TTL、MiniBatch、Watermark 延迟与业务时效的权衡

更多文章