百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

周期性清除Spark Streaming流状态的方法

mhr18 2025-08-05 19:44 2 浏览 0 评论

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:

 val productPvStream = stream.mapPartitions(records => {
 var result = new ListBuffer[(String, Int)]
 for (record <- records) {
 result += Tuple2(record.key(), 1)
 }
 result.iterator
 }).reduceByKey(_ + _).mapWithState(
 StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {
 val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)
 state.update(sum)
 (productId, sum)
 })).stateSnapshots()

现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。

编写脚本重启Streaming程序

用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本:

stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
if [ ${cnt} -eq 1 ]; then
 pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
 kill -9 ${pid}
 sleep 20
 cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
 if [ ${cnt} -eq 0 ]; then
 nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
 fi
fi

这种方式最简单,也不需要对程序本身做任何改动。但随着同时运行的Streaming任务越来越多,就会显得越来越累赘了。

给StreamingContext设置超时

在程序启动之前,先计算出当前时间点距离第二天凌晨0点的毫秒数:

def msTillTomorrow = {
 val now = new Date()
 val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
 tomorrow.getTime - now.getTime
}

然后将Streaming程序的主要逻辑写在while(true)循环中,并且不像平常一样调用
StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

while (true) {
 val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
 ssc.checkpoint(CHECKPOINT_DIR)
 // ...处理逻辑...
 ssc.start()
 ssc.awaitTerminationOrTimeout(msTillTomorrow)
 ssc.stop(false, true)
 Thread.sleep(BATCH_INTERVAL * 1000)
 }

在经过msTillTomorrow毫秒之后,StreamingContext就会超时,再调用其stop()方法(注意两个参数,stopSparkContext表示是否停止关联的SparkContext,stopGracefully表示是否优雅停止),就可以停止并重启StreamingContext。

以上两种方法都是仍然采用Spark Streaming的机制进行状态计算的。如果其他条件允许的话,我们还可以抛弃mapWithState(),直接借助外部存储自己维护状态。比如将Redis的Key设计为product_pv:[product_id]:[date],然后在Spark Streaming的每个批次中使用incrby指令,就能方便地统计PV了,不必考虑定时的问题。

相关推荐

IM群聊消息如此复杂,如何保证不丢不重?

群聊是多人社交的基本诉求,不管是QQ群,还是微信群,一个群友在群内发了一条消息:(1)在线的群友能第一时间收到消息(2)离线的群友能在登陆后收到消息群消息的复杂度要远高于单对单消息。群消息的实时性,可...

Python 网络爬虫实战:从零到部署的完整流程

适用人群:初-中级Python开发者、数据分析师、运维/测试自动化工程师工具栈:Python3.11+requests+BeautifulSoup/lxml+pandas+(...

用上Kiro之后,完全没理由为Cursor续费了

替Cursor续费前最后一秒,免费IDEKiro把钱包按死在屏幕前五位数年费的AI编程助手,被一匹黑马零元秒杀。用过Kiro的人,开note第一件事就是删掉Cursor的自动续费,动作快到连...

分布式微服务中的搜索引擎:架构与实战盘点

01、为什么微服务需要分布式搜索?在单体应用时代,我们通常使用单一数据库的全文检索功能(如MySQL的LIKE语句)或简单的搜索引擎(如早期的Lucene)。但随着业务规模扩大,这种架构暴露出诸多问题...

产品列表获取API接口详解

在现代软件开发中,API(应用程序编程接口)是获取产品列表的核心工具,它允许开发者从远程服务器高效地检索数据。本文将逐步介绍如何设计和使用产品列表获取API接口,包括核心概念、实现步骤、代码示例以及最...

企业和个人基于业务知识和代码库增强的大模型生成代码实践

作者:京东零售杨亚龙1.源起李明是今年刚加入某互联网公司的研发新人,满怀期待地开始了他的职业生涯。然而,短短两周后,他的热情就被现实浇了一盆冷水。第一周:当他第一次接手需求时,mentor只是简单...

从零到一:独立运行若依框架系统并进行本地二次开发

####一、环境准备1.**基础环境**:-JDK1.8+(推荐JDK17)-Maven3.6+-MySQL5.7+(推荐8.0)-Redis5.0+-Node.js16...

一文教你高效优化在Spring Boot3中遇到深度分页查询性能难题?

你有没有这样的经历?在使用SpringBoot3开发项目时,深度分页查询操作让程序运行得越来越慢,页面加载时间变得难以忍受,不仅影响用户体验,还可能导致项目进度受阻。明明代码逻辑看起来没问题,可...

JAVA面试|如何优化limit分页

我们来详细通俗地聊聊如何优化LIMIToffset,size分页。核心问题在于OFFSET的值很大时,性能会急剧下降。想象一下数据库的工作方式,你就明白为什么了。一、为什么OFFSET大时慢?假...

MySQL(143)如何优化分页查询?

优化分页查询是提升数据库性能和用户体验的重要手段。特别是在处理大数据集时,分页查询的效率对系统性能有显著影响。以下是优化分页查询的详细步骤和代码示例。一、传统分页查询传统的分页查询使用OFFSET...

Seata概述

什么是SeataSeata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务也是SpringCloudAlibaba提供的组件Seata官方文档https...

Docmost:一款开源的Wiki和文档协作软件

是一款开源的团队协作Wiki与文档管理工具,定位为Confluence和Notion的开源替代品,专注于提供高效、安全且可定制的知识库解决方案。Docmost的核心优势在于开源免...

B端系统管理「字典管理」模块实战指南

字典管理听起来像“后端杂务”,其实是B端系统配置能力的关键支点。本指南将从真实业务场景出发,系统拆解该模块的设计逻辑、关键字段与典型坑位,让你一文读懂如何搭建一个能跑得久、配得稳的字典模块。一、字典管...

Spring Boot 整合 Redis BitMap 实现 签到与统计

要在SpringBoot中实现RedisBitMap来进行签到和统计,您需要按照以下步骤进行操作:添加Redis依赖:在pom.xml文件中添加Redis依赖:<dependen...

周期性清除Spark Streaming流状态的方法

在SparkStreaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:valproductPvSt...

取消回复欢迎 发表评论: