百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

聊聊在集群环境中本地缓存如何进行同步

mhr18 2024-12-10 14:38 20 浏览 0 评论

前言

之前有发过一篇文章聊聊如何利用redis实现多级缓存同步。有个读者就给我留言说,因为他项目的redis版本不是6.0+版本,因此他使用我文章介绍通过MQ来实现本地缓存同步,他的同步流程大概如下图

他原来的业务流程是每天凌晨开启定时器去爬取第三方的数据,并持久化到redis,后边因为redis发生过宕机事故,他碰巧看了我文章,就觉得可以用使用多级缓存的策略,用来做个兜底。他的业务流程就如上图,即每天凌晨开启定时器去爬取第三方数据,持久化到redis和其中一台服务的本地缓存,然后将爬取到的业务数据发送到kafka,其他业务服务通过订阅kafka,将业务数据保存到本地缓存。

他改造完,某天突然发现在集群环境中,只要其中一台服务消费了kafka数据,其他就消费不到。今天就借这个话题,来聊聊集群环境中本地缓存如何进行同步

前置知识

kafka消费topic-partitions模式分为subscribe模式和assign模式。subscribe模式需要指定group.id,该模式会为consumer自动分配partition,且同一个group.id下的不同consumer不会消费同样的分区。assign模式需要为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group.id无效。通俗一点讲就是assign模式下,所有消费者都可以订阅指定分区

我们要通过消息队列实现本地缓存同步,本质上就是需要利用消息队列提供广播能力,而kafka默认不具备。不过我们可以根据kafka提供的消费模式进行定制,从而是kafka也具备广播能力

集群本地缓存同步方案

方案一:利用MQ广播能力

因为读者项目是使用kafka,且项目是使用spring-kafka,我们也就以此为例

1、subscribe模式

通过前置知识,我们了解到在subscribe模式下,同一个group.id下的不同consumer不会消费同样的分区,这就意味我们可以通过指定不同group.id来消费同样分区达到广播的效果

那如何在同个集群服务实现不同的group.id?

此时Spring EL 表达式就派上用场了,我们通过 Spring EL 表达式,在每个消费者分组的名字上配合 UUID 生成其后缀。这样,就能保证每个项目启动的消费者分组不同,从而达到广播消费的目的

示例

  @KafkaListener(topics = "${userCache.topic}",groupId =  "${userCache.topic}_group_" + "#{T(java.util.UUID).randomUUID()})")
    public void receive(Acknowledgment ack, String data){
        System.out.println(String.format("serverPort:【%s】,接收到数据:【%s】",serverPort,data));
        ack.acknowledge();
    }

如果我们决定UUID不直观,我们也可以使用IP作为标识,只要能保证同个集群服务的group.id是唯一即可

不过如果要改成ip,我们得做一定的改造。改造步骤如下

a、 获取ip地址信息,并放入environment

public class ServerAddrEnvironmentPostProcessor implements EnvironmentPostProcessor{



    private String SERVER_ADDRESS = "server.addr";

    @Override
    @SneakyThrows
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        MutablePropertySources propertySources = environment.getPropertySources();
        Map<String, Object> source = new HashMap<>();
        String serverAddr = InetAddress.getLocalHost().getHostAddress();
        source.put(SERVER_ADDRESS,serverAddr);
        MapPropertySource mapPropertySource = new MapPropertySource("serverAddrProperties",source);
        propertySources.addFirst(mapPropertySource);

    }



}

b、 配置spi

在src/main/resource目录下配置META-INF/spring.factories,配置内容如下

org.springframework.boot.env.EnvironmentPostProcessor=\
com.github.lybgeek.comsumer.ip.ServerAddrEnvironmentPostProcessor

c、 @KafkaListener配置如下内容

 @KafkaListener(topics = "${userCache.topic}",groupId =  "${userCache.topic}_group_" + "${server.addr}" + "_${server.port}")

小结

该方式的实现优点是比较简单,但如果需要对服务进行运维监控统计,那就不怎么友好了,虽然指定IP会比随机UUID好点,但如果是容器化部署,每次部署其IP也是会变化,这样跟随机指定UUID,差别也不大了。其次如果是使用云产品,比如阿里云对comsume group是有数量上限,且消费者组需要提前创建,这种情况使用该方案就不是很合适了

assign模式

通过assign模式手动消费对应的分区

示例

   @KafkaListener(topicPartitions =
            {@TopicPartition(topic = "${userCache.topic}", partitions = "0")})
    public void receive(Acknowledgment ack, ConsumerRecord record){
        System.out.println(String.format("serverPort:【%s】,接收到数据:【%s】",serverPort,record));
        ack.acknowledge();
    }

小结

该方式实现也是很简单,如果我们不需要动态创建新的分区,用该方案实现广播,会是一个不错的选择。不过该方式的缺点很明显,因为是手动指定分区,当该分区有问题,也挺麻烦的

方案二:通过定时器触发

该方案主要基于读者目前的同步进行改造,改造后如下图

核心就是根据读者业务的特性,因为他是定时每天晚上同步爬取,那就意味着他这个数据至少在当天基本不变,就可以让集群里的服务都定时执行,此时仅需将xxl-job的调度策略改成分片广播就行,这样就可以持久化到redis的同时,也持久化到本地缓存

小结

该方案改动量比较小,有个小缺点就是,因为集群内所有服务都执行调度,这样就会使redis重复持久化,不过问题也不大就是好。最后读者选择该方案

总结

本文主要阐述集群环境中本地缓存如何进行同步,之前还有读者问我说,使用了多级缓存,数据一致性要如何保证?以前我可能会从技术角度来回答,比如你可以延迟双删,或者如果你是mysql,你可以使用canal+mq,更甚者你可以使用分布式锁来保证。但现在我更多从业务角度来思考这件事情,你都考虑使用缓存,是不是意味着你在业务上是可以容忍一定不一致性,既然可以容忍,是不是最终可以通过一些补偿方案来解决这个不一致性

没有完美的方案,你此时感觉的完美方案,可能是当时在那个业务场景下,做了一个贴合业务的权衡

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-broadcast


作者:linyb极客之路
链接:https://www.imooc.com/article/336455
来源:慕课网
本文原创发布于慕课网 ,转载请注明出处,谢谢合作

相关推荐

Java面试题合集200道!

1.Java中操作字符串都有哪些类?它们之间有什么区别?String、StringBuffer、StringBuilder.String和StringBufer、StringBuilder的区别...

JAVA分布式锁的原理,及多种分布式实现优劣对比分析

引题比如在同一个节点上,两个线程并发的操作A的账户,都是取钱,如果不加锁,A的账户可能会出现负数,正确的方式是对账户acount进行加锁,即使用synchronized关键字,对其进行加锁后,当有线程...

百度Linux C++后台开发面试题(个人整理)

1、C/C++程序的内存分区其实C和C++的内存分区还是有一定区别的,但此处不作区分:1)、栈区(stack)—由编译器自动分配释放,存放函数的参数值,局部变量的值等。其操作方式类似于数据结构中...

什么是云计算?看这篇就够了(建议收藏)

一、什么是云?云,又称云端,指无数的大型机房或者大型数据中心。二、为什么需要云?1)从用户的角度来讲:传统应用的需求日益复杂,比如需要支持更多的用户,需要更强的计算能力等,为满足这些日益增长的需求,企...

写PHP框架需要具备那些知识?

如果没用过框架,讨论各个框架的内容都没有可讨论性,想自己写个框架涉及到的内容很多,个人觉得自己写一个框架对自己的逻辑思维,开发架构以及这门语言都有质的提升。可以参照其他框架的源代码,仅仅是看他们的思路...

不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理

我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储键值对数据会使用HashMap,在什么场景下需要用到Blocking...

Java性能优化指南—缓存那些事

由于笔者自身水平有限,如果有不对或者任何建议欢迎批评和指正本文预计阅读时间10分钟,分为前言、填坑两部分,主要包含缓存的基本使用到高级应用场景的介绍一、前言在处理高并发请求时,缓存几乎是无往不利的利器...

卓象科技:Nosql的介绍以及和关系型数据库的区别

Nosql介绍NoSQL(NotOnlySQL),泛指非关系型数据库。Nosql的全称是NotOnlySql,这个概念很早就有人提出,在09年的时候比较火。Nosql指的是非关系型数...

腾讯一面凉经(一面竟然就问了2小时,什么情况?)

这次一面感觉是在打心理战,哥们自己的心里防线基本是被击溃,面到怀疑人生的程度,所以过程感觉不是太好,很多题哥们自己也感觉没答好,要么答得“缺胳膊少腿”,要么就是“画蛇添足”。先是聊项目,从项目的架构设...

我凭借这份pdf,最终拿到了阿里,腾讯,京东等八家大厂offer

怎样才能拿到大厂的offer,没有掌握绝对的技术,那么就要不断的学习我是如何笑对金九银十,拿到阿里,腾讯等八家大厂的offer的呢,今天分享我的秘密武器,美团大神整理的Java核心知识点,面试时面试官...

高并发 异步解耦利器:RocketMQ究竟强在哪里?

本文带大家从以下几个方面详细了解RocketMQ:RocketMQ如何保证消息存储的可靠性?RocketMQ如何保证消息队列服务的高可用?如何构建一个高可用的RocketMQ双主双从最小集群?Rock...

阿里最新Java架构师成长笔记开源

下面先给大家上一个总的目录大纲,基础的东西就不进行过多的赘述,我们将会从JVM说起,同时由于每篇的内容过多,我们也只说重点,太过基础的内容谁都会,我就不多敲字浪费大家的时间了!JVM多线程与高并发Sp...

程序员失业2个月找不到工作,狂刷了5遍这份pdf终获字节跳动offer

写在前面1月初失业,找了近2个多月的工作了,还没找到心仪的工作,感觉心好慌,不知道该怎么办了?找不到工作的时候压力很大,有人说自信会很受打击,还有人说会很绝望,是人生的低谷……尽管很多时候我们自己知道...

Spring AI 模块架构与功能解析

SpringAI是Spring生态系统中的一个新兴模块,专注于简化人工智能和机器学习技术在Spring应用程序中的集成。本文将详细介绍SpringAI的核心组件、功能模块及其之间的关...

Nginx从入门到精通,超详细整理,含项目实战案例|运维必学

Nginx是免费的、开源的、高性能的HTTP和反向代理服务器、邮件代理服务器、以及TCP/UDP代理服务器。因为它的稳定性、丰富的模块库、灵活的配置和低系统资源的消耗而闻名。Nginx可以做静态HT...

取消回复欢迎 发表评论: