flink维表查询redis之flink-connector-redis
mhr18 2024-11-01 12:19 15 浏览 0 评论
插件名称:flink-connector-redis
插件地址:https://github.com/jeff-zou/flink-connector-redis.git
无法翻墙:https://gitee.com/jeff-zou/flink-connector-redis.git
项目介绍
基于bahir-flink二次开发,相对bahir调整的内容有:删除过期Flink API、增加Table/SQL API、 增加维表查询支持、增加写入与查询缓存、统一使用过期策略、写入并发数等。
因bahir使用的flink接口版本较老,所以改动较大,开发过程中参考了腾讯云与阿里云两家产商的流计算产品,取两家之长,并增加了更丰富的功能,包括更多的redis操作命令和更多的redis服务类型,如:simple sentinel cluster。
支持功能对应redis的操作命令有:
插入 | 维表查询 |
set | get |
hset | hget |
rpush lpush | |
incrBy decrBy hincrBy zincrby | |
sadd zadd pfadd(hyperloglog) | |
publish | |
zrem decrby |
使用方法:
命令行执行 mvn package -DskipTests打包后,将生成的包flink-connector-redis-1.0.2.jar引入flink lib中即可,无需其它设置。
开发环境工程直接引用:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.3</version>
</dependency>
使用说明:
无需通过primary key来映射redis中的Key,直接由ddl中的字段顺序来决定Key,如:
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
其中username为key, passport为value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
其中name为map结构的key, subject为field, score为value.
with参数说明:
字段 | 默认值 | 类型 | 说明 |
connector | (none) | String | redis |
host | (none) | String | Redis IP |
port | 6379 | Integer | Redis 端口 |
password | null | String | 如果没有设置,则为 null |
database | 0 | Integer | 默认使用 db0 |
maxTotal | 2 | Integer | 最大连接数 |
maxIdle | 2 | Integer | 最大保持连接数 |
minIdle | 1 | Integer | 最小保持连接数 |
timeout | 2000 | Integer | 连接超时时间,单位 ms,默认 1s |
cluster-nodes | (none) | String | 集群ip与端口,当redis-mode为cluster时不为空,如:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
command | (none) | String | 对应上文中的redis命令 |
redis-mode | (none) | Integer | mode类型: single cluster |
lookup.cache.max-rows | -1 | Integer | 查询缓存大小,减少对redis重复key的查询 |
lookup.cache.ttl | -1 | Integer | 查询缓存过期时间,单位为秒, 开启查询缓存条件是max-rows与ttl都不能为-1 |
lookup.max-retries | 1 | Integer | 查询失败重试次数 |
sink.cache.max-rows | -1 | Integer | 写入缓存大小,减少对redis重复写入相同的key与value |
sink.cache.ttl | -1 | Integer | 写入缓存过期时间,单位为秒, 开启缓存条件是max-rows与ttl都不能为-1 |
sink.max-retries | 1 | Integer | 写入失败重试次数 |
sink.parallelism | (none) | Integer | 写入并发数 |
集群类型为sentinel时额外连接参数:
字段 | 默认值 | 类型 | 说明 |
master.name | (none) | String | 主名 |
sentinels.info | (none) | String | |
sentinels.password | none) | String |
使用示例:
- 维表查询:
create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');
-- 先在redis中插入数据,相当于redis命令: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));
create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');
-- 随机生成10以内的数据作为数据源 --
-- 其中有一条数据会是: username = 3 level = 3, 会跟上面插入的数据关联 --
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen', 'rows-per-second'='1', 'fields.username.kind'='sequence', 'fields.username.start'='1', 'fields.username.end'='10', 'fields.level.kind'='sequence', 'fields.level.start'='1', 'fields.level.end'='10');
create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');
insert into
sink_table
select
s.username,
s.level,
d.age
from
source_table s
left join dim_table for system_time as of s.proctime as d on
d.name = s.username
and d.level = s.level;
-- username为3那一行会关联到redis内的值,输出为: 3,3,100
- DataStream查询方式
- 示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java hset示例,相当于redis命令:hset tom math 150
Configuration configuration = new Configuration();
configuration.setString(REDIS_MODE, REDIS_CLUSTER);
configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());
RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
.findRedisHandler(RedisMapperHandler.class, configuration.toMap())
.createRedisMapper(configuration);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, "tom");
genericRowData.setField(1, "math");
genericRowData.setField(2, "152");
DataStream<GenericRowData> dataStream = env.fromElements(genericRowData, genericRowData);
RedisCacheOptions redisCacheOptions = new RedisCacheOptions.Builder().setCacheMaxSize(100).setCacheTTL(10L).build();
FlinkJedisConfigBase conf = getLocalRedisClusterConfig();
RedisSinkFunction redisSinkFunction = new RedisSinkFunction<>(conf, redisMapper, redisCacheOptions);
dataStream.addSink(redisSinkFunction).setParallelism(1);
env.execute("RedisSinkTest");
- redis-cluster写入示例
- 示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLTest.java set示例,相当于redis命令: set test test11
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
"'cluster-nodes'='10.11.80.147:7000,10.11.80.147:7001','redis- mode'='cluster','password'='******','command'='set')" ;
tEnv.executeSql(ddl);
String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get()
.getJobExecutionResult()
.get();
开发与测试环境
ide: IntelliJ IDEA
code format: google-java-format + Save Actions
code check: CheckStyle
相关推荐
- Docker集群管理之Docker Compose
-
前言:在上一篇《Docker集群管理之DockerMachine》中,我们通过源码分析了解了DockerMachine的工作原理,使用者可以通过DockerMachine的一条命令在任意支持的平...
- 使用Dockerfile build镜像
-
Docker映像可以看作是Docker容器的压缩包,包含了应用程序以及运行应用程序所需的依赖,容器是映像的运行时实例。一般构建镜像都是使用dockerfile进行构建而不是dockercommit,...
- 自建私有云相册:Docker一键部署Immich,照片视频备份利器
-
自建私有云相册:Docker一键部署Immich,照片视频备份利器前言随着人们手机、PC、平板等电子产品多样,我们拍摄和保存的照片和视频数量也在不断增加。如何高效地管理和备份这些珍贵的记忆成为了一个重...
- docker容器的使用以及部署mysql
-
首先什么是docker官方:翻译:Docker是一个用于开发、发布和运行应用程序的开放平台。Docker使您能够将应用程序与基础架构分离,以便您可以快速交付软件。使用Docker,您可以像管理应...
-
- 自建Docker镜像加速服务,免费且简单,服务器VPS、NAS皆可用
-
写在前面:流程十分简单,有手就行,还请耐心看完。本文的实例仅做演示用,不久后将会删除,有需要的各位请自行搭建。免费实例如果15分钟内未收到入站流量,Render会关闭实例的网络服务。Render会在下次收到处理请求时重新启动该服务。Ren...
-
2025-05-24 15:40 mhr18
- 用了8年的方式-用 Docker 瞬间搭建本地开发环境
-
有些时候我们需要在本地搭开发环境,比如平时学习新技术的时候。或者有时候公司的项目需要在本地建一套类似的,方便调试修改。开发环境可能包括MySQL、Redis、Nginx、MQ、Elasticsea...
- 使用dockerfile构建docker镜像
-
准备工作购买vps使用ssh工具连接上1、更新系统aptupdate-y2、一键安装Dockercurl-fsSLhttps://get.docker.com-oget-docker.sh...
- 快速搭建 SpringCloud 微服务开发环境的脚手架
-
本文适合有SpringBoot和SpringCloud基础知识的人群,跟着本文可使用和快速搭建SpringCloud项目。本文作者:HelloGitHub-秦人HelloGitHub推出...
- Docker Hub最全详解(图文全面总结)
-
DockerHubDockerHub是一个由Docker公司负责维护的公共注册中心,它包含了超过15000多个可用来下载和构建容器的Docker镜像。DockerHub作用Docker好比一个代...
- Docker 命令详解
-
dockerimages—查看本地镜像命令dockerimages说明列出本地已下载的所有镜像及其标签、ID、大小等信息。适用场景查看本地镜像资源、准备删除或管理镜像时。常见用法docker...
- Kylin安装Dify
-
cd/mntgitclonehttps://github.com/langgenius/dify.gitcp/mnt/dify/docker/.env.example/mnt/dif...
- kali下对Docker的详细安装
-
Docker是渗透测试中必学不可的一个容器工具,在其中,我们能够快速创建、运行、测试以及部署应用程序。如,我们对一些漏洞进行本地复现时,可以使用Docker快速搭建漏洞环境,完成复现学习。注:本教程仅...
- 银河麒麟V10使用Docker方式部署应用
-
现在越来越多的企业级应用需要运行在国产化环境中,而银河麒麟V10是目前我碰到的最常用的服务器,在银河麒麟上部署应用有两种方式:使用二进制文件编译部署和使用Docker。关于使用二进制文件的方式...
- Docker入门到精通超详细教程,Docker全家桶实战攻略
-
大家好,我是各位双生的武魂、随身老爷爷。从看到这篇内容开始,你就是被选定的天命骚年,将承担起学完docker教程的使命,本使命为单向契约,你可选择YES或者选择YES。正式学习之前,我先给大家做一下d...
- 【Docker 新手入门指南】第一章:前言
-
一、基本介绍Docker介绍Docker是基于Go语言开发的开源容器化平台,旨在实现“一次镜像,处处运行”。它通过将应用程序及其依赖环境(代码、运行时、系统工具、系统库等)打包成一个轻量级、可移...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)