Flink中处理维表关联技术实现路径
mhr18 2025-05-27 16:04 7 浏览 0 评论
在 Flink 中处理维表关联大体氛围Table SQL Lookup Join 和 DataStream 算子函数,主要技术实现路径:
I. Flink SQL/Table API 中的 Lookup Join
Flink SQL/Table API 提供了 LOOKUP JOIN 语法,专门用于将流式数据与维表(通常是存储在外部系统中的批数据)进行关联。其核心在于通过异步或同步的方式查询维表,避免阻塞流处理。
1. 同步 Lookup Join
实现路径:
- JDBC Lookup Function (MySQL, PostgreSQL等关系型数据库): Flink 内部提供了 JDBC connector,可以通过配置来连接关系型数据库。当流数据到来时,Flink 会向数据库发送查询请求,同步等待查询结果。
- HBase Lookup Function: Flink 也提供了 HBase Connector,可以用于与 HBase 进行同步关联。
- Redis Lookup Function: 虽然没有内置的 Redis Connector 作为 Lookup Source,但可以通过自定义 TableFunction 或 AsyncTableFunction 来实现同步或异步与 Redis 的关联。
优点:
- 实现简单: 对于支持的数据库,配置相对简单,直接使用 SQL 语法即可。
- 数据一致性高: 每次查询都是从最新维表数据获取,保证了数据的新鲜度(在不考虑数据库同步延迟的情况下)。
缺点:
- 性能瓶颈: 同步查询会阻塞 Flink 算子,如果维表查询延迟较高,或者查询并发量大,会严重影响流处理的吞吐量和延迟。这在处理高吞吐量流数据时是一个显著的缺点。
- 外部依赖: 强依赖外部数据库的性能和可用性。
- 不适合高并发场景: 容易导致数据库连接池耗尽或数据库负载过高。
示例代码:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SyncLookupJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 示例中使用单并行度
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 1. 定义一个订单数据源 (这里使用 VALUES 生成一个模拟的流)
// 实际应用中可以替换为 Kafka, Pulsar, Kinesis 等数据源
tEnv.executeSql(
"CREATE TEMPORARY TABLE orders (\n" +
" order_id INT,\n" +
" product_id INT,\n" +
" amount DOUBLE,\n" +
" proctime AS PROCTIME() -- 处理时间字段\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" + // 使用 datagen 连接器模拟数据
" 'rows-per-second' = '1',\n" + // 每秒生成一条数据
" 'fields.order_id.min' = '1',\n" +
" 'fields.order_id.max' = '1000',\n" +
" 'fields.product_id.min' = '101',\n" + // 模拟关联 Product ID
" 'fields.product_id.max' = '202',\n" +
" 'fields.amount.min' = '10.0',\n" +
" 'fields.amount.max' = '1000.0'\n" +
")"
);
// 2. 定义一个维表数据源 (MySQL JDBC Lookup Table)
tEnv.executeSql(
"CREATE TEMPORARY TABLE products_dim (\n" +
" product_id INT,\n" +
" product_name STRING,\n" +
" category STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/testdb',\n" + // 替换为你的 MySQL 地址和数据库名
" 'table-name' = 'products',\n" +
" 'username' = 'root',\n" + // 替换为你的 MySQL 用户名
" 'password' = 'your_password', \n" + // 替换为你的 MySQL 密码
" 'lookup.cache.max-rows' = '1000',\n" + // JDBC Lookup 支持缓存
" 'lookup.cache.ttl' = '10min',\n" + // 缓存过期时间
" 'lookup.max-retries' = '3' \n" + // 查询失败重试次数
")"
);
// 3. 执行 Lookup Join 查询
Table resultTable = tEnv.sqlQuery(
"SELECT\n" +
" o.order_id,\n" +
" o.product_id,\n" +
" p.product_name,\n" +
" p.category,\n" +
" o.amount\n" +
"FROM\n" +
" orders AS o\n" +
"JOIN products_dim AS p ON o.product_id = p.product_id" // 简单的等值 JOIN 即可
);
// 4. 将结果打印到控制台 (或输出到其他 Sink)
resultTable.execute().print();
// Flink Job 会持续运行,直到手动停止
// env.execute("Sync Lookup Join Example"); // 对于execute().print()不需要再execute()
}
}
如果你的 MySQL 响应慢,你会发现 Flink 的输出也会变慢,因为它是同步阻塞的
2. 异步 Lookup Join (推荐)
实现路径:
- 自定义 AsyncTableFunction: 这是实现异步 Lookup Join 的核心和推荐方式。
- 原理: AsyncTableFunction 允许用户编写异步查询逻辑。当流数据到来时,Flink 会并发地向外部维表发起查询请求,但不会阻塞当前处理线程。查询结果返回后,通过 ResultFuture 回调通知 Flink,将维表数据与流数据进行合并。
- 适用场景: 适用于任何支持异步客户端的外部存储,例如: Redis: 使用 Jedis 或 Lettuce 等异步客户端。 HBase (Async Client): 使用 HBase 提供的异步客户端。 Cassandra: 使用 DataStax Java Driver。 ClickHouse (异步JDBC或HTTP API): 虽然是关系型数据库,但如果其驱动支持异步,或通过 HTTP API 异步访问,也可以实现。 HTTP API (微服务接口): 调用外部微服务接口获取维表数据。
- 实现步骤: 实现 AsyncTableFunction 接口。 在 eval 方法中发起异步查询,并传入 ResultFuture。 当异步查询返回结果时,调用 ResultFuture.complete() 来通知 Flink。 注册自定义的 AsyncTableFunction 到 Table Environment。 在 SQL 中使用 LATERAL TABLE() 语法进行关联。
优点:
- 高吞吐量和低延迟: 非阻塞查询,允许多个并发查询同时进行,极大地提高了吞吐量,降低了端到端延迟。
- 资源利用率高: 充分利用 I/O 资源,避免了线程阻塞。
- 灵活性强: 可以集成任何支持异步查询的外部存储或服务。
- 背压能力: Flink 的异步 I/O 组件提供了内置的背压机制,可以根据下游处理能力自动调整并发度,防止外部系统过载。
缺点:
- 实现复杂度相对较高: 需要编写异步查询逻辑,处理回调和异常。
- 外部系统支持: 依赖外部系统提供的异步客户端或接口。
示例代码:
首先,我们需要创建一个自定义的 AsyncRedisLookupFunction。
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Map;
public class AsyncRedisLookupFunction extends AsyncTableFunction<RowData> {
private final String redisHost;
private final int redisPort;
private final String redisPassword; // 如果有密码
private transient JedisPool jedisPool; // transient 表示不参与序列化
private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步
public AsyncRedisLookupFunction(String redisHost, int redisPort, String redisPassword) {
this.redisHost = redisHost;
this.redisPort = redisPort;
this.redisPassword = redisPassword;
}
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100); // 最大连接数
poolConfig.setMaxIdle(20); // 最大空闲连接数
poolConfig.setMinIdle(5); // 最小空闲连接数
poolConfig.setTestOnBorrow(true); // 借用连接时测试
poolConfig.setTestOnReturn(true); // 归还连接时测试
poolConfig.setTestWhileIdle(true); // 空闲时测试
if (redisPassword != null && !redisPassword.isEmpty()) {
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
} else {
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
}
// 创建一个线程池来模拟异步查询。
// 实际生产中,更推荐使用 Lettuce 等真正的异步 Redis 客户端。
this.executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2 // 通常设置为 CPU 核数的两倍
);
}
@Override
public void close() throws Exception {
if (jedisPool != null) {
jedisPool.close();
}
if (executorService != null) {
executorService.shutdown();
}
super.close();
}
// eval 方法接收流中的 join key,并返回一个 CompletableFuture
// Flink 将在 CompletableFuture 完成时收集结果
public void eval(CompletableFuture<Collector<RowData>> resultFuture, Integer productId) {
executorService.submit(() -> {
try (Jedis jedis = jedisPool.getResource()) {
String key = "product:" + productId;
Map<String, String> productInfo = jedis.hgetAll(key);
if (productInfo != null && !productInfo.isEmpty()) {
String productName = productInfo.get("name");
String category = productInfo.get("category");
// 返回一个 RowData,包含维表查询到的字段
// 顺序与定义 Table 时声明的字段顺序一致
resultFuture.complete(new SimpleCollector<>(
Collections.singletonList(GenericRowData.of(
StringData.fromString(productName),
StringData.fromString(category)
))
));
} else {
// 如果未找到,返回空结果
resultFuture.complete(new SimpleCollector<>(Collections.emptyList()));
}
} catch (Exception e) {
// 处理异常,可以返回空结果或抛出异常让 Flink 处理
resultFuture.completeExceptionally(e);
}
});
}
// 辅助类,用于将 List<RowData> 收集到 Collector 中
private static class SimpleCollector<T> implements Collector<T> {
private final java.util.List<T> list;
public SimpleCollector(java.util.List<T> list) {
this.list = list;
}
@Override
public void collect(T record) {
list.add(record); // 实际上这里我们只收集一个结果
}
@Override
public void close() {
// No-op
}
}
}
现在,我们将在 Flink SQL 中注册并使用这个 AsyncRedisLookupFunction。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeComparators;
import org.apache.flink.table.types.utils.TypeConversions;
import java.util.Collections;
import static org.apache.flink.table.api.DataTypes.*;
public class AsyncLookupJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 示例中使用单并行度,但异步 I/O 可以并行
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 1. 定义一个订单数据源 (Datagen 生成模拟流)
tEnv.executeSql(
"CREATE TEMPORARY TABLE orders (\n" +
" order_id INT,\n" +
" product_id INT,\n" +
" amount DOUBLE,\n" +
" proctime AS PROCTIME() -- 处理时间字段\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '10',\n" + // 提高生成速度,更明显地展示异步优势
" 'fields.order_id.min' = '1',\n" +
" 'fields.order_id.max' = '1000',\n" +
" 'fields.product_id.min' = '101',\n" +
" 'fields.product_id.max' = '202',\n" + // 确保 product_id 在 Redis 示例范围内
" 'fields.amount.min' = '10.0',\n" +
" 'fields.amount.max' = '1000.0'\n" +
")"
);
// 2. 注册自定义的 AsyncRedisLookupFunction
// 构造函数参数: redisHost, redisPort, redisPassword
tEnv.createTemporarySystemFunction(
"AsyncRedisProducts",
new AsyncRedisLookupFunction("localhost", 6379, "") // 替换为你的 Redis 地址、端口和密码
);
// 3. 执行 Lookup Join 查询
// 使用 LATERAL TABLE(FunctionName(lookup_keys)) 语法
// 注意:AsyncRedisProducts 函数的返回类型决定了 JOIN 后的字段名
// AsyncRedisLookupFunction 返回的是 RowData,包含 (product_name, category)
Table resultTable = tEnv.sqlQuery(
"SELECT\n" +
" o.order_id,\n" +
" o.product_id,\n" +
" T.product_name,\n" + // T 是 LATERAL TABLE 的别名
" T.category,\n" +
" o.amount\n" +
"FROM\n" +
" orders AS o,\n" + // 注意这里是逗号连接,而不是 JOIN
" LATERAL TABLE(AsyncRedisProducts(o.product_id)) AS T(product_name, category)" // 指定返回的字段名
);
// 4. 将结果打印到控制台
resultTable.execute().print();
}
}
查询 Redis 是异步进行的,不会阻塞主线程。
II. DataStream API 中的维表关联
在 DataStream API 中,没有直接的 LOOKUP JOIN 概念,但可以通过自定义函数和状态管理来实现维表关联。
1. 使用 RichFlatMapFunction/RichMapFunction + 缓存
实现路径:
- 原理: 在 open() 方法中初始化外部维表连接或加载部分维表数据到内存(如果维表数据量不大)。在 map() 或 flatMap() 方法中,当流数据到来时,首先尝试从本地缓存中查询维表数据。如果缓存未命中,则向外部维表发起同步查询,并将结果更新到缓存中。
- 缓存策略: LRU Cache: 维护一个固定大小的最近最少使用缓存。 TTL Cache: 给缓存中的数据设置过期时间,定期清理过期数据。
- 适用场景: 维表数据量不大,可以完全或部分加载到内存中。 对数据新鲜度要求不是极高,允许一定的缓存延迟。 对实时性要求较高,但外部维表查询延迟较低。
优点:
- 性能提升: 命中缓存可以避免与外部存储的交互,显著提升性能。
- 降低外部系统压力: 减少对外部维表的查询次数。
- 实现相对简单: 比异步 I/O 相对容易理解和实现。
缺点:
- 数据新鲜度问题: 缓存可能导致数据不新鲜,特别是维表数据更新频繁时。需要实现合适的缓存失效和更新机制。
- 内存消耗: 维表数据如果过大,可能导致内存溢出。
- 并发瓶颈: 如果缓存未命中,仍需进行同步查询,在高并发下仍可能成为瓶颈。
- 缺乏内置背压: 需要手动管理与外部系统的交互,没有像异步 I/O 那样内置的背压机制。
示例代码:
RichFlatMapFunction 实现
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RedisProductCachedFlatMapFunction extends RichFlatMapFunction<Order, EnrichedOrder> {
private final String redisHost;
private final int redisPort;
private final String redisPassword;
private transient JedisPool jedisPool;
private transient Map<Integer, ProductInfo> cache; // LRU 缓存
private final int cacheMaxSize;
private final long cacheTTL; // 缓存过期时间,单位毫秒
// 内部类,存储产品信息
private static class ProductInfo {
String productName;
String category;
long timestamp; // 记录缓存时间,用于TTL
public ProductInfo(String productName, String category) {
this.productName = productName;
this.category = category;
this.timestamp = System.currentTimeMillis();
}
}
public RedisProductCachedFlatMapFunction(String redisHost, int redisPort, String redisPassword, int cacheMaxSize, long cacheTTL) {
this.redisHost = redisHost;
this.redisPort = redisPort;
this.redisPassword = redisPassword;
this.cacheMaxSize = cacheMaxSize;
this.cacheTTL = cacheTTL;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(50); // 连接数可以适当减少,因为有缓存
poolConfig.setMaxIdle(10);
if (redisPassword != null && !redisPassword.isEmpty()) {
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
} else {
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
}
// 实现一个简单的 LRU 缓存,带 TTL
this.cache = new LinkedHashMap<Integer, ProductInfo>(cacheMaxSize, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<Integer, ProductInfo> eldest) {
// 移除最老或过期的条目
return size() > cacheMaxSize || (System.currentTimeMillis() - eldest.getValue().timestamp > cacheTTL);
}
};
}
@Override
public void close() throws Exception {
if (jedisPool != null) {
jedisPool.close();
}
super.close();
}
@Override
public void flatMap(Order order, Collector<EnrichedOrder> out) throws Exception {
ProductInfo productInfo = cache.get(order.productId);
long currentTime = System.currentTimeMillis();
// 检查缓存是否存在且未过期
if (productInfo != null && (currentTime - productInfo.timestamp) <= cacheTTL) {
// 缓存命中
out.collect(new EnrichedOrder(order, productInfo.productName, productInfo.category));
} else {
// 缓存未命中或已过期,从 Redis 查询
try (Jedis jedis = jedisPool.getResource()) {
String key = "product:" + order.productId;
Map<String, String> redisResult = jedis.hgetAll(key);
if (redisResult != null && !redisResult.isEmpty()) {
String productName = redisResult.get("name");
String category = redisResult.get("category");
productInfo = new ProductInfo(productName, category);
cache.put(order.productId, productInfo); // 更新缓存
out.collect(new EnrichedOrder(order, productName, category));
} else {
// 未找到维表数据,选择丢弃或使用默认值
// System.err.println("Product ID " + order.productId + " not found in Redis.");
// out.collect(new EnrichedOrder(order, "UNKNOWN", "UNKNOWN")); // 示例:使用默认值
}
} catch (Exception e) {
System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
// out.collect(new EnrichedOrder(order, "ERROR_PRODUCT", "ERROR_CATEGORY")); // 错误时使用默认值
}
}
}
}
DataStream Job 代码
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class DataStreamCachedLookupExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // 可以设置多个并行度,每个并行度会维护自己的缓存
// 1. 模拟订单数据流
DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
private volatile boolean isRunning = true;
private final Random random = new Random();
private long orderCounter = 0;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (isRunning) {
orderCounter++;
int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
double amount = 10.0 + random.nextDouble() * 990.0;
ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
Thread.sleep(50); // 每50毫秒生成一个订单
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// 2. 使用 RichFlatMapFunction 进行维表关联,带缓存
DataStream<EnrichedOrder> enrichedStream = orderStream.flatMap(
new RedisProductCachedFlatMapFunction(
"localhost", 6379, "", // 替换为你的Redis地址和密码
1000, // 缓存最大条目数
5 * 60 * 1000L // 缓存TTL 5分钟 (5 * 60 * 1000 毫秒)
)
);
// 3. 打印结果
enrichedStream.print();
env.execute("DataStream Cached Lookup Join Example");
}
}
观察控制台输出。你会注意到,当缓存生效时,处理速度会非常快。如果产品 ID 命中缓存,则不会访问 Redis。当新的产品 ID 出现或缓存过期时,才会触发对 Redis 的查询。
2. 使用 AsyncDataStream (异步 I/O)
实现路径:
- 原理: 这是 DataStream API 中实现异步维表关联的推荐方式,与 Table API 的 AsyncTableFunction 原理类似。用户实现 AsyncFunction 接口,在 asyncInvoke() 方法中发起异步查询,并通过 ResultFuture 返回结果。
- 适用场景: 同步 Lookup Join 的异步版本在 DataStream API 中的实现,适用于对性能、吞吐量和延迟有高要求的场景,并且外部系统支持异步客户端。
优点:
- 与 Table API 的异步 Lookup Join 类似,具有相同的优点: 高吞吐量、低延迟、高资源利用率、灵活性强、内置背压机制。
缺点:
- 与 Table API 的异步 Lookup Join 类似,具有相同的缺点: 实现复杂度相对较高,依赖外部系统提供的异步客户端或接口。
示例代码:
AsyncFunction 实现
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.configuration.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class RedisProductAsyncFunction extends RichAsyncFunction<Order, EnrichedOrder> {
private final String redisHost;
private final int redisPort;
private final String redisPassword;
private transient JedisPool jedisPool;
private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步
public RedisProductAsyncFunction(String redisHost, int redisPort, String redisPassword) {
this.redisHost = redisHost;
this.redisPort = redisPort;
this.redisPassword = redisPassword;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
if (redisPassword != null && !redisPassword.isEmpty()) {
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
} else {
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
}
this.executorService = Executors.newFixedThreadPool(
getRuntimeContext().getNumberOfParallelSubtasks() * 2 // 每个并行度至少2个线程
);
}
@Override
public void close() throws Exception {
if (jedisPool != null) {
jedisPool.close();
}
if (executorService != null) {
executorService.shutdown();
}
super.close();
}
@Override
public void asyncInvoke(Order order, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
executorService.submit(() -> {
try (Jedis jedis = jedisPool.getResource()) {
String key = "product:" + order.productId;
Map<String, String> productInfo = jedis.hgetAll(key);
if (productInfo != null && !productInfo.isEmpty()) {
String productName = productInfo.get("name");
String category = productInfo.get("category");
EnrichedOrder enrichedOrder = new EnrichedOrder(order, productName, category);
resultFuture.complete(Collections.singletonList(enrichedOrder));
} else {
// 如果未找到维表数据,可以选择丢弃,或者用默认值填充
// 这里选择丢弃,也可以返回一个只包含订单信息的 EnrichedOrder
resultFuture.complete(Collections.emptyList());
}
} catch (Exception e) {
// 处理异常,例如打印日志,并返回空列表或原始数据
System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
resultFuture.complete(Collections.emptyList()); // 发生异常时,丢弃该订单
}
});
}
// 可选:处理超时,当异步请求在指定时间内未完成时调用
@Override
public void timeout(Order input, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
System.err.println("Async lookup timeout for order: " + input.orderId);
// 超时时可以选择返回原始数据,或丢弃
resultFuture.complete(Collections.singletonList(new EnrichedOrder(input, "UNKNOWN", "UNKNOWN")));
}
}
DataStream Job 代码
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DataStreamAsyncLookupExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // 可以设置多个并行度,体验异步I/O的并发性
// 1. 模拟订单数据流
DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
private volatile boolean isRunning = true;
private final Random random = new Random();
private long orderCounter = 0;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (isRunning) {
orderCounter++;
int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
double amount = 10.0 + random.nextDouble() * 990.0;
ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
Thread.sleep(50); // 每50毫秒生成一个订单,模拟高吞吐
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// 2. 使用 AsyncDataStream 进行异步维表关联
// orderStream: 输入流
// new RedisProductAsyncFunction(...): 异步函数实例
// 100: 最大并发异步请求数 (maxConcurrentRequests)
// 1000: 超时时间 (timeout)
// TimeUnit.MILLISECONDS: 超时时间单位
DataStream<EnrichedOrder> enrichedStream = AsyncDataStream.unorderedWait(
orderStream,
new RedisProductAsyncFunction("localhost", 6379, ""), // 替换为你的Redis地址和密码
1000, // 超时时间,例如1000毫秒
TimeUnit.MILLISECONDS,
100 // 最大并发请求数,可以调整
);
// 也可以使用 orderedWait 保持事件顺序,但通常 unorderedWait 性能更高
// 3. 打印结果
enrichedStream.print();
env.execute("DataStream Async Lookup Join Example");
}
}
3. 使用 Managed State (Stateful Functions) + 定期更新
实现路径:
- 原理: 将维表数据作为 Flink Managed State(如 ValueState, MapState)存储在 Flink 内部,通常与 Keyed State 结合使用。通过一个独立的流(或定时任务)定期从外部维表读取最新数据,并更新到 Flink 的 Managed State 中。当主数据流到来时,直接从本地状态中查询维表数据。
- 更新机制: 全量同步: 定期拉取全量维表数据。 增量同步: 如果维表支持,可以通过 CDC (Change Data Capture) 或消息队列(如 Kafka)订阅维表的变化,将变化数据作为单独的流推送到 Flink,然后更新到 Managed State。
- 适用场景: 维表数据量大,无法全部加载到每个 TaskManager 的内存中,但仍希望在 Flink 内部实现高性能查询。 对数据新鲜度有一定容忍度,允许维表数据与外部源存在短暂延迟。 需要利用 Flink 的状态管理能力(如状态快照、故障恢复)。
优点:
- 极高查询性能: 维表数据在 Flink 内部,查询速度极快,避免了网络 I/O 延迟。
- 故障恢复: 维表数据作为 Managed State,可以随着 Flink 的状态快照和故障恢复机制得到保护。
- 独立更新: 维表更新与主数据流处理分离,不会阻塞主数据流。
- 降低外部系统压力: 维表查询不再直接访问外部系统,而是访问 Flink 内部状态。
缺点:
- 数据新鲜度: 维表数据与外部源存在更新延迟,取决于更新频率和机制。
- 实现复杂度高: 需要管理状态的更新逻辑,特别是增量更新。
- 状态管理开销: 维表数据如果非常大,会增加 Flink State 的存储和快照开销。
- 扩容和再平衡: 如果维表数据是 Keyed State,当 Flink 集群扩容或任务再平衡时,状态迁移可能会带来开销。
总结与选择建议:
实现路径 | 适用场景 | 优点 | 缺点 |
SQL/Table API - 同步 Lookup Join | 简单验证、低吞吐量、对性能要求不高。 | 实现简单,易于上手。 | 性能瓶颈,不适合高并发,强依赖外部数据库性能。 |
SQL/Table API - 异步 Lookup Join | 推荐,对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。 | 高吞吐量,低延迟,高资源利用率,内置背压。 | 实现复杂度相对高,依赖外部系统异步客户端。 |
DataStream - 缓存 (RichFunction) | 维表数据量小,对数据新鲜度有一定容忍,且外部维表查询延迟较低。 | 命中缓存性能高,降低外部系统压力,实现相对简单。 | 数据新鲜度问题,内存消耗,无内置背压。 |
DataStream - AsyncDataStream | 对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。 | 同 Table API 异步 Lookup Join 的优点。 | 同 Table API 异步 Lookup Join 的缺点。 |
DataStream - Managed State (定期更新) | 维表数据量大,但希望高性能查询,对数据新鲜度有一定容忍,需要 Flink 状态管理。 | 极高查询性能,故障恢复,独立更新,降低外部系统压力。 | 数据新鲜度有延迟,实现复杂度高,状态管理开销,扩容开销。 |
选择建议:
- 首选异步 I/O: 对于大多数生产环境下的维表关联场景,无论是使用 Flink SQL/Table API 的 LOOKUP JOIN 还是 DataStream API 的 AsyncDataStream,都强烈推荐使用异步 I/O。它能最大程度地提高吞吐量,降低延迟,并有效利用资源。
- 考虑缓存: 如果维表数据量不大,且对数据新鲜度有一定容忍,可以考虑在 RichFunction 中结合缓存策略。
- 大型维表或高一致性要求: 对于极其庞大或需要极高查询性能和故障恢复能力的维表,可以考虑将维表数据作为 Flink Managed State 来管理,通过增量或全量同步机制定期更新。
- SQL/Table API vs. DataStream API: SQL/Table API: 如果业务逻辑可以通过 SQL 或 Table API 表达,且需要快速开发和部署,优先考虑。Lookup Join 语法简洁。 DataStream API: 如果业务逻辑复杂,需要更精细的控制,或者需要与 Flink 的其他高级特性(如事件时间处理、状态编程)深度结合,DataStream API 会提供更大的灵活性。
在实际应用中,通常需要根据维表的数据量、更新频率、数据新鲜度要求、以及对性能、延迟和实现复杂度的权衡来选择最合适的实现路径。
相关推荐
- Java面试题合集200道!
-
1.Java中操作字符串都有哪些类?它们之间有什么区别?String、StringBuffer、StringBuilder.String和StringBufer、StringBuilder的区别...
- JAVA分布式锁的原理,及多种分布式实现优劣对比分析
-
引题比如在同一个节点上,两个线程并发的操作A的账户,都是取钱,如果不加锁,A的账户可能会出现负数,正确的方式是对账户acount进行加锁,即使用synchronized关键字,对其进行加锁后,当有线程...
- 百度Linux C++后台开发面试题(个人整理)
-
1、C/C++程序的内存分区其实C和C++的内存分区还是有一定区别的,但此处不作区分:1)、栈区(stack)—由编译器自动分配释放,存放函数的参数值,局部变量的值等。其操作方式类似于数据结构中...
- 什么是云计算?看这篇就够了(建议收藏)
-
一、什么是云?云,又称云端,指无数的大型机房或者大型数据中心。二、为什么需要云?1)从用户的角度来讲:传统应用的需求日益复杂,比如需要支持更多的用户,需要更强的计算能力等,为满足这些日益增长的需求,企...
- 写PHP框架需要具备那些知识?
-
如果没用过框架,讨论各个框架的内容都没有可讨论性,想自己写个框架涉及到的内容很多,个人觉得自己写一个框架对自己的逻辑思维,开发架构以及这门语言都有质的提升。可以参照其他框架的源代码,仅仅是看他们的思路...
- 不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理
-
我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储键值对数据会使用HashMap,在什么场景下需要用到Blocking...
- Java性能优化指南—缓存那些事
-
由于笔者自身水平有限,如果有不对或者任何建议欢迎批评和指正本文预计阅读时间10分钟,分为前言、填坑两部分,主要包含缓存的基本使用到高级应用场景的介绍一、前言在处理高并发请求时,缓存几乎是无往不利的利器...
- 卓象科技:Nosql的介绍以及和关系型数据库的区别
-
Nosql介绍NoSQL(NotOnlySQL),泛指非关系型数据库。Nosql的全称是NotOnlySql,这个概念很早就有人提出,在09年的时候比较火。Nosql指的是非关系型数...
- 腾讯一面凉经(一面竟然就问了2小时,什么情况?)
-
这次一面感觉是在打心理战,哥们自己的心里防线基本是被击溃,面到怀疑人生的程度,所以过程感觉不是太好,很多题哥们自己也感觉没答好,要么答得“缺胳膊少腿”,要么就是“画蛇添足”。先是聊项目,从项目的架构设...
- 我凭借这份pdf,最终拿到了阿里,腾讯,京东等八家大厂offer
-
怎样才能拿到大厂的offer,没有掌握绝对的技术,那么就要不断的学习我是如何笑对金九银十,拿到阿里,腾讯等八家大厂的offer的呢,今天分享我的秘密武器,美团大神整理的Java核心知识点,面试时面试官...
- 高并发 异步解耦利器:RocketMQ究竟强在哪里?
-
本文带大家从以下几个方面详细了解RocketMQ:RocketMQ如何保证消息存储的可靠性?RocketMQ如何保证消息队列服务的高可用?如何构建一个高可用的RocketMQ双主双从最小集群?Rock...
- 阿里最新Java架构师成长笔记开源
-
下面先给大家上一个总的目录大纲,基础的东西就不进行过多的赘述,我们将会从JVM说起,同时由于每篇的内容过多,我们也只说重点,太过基础的内容谁都会,我就不多敲字浪费大家的时间了!JVM多线程与高并发Sp...
- 程序员失业2个月找不到工作,狂刷了5遍这份pdf终获字节跳动offer
-
写在前面1月初失业,找了近2个多月的工作了,还没找到心仪的工作,感觉心好慌,不知道该怎么办了?找不到工作的时候压力很大,有人说自信会很受打击,还有人说会很绝望,是人生的低谷……尽管很多时候我们自己知道...
- Spring AI 模块架构与功能解析
-
SpringAI是Spring生态系统中的一个新兴模块,专注于简化人工智能和机器学习技术在Spring应用程序中的集成。本文将详细介绍SpringAI的核心组件、功能模块及其之间的关...
- Nginx从入门到精通,超详细整理,含项目实战案例|运维必学
-
Nginx是免费的、开源的、高性能的HTTP和反向代理服务器、邮件代理服务器、以及TCP/UDP代理服务器。因为它的稳定性、丰富的模块库、灵活的配置和低系统资源的消耗而闻名。Nginx可以做静态HT...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)