百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何使用 Kafka 和 Redis 构建实时推荐引擎

mhr18 2025-03-20 16:33 25 浏览 0 评论

在电商平台上,个性化推荐已经成为必不可少的功能:推荐内容既需要符合用户喜好,还需要能实时更新。然而传统的批处理推荐系统依赖定期更新,难以跟上节奏,让用户在浏览新品时常常看到过时的推荐。试想一下,用户刚搜索完跑鞋,系统却还在推荐冬季外套——这很可能降低转化率。

本文将介绍如何构建一个实时推荐引擎,利用 RisingWave[1]Kafka[2]Redis[3] 即时响应用户行为,提供精准推荐,优化购物体验并提升转化率。

实时推荐引擎架构|图源:RW

设计思路

数据摄取

  • 用户行为数据(如浏览页面、点击、搜索、加入购物车等)实时流入 Kafka 主题。
  • 产品目录数据(如产品 ID、名称、类别、价格)存储在 PostgreSQL 数据库,并通过 Change Data Capture (CDC) 同步到 RisingWave。当然,如果产品数据更新频率较低,也可以直接存储在 RisingWave 中。
  • 可以选择性地将购买记录数据流用于协同过滤。

流处理

RisingWave 通过定义与 Kafka 主题关联的 Source 来接收数据流,并利用基于 SQL 的物化视图处理推荐逻辑,确保数据随新事件到达时持续更新。

推荐结果存储

RisingWave 计算出的推荐结果会存入 Redis,这是一个针对快速检索优化的内存缓存数据库。

注:在低流量场景或原型阶段,可以直接查询 RisingWave,但在生产环境下,目前 Redis 由于其性能优势仍然是更优的选择。

推荐服务

电商应用可通过用户 ID 从 Redis 获取推荐结果,提供高速、流畅的用户体验。

示例数据

  • 用户行为流(Kafka)
    // 页面浏览事件
    {
      "event_type": "page_view",
      "user_id": 123,
      "product_id": "product_abc",
      "timestamp": "2024-07-27T10:00:00Z"
    }
    
    // 搜索事件
    {
      "event_type": "search",
      "user_id": 456,
      "query": "running shoes",
      "timestamp": "2024-07-27T10:03:00Z"
    }
    
  • 产品目录流(Kafka - 通过 CDC 同步)
    {
      "product_id": "product_abc",
      "name": "Awesome Running Shoes",
      "category": "shoes/running",
      "price": 99.99,
      "description": "...",
      "image_url": "..."
    }    


使用 RisingWave 构建 Pipeline

本部分将使用 RisingWave 的 SQL 接口搭建推荐引擎的核心组件。开始之前,请确保以下系统已经启动并运行:

  • RisingWave 集群:关于如何启动 RisingWave 集群,请参考快速入门指南[4]
  • Kafka 实例:用于存储实时用户行为数据流(Kafka 主题)。
  • PostgreSQL 实例:用于存储 product_catalog(产品目录)表,并启用变更数据捕获(Change Data Capture, CDC) 以实现实时同步。详细配置请参考从 PostgreSQL CDC 导入数据[5]
  • Redis 实例:用于缓存推荐结果,加快查询速度。

步骤 1:定义数据源

通过以下 SQL 语句连接 Kafka 主题,定义数据源:

-- 用户行为数据流
CREATE SOURCE user_activity_stream (
    event_type VARCHAR,
    user_id INT,
    product_id VARCHAR,
    timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'user_activity',
    brokers = 'kafka-broker1:9092,kafka-broker2:9092',
    scan.startup.mode = 'earliest',
    format = 'json'
);

-- 产品目录数据流
CREATE SOURCE product_catalog (
    product_id VARCHAR,
    name VARCHAR,
    category VARCHAR,
    price DOUBLE PRECISION,
    description VARCHAR,
    image_url VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'product_catalog',
    brokers = 'kafka-broker1:9092,kafka-broker2:9092',
    scan.startup.mode = 'earliest',
    format = 'json'
);

步骤 2:创建物化视图以跟踪热门商品

以下 SQL 语句用于计算最近 1 小时内浏览次数最多的前 10 个商品:

CREATE MATERIALIZED VIEW trending_products AS
WITH windowed_views AS (
    SELECT
        window_start,
        product_id,
        COUNT(*) AS view_count
    FROM
        TUMBLE(user_activity_stream, timestamp, INTERVAL '1 hour')
    WHERE event_type = 'page_view'
    GROUP BY window_start, product_id
),
ranked_products AS (
    SELECT
        window_start,
        product_id,
        view_count,
        RANK() OVER (PARTITION BY window_start ORDER BY view_count DESC) AS rank
    FROM windowed_views
)
SELECT
    window_start,
    product_id,
    view_count
FROM ranked_products
WHERE rank <= 10; -- 仅保留前 10 个热门商品

该视图的作用:

  • 以 1 小时为窗口,对页面浏览事件进行分组统计。
  • 按照浏览次数对商品进行排名。
  • 随着新数据到达,视图会持续增量更新,确保数据始终保持最新。

步骤 3:创建个性化推荐的物化视图

基于用户最近浏览的商品类别,生成个性化推荐:

CREATE MATERIALIZED VIEW user_recommendations AS
WITH recent_user_activity AS (
    SELECT user_id, product_id, timestamp
    FROM user_activity_stream
    WHERE event_type = 'page_view'
      AND timestamp > NOW() - INTERVAL '24 hours'
),
user_category_views AS (
    SELECT
        r.user_id,
        p.category,
        COUNT(*) AS category_views
    FROM recent_user_activity r
    JOIN product_catalog p ON r.product_id = p.product_id
    GROUP BY r.user_id, p.category
),
ranked_categories AS (
    SELECT
        user_id,
        category,
        category_views,
        RANK() OVER (PARTITION BY user_id ORDER BY category_views DESC) AS rank
    FROM user_category_views
),
recommendations AS (
    SELECT
        rc.user_id,
        p.product_id AS recommended_product_id
    FROM ranked_categories rc
    JOIN product_catalog p ON rc.category = p.category
    WHERE rc.rank <= 3
    AND p.product_id NOT IN (
        SELECT product_id FROM recent_user_activity WHERE user_id = rc.user_id
    )
)
SELECT
    user_id,
    array_agg(recommended_product_id) AS recommended_products
FROM recommendations
GROUP BY user_id;

该视图的作用:

  • 分析用户最近 24 小时的浏览记录。
  • 计算用户最常浏览的 3 个商品类别。
  • 在这些类别中挑选用户未浏览过的热门商品进行推荐。

步骤 4:将推荐结果写入 Redis

-- 个性化推荐结果存入 Redis
CREATE SINK user_recommendations_sink
FROM user_recommendations WITH (
    connector = 'redis',
    primary_key = 'user_id',
    redis.url = 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE JSON (
    force_append_only = 'true'
);

-- 热门商品数据存入 Redis
CREATE SINK trending_products_sink
FROM trending_products WITH (
    connector = 'redis',
    primary_key = 'window_start,product_id',
    redis.url = 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE TEMPLATE (
    force_append_only = 'true',
    key_format = 'trending:{window_start}',
    value_format = '{product_id}:{view_count}'
);

Redis 数据结构示例:

  • 个性化推荐:user_id → {"user_id": 123, "recommended_products": ["prod1", "prod2"]}
  • 热门商品:trending:2024-03-21T10:00:00 → product_abc:42

应用端查询推荐结果

使用简单的 Python 脚本从 Redis 获取推荐结果:

import redis
import json

# 连接 Redis
redis_client = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)

def get_user_recommendations(user_id):
    data = redis_client.get(str(user_id))
    return json.loads(data)['recommended_products'] if data else []

def get_trending_products(window_start):
    key = f"trending:{window_start}"
    return redis_client.hgetall(key)

# 示例
print(get_user_recommendations(123))
print(get_trending_products('2025-02-25T10:00:00'))

对于低流量场景或原型阶段,可以直接查询 RisingWave 物化视图,而无需 Redis:

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd

# 使用官方 SDK 连接到 RisingWave
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host='localhost',
        port=4566,  # 默认的 RisingWave 端口
        user='root',
        password='root',
        database='dev'
    )
)

def get_recommendations_direct(user_id: int) -> list:
    """Retrieves recommendations directly from RisingWave."""
    query = f"""
        SELECT recommended_products 
        FROM user_recommendations 
        WHERE user_id = {user_id}
    """
    
    # 使用 fetch 以 DataFrame 形式获取结果
    result: pd.DataFrame = rw.fetch(
        query,
        format=OutputFormat.DATAFRAME
    )
    
    if not result.empty:
        return result['recommended_products'].iloc[0]
    return []

# 示例
def example_usage():
    user_recs = get_recommendations_direct(123)
    print(f"Recommendations for user 123 (direct): {user_recs}")
    
    # 如有需要,也支持以原始元组的形式获取结果
    raw_results = rw.fetch(
        "SELECT * FROM trending_products LIMIT 5",
        format=OutputFormat.RAW
    )
    print(f"Trending products (raw): {raw_results}")

注意:虽然直接查询方式更简单,但对于高并发环境,仍建议使用 Redis 进行缓存,以提升响应速度。

架构优势与优化方向

架构优势

  • 实时更新:系统能够即时响应用户行为,确保推荐内容始终准确。
  • 低延迟:Redis 提供高速数据检索,RisingWave 负责高效计算,确保整体性能优越。
  • 良好的扩展性:RisingWave 和 Redis 都支持水平扩展,能够灵活应对流量增长。
  • 易于开发:基于 SQL 进行数据处理,使系统的开发和维护更加便捷。
  • 高可靠性:分布式架构增强了系统的容错能力,保障业务的连续性。

优化方向

  • A/B 测试:针对不同的推荐策略进行实验,以优化效果。
  • 集成机器学习模型:引入更高级的数据分析和预测能力,提高推荐的精准度。
  • 细分趋势分析:根据用户群体特征提供个性化趋势推荐。
  • 数据增强:利用特征存储(Feature Store)丰富用户画像,实现更精细化的个性化推荐。

相关推荐

B站收藏视频失效?mybili 收藏夹备份神器完整部署指南

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙很多B站用户都有过类似经历:自己精心收藏的视频突然“消失”,点开一看不是“已被删除”,就是“因UP主设置不可见”。而B站并不会主动通知...

中间件推荐初始化配置

Redis推荐初始化配置bind0.0.0.0protected-modeyesport6379tcp-backlog511timeout300tcp-keepalive300...

Redis中缓存穿透问题与解决方法

缓存穿透问题概述在Redis作为缓存使用时,缓存穿透是常见问题。正常查询流程是先从Redis缓存获取数据,若有则直接使用;若没有则去数据库查询,查到后存入缓存。但当请求的数据在缓存和数据库中都...

后端开发必看!Redis 哨兵机制如何保障系统高可用?

你是否曾在项目中遇到过Redis主服务器突然宕机,导致整个业务系统出现数据读取异常、响应延迟甚至服务中断的情况?面对这样的突发状况,作为互联网大厂的后端开发人员,如何快速恢复服务、保障系统的高可用...

Redis合集-大Key处理建议

以下是Redis大Key问题的全流程解决方案,涵盖检测、处理、优化及预防策略,结合代码示例和最佳实践:一、大Key的定义与风险1.大Key判定标准数据类型大Key阈值风险场景S...

深入解析跳跃表:Redis里的&quot;老六&quot;数据结构,专治各种不服

大家好,我是你们的码农段子手,今天要给大家讲一个Redis世界里最会"跳科目三"的数据结构——跳跃表(SkipList)。这货表面上是个青铜,实际上是个王者,连红黑树见了都要喊声大哥。...

Redis 中 AOF 持久化技术原理全解析,看完你就懂了!

你在使用Redis的过程中,有没有担心过数据丢失的问题?尤其是在服务器突然宕机、意外断电等情况发生时,那些还没来得及持久化的数据,是不是让你夜不能寐?别担心,Redis的AOF持久化技术就是...

Redis合集-必备的几款运维工具

Redis在应用Redis时,经常会面临的运维工作,包括Redis的运行状态监控,数据迁移,主从集群、切片集群的部署和运维。接下来,从这三个方面,介绍一些工具。先来学习下监控Redis实时...

别再纠结线程池大小 + 线程数量了,没有固定公式的!

我们在百度上能很轻易地搜索到以下线程池设置大小的理论:在一台服务器上我们按照以下设置CPU密集型的程序-核心数+1I/O密集型的程序-核心数*2你不会真的按照这个理论来设置线程池的...

网络编程—IO多路复用详解

假如你想了解IO多路复用,那本文或许可以帮助你本文的最大目的就是想要把select、epoll在执行过程中干了什么叙述出来,所以具体的代码不会涉及,毕竟不同语言的接口有所区别。基础知识IO多路复用涉及...

5分钟学会C/C++多线程编程进程和线程

前言对线程有基本的理解简单的C++面向过程编程能力创造单个简单的线程。创造单个带参数的线程。如何等待线程结束。创造多个线程,并使用互斥量来防止资源抢占。会使用之后,直接跳到“汇总”,复制模板来用就行...

尽情阅读,技术进阶,详解mmap的原理

1.一句话概括mmapmmap的作用,在应用这一层,是让你把文件的某一段,当作内存一样来访问。将文件映射到物理内存,将进程虚拟空间映射到那块内存。这样,进程不仅能像访问内存一样读写文件,多个进程...

C++11多线程知识点总结

一、多线程的基本概念1、进程与线程的区别和联系进程:进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程;线程:是运行中的实际的任务执行者。可以说,进程中包含了多...

微服务高可用的2个关键技巧,你一定用得上

概述上一篇文章讲了一个朋友公司使用SpringCloud架构遇到问题的一个真实案例,虽然不是什么大的技术问题,但如果对一些东西理解的不深刻,还真会犯一些错误。这篇文章我们来聊聊在微服务架构中,到底如...

Java线程间如何共享与传递数据

1、背景在日常SpringBoot应用或者Java应用开发中,使用多线程编程有很多好处,比如可以同时处理多个任务,提高程序的并发性;可以充分利用计算机的多核处理器,使得程序能够更好地利用计算机的资源,...

取消回复欢迎 发表评论: