百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Flink SQL 知其所以然(六)| 维表 join 的性能优化之路(上)

myzbx 2025-06-30 18:49 1 浏览

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 背景及应用场景介绍:博主期望你能了解到,flink sql 提供了轻松访问外部存储的 lookup join(与上节不同,上节说的是流与流的 join)。lookup join 可以简单理解为使用 flatmap 访问外部存储数据然后将维度字段拼接到当前这条数据上面
  2. 来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 lookup join 应该达到的关联的预期效果。
  3. flink sql lookup join 的解决方案以及原理的介绍:主要介绍 lookup join 的在上述实战案例的 sql 写法,博主期望你能了解到,lookup join 是基于处理时间的,并且 lookup join 经常会由于访问外部存储的 qps 过高而导致背压,产出延迟等性能问题。我们可以借鉴在 DataStream api 中的维表 join 优化思路在 flink sql 使用 local cache异步访问维表批量访问维表三种方式去解决性能问题。
  4. 总结及展望:官方并没有提供 批量访问维表 的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。

2.背景及应用场景介绍

维表作为 sql 任务中一种常见表的类型,其本质就是关联表数据的额外数据属性,通常在 join 语句中进行使用。比如源数据有人的 id,你现在想要得到人的性别、年龄,那么可以通过用户 id 去关联人的性别、年龄,就可以得到更全的数据。

维表 join 在离线数仓中是最常见的一种数据处理方式了,在实时数仓的场景中,flink sql 目前也支持了维表的 join,即 lookup join,生产环境可以用 mysql,redis,hbase 来作为高速维表存储引擎。

Notes:在实时数仓中,常用实时维表有两种更新频率

实时的更新:维度信息是实时新建的,实时写入到高速存储引擎中。然后其他实时任务在做处理时实时的关联这些维度信息。

周期性的更新:对于一些缓慢变化维度,比如年龄、性别的用户画像等,几万年都不变化一次的东西,实时维表的更新可以是小时级别,天级别的。

3.来一个实战案例

来看看在具体场景下,对应输入值的输出值应该长啥样。

需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。此处我们只关心关联维表这一部分的输入输出数据。

来一波输入数据:



注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:

预期输出数据如下:


flink sql lookup join 登场。下面是官网的链接。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join

4.flink sql lookup join

4.1.lookup join 定义

以上述案例来说,lookup join 其实简单理解来,就是每来一条数据去 redis 里面搂一次数据。然后把关联到的维度数据给拼接到当前数据中。

熟悉 DataStream api 的小伙伴萌,简单来理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中处理每一条来的数据,针对每一条数据去访问用户画像的 redis。(实际上,flink sql api 中也确实是这样实现的!sql 生成的 lookup join 代码就是继承了 flatmap)

4.2.上述案例解决方案

来看看上述案例的 flink sql lookup join sql 怎么写:

CREATE TABLE show_log (
    log_id BIGINT,
    `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
    user_id STRING,
    proctime AS PROCTIME()
)
WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.user_id.length' = '1',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '10'
);

CREATE TABLE user_profile (
    user_id STRING,
    age STRING,
    sex STRING
    ) WITH (
  'connector' = 'redis',
  'hostname' = '127.0.0.1',
  'port' = '6379',
  'format' = 'json',
  'lookup.cache.max-rows' = '500',
  'lookup.cache.ttl' = '3600',
  'lookup.max-retries' = '1'
);

CREATE TABLE sink_table (
    log_id BIGINT,
    `timestamp` TIMESTAMP(3),
    user_id STRING,
    proctime TIMESTAMP(3),
    age STRING,
    sex STRING
) WITH (
  'connector' = 'print'
);

-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT 
    s.log_id as log_id
    , s.`timestamp` as `timestamp`
    , s.user_id as user_id
    , s.proctime as proctime
    , u.sex as sex
    , u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id

这里使用了 for SYSTEM_TIME as of 时态表的语法来作为维表关联的标识语法。

Notes:实时的 lookup 维表关联能使用处理时间去做关联。

运行结果如下:


flink web ui 算子图如下:

但是!!!但是!!!但是!!!

flink 官方并没有提供 redis 的维表 connector 实现。

没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!

4.3.关于维表使用的一些注意事项

  1. 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 的数据在 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08:01 failover 之后从 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
  2. 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据

4.4.再说说维表常见的性能问题及优化思路

所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。

举个例子:

  • 在没有使用维表的情况下:一条数据从输入 flink 任务到输出 flink 任务的时延假如为 0.1 ms,那么并行度为 1 的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps
  • 在使用维表之后:每条数据访问维表的外部存储的时长为 2 ms,那么一条数据从输入 flink 任务到输出 flink 任务的时延就会变成 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 query / 2.1 ms = 476 qps。两者的吞吐量相差 21 倍

这就是为什么维表 join 的算子会产生背压,任务产出会延迟。

那么当然,解决方案也是有很多的。抛开 flink sql 想一下,如果我们使用 DataStream api,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

  1. 按照 redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。
  2. 异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps
  3. 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms + 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。

博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。

既然 DataStream 可以这样做,flink sql 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

4.5.lookup join 的具体性能优化方案

  1. 按照 redis 维表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做访问 redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 redis 压力。在博主实现的 redis connector 中,内置了 local cache 的实现,小伙伴萌可以参考下面这部篇文章进行配置。
  2. 异步访问外存:目前博主实现的 redis connector 不支持异步访问,但是官方实现的 hbase connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
  3. 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 redis 的批量访问外存优化的功能。

4.6.基于 redis connector 的批量访问机制优化

先描述一下大概是个什么东西,具体怎么用。

你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.modetrue,sql 不用做任何改动的情况下,flink lookup join 算子会自动优化,优化效果如下:

lookup join 算子的每个 task 上,每攒够 30 条数据 or 每隔五秒(处理时间) 去触发一次批量访问 redis 的请求,使用的是 jedis client 的 pipeline 功能访问 redis server。实测性能有很大提升。关于这个批量访问机制的优化介绍和使用方式介绍,小伙伴们先别急,下篇文章会详细介绍到。

相关推荐

使用 Siemens Teamcenter Digital Reality Viewer 打造逼真的数字孪生

现代产品通常由数百万个部件组成,需要复杂的设计和协作。工业世界在管理复杂性方面面临重大挑战,传统的可视化工具无法渲染这些大型的多CAD组件,从而无法充分利用数字孪生的优势。为了解决这些难题,西门子...

如何在JavaScript中实现数字输入框的范围限制?

在JavaScript语言中,实现数字输入框的范围限制可以通过多种方式实现,最常见的方式是利用JavaScript的事件监听机制,和通过JavaScript的条件判断语句来实现范围限制。以...

2.3.8.J速算24点终极挑战(速算24点题目及答案)

各位数学高手、脑力达人,今天给大家带来一道超烧脑的**24点挑战**!**数字:2、3、8、J(J=11规则很简单:**用加减乘除和括号,把四个数字组合成24!每个数字只能用一次!****看起...

分布式系统进阶二十一之短链接生成原理

前言短链接(ShortURL)是一种通过缩短网页链接长度来方便分享的技术。相比于传统的长链接,短链接更简洁明了,更易于在社交媒体等平台上分享和传播。在本文中,我们将会详细解释短链接的定义、作用及其构...

一、SpringBoo中Mybatis多数据源动态切换

我们以一个实例来详细说明一下如何在SpringBoot中动态切换MyBatis的数据源。一、需求1、用户可以界面新增数据源相关信息,提交后,保存到数据库2、保存后的数据源需要动态生效,并且可以由用户动...

「JS 逆向百例」层层嵌套!某加速商城 RSA 加密

声明本文章中所有内容仅供学习交流,敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除!逆向目标目标:某加速商城登录接口主页:a...

Spring Data Jpa 介绍和详细入门案例搭建

1.SpringDataJPA的概念在介绍SpringDataJPA的时候,我们首先认识下Hibernate。Hibernate是数据访问解决技术的绝对霸主,使用O/R映射(Object-Re...

SpringBoot 开发 Web 系统,快速入门指南!

01、背景介绍在之前的文章中,我们简单的介绍了SpringBoot项目的创建过程,了解了SpringBoot开箱即用的特性,本篇文章接着上篇的内容继续介绍SpringBoot用于we...

Nacos3.0重磅来袭!全面拥抱AI,单机及集群模式安装详细教程!

之前和大家分享过JDK17的多版本管理及详细安装过程,然后在项目升级完jdk17后又发现之前的注册和配置中心nacos又用不了,原因是之前的nacos1.3版本的,版本太老了,已经无法适配当前新的JD...

golang语言的魅力精华之玩转通道channel 值得你反复阅读100遍

通道用例大全在阅读本文之前,请先阅读通道一文。那篇文章详细地解释了通道类型和通道值,以及各种通道操作的规则细节。一个Go新手程序员可能需要反复多次阅读那篇文章和当前这篇文章来精通Go通道编程。本文...

2小时快速搭建一个高可用的IM系统

“笔者2019年参加了一次Gopher大会,有幸听探探的架构师分享了他们2019年微服务化的过程。图片来自Pexels本文快速搭建的IM系统也是使用Go语言来快速实现的,这里先和...

分库分表以后,如何管理几万张分片表?

大家好,我是小富~ShardingSphere实现分库分表,如何管理分布在不同数据库实例中的成千上万张分片表?上边的问题是之前有个小伙伴看了我的分库分表的文章,私下咨询我的,看到他的提问我第一感觉就是...

Spring Boot JDBC 与 JdbcTemplate 全面指南(万字保姆级教程)

一、SpringBootJDBC基础1.1JDBC简介与演进JDBC(JavaDatabaseConnectivity)是Java语言中用来规范客户端程序如何访问数据库的应用程序...

Flink SQL 知其所以然(六)| 维表 join 的性能优化之路(上)

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:背景及应用场景介绍:博主期望你能了解到,flinksql提供了轻松访问外部存储的loo...

大数据Hadoop之——Flink Table API 和 SQL(单机Kafka)

一、TableAPI和FlinkSQL是什么TableAPI和SQL集成在同一套API中。这套API的核心概念是Table,用作查询的输入和输出,这套API都是批处理和...