flink维表查询redis之flink-connector-redis
mhr18 2024-10-22 12:39 26 浏览 0 评论
插件名称:flink-connector-redis
插件地址:https://github.com/jeff-zou/flink-connector-redis.git
无法翻墙:https://gitee.com/jeff-zou/flink-connector-redis.git
项目介绍
基于bahir-flink二次开发,相对bahir调整的内容有:增加支持Flink高版本(包括1.12,1.13,1.14等)、增加Table/SQL API、 增加维表查询支持、增加查询缓存(支持增量与全量)、统一过期策略、写入并发数等。
因bahir使用的flink接口版本较老,所以改动较大,开发过程中参考了腾讯云与阿里云两家产商的流计算产品,取两家之长,并增加了更丰富的功能,包括更多的redis操作命令和更多的redis服务类型,如:simple sentinel cluster。
支持功能对应redis的操作命令有:
插入 | 维表查询 |
set | get |
hset | hget |
rpush lpush | |
incrBy decrBy hincrBy zincrby | |
sadd zadd pfadd(hyperloglog) | |
publish | |
zrem srem | |
del hdel |
使用方法:
在命令行执行 mvn package -DskipTests打包后,将生成的包flink-connector-redis-1.1.0.jar引入flink lib中即可,无需其它设置。
项目依赖jedis 3.7.1,如flink环境无jedis,则使用flink-connector-redis-1.1.0-jar-with-dependencies.jar
开发环境工程直接引用:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.1.0</version>
</dependency>
使用说明:
value.data.structure = column(默认)
无需通过primary key来映射redis中的Key,直接由ddl中的字段顺序来决定Key,如:
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
其中username为key, passport为value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
其中name为map结构的key, subject为field, score为value.
value.data.structure = row
整行内容保存至value并以'\01'分割
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
其中username为key, username\01passport为value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
其中name为map结构的key, subject为field, name\01subject\01score为value.
with参数说明:
字段 | 默认值 | 类型 | 说明 |
connector | (none) | String | redis |
host | (none) | String | Redis IP |
port | 6379 | Integer | Redis 端口 |
password | null | String | 如果没有设置,则为 null |
database | 0 | Integer | 默认使用 db0 |
maxTotal | 2 | Integer | 最大连接数 |
maxIdle | 2 | Integer | 最大保持连接数 |
minIdle | 1 | Integer | 最小保持连接数 |
timeout | 2000 | Integer | 连接超时时间,单位 ms,默认 1s |
cluster-nodes | (none) | String | 集群ip与端口,当redis-mode为cluster时不为空,如:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
command | (none) | String | 对应上文中的redis命令 |
redis-mode | (none) | Integer | mode类型: single cluster |
lookup.cache.max-rows | -1 | Integer | 查询缓存大小,减少对redis重复key的查询 |
lookup.cache.ttl | -1 | Integer | 查询缓存过期时间,单位为秒, 开启查询缓存条件是max-rows与ttl都不能为-1 |
lookup.max-retries | 1 | Integer | 查询失败重试次数 |
lookup.cache.load-all | false | Boolean | 开启全量缓存,当命令为hget时,将从redis map查询出所有元素并保存到cache中,用于解决缓存穿透问题 |
sink.max-retries | 1 | Integer | 写入失败重试次数 |
sink.parallelism | (none) | Integer | 写入并发数 |
value.data.structure | column | String | column: value值来自某一字段 (如, set: key值取自DDL定义的第一个字段, value值取自第二个字段) |
在线调试SQL时,用于限制sink资源使用的参数:
Field | Default | Type | Description |
sink.limit | false | Boolean | 限制开头 |
sink.limit.max-num | 10000 | Integer | taskmanager内每个slot可以写的最大数据量 |
sink.limit.interval | 100 | String | taskmanager内每个slot写入数据间隔 milliseconds |
sink.limit.max-online | 30 * 60 * 1000L | Long | taskmanager内每个slot最大在线时间, milliseconds |
集群类型为sentinel时额外连接参数:
字段 | 默认值 | 类型 | 说明 |
master.name | (none) | String | 主名 |
sentinels.info | (none) | String | |
sentinels.password | none) | String |
数据类型转换
flink type | redis row converter |
CHAR | String |
VARCHAR | String |
String | String |
BOOLEAN | String String.valueOf(boolean val) |
BINARY | String Base64.getEncoder().encodeToString |
VARBINARY | String Base64.getEncoder().encodeToString |
DECIMAL | String BigDecimal.toString |
TINYINT | String String.valueOf(byte val) |
SMALLINT | String String.valueOf(short val) |
INTEGER | String String.valueOf(int val) |
DATE | String the day from epoch as int |
TIME | String the millisecond from 0'clock as int |
BIGINT | String String.valueOf(long val) |
FLOAT | String String.valueOf(float val) |
DOUBLE | String String.valueOf(double val) |
TIMESTAMP | String the millisecond from epoch as long |
使用示例:
- 维表查询:
create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');
-- 先在redis中插入数据,相当于redis命令: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));
create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');
-- 随机生成10以内的数据作为数据源 --
-- 其中有一条数据会是: username = 3 level = 3, 会跟上面插入的数据关联 --
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen', 'rows-per-second'='1', 'fields.username.kind'='sequence', 'fields.username.start'='1', 'fields.username.end'='10', 'fields.level.kind'='sequence', 'fields.level.start'='1', 'fields.level.end'='10');
create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');
insert into
sink_table
select
s.username,
s.level,
d.age
from
source_table s
left join dim_table for system_time as of s.proctime as d on
d.name = s.username
and d.level = s.level;
-- username为3那一行会关联到redis内的值,输出为: 3,3,100
- 多字段的维表关联查询
很多情况维表有多个字段,本实例展示如何利用'value.data.structure'='row'写多字段并关联查询。
-- 创建表
create table sink_redis(uid VARCHAR,score double,score2 double )
with ( 'connector' = 'redis',
'host' = '10.11.69.176',
'port' = '6379',
'redis-mode' = 'single',
'password' = '****',
'command' = 'SET',
'value.data.structure' = 'row'); -- 'value.data.structure'='row':整行内容保存至value并以'\01'分割
-- 写入测试数据,score、score2为需要被关联查询出的两个维度
insert into sink_redis select * from (values ('1', 10.3, 10.1));
-- 在redis中,value的值为: "1\x0110.3\x0110.1" --
-- 写入结束 --
-- create join table --
create table join_table with ('command'='get', 'value.data.structure'='row') like sink_redis
-- create result table --
create table result_table(uid VARCHAR, username VARCHAR, score double, score2 double) with ('connector'='print')
-- create source table --
create table source_table(uid VARCHAR, username VARCHAR, proc_time as procTime()) with ('connector'='datagen', 'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='2')
-- 关联查询维表,获得维表的多个字段值 --
insert
into
result_table
select
s.uid,
s.username,
j.score, -- 来自维表
j.score2 -- 来自维表
from
source_table as s
join join_table for system_time as of s.proc_time as j on
j.uid = s.uid
result:
2> +I[2, 1e0fe885a2990edd7f13dd0b81f923713182d5c559b21eff6bda3960cba8df27c69a3c0f26466efaface8976a2e16d9f68b3, null, null]
1> +I[1, 30182e00eca2bff6e00a2d5331e8857a087792918c4379155b635a3cf42a53a1b8f3be7feb00b0c63c556641423be5537476, 10.3, 10.1]
- DataStream查询方式
- 示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java
hset示例,相当于redis命令:hset tom math 150
Configuration configuration = new Configuration();
configuration.setString(REDIS_MODE, REDIS_CLUSTER);
configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());
RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
.findRedisHandler(RedisMapperHandler.class, configuration.toMap())
.createRedisMapper(configuration);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, "tom");
genericRowData.setField(1, "math");
genericRowData.setField(2, "152");
DataStream<GenericRowData> dataStream = env.fromElements(genericRowData, genericRowData);
RedisCacheOptions redisCacheOptions = new RedisCacheOptions.Builder().setCacheMaxSize(100).setCacheTTL(10L).build();
FlinkJedisConfigBase conf = getLocalRedisClusterConfig();
RedisSinkFunction redisSinkFunction = new RedisSinkFunction<>(conf, redisMapper, redisCacheOptions);
dataStream.addSink(redisSinkFunction).setParallelism(1);
env.execute("RedisSinkTest");
- redis-cluster写入示例
- 示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLTest.java
set示例,相当于redis命令: set test test11
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
"'cluster-nodes'='10.11.80.147:7000,10.11.80.147:7001','redis- mode'='cluster','password'='******','command'='set')" ;
tEnv.executeSql(ddl);
String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get()
.getJobExecutionResult()
.get();
开发与测试环境
ide: IntelliJ IDEA
code format: google-java-format + Save Actions
code check: CheckStyle
flink 1.12/1.13/1.14+
jdk1.8 jedis3.7.1
如果需要flink 1.12版本支持,请切换到分支flink-1.12
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.1.1-1.12</version>
</dependency>
相关推荐
- Docker安装详细步骤及相关环境安装配置
-
最近自己在虚拟机上搭建一个docker,将项目运行在虚拟机中。需要提前准备的工具,FinallShell(远程链接工具),VM(虚拟机-配置网络)、CentOS7(Linux操作系统-在虚拟机上安装)...
- Linux下安装常用软件都有哪些?做了一个汇总列表,你看还缺啥?
-
1.安装列表MySQL5.7.11Java1.8ApacheMaven3.6+tomcat8.5gitRedisNginxpythondocker2.安装mysql1.拷贝mysql安装文件到...
- Nginx安装和使用指南详细讲解(nginx1.20安装)
-
Nginx安装和使用指南安装1.检查并安装所需的依赖软件1).gcc:nginx编译依赖gcc环境安装命令:yuminstallgcc-c++2).pcre:(PerlCompatibleRe...
- docker之安装部署Harbor(docker安装hacs)
-
在现代软件开发和部署环境中,Harbor作为一个企业级的容器镜像仓库,提供了高效、安全的镜像管理解决方案。通过Docker部署Harbor,可以轻松构建私有镜像仓库,满足企业对镜像存储、管理和安全性...
- 成功安装 Magento2.4.3最新版教程「技术干货」
-
外贸独立站设计公司xingbell.com经过多次的反复实验,最新版的magento2.4.3在oneinstack的环境下的详细安装教程如下:一.vps系统:LinuxCentOS7.7.19...
- 【Linux】——从0到1的学习,让你熟练掌握,带你玩转Linu
-
学习Linux并掌握Java环境配置及SpringBoot项目部署是一个系统化的过程,以下是从零开始的详细指南,帮助你逐步掌握这些技能。一、Linux基础入门1.安装Linux系统选择发行版:推荐...
- cent6.5安装gitlab-ce最新版本-11.8.2并配置邮件服务
-
cent6.5安装gitlab-ce最新版本-11.8.2并配置邮件服务(yum选择的,时间不同,版本不同)如果对运维课程感兴趣,可以在b站上搜索我的账号:运维实战课程,可以关注我,学习更多免费的运...
- 时隔三月,参加2020秋招散招,终拿字节跳动后端开发意向书.
-
3个月前头条正式批笔试4道编程题只AC了2道,然后被刷了做了200多道还是太菜了,本来对字节不抱太大希望,毕竟后台竞争太大,而且字节招客户端开发比较多。后来看到有散招免笔试,抱着试一试的心态投了,然而...
- Redisson:Java程序员手中的“魔法锁”
-
Redisson:Java程序员手中的“魔法锁”在这个万物互联的时代,分布式系统已经成为主流。然而,随着系统的扩展,共享资源的争夺成为了一个棘手的问题。就比如你想在淘宝“秒杀”一款商品,却发现抢的人太...
- 【线上故障复盘】RPC 线程池被打满,1024个线程居然不够用?
-
1.故障背景昨天晚上,我刚到家里打开公司群,就看见群里有人讨论:线上环境出现大量RPC请求报错,异常原因:被线程池拒绝。虽然异常量很大,但是异常服务非核心服务,属于系统旁路,服务于数据核对任务,即使...
- 小红书取消大小周,有人不高兴了!
-
小红书宣布五一节假日之后,取消大小周,恢复为正常的双休,乍一看工作时长变少,按道理来说大家应该都会很开心,毕竟上班时间缩短了,但是还是有一些小红书的朋友高兴不起来,心情很复杂。因为没有了大小周,以前...
- 延迟任务的多种实现方案(延迟机制)
-
场景订单超时自动取消:延迟任务典型的使用场景是订单超时自动取消。功能精确的时间控制:延时任务的时间控制要尽量准确。可靠性:延时任务的处理要是可靠的,确保所有任务最终都能被执行。这通常要求延时任务的方案...
- 百度java面试真题(java面试题下载)
-
1、SpingBoot也有定时任务?是什么注解?在SpringBoot中使用定时任务主要有两种不同的方式,一个就是使用Spring中的@Scheduled注解,另一个则是使用第三方框架Q...
- 回归基础:访问 Kubernetes Pod(concurrent.futures访问数据库)
-
Kubernetes是一头巨大的野兽。在它开始有用之前,您需要了解许多概念。在这里,学习几种访问集群外pod的方法。Kubernetes是一头巨大的野兽。在它开始有用之前,您需要了解许多不同的...
- Spring 缓存神器 @Cacheable:3 分钟学会优化高频数据访问
-
在互联网应用中,高频数据查询(如商品详情、用户信息)往往成为性能瓶颈。每次请求都触发数据库查询,不仅增加服务器压力,还会导致响应延迟。Spring框架提供的@Cacheable注解,就像给方法加了一...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Docker安装详细步骤及相关环境安装配置
- Linux下安装常用软件都有哪些?做了一个汇总列表,你看还缺啥?
- Nginx安装和使用指南详细讲解(nginx1.20安装)
- docker之安装部署Harbor(docker安装hacs)
- 成功安装 Magento2.4.3最新版教程「技术干货」
- 【Linux】——从0到1的学习,让你熟练掌握,带你玩转Linu
- cent6.5安装gitlab-ce最新版本-11.8.2并配置邮件服务
- 时隔三月,参加2020秋招散招,终拿字节跳动后端开发意向书.
- Redisson:Java程序员手中的“魔法锁”
- 【线上故障复盘】RPC 线程池被打满,1024个线程居然不够用?
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)