百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Flink中处理维表关联技术实现路径

mhr18 2025-05-27 16:04 7 浏览 0 评论

在 Flink 中处理维表关联大体氛围Table SQL Lookup Join 和 DataStream 算子函数,主要技术实现路径:

I. Flink SQL/Table API 中的 Lookup Join

Flink SQL/Table API 提供了 LOOKUP JOIN 语法,专门用于将流式数据与维表(通常是存储在外部系统中的批数据)进行关联。其核心在于通过异步或同步的方式查询维表,避免阻塞流处理。

1. 同步 Lookup Join

实现路径:

  • JDBC Lookup Function (MySQL, PostgreSQL等关系型数据库): Flink 内部提供了 JDBC connector,可以通过配置来连接关系型数据库。当流数据到来时,Flink 会向数据库发送查询请求,同步等待查询结果。
  • HBase Lookup Function: Flink 也提供了 HBase Connector,可以用于与 HBase 进行同步关联。
  • Redis Lookup Function: 虽然没有内置的 Redis Connector 作为 Lookup Source,但可以通过自定义 TableFunctionAsyncTableFunction 来实现同步或异步与 Redis 的关联。

优点:

  • 实现简单: 对于支持的数据库,配置相对简单,直接使用 SQL 语法即可。
  • 数据一致性高: 每次查询都是从最新维表数据获取,保证了数据的新鲜度(在不考虑数据库同步延迟的情况下)。

缺点:

  • 性能瓶颈: 同步查询会阻塞 Flink 算子,如果维表查询延迟较高,或者查询并发量大,会严重影响流处理的吞吐量和延迟。这在处理高吞吐量流数据时是一个显著的缺点。
  • 外部依赖: 强依赖外部数据库的性能和可用性。
  • 不适合高并发场景: 容易导致数据库连接池耗尽或数据库负载过高。

示例代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SyncLookupJoinExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 示例中使用单并行度

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 定义一个订单数据源 (这里使用 VALUES 生成一个模拟的流)
        // 实际应用中可以替换为 Kafka, Pulsar, Kinesis 等数据源
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE orders (\n" +
            "    order_id INT,\n" +
            "    product_id INT,\n" +
            "    amount DOUBLE,\n" +
            "    proctime AS PROCTIME() -- 处理时间字段\n" +
            ") WITH (\n" +
            "    'connector' = 'datagen',\n" + // 使用 datagen 连接器模拟数据
            "    'rows-per-second' = '1',\n" + // 每秒生成一条数据
            "    'fields.order_id.min' = '1',\n" +
            "    'fields.order_id.max' = '1000',\n" +
            "    'fields.product_id.min' = '101',\n" + // 模拟关联 Product ID
            "    'fields.product_id.max' = '202',\n" +
            "    'fields.amount.min' = '10.0',\n" +
            "    'fields.amount.max' = '1000.0'\n" +
            ")"
        );

        // 2. 定义一个维表数据源 (MySQL JDBC Lookup Table)
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE products_dim (\n" +
            "    product_id INT,\n" +
            "    product_name STRING,\n" +
            "    category STRING\n" +
            ") WITH (\n" +
            "    'connector' = 'jdbc',\n" +
            "    'url' = 'jdbc:mysql://localhost:3306/testdb',\n" + // 替换为你的 MySQL 地址和数据库名
            "    'table-name' = 'products',\n" +
            "    'username' = 'root',\n" +     // 替换为你的 MySQL 用户名
            "    'password' = 'your_password', \n" + // 替换为你的 MySQL 密码
            "    'lookup.cache.max-rows' = '1000',\n" + // JDBC Lookup 支持缓存
            "    'lookup.cache.ttl' = '10min',\n" +    // 缓存过期时间
            "    'lookup.max-retries' = '3' \n" +       // 查询失败重试次数
            ")"
        );

        // 3. 执行 Lookup Join 查询
        Table resultTable = tEnv.sqlQuery(
            "SELECT\n" +
            "    o.order_id,\n" +
            "    o.product_id,\n" +
            "    p.product_name,\n" +
            "    p.category,\n" +
            "    o.amount\n" +
            "FROM\n" +
            "    orders AS o\n" +
            "JOIN products_dim AS p ON o.product_id = p.product_id" // 简单的等值 JOIN 即可
        );

        // 4. 将结果打印到控制台 (或输出到其他 Sink)
        resultTable.execute().print();

        // Flink Job 会持续运行,直到手动停止
        // env.execute("Sync Lookup Join Example"); // 对于execute().print()不需要再execute()
    }
}

如果你的 MySQL 响应慢,你会发现 Flink 的输出也会变慢,因为它是同步阻塞的

2. 异步 Lookup Join (推荐)

实现路径:

  • 自定义 AsyncTableFunction: 这是实现异步 Lookup Join 的核心和推荐方式。
  • 原理: AsyncTableFunction 允许用户编写异步查询逻辑。当流数据到来时,Flink 会并发地向外部维表发起查询请求,但不会阻塞当前处理线程。查询结果返回后,通过 ResultFuture 回调通知 Flink,将维表数据与流数据进行合并。
  • 适用场景: 适用于任何支持异步客户端的外部存储,例如: Redis: 使用 Jedis 或 Lettuce 等异步客户端。 HBase (Async Client): 使用 HBase 提供的异步客户端。 Cassandra: 使用 DataStax Java Driver。 ClickHouse (异步JDBC或HTTP API): 虽然是关系型数据库,但如果其驱动支持异步,或通过 HTTP API 异步访问,也可以实现。 HTTP API (微服务接口): 调用外部微服务接口获取维表数据。
  • 实现步骤: 实现 AsyncTableFunction 接口。eval 方法中发起异步查询,并传入 ResultFuture 当异步查询返回结果时,调用 ResultFuture.complete() 来通知 Flink。 注册自定义的 AsyncTableFunction 到 Table Environment。 在 SQL 中使用 LATERAL TABLE() 语法进行关联。

优点:

  • 高吞吐量和低延迟: 非阻塞查询,允许多个并发查询同时进行,极大地提高了吞吐量,降低了端到端延迟。
  • 资源利用率高: 充分利用 I/O 资源,避免了线程阻塞。
  • 灵活性强: 可以集成任何支持异步查询的外部存储或服务。
  • 背压能力: Flink 的异步 I/O 组件提供了内置的背压机制,可以根据下游处理能力自动调整并发度,防止外部系统过载。

缺点:

  • 实现复杂度相对较高: 需要编写异步查询逻辑,处理回调和异常。
  • 外部系统支持: 依赖外部系统提供的异步客户端或接口。

示例代码:

首先,我们需要创建一个自定义的 AsyncRedisLookupFunction。

import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Collector;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Map;

public class AsyncRedisLookupFunction extends AsyncTableFunction<RowData> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword; // 如果有密码
    private transient JedisPool jedisPool; // transient 表示不参与序列化
    private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步

    public AsyncRedisLookupFunction(String redisHost, int redisPort, String redisPassword) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100); // 最大连接数
        poolConfig.setMaxIdle(20);  // 最大空闲连接数
        poolConfig.setMinIdle(5);   // 最小空闲连接数
        poolConfig.setTestOnBorrow(true); // 借用连接时测试
        poolConfig.setTestOnReturn(true); // 归还连接时测试
        poolConfig.setTestWhileIdle(true); // 空闲时测试

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        // 创建一个线程池来模拟异步查询。
        // 实际生产中,更推荐使用 Lettuce 等真正的异步 Redis 客户端。
        this.executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors() * 2 // 通常设置为 CPU 核数的两倍
        );
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    // eval 方法接收流中的 join key,并返回一个 CompletableFuture
    // Flink 将在 CompletableFuture 完成时收集结果
    public void eval(CompletableFuture<Collector<RowData>> resultFuture, Integer productId) {
        executorService.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + productId;
                Map<String, String> productInfo = jedis.hgetAll(key);

                if (productInfo != null && !productInfo.isEmpty()) {
                    String productName = productInfo.get("name");
                    String category = productInfo.get("category");
                    // 返回一个 RowData,包含维表查询到的字段
                    // 顺序与定义 Table 时声明的字段顺序一致
                    resultFuture.complete(new SimpleCollector<>(
                            Collections.singletonList(GenericRowData.of(
                                    StringData.fromString(productName),
                                    StringData.fromString(category)
                            ))
                    ));
                } else {
                    // 如果未找到,返回空结果
                    resultFuture.complete(new SimpleCollector<>(Collections.emptyList()));
                }
            } catch (Exception e) {
                // 处理异常,可以返回空结果或抛出异常让 Flink 处理
                resultFuture.completeExceptionally(e);
            }
        });
    }

    // 辅助类,用于将 List<RowData> 收集到 Collector 中
    private static class SimpleCollector<T> implements Collector<T> {
        private final java.util.List<T> list;

        public SimpleCollector(java.util.List<T> list) {
            this.list = list;
        }

        @Override
        public void collect(T record) {
            list.add(record); // 实际上这里我们只收集一个结果
        }

        @Override
        public void close() {
            // No-op
        }
    }
}

现在,我们将在 Flink SQL 中注册并使用这个 AsyncRedisLookupFunction。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeComparators;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.Collections;

import static org.apache.flink.table.api.DataTypes.*;

public class AsyncLookupJoinExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 示例中使用单并行度,但异步 I/O 可以并行

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 定义一个订单数据源 (Datagen 生成模拟流)
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE orders (\n" +
            "    order_id INT,\n" +
            "    product_id INT,\n" +
            "    amount DOUBLE,\n" +
            "    proctime AS PROCTIME() -- 处理时间字段\n" +
            ") WITH (\n" +
            "    'connector' = 'datagen',\n" +
            "    'rows-per-second' = '10',\n" + // 提高生成速度,更明显地展示异步优势
            "    'fields.order_id.min' = '1',\n" +
            "    'fields.order_id.max' = '1000',\n" +
            "    'fields.product_id.min' = '101',\n" +
            "    'fields.product_id.max' = '202',\n" + // 确保 product_id 在 Redis 示例范围内
            "    'fields.amount.min' = '10.0',\n" +
            "    'fields.amount.max' = '1000.0'\n" +
            ")"
        );

        // 2. 注册自定义的 AsyncRedisLookupFunction
        // 构造函数参数: redisHost, redisPort, redisPassword
        tEnv.createTemporarySystemFunction(
            "AsyncRedisProducts",
            new AsyncRedisLookupFunction("localhost", 6379, "") // 替换为你的 Redis 地址、端口和密码
        );

        // 3. 执行 Lookup Join 查询
        // 使用 LATERAL TABLE(FunctionName(lookup_keys)) 语法
        // 注意:AsyncRedisProducts 函数的返回类型决定了 JOIN 后的字段名
        // AsyncRedisLookupFunction 返回的是 RowData,包含 (product_name, category)
        Table resultTable = tEnv.sqlQuery(
            "SELECT\n" +
            "    o.order_id,\n" +
            "    o.product_id,\n" +
            "    T.product_name,\n" + // T 是 LATERAL TABLE 的别名
            "    T.category,\n" +
            "    o.amount\n" +
            "FROM\n" +
            "    orders AS o,\n" + // 注意这里是逗号连接,而不是 JOIN
            "    LATERAL TABLE(AsyncRedisProducts(o.product_id)) AS T(product_name, category)" // 指定返回的字段名
        );

        // 4. 将结果打印到控制台
        resultTable.execute().print();
    }
}

查询 Redis 是异步进行的,不会阻塞主线程。

II. DataStream API 中的维表关联

在 DataStream API 中,没有直接的 LOOKUP JOIN 概念,但可以通过自定义函数和状态管理来实现维表关联。

1. 使用 RichFlatMapFunction/RichMapFunction + 缓存

实现路径:

  • 原理:open() 方法中初始化外部维表连接或加载部分维表数据到内存(如果维表数据量不大)。在 map()flatMap() 方法中,当流数据到来时,首先尝试从本地缓存中查询维表数据。如果缓存未命中,则向外部维表发起同步查询,并将结果更新到缓存中。
  • 缓存策略: LRU Cache: 维护一个固定大小的最近最少使用缓存。 TTL Cache: 给缓存中的数据设置过期时间,定期清理过期数据。
  • 适用场景: 维表数据量不大,可以完全或部分加载到内存中。 对数据新鲜度要求不是极高,允许一定的缓存延迟。 对实时性要求较高,但外部维表查询延迟较低。

优点:

  • 性能提升: 命中缓存可以避免与外部存储的交互,显著提升性能。
  • 降低外部系统压力: 减少对外部维表的查询次数。
  • 实现相对简单: 比异步 I/O 相对容易理解和实现。

缺点:

  • 数据新鲜度问题: 缓存可能导致数据不新鲜,特别是维表数据更新频繁时。需要实现合适的缓存失效和更新机制。
  • 内存消耗: 维表数据如果过大,可能导致内存溢出。
  • 并发瓶颈: 如果缓存未命中,仍需进行同步查询,在高并发下仍可能成为瓶颈。
  • 缺乏内置背压: 需要手动管理与外部系统的交互,没有像异步 I/O 那样内置的背压机制。

示例代码:

RichFlatMapFunction 实现

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class RedisProductCachedFlatMapFunction extends RichFlatMapFunction<Order, EnrichedOrder> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword;
    private transient JedisPool jedisPool;
    private transient Map<Integer, ProductInfo> cache; // LRU 缓存
    private final int cacheMaxSize;
    private final long cacheTTL; // 缓存过期时间,单位毫秒

    // 内部类,存储产品信息
    private static class ProductInfo {
        String productName;
        String category;
        long timestamp; // 记录缓存时间,用于TTL

        public ProductInfo(String productName, String category) {
            this.productName = productName;
            this.category = category;
            this.timestamp = System.currentTimeMillis();
        }
    }

    public RedisProductCachedFlatMapFunction(String redisHost, int redisPort, String redisPassword, int cacheMaxSize, long cacheTTL) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
        this.cacheMaxSize = cacheMaxSize;
        this.cacheTTL = cacheTTL;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(50); // 连接数可以适当减少,因为有缓存
        poolConfig.setMaxIdle(10);

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        // 实现一个简单的 LRU 缓存,带 TTL
        this.cache = new LinkedHashMap<Integer, ProductInfo>(cacheMaxSize, 0.75f, true) {
            @Override
            protected boolean removeEldestEntry(Map.Entry<Integer, ProductInfo> eldest) {
                // 移除最老或过期的条目
                return size() > cacheMaxSize || (System.currentTimeMillis() - eldest.getValue().timestamp > cacheTTL);
            }
        };
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        super.close();
    }

    @Override
    public void flatMap(Order order, Collector<EnrichedOrder> out) throws Exception {
        ProductInfo productInfo = cache.get(order.productId);
        long currentTime = System.currentTimeMillis();

        // 检查缓存是否存在且未过期
        if (productInfo != null && (currentTime - productInfo.timestamp) <= cacheTTL) {
            // 缓存命中
            out.collect(new EnrichedOrder(order, productInfo.productName, productInfo.category));
        } else {
            // 缓存未命中或已过期,从 Redis 查询
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + order.productId;
                Map<String, String> redisResult = jedis.hgetAll(key);

                if (redisResult != null && !redisResult.isEmpty()) {
                    String productName = redisResult.get("name");
                    String category = redisResult.get("category");
                    productInfo = new ProductInfo(productName, category);
                    cache.put(order.productId, productInfo); // 更新缓存
                    out.collect(new EnrichedOrder(order, productName, category));
                } else {
                    // 未找到维表数据,选择丢弃或使用默认值
                    // System.err.println("Product ID " + order.productId + " not found in Redis.");
                    // out.collect(new EnrichedOrder(order, "UNKNOWN", "UNKNOWN")); // 示例:使用默认值
                }
            } catch (Exception e) {
                System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
                // out.collect(new EnrichedOrder(order, "ERROR_PRODUCT", "ERROR_CATEGORY")); // 错误时使用默认值
            }
        }
    }
}

DataStream Job 代码

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class DataStreamCachedLookupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 可以设置多个并行度,每个并行度会维护自己的缓存

        // 1. 模拟订单数据流
        DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
            private long orderCounter = 0;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    orderCounter++;
                    int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
                    double amount = 10.0 + random.nextDouble() * 990.0;
                    ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
                    Thread.sleep(50); // 每50毫秒生成一个订单
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 2. 使用 RichFlatMapFunction 进行维表关联,带缓存
        DataStream<EnrichedOrder> enrichedStream = orderStream.flatMap(
                new RedisProductCachedFlatMapFunction(
                        "localhost", 6379, "", // 替换为你的Redis地址和密码
                        1000, // 缓存最大条目数
                        5 * 60 * 1000L // 缓存TTL 5分钟 (5 * 60 * 1000 毫秒)
                )
        );

        // 3. 打印结果
        enrichedStream.print();

        env.execute("DataStream Cached Lookup Join Example");
    }
}

观察控制台输出。你会注意到,当缓存生效时,处理速度会非常快。如果产品 ID 命中缓存,则不会访问 Redis。当新的产品 ID 出现或缓存过期时,才会触发对 Redis 的查询。

2. 使用 AsyncDataStream (异步 I/O)

实现路径:

  • 原理: 这是 DataStream API 中实现异步维表关联的推荐方式,与 Table API 的 AsyncTableFunction 原理类似。用户实现 AsyncFunction 接口,在 asyncInvoke() 方法中发起异步查询,并通过 ResultFuture 返回结果。
  • 适用场景: 同步 Lookup Join 的异步版本在 DataStream API 中的实现,适用于对性能、吞吐量和延迟有高要求的场景,并且外部系统支持异步客户端。

优点:

  • 与 Table API 的异步 Lookup Join 类似,具有相同的优点: 高吞吐量、低延迟、高资源利用率、灵活性强、内置背压机制。

缺点:

  • 与 Table API 的异步 Lookup Join 类似,具有相同的缺点: 实现复杂度相对较高,依赖外部系统提供的异步客户端或接口。

示例代码:

AsyncFunction 实现

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.configuration.Configuration;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class RedisProductAsyncFunction extends RichAsyncFunction<Order, EnrichedOrder> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword;
    private transient JedisPool jedisPool;
    private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步

    public RedisProductAsyncFunction(String redisHost, int redisPort, String redisPassword) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);
        poolConfig.setMaxIdle(20);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        this.executorService = Executors.newFixedThreadPool(
                getRuntimeContext().getNumberOfParallelSubtasks() * 2 // 每个并行度至少2个线程
        );
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    @Override
    public void asyncInvoke(Order order, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
        executorService.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + order.productId;
                Map<String, String> productInfo = jedis.hgetAll(key);

                if (productInfo != null && !productInfo.isEmpty()) {
                    String productName = productInfo.get("name");
                    String category = productInfo.get("category");
                    EnrichedOrder enrichedOrder = new EnrichedOrder(order, productName, category);
                    resultFuture.complete(Collections.singletonList(enrichedOrder));
                } else {
                    // 如果未找到维表数据,可以选择丢弃,或者用默认值填充
                    // 这里选择丢弃,也可以返回一个只包含订单信息的 EnrichedOrder
                    resultFuture.complete(Collections.emptyList());
                }
            } catch (Exception e) {
                // 处理异常,例如打印日志,并返回空列表或原始数据
                System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
                resultFuture.complete(Collections.emptyList()); // 发生异常时,丢弃该订单
            }
        });
    }

    // 可选:处理超时,当异步请求在指定时间内未完成时调用
    @Override
    public void timeout(Order input, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
        System.err.println("Async lookup timeout for order: " + input.orderId);
        // 超时时可以选择返回原始数据,或丢弃
        resultFuture.complete(Collections.singletonList(new EnrichedOrder(input, "UNKNOWN", "UNKNOWN")));
    }
}

DataStream Job 代码

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DataStreamAsyncLookupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 可以设置多个并行度,体验异步I/O的并发性

        // 1. 模拟订单数据流
        DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
            private long orderCounter = 0;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    orderCounter++;
                    int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
                    double amount = 10.0 + random.nextDouble() * 990.0;
                    ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
                    Thread.sleep(50); // 每50毫秒生成一个订单,模拟高吞吐
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 2. 使用 AsyncDataStream 进行异步维表关联
        // orderStream: 输入流
        // new RedisProductAsyncFunction(...): 异步函数实例
        // 100: 最大并发异步请求数 (maxConcurrentRequests)
        // 1000: 超时时间 (timeout)
        // TimeUnit.MILLISECONDS: 超时时间单位
        DataStream<EnrichedOrder> enrichedStream = AsyncDataStream.unorderedWait(
                orderStream,
                new RedisProductAsyncFunction("localhost", 6379, ""), // 替换为你的Redis地址和密码
                1000, // 超时时间,例如1000毫秒
                TimeUnit.MILLISECONDS,
                100 // 最大并发请求数,可以调整
        );
        // 也可以使用 orderedWait 保持事件顺序,但通常 unorderedWait 性能更高

        // 3. 打印结果
        enrichedStream.print();

        env.execute("DataStream Async Lookup Join Example");
    }
}

3. 使用 Managed State (Stateful Functions) + 定期更新

实现路径:

  • 原理: 将维表数据作为 Flink Managed State(如 ValueState, MapState)存储在 Flink 内部,通常与 Keyed State 结合使用。通过一个独立的流(或定时任务)定期从外部维表读取最新数据,并更新到 Flink 的 Managed State 中。当主数据流到来时,直接从本地状态中查询维表数据。
  • 更新机制: 全量同步: 定期拉取全量维表数据。 增量同步: 如果维表支持,可以通过 CDC (Change Data Capture) 或消息队列(如 Kafka)订阅维表的变化,将变化数据作为单独的流推送到 Flink,然后更新到 Managed State。
  • 适用场景: 维表数据量大,无法全部加载到每个 TaskManager 的内存中,但仍希望在 Flink 内部实现高性能查询。 对数据新鲜度有一定容忍度,允许维表数据与外部源存在短暂延迟。 需要利用 Flink 的状态管理能力(如状态快照、故障恢复)。

优点:

  • 极高查询性能: 维表数据在 Flink 内部,查询速度极快,避免了网络 I/O 延迟。
  • 故障恢复: 维表数据作为 Managed State,可以随着 Flink 的状态快照和故障恢复机制得到保护。
  • 独立更新: 维表更新与主数据流处理分离,不会阻塞主数据流。
  • 降低外部系统压力: 维表查询不再直接访问外部系统,而是访问 Flink 内部状态。

缺点:

  • 数据新鲜度: 维表数据与外部源存在更新延迟,取决于更新频率和机制。
  • 实现复杂度高: 需要管理状态的更新逻辑,特别是增量更新。
  • 状态管理开销: 维表数据如果非常大,会增加 Flink State 的存储和快照开销。
  • 扩容和再平衡: 如果维表数据是 Keyed State,当 Flink 集群扩容或任务再平衡时,状态迁移可能会带来开销。

总结与选择建议:

实现路径

适用场景

优点

缺点

SQL/Table API - 同步 Lookup Join

简单验证、低吞吐量、对性能要求不高。

实现简单,易于上手。

性能瓶颈,不适合高并发,强依赖外部数据库性能。

SQL/Table API - 异步 Lookup Join

推荐,对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。

高吞吐量,低延迟,高资源利用率,内置背压。

实现复杂度相对高,依赖外部系统异步客户端。

DataStream - 缓存 (RichFunction)

维表数据量小,对数据新鲜度有一定容忍,且外部维表查询延迟较低。

命中缓存性能高,降低外部系统压力,实现相对简单。

数据新鲜度问题,内存消耗,无内置背压。

DataStream - AsyncDataStream

对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。

同 Table API 异步 Lookup Join 的优点。

同 Table API 异步 Lookup Join 的缺点。

DataStream - Managed State (定期更新)

维表数据量大,但希望高性能查询,对数据新鲜度有一定容忍,需要 Flink 状态管理。

极高查询性能,故障恢复,独立更新,降低外部系统压力。

数据新鲜度有延迟,实现复杂度高,状态管理开销,扩容开销。

选择建议:

  • 首选异步 I/O: 对于大多数生产环境下的维表关联场景,无论是使用 Flink SQL/Table API 的 LOOKUP JOIN 还是 DataStream API 的 AsyncDataStream,都强烈推荐使用异步 I/O。它能最大程度地提高吞吐量,降低延迟,并有效利用资源。
  • 考虑缓存: 如果维表数据量不大,且对数据新鲜度有一定容忍,可以考虑在 RichFunction 中结合缓存策略。
  • 大型维表或高一致性要求: 对于极其庞大或需要极高查询性能和故障恢复能力的维表,可以考虑将维表数据作为 Flink Managed State 来管理,通过增量或全量同步机制定期更新。
  • SQL/Table API vs. DataStream API: SQL/Table API: 如果业务逻辑可以通过 SQL 或 Table API 表达,且需要快速开发和部署,优先考虑。Lookup Join 语法简洁。 DataStream API: 如果业务逻辑复杂,需要更精细的控制,或者需要与 Flink 的其他高级特性(如事件时间处理、状态编程)深度结合,DataStream API 会提供更大的灵活性。

在实际应用中,通常需要根据维表的数据量、更新频率、数据新鲜度要求、以及对性能、延迟和实现复杂度的权衡来选择最合适的实现路径。

相关推荐

Java面试题合集200道!

1.Java中操作字符串都有哪些类?它们之间有什么区别?String、StringBuffer、StringBuilder.String和StringBufer、StringBuilder的区别...

JAVA分布式锁的原理,及多种分布式实现优劣对比分析

引题比如在同一个节点上,两个线程并发的操作A的账户,都是取钱,如果不加锁,A的账户可能会出现负数,正确的方式是对账户acount进行加锁,即使用synchronized关键字,对其进行加锁后,当有线程...

百度Linux C++后台开发面试题(个人整理)

1、C/C++程序的内存分区其实C和C++的内存分区还是有一定区别的,但此处不作区分:1)、栈区(stack)—由编译器自动分配释放,存放函数的参数值,局部变量的值等。其操作方式类似于数据结构中...

什么是云计算?看这篇就够了(建议收藏)

一、什么是云?云,又称云端,指无数的大型机房或者大型数据中心。二、为什么需要云?1)从用户的角度来讲:传统应用的需求日益复杂,比如需要支持更多的用户,需要更强的计算能力等,为满足这些日益增长的需求,企...

写PHP框架需要具备那些知识?

如果没用过框架,讨论各个框架的内容都没有可讨论性,想自己写个框架涉及到的内容很多,个人觉得自己写一个框架对自己的逻辑思维,开发架构以及这门语言都有质的提升。可以参照其他框架的源代码,仅仅是看他们的思路...

不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理

我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储键值对数据会使用HashMap,在什么场景下需要用到Blocking...

Java性能优化指南—缓存那些事

由于笔者自身水平有限,如果有不对或者任何建议欢迎批评和指正本文预计阅读时间10分钟,分为前言、填坑两部分,主要包含缓存的基本使用到高级应用场景的介绍一、前言在处理高并发请求时,缓存几乎是无往不利的利器...

卓象科技:Nosql的介绍以及和关系型数据库的区别

Nosql介绍NoSQL(NotOnlySQL),泛指非关系型数据库。Nosql的全称是NotOnlySql,这个概念很早就有人提出,在09年的时候比较火。Nosql指的是非关系型数...

腾讯一面凉经(一面竟然就问了2小时,什么情况?)

这次一面感觉是在打心理战,哥们自己的心里防线基本是被击溃,面到怀疑人生的程度,所以过程感觉不是太好,很多题哥们自己也感觉没答好,要么答得“缺胳膊少腿”,要么就是“画蛇添足”。先是聊项目,从项目的架构设...

我凭借这份pdf,最终拿到了阿里,腾讯,京东等八家大厂offer

怎样才能拿到大厂的offer,没有掌握绝对的技术,那么就要不断的学习我是如何笑对金九银十,拿到阿里,腾讯等八家大厂的offer的呢,今天分享我的秘密武器,美团大神整理的Java核心知识点,面试时面试官...

高并发 异步解耦利器:RocketMQ究竟强在哪里?

本文带大家从以下几个方面详细了解RocketMQ:RocketMQ如何保证消息存储的可靠性?RocketMQ如何保证消息队列服务的高可用?如何构建一个高可用的RocketMQ双主双从最小集群?Rock...

阿里最新Java架构师成长笔记开源

下面先给大家上一个总的目录大纲,基础的东西就不进行过多的赘述,我们将会从JVM说起,同时由于每篇的内容过多,我们也只说重点,太过基础的内容谁都会,我就不多敲字浪费大家的时间了!JVM多线程与高并发Sp...

程序员失业2个月找不到工作,狂刷了5遍这份pdf终获字节跳动offer

写在前面1月初失业,找了近2个多月的工作了,还没找到心仪的工作,感觉心好慌,不知道该怎么办了?找不到工作的时候压力很大,有人说自信会很受打击,还有人说会很绝望,是人生的低谷……尽管很多时候我们自己知道...

Spring AI 模块架构与功能解析

SpringAI是Spring生态系统中的一个新兴模块,专注于简化人工智能和机器学习技术在Spring应用程序中的集成。本文将详细介绍SpringAI的核心组件、功能模块及其之间的关...

Nginx从入门到精通,超详细整理,含项目实战案例|运维必学

Nginx是免费的、开源的、高性能的HTTP和反向代理服务器、邮件代理服务器、以及TCP/UDP代理服务器。因为它的稳定性、丰富的模块库、灵活的配置和低系统资源的消耗而闻名。Nginx可以做静态HT...

取消回复欢迎 发表评论: