flink维表查询redis之flink-connector-redis
mhr18 2024-11-01 12:19 20 浏览 0 评论
插件名称:flink-connector-redis
插件地址:https://github.com/jeff-zou/flink-connector-redis.git
无法翻墙:https://gitee.com/jeff-zou/flink-connector-redis.git
项目介绍
基于bahir-flink二次开发,相对bahir调整的内容有:删除过期Flink API、增加Table/SQL API、 增加维表查询支持、增加写入与查询缓存、统一使用过期策略、写入并发数等。
因bahir使用的flink接口版本较老,所以改动较大,开发过程中参考了腾讯云与阿里云两家产商的流计算产品,取两家之长,并增加了更丰富的功能,包括更多的redis操作命令和更多的redis服务类型,如:simple sentinel cluster。
支持功能对应redis的操作命令有:
插入 | 维表查询 |
set | get |
hset | hget |
rpush lpush | |
incrBy decrBy hincrBy zincrby | |
sadd zadd pfadd(hyperloglog) | |
publish | |
zrem decrby |
使用方法:
命令行执行 mvn package -DskipTests打包后,将生成的包flink-connector-redis-1.0.2.jar引入flink lib中即可,无需其它设置。
开发环境工程直接引用:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.3</version>
</dependency>
使用说明:
无需通过primary key来映射redis中的Key,直接由ddl中的字段顺序来决定Key,如:
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
其中username为key, passport为value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
其中name为map结构的key, subject为field, score为value.
with参数说明:
字段 | 默认值 | 类型 | 说明 |
connector | (none) | String | redis |
host | (none) | String | Redis IP |
port | 6379 | Integer | Redis 端口 |
password | null | String | 如果没有设置,则为 null |
database | 0 | Integer | 默认使用 db0 |
maxTotal | 2 | Integer | 最大连接数 |
maxIdle | 2 | Integer | 最大保持连接数 |
minIdle | 1 | Integer | 最小保持连接数 |
timeout | 2000 | Integer | 连接超时时间,单位 ms,默认 1s |
cluster-nodes | (none) | String | 集群ip与端口,当redis-mode为cluster时不为空,如:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
command | (none) | String | 对应上文中的redis命令 |
redis-mode | (none) | Integer | mode类型: single cluster |
lookup.cache.max-rows | -1 | Integer | 查询缓存大小,减少对redis重复key的查询 |
lookup.cache.ttl | -1 | Integer | 查询缓存过期时间,单位为秒, 开启查询缓存条件是max-rows与ttl都不能为-1 |
lookup.max-retries | 1 | Integer | 查询失败重试次数 |
sink.cache.max-rows | -1 | Integer | 写入缓存大小,减少对redis重复写入相同的key与value |
sink.cache.ttl | -1 | Integer | 写入缓存过期时间,单位为秒, 开启缓存条件是max-rows与ttl都不能为-1 |
sink.max-retries | 1 | Integer | 写入失败重试次数 |
sink.parallelism | (none) | Integer | 写入并发数 |
集群类型为sentinel时额外连接参数:
字段 | 默认值 | 类型 | 说明 |
master.name | (none) | String | 主名 |
sentinels.info | (none) | String | |
sentinels.password | none) | String |
使用示例:
- 维表查询:
create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');
-- 先在redis中插入数据,相当于redis命令: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));
create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');
-- 随机生成10以内的数据作为数据源 --
-- 其中有一条数据会是: username = 3 level = 3, 会跟上面插入的数据关联 --
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen', 'rows-per-second'='1', 'fields.username.kind'='sequence', 'fields.username.start'='1', 'fields.username.end'='10', 'fields.level.kind'='sequence', 'fields.level.start'='1', 'fields.level.end'='10');
create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');
insert into
sink_table
select
s.username,
s.level,
d.age
from
source_table s
left join dim_table for system_time as of s.proctime as d on
d.name = s.username
and d.level = s.level;
-- username为3那一行会关联到redis内的值,输出为: 3,3,100
- DataStream查询方式
- 示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java hset示例,相当于redis命令:hset tom math 150
Configuration configuration = new Configuration();
configuration.setString(REDIS_MODE, REDIS_CLUSTER);
configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());
RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
.findRedisHandler(RedisMapperHandler.class, configuration.toMap())
.createRedisMapper(configuration);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, "tom");
genericRowData.setField(1, "math");
genericRowData.setField(2, "152");
DataStream<GenericRowData> dataStream = env.fromElements(genericRowData, genericRowData);
RedisCacheOptions redisCacheOptions = new RedisCacheOptions.Builder().setCacheMaxSize(100).setCacheTTL(10L).build();
FlinkJedisConfigBase conf = getLocalRedisClusterConfig();
RedisSinkFunction redisSinkFunction = new RedisSinkFunction<>(conf, redisMapper, redisCacheOptions);
dataStream.addSink(redisSinkFunction).setParallelism(1);
env.execute("RedisSinkTest");
- redis-cluster写入示例
- 示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLTest.java set示例,相当于redis命令: set test test11
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
"'cluster-nodes'='10.11.80.147:7000,10.11.80.147:7001','redis- mode'='cluster','password'='******','command'='set')" ;
tEnv.executeSql(ddl);
String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get()
.getJobExecutionResult()
.get();
开发与测试环境
ide: IntelliJ IDEA
code format: google-java-format + Save Actions
code check: CheckStyle
相关推荐
- Java培训机构,你选对了吗?(java培训机构官网)
-
如今IT行业发展迅速,不仅是大学生,甚至有些在职的员工都想学习java开发,需求量的扩大,薪资必定增长,这也是更多人选择java开发的主要原因。不过对于没有基础的学员来说,java技术不是一两天就能...
- 产品经理MacBook软件清单-20个实用软件
-
三年前开始使用MacBookPro,从此再也不想用Windows电脑了,作为生产工具,MacBook可以说是非常胜任。作为产品经理,值得拥有一台MacBook。MacBook是工作平台,要发挥更大作...
- RAD Studio(Delphi) 本月隆重推出新的版本12.3
-
#在头条记录我的2025#自2024年9月,推出Delphi12.2版本后,本月隆重推出新的版本12.3,RADStudio12.3,包含了Delphi12.3和C++builder12.3最...
- 图解Java垃圾回收机制,写得非常好
-
什么是自动垃圾回收?自动垃圾回收是一种在堆内存中找出哪些对象在被使用,还有哪些对象没被使用,并且将后者删掉的机制。所谓使用中的对象(已引用对象),指的是程序中有指针指向的对象;而未使用中的对象(未引用...
- Centos7 初始化硬盘分区、挂载(针对2T以上)添加磁盘到卷
-
1、通过命令fdisk-l查看硬盘信息:#fdisk-l,发现硬盘为/dev/sdb大小4T。2、如果此硬盘以前有过分区,则先对磁盘格式化。命令:mkfs.文件系统格式-f/dev/sdb...
- 半虚拟化如何提高服务器性能(虚拟化 半虚拟化)
-
半虚拟化是一种重新编译客户机操作系统(OS)将其安装在虚拟机(VM)上的一种虚拟化类型,并在主机操作系统(OS)运行的管理程序上运行。与传统的完全虚拟化相比,半虚拟化可以减少开销,并提高系统性能。虚...
- HashMap底层实现原理以及线程安全实现
-
HashMap底层实现原理数据结构:HashMap的底层实现原理主要依赖于数组+链表+红黑树的结构。1、数组:HashMap最底层是一个数组,称为table,它存放着键值对。2、链...
- long和double类型操作的非原子性探究
-
前言“深入java虚拟机”中提到,int等不大于32位的基本类型的操作都是原子操作,但是某些jvm对long和double类型的操作并不是原子操作,这样就会造成错误数据的出现。其实这里的某些jvm是指...
- 数据库DELETE 语句,还保存原有的磁盘空间
-
MySQL和Oracle的DELETE语句与数据存储MySQL的DELETE操作当你在MySQL中执行DELETE语句时:逻辑删除:数据从表中标记为删除,不再可见于查询结果物理...
- 线程池—ThreadPoolExecutor详解(线程池实战)
-
一、ThreadPoolExecutor简介在juc-executors框架概述的章节中,我们已经简要介绍过ThreadPoolExecutor了,通过Executors工厂,用户可以创建自己需要的执...
- navicat如何使用orcale(详细步骤)
-
前言:看过我昨天文章的同鞋都知道最近接手另一个国企项目,数据库用的是orcale。实话实说,也有快三年没用过orcale数据库了。这期间问题不断,因为orcale日渐消沉,网上资料也是真真假假,难辨虚...
- 你的程序是不是慢吞吞?GraalVM来帮你飞起来性能提升秘籍大公开
-
各位IT圈内外的朋友们,大家好!我是你们的老朋友,头条上的IT技术博主。不知道你们有没有这样的经历:打开一个软件,半天没反应;点开一个网站,图片刷不出来;或者玩个游戏,卡顿得想砸电脑?是不是特别上火?...
- 大数据正当时,理解这几个术语很重要
-
目前,大数据的流行程度远超于我们的想象,无论是在云计算、物联网还是在人工智能领域都离不开大数据的支撑。那么大数据领域里有哪些基本概念或技术术语呢?今天我们就来聊聊那些避不开的大数据技术术语,梳理并...
- 秒懂列式数据库和行式数据库(列式数据库的特点)
-
行式数据库(Row-Based)数据按行存储,常见的行式数据库有Mysql,DB2,Oracle,Sql-server等;列数据库(Column-Based)数据存储方式按列存储,常见的列数据库有Hb...
- AMD发布ROCm 6.4更新:带来了多项底层改进,但仍不支持RDNA 4
-
AMD宣布,对ROCm软件栈进行了更新,推出了新的迭代版本ROCm6.4。这一新版本里,AMD带来了多项底层改进,包括更新改进了ROCm的用户空间库和AMDKFD内核驱动程序之间的兼容性,使其更容易...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)