Flink SQL 知其所以然(六)| 维表 join 的性能优化之路(上)
myzbx 2025-06-30 18:49 45 浏览
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
- 背景及应用场景介绍:博主期望你能了解到,flink sql 提供了轻松访问外部存储的 lookup join(与上节不同,上节说的是流与流的 join)。lookup join 可以简单理解为使用 flatmap 访问外部存储数据然后将维度字段拼接到当前这条数据上面
- 来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 lookup join 应该达到的关联的预期效果。
- flink sql lookup join 的解决方案以及原理的介绍:主要介绍 lookup join 的在上述实战案例的 sql 写法,博主期望你能了解到,lookup join 是基于处理时间的,并且 lookup join 经常会由于访问外部存储的 qps 过高而导致背压,产出延迟等性能问题。我们可以借鉴在 DataStream api 中的维表 join 优化思路在 flink sql 使用 local cache,异步访问维表,批量访问维表三种方式去解决性能问题。
- 总结及展望:官方并没有提供 批量访问维表 的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。
2.背景及应用场景介绍
维表作为 sql 任务中一种常见表的类型,其本质就是关联表数据的额外数据属性,通常在 join 语句中进行使用。比如源数据有人的 id,你现在想要得到人的性别、年龄,那么可以通过用户 id 去关联人的性别、年龄,就可以得到更全的数据。
维表 join 在离线数仓中是最常见的一种数据处理方式了,在实时数仓的场景中,flink sql 目前也支持了维表的 join,即 lookup join,生产环境可以用 mysql,redis,hbase 来作为高速维表存储引擎。
Notes:在实时数仓中,常用实时维表有两种更新频率
实时的更新:维度信息是实时新建的,实时写入到高速存储引擎中。然后其他实时任务在做处理时实时的关联这些维度信息。
周期性的更新:对于一些缓慢变化维度,比如年龄、性别的用户画像等,几万年都不变化一次的东西,实时维表的更新可以是小时级别,天级别的。
3.来一个实战案例
来看看在具体场景下,对应输入值的输出值应该长啥样。
需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。此处我们只关心关联维表这一部分的输入输出数据。
来一波输入数据:
注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:
预期输出数据如下:
flink sql lookup join 登场。下面是官网的链接。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join
4.flink sql lookup join
4.1.lookup join 定义
以上述案例来说,lookup join 其实简单理解来,就是每来一条数据去 redis 里面搂一次数据。然后把关联到的维度数据给拼接到当前数据中。
熟悉 DataStream api 的小伙伴萌,简单来理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中处理每一条来的数据,针对每一条数据去访问用户画像的 redis。(实际上,flink sql api 中也确实是这样实现的!sql 生成的 lookup join 代码就是继承了 flatmap)
4.2.上述案例解决方案
来看看上述案例的 flink sql lookup join sql 怎么写:
CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
这里使用了 for SYSTEM_TIME as of 时态表的语法来作为维表关联的标识语法。
Notes:实时的 lookup 维表关联能使用处理时间去做关联。
运行结果如下:
flink web ui 算子图如下:
但是!!!但是!!!但是!!!
flink 官方并没有提供 redis 的维表 connector 实现。
没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!
4.3.关于维表使用的一些注意事项
- 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 的数据在 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08:01 failover 之后从 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
- 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据
4.4.再说说维表常见的性能问题及优化思路
所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。
举个例子:
- 在没有使用维表的情况下:一条数据从输入 flink 任务到输出 flink 任务的时延假如为 0.1 ms,那么并行度为 1 的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps。
- 在使用维表之后:每条数据访问维表的外部存储的时长为 2 ms,那么一条数据从输入 flink 任务到输出 flink 任务的时延就会变成 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 query / 2.1 ms = 476 qps。两者的吞吐量相差 21 倍。
这就是为什么维表 join 的算子会产生背压,任务产出会延迟。
那么当然,解决方案也是有很多的。抛开 flink sql 想一下,如果我们使用 DataStream api,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:
- 按照 redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。
- 异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps。
- 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms + 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。
博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。
既然 DataStream 可以这样做,flink sql 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作
4.5.lookup join 的具体性能优化方案
- 按照 redis 维表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做访问 redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 redis 压力。在博主实现的 redis connector 中,内置了 local cache 的实现,小伙伴萌可以参考下面这部篇文章进行配置。
- 异步访问外存:目前博主实现的 redis connector 不支持异步访问,但是官方实现的 hbase connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
- 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 redis 的批量访问外存优化的功能。
4.6.基于 redis connector 的批量访问机制优化
先描述一下大概是个什么东西,具体怎么用。
你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.mode 为 true,sql 不用做任何改动的情况下,flink lookup join 算子会自动优化,优化效果如下:
lookup join 算子的每个 task 上,每攒够 30 条数据 or 每隔五秒(处理时间) 去触发一次批量访问 redis 的请求,使用的是 jedis client 的 pipeline 功能访问 redis server。实测性能有很大提升。关于这个批量访问机制的优化介绍和使用方式介绍,小伙伴们先别急,下篇文章会详细介绍到。
相关推荐
- 如何设计一个优秀的电子商务产品详情页
-
加入人人都是产品经理【起点学院】产品经理实战训练营,BAT产品总监手把手带你学产品电子商务网站的产品详情页面无疑是设计师和开发人员关注的最重要的网页之一。产品详情页面是客户作出“加入购物车”决定的页面...
- 怎么在JS中使用Ajax进行异步请求?
-
大家好,今天我来分享一项JavaScript的实战技巧,即如何在JS中使用Ajax进行异步请求,让你的网页速度瞬间提升。Ajax是一种在不刷新整个网页的情况下与服务器进行数据交互的技术,可以实现异步加...
- 中小企业如何组建,管理团队_中小企业应当如何开展组织结构设计变革
-
前言写了太多关于产品的东西觉得应该换换口味.从码农到架构师,从前端到平面再到UI、UE,最后走向了产品这条不归路,其实以前一直再给你们讲.产品经理跟项目经理区别没有特别大,两个岗位之间有很...
- 前端监控 SDK 开发分享_前端监控系统 开源
-
一、前言随着前端的发展和被重视,慢慢的行业内对于前端监控系统的重视程度也在增加。这里不对为什么需要监控再做解释。那我们先直接说说需求。对于中小型公司来说,可以直接使用三方的监控,比如自己搭建一套免费的...
- Ajax 会被 fetch 取代吗?Axios 怎么办?
-
大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发!今天给大家带来的主题是ajax、fetch...
- 前端面试题《AJAX》_前端面试ajax考点汇总
-
1.什么是ajax?ajax作用是什么?AJAX=异步JavaScript和XML。AJAX是一种用于创建快速动态网页的技术。通过在后台与服务器进行少量数据交换,AJAX可以使网页实...
- Ajax 详细介绍_ajax
-
1、ajax是什么?asynchronousjavascriptandxml:异步的javascript和xml。ajax是用来改善用户体验的一种技术,其本质是利用浏览器内置的一个特殊的...
- 6款可替代dreamweaver的工具_替代powerdesigner的工具
-
dreamweaver对一个web前端工作者来说,再熟悉不过了,像我07年接触web前端开发就是用的dreamweaver,一直用到现在,身边的朋友有跟我推荐过各种更好用的可替代dreamweaver...
- 我敢保证,全网没有再比这更详细的Java知识点总结了,送你啊
-
接下来你看到的将是全网最详细的Java知识点总结,全文分为三大部分:Java基础、Java框架、Java+云数据小编将为大家仔细讲解每大部分里面的详细知识点,别眨眼,从小白到大佬、零基础到精通,你绝...
- 福斯《死侍》发布新剧照 "小贱贱"韦德被改造前造型曝光
-
时光网讯福斯出品的科幻片《死侍》今天发布新剧照,其中一张是较为罕见的死侍在被改造之前的剧照,其余两张剧照都是死侍在执行任务中的状态。据外媒推测,片方此时发布剧照,预计是为了给不久之后影片发布首款正式预...
- 2021年超详细的java学习路线总结—纯干货分享
-
本文整理了java开发的学习路线和相关的学习资源,非常适合零基础入门java的同学,希望大家在学习的时候,能够节省时间。纯干货,良心推荐!第一阶段:Java基础重点知识点:数据类型、核心语法、面向对象...
- 不用海淘,真黑五来到你身边:亚马逊15件热卖爆款推荐!
-
Fujifilm富士instaxMini8小黄人拍立得相机(黄色/蓝色)扫二维码进入购物页面黑五是入手一个轻巧可爱的拍立得相机的好时机,此款是mini8的小黄人特别版,除了颜色涂装成小黄人...
- 2025 年 Python 爬虫四大前沿技术:从异步到 AI
-
作为互联网大厂的后端Python爬虫开发,你是否也曾遇到过这些痛点:面对海量目标URL,单线程爬虫爬取一周还没完成任务;动态渲染的SPA页面,requests库返回的全是空白代码;好不容易...
- 最贱超级英雄《死侍》来了!_死侍超燃
-
死侍Deadpool(2016)导演:蒂姆·米勒编剧:略特·里斯/保罗·沃尼克主演:瑞恩·雷诺兹/莫蕾娜·巴卡林/吉娜·卡拉诺/艾德·斯克林/T·J·米勒类型:动作/...
- 停止javascript的ajax请求,取消axios请求,取消reactfetch请求
-
一、Ajax原生里可以通过XMLHttpRequest对象上的abort方法来中断ajax。注意abort方法不能阻止向服务器发送请求,只能停止当前ajax请求。停止javascript的ajax请求...
- 一周热门
- 最近发表
- 标签列表
-
- HTML 简介 (30)
- HTML 响应式设计 (31)
- HTML URL 编码 (32)
- HTML Web 服务器 (31)
- HTML 表单属性 (32)
- HTML 音频 (31)
- HTML5 支持 (33)
- HTML API (36)
- HTML 总结 (32)
- HTML 全局属性 (32)
- HTML 事件 (31)
- HTML 画布 (32)
- HTTP 方法 (30)
- 键盘快捷键 (30)
- CSS 语法 (35)
- CSS 轮廓宽度 (31)
- CSS 谷歌字体 (33)
- CSS 链接 (31)
- CSS 定位 (31)
- CSS 图片库 (32)
- CSS 图像精灵 (31)
- SVG 文本 (32)
- 时钟启动 (33)
- HTML 游戏 (34)
- JS Loop For (32)
