百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Spring Boot 分布式事务实现简单得超乎想象

mhr18 2025-07-14 20:06 2 浏览 0 评论

环境:SpringBoot2.7.18 + Atomikos4.x + MySQL5.7



1. 简介

关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。

本篇文章将基于SpringBoot整合Atomikos实现分布式事务。

1.1 什么是Atomikos

Atomikos 是一个第三方的事务管理器(Transaction Manager),它是对Java Transaction API (JTA) 的实现,旨在提供可靠、高效的分布式事务处理能力。Atomikos 与 JTA 之间的关系可以描述为:实现的关系:

  • JTA:Java Transaction API 是一套标准的Java接口规范,定义了应用程序如何与事务管理器交互,以进行分布式事务的管理和控制。JTA规范由Java Community Process (JCP)制定,为Java开发者提供了一种与具体数据库或事务资源无关的、统一的事务处理编程模型。
  • Atomikos:作为一款第三方事务管理软件,Atomikos TransactionsEssentials 是 JTA 规范的一个具体实现。它实现了JTA所定义的接口,如下接口:
javax.transaction.UserTransaction;
javax.transaction.TransactionManager;

Atomikos 作为 JTA 实现,负责实际的事务管理任务,包括:

  • 事务协调:按照两阶段提交(2PC)协议或者其他分布式事务协议来协调参与事务的多个资源(如数据库、消息中间件)。
  • 事务生命周期管理:提供API供应用程序启动、提交、回滚事务,以及查询事务状态。
  • 故障恢复:在出现系统故障时,具备一定的事务恢复能力,保证事务的最终一致性。
  • 性能优化:可能包含针对特定环境的性能优化策略,如事务超时管理、并发控制、缓存机制等。


1.2 什么是JTA

Java Transaction API(JTA)是Java的应用程序接口(API),用于处理分布式事务,尤其是在企业级Java应用程序(Java EE)的环境中。JTA使得开发人员能够在一个跨多个数据库或事务资源(例如JMS消息队列)的操作中管理事务的完整生命周期,确保数据的一致性和完整性。

JTA的核心概念包括:

  1. 分布式事务:JTA支持跨越多个网络计算机资源的事务管理,允许在不同的数据库或事务资源之间同步提交或回滚事务。
  2. XA协议:JTA依赖于XA协议来协调不同事务资源之间的事务。XA是分布式事务处理的标准接口,定义了两阶段提交协议来确保所有参与事务的资源要么一起成功提交,要么一起回滚。
  3. 事务管理器(Transaction Manager):JTA事务管理器负责协调全局事务,监控事务边界,并决定何时提交或回滚事务。
  4. 资源管理器(Resource Manager):如JDBC XA兼容的数据库驱动程序(像MySQL,Oracle等都实现了XA协议),它们参与到全局事务中并能响应事务管理器的指令。

通过JTA,开发者无需直接编写复杂的事务管理代码,而是可以通过容器提供的事务服务或者编程方式来声明事务边界、设置事务属性以及控制事务的传播行为等。在Java EE应用服务器中,JTA通常与EJB(Enterprise JavaBeans)容器集成,使得EJB方法能够在无感知的情况下自动参与到事务管理中去。此外,Spring框架也提供了对JTA事务的支持,使得在非Java EE环境下也能方便地使用分布式事务处理功能。

以上是关于Atomikos与JTA的简单介绍,接下来将通过实际的案例来演示。

2. 实战案例

2.1 环境准备

数据库

创建2个数据库,如下:

不用创建表,因为我们将使用JPA,由JPA自动生成相关的表。

项目依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

SpringBoot已经为我们集成了atomikos,所以这里只需要引入对应的starter即可。

2.2 核心配置类

定义domain对象

分别放到不同的package中,如下:

// pkg: com.pack.domain.customer
@Entity
@Table(name = "customer")
public class Customer {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id;
  @Column(name = "name", nullable = false)
  private String name;
  @Column(name = "age", nullable = false)
  private Integer age;
}
// pkg: com.pack.domain.order
@Entity
@Table(name = "orders")
public class Order {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id;
  @Column(name = "code", nullable = false)
  private Integer code;
  @Column(name = "quantity", nullable = false)
  private Integer quantity;
}

定义Repository接口

// pkg: com.pack.repository.customer
public interface CustomerRepository extends JpaRepository<Customer, Integer> {
}
// pkg: com.pack.repository.order
public interface OrderRepository extends JpaRepository<Order, Integer> {
}

定义Service

// pkg: com.pack.service.customer
@Service
public class CustomerService{


  @Resource
  private CustomerRepository customerRepository ;


  @Transactional
  public void save(Customer customer){
    this.customerRepository.save(customer) ;
  }
}
// pkg: com.pack.service.order
@Service
public class OrderService{


  @Resource
  private OrderRepository orderRepository ;


  @Transactional
  public void save(Order order){
    this.orderRepository.save(order) ;
    throw new RuntimeException("订单发生异常") ;
  }
}

接下来是重点了,需要分别对每一个数据源进行相应JPA的配置(如果你使用MyBatis也是同样的原理,对相应的如:SqlSessionFactory等进行配置)。

Customer相应配置

@ConfigurationProperties(prefix = "pack.datasource.customer")
public class CustomerDataSourceProperties {


  private String jdbcUrl ;
  private String username ;
  private String password ;
}
// 下面是针对JPA及XA数据源的配置
@Configuration
@DependsOn("transactionManager")      
// 设置jpa repository扫描的包 及相应的事务管理器
@EnableJpaRepositories(basePackages = "com.pack.repository.customer", entityManagerFactoryRef = "customerEntityManager", transactionManagerRef = "transactionManager")
@EnableConfigurationProperties(CustomerDataSourceProperties.class)
public class CustomerConfig {


  @Resource
  private JpaVendorAdapter jpaVendorAdapter;
  @Resource
  private CustomerDataSourceProperties properties ;


  // 数据源配置该数据源是由MySQL驱动程序提供的 
  @Bean(name = "customerDataSource", initMethod = "init", destroyMethod = "close")
  DataSource customerDataSource() throws Exception {
    MysqlXADataSource dataSource = new MysqlXADataSource();
    dataSource.setUrl(properties.getJdbcUrl()) ;
    dataSource.setUser(properties.getUsername()) ;
    dataSource.setPassword(properties.getPassword()) ;
    dataSource.setPinGlobalTxToPhysicalConnection(true) ;
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(dataSource);
    xaDataSource.setUniqueResourceName("xa-customer");
    return xaDataSource;
  }


  @Bean(name = "customerEntityManager")
  @DependsOn("transactionManager")
  LocalContainerEntityManagerFactoryBean customerEntityManager() throws Throwable {
    HashMap<String, Object> properties = new HashMap<String, Object>();
    properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
    // 设置事务类型为JTA
    properties.put("javax.persistence.transactionType", "JTA") ;
    // 指定数据库方言
    properties.put("hibernate.dialect", "org.hibernate.dialect.MySQL5InnoDBDialect") ;  


    LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
    entityManager.setJtaDataSource(customerDataSource());
    entityManager.setJpaVendorAdapter(jpaVendorAdapter);
    // 配置JPA扫描的包
    entityManager.setPackagesToScan("com.pack.domain.customer");
    entityManager.setPersistenceUnitName("customerPersistenceUnit");
    entityManager.setJpaPropertyMap(properties);
    return entityManager;
  }


}

Order相应配置

@ConfigurationProperties(prefix = "pack.datasource.order")
public class OrderDataSourceProperties {
  private String jdbcUrl ;
  private String username ;
  private String password ;
}
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.pack.repository.order", entityManagerFactoryRef = "orderEntityManager", transactionManagerRef = "transactionManager")
@EnableConfigurationProperties(OrderDataSourceProperties.class)
public class OrderConfig {


  @Resource
  private JpaVendorAdapter jpaVendorAdapter;
  @Resource
  private OrderDataSourceProperties properties ;


  @Bean(name = "orderDataSource", initMethod = "init", destroyMethod = "close")
  DataSource customerDataSource() throws Exception {
    MysqlXADataSource dataSource = new MysqlXADataSource();
    dataSource.setUrl(properties.getJdbcUrl()) ;
    dataSource.setUser(properties.getUsername()) ;
    dataSource.setPassword(properties.getPassword());
    dataSource.setPinGlobalTxToPhysicalConnection(true) ;
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(dataSource);
    xaDataSource.setUniqueResourceName("xa-order");
    return xaDataSource;
  }


  @Bean(name = "orderEntityManager")
  @DependsOn("transactionManager")
  LocalContainerEntityManagerFactoryBean customerEntityManager() throws Throwable {
    HashMap<String, Object> properties = new HashMap<String, Object>();
    properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName()) ;
    properties.put("javax.persistence.transactionType", "JTA") ;
    properties.put("hibernate.dialect", "org.hibernate.dialect.MySQL5InnoDBDialect") ;


    LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
    entityManager.setJtaDataSource(customerDataSource());
    entityManager.setJpaVendorAdapter(jpaVendorAdapter);
    entityManager.setPackagesToScan("com.pack.domain.order");
    entityManager.setPersistenceUnitName("orderPersistenceUnit");
    entityManager.setJpaPropertyMap(properties);
    return entityManager;
  }


}

主配置

@Configuration
@ComponentScan
@EnableTransactionManagement
public class MainConfig{


  @Bean
  JpaVendorAdapter jpaVendorAdapter(){
    HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
    // 显示SQL
    hibernateJpaVendorAdapter.setShowSql(true);
    // 生成ddl语句
    hibernateJpaVendorAdapter.setGenerateDdl(true);
    // 数据库类型
    hibernateJpaVendorAdapter.setDatabase(Database.MYSQL);
    return hibernateJpaVendorAdapter;
  }


  @Bean(name = "userTransaction")
  UserTransaction userTransaction() throws Throwable {
    UserTransactionImp userTransactionImp = new UserTransactionImp();
    userTransactionImp.setTransactionTimeout(10000) ;
    return userTransactionImp;
  }


  @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
  TransactionManager atomikosTransactionManager() throws Throwable {
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(false);
    AtomikosJtaPlatform.transactionManager = userTransactionManager;
    return userTransactionManager;
  }


  @Bean(name = "transactionManager")
  @DependsOn({"userTransaction", "atomikosTransactionManager"})
  PlatformTransactionManager transactionManager() throws Throwable {
    UserTransaction userTransaction = userTransaction();
    AtomikosJtaPlatform.transaction = userTransaction;
    TransactionManager atomikosTransactionManager = atomikosTransactionManager();
    return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
  }
}

以上就是所有的核心配置类,接下来做测试

准备测试Service

@Service
public class BaseService{


  @Resource
  private CustomerService customerService ;
  @Resource
  private OrderService orderService ;


  // 正常执行
  @Transactional
  public void save(Customer customer, Order order){
    this.customerService.save(customer) ;
    this.orderService.save(order) ;
  }


  // 入口抛出异常(测试是否两个都进行了回滚)
  @Transactional
  public void saveLocalExecption(Customer customer, Order order){
    this.customerService.save(customer) ;
    this.orderService.save(order) ;
    throw new RuntimeException("LocalException 发生异常") ;
  }


  // 测试当Customer接口抛出了异常
  @Transactional
  public void saveCustomerException(Customer customer, Order order){
    this.customerService.save(customer) ;
    this.orderService.save(order) ;
  }


}

正常测试

@Test
public void testSaveNormal(){
  Customer customer = new Customer() ;
  customer.setAge(10) ;
  customer.setName("张三") ;


  Order order = new Order() ;
  order.setCode(10001) ;
  order.setQuantity(10) ;


  baseService.save(customer, order) ;
}

输出结果

成功提交事务,查看数据库都有数据。

异常测试1

@Test
public void testSaveLocalExecption(){
  Customer customer = new Customer() ;
  customer.setAge(10) ;
  customer.setName("张三") ;


  Order order = new Order() ;
  order.setCode(10001) ;
  order.setQuantity(10) ;


  baseService.saveLocalExecption(customer, order) ;
}

输出结果

两个事务都回滚,数据库没有insert数据。

异常测试2

与上面的结果相同。

到此通过Atomikos实现的分布式事务就成功完成了。

相关推荐

Spring Boot 分布式事务实现简单得超乎想象

环境:SpringBoot2.7.18+Atomikos4.x+MySQL5.71.简介关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。本篇文章将基于SpringBoot...

Qt编写可视化大屏电子看板系统15-曲线面积图

##一、前言曲线面积图其实就是在曲线图上增加了颜色填充,单纯的曲线可能就只有线条以及数据点,面积图则需要从坐标轴的左下角和右下角联合曲线形成完整的封闭区域路径,然后对这个路径进行颜色填充,为了更美观...

Doris大数据AI可视化管理工具SelectDB Studio重磅发布!

一、初识SelectDBStudioSelectDBStudio是专为ApacheDoris湖仓一体典型场景实战及其兼容数据库量身打造的GUI工具,简化数据开发与管理。二、Select...

RAD Studio 、Delphi或C++Builder设计代码编译上线缩短开发时间

#春日生活打卡季#本月,Embarcadero宣布RADStudio12.3Athens以及Delphi12.3和C++Builder12.3,提供下载。RADStudio12.3A...

Mybatis Plus框架学习指南-第三节内容

自动填充字段基本概念MyBatis-Plus提供了一个便捷的自动填充功能,用于在插入或更新数据时自动填充某些字段,如创建时间、更新时间等。原理自动填充功能通过实现com.baomidou.myba...

「数据库」Sysbench 数据库压力测试工具

sysbench是一个开源的、模块化的、跨平台的多线程性能测试工具,可以用来进行CPU、内存、磁盘I/O、线程、数据库的性能测试。目前支持的数据库有MySQL、Oracle和PostgreSQL。以...

如何选择适合公司的ERP(选erp系统的经验之谈)

很多中小公司想搞ERP,但不得要领。上ERP的目的都是歪的,如提高效率,减少人员,堵住财务漏洞等等。真正用ERP的目的是借机提升企业管理能力,找出管理上的问题并解决,使企业管理更规范以及标准化。上ER...

Manus放开注册,但Flowith才是Agent领域真正的yyds

大家好,我是运营黑客。前天,AIAgent领域的当红炸子鸡—Manus宣布全面放开注册,终于,不需要邀请码就能体验了。于是,赶紧找了个小号去确认一下。然后,额……就被墙在了外面。官方解释:中文版...

歌浓酒庄总酿酒师:我们有最好的葡萄园和最棒的酿酒师

中新网1月23日电1月18日,张裕董事长周洪江及总经理孙健一行在澳大利亚阿德莱德,完成了歌浓酒庄股权交割签约仪式,这也意味着张裕全球布局基本成型。歌浓:澳大利亚年度最佳酒庄据悉,此次张裕收购的...

软件测试进阶之自动化测试——python+appium实例

扼要:1、了解python+appium进行APP的自动化测试实例;2、能根据实例进行实训操作;本课程主要讲述用python+appium对APP进行UI自动化测试的例子。appium支持Androi...

为什么说Python是最伟大的语言?看图就知道了

来源:麦叔编程作者:麦叔测试一下你的分析能力,直接上图,自己判断一下为什么Python是最好的语言?1.有图有真相Java之父-JamesGoshlingC++之父-BjarneStrou...

如何在Eclipse中配置Python开发环境?

Eclipse是著名的跨平台集成开发环境(IDE),最初主要用来Java语言开发。但是我们通过安装不同的插件Eclipse可以支持不同的计算机语言。比如说,我们可以通过安装PyDev插件,使Eclip...

联合国岗位上新啦(联合国的岗位)

联合国人权事务高级专员办事处PostingTitleIntern-HumanRightsDutyStationBANGKOKDeadlineOct7,2025CategoryandL...

一周安全漫谈丨工信部:拟定超1亿条一般数据泄露属后果严重情节

工信部:拟定超1亿条一般数据泄露属后果严重情节11月23日,工信部官网公布《工业和信息化领域数据安全行政处罚裁量指引(试行)(征求意见稿)》。《裁量指引》征求意见稿明确了行政处罚由违法行为发生地管辖、...

oracle列转行以及C#执行语句时报错问题

oracle列转行的关键字:UNPIVOT,经常查到的怎么样转一列,多列怎么转呢,直接上代码(sshwomeyourcode):SELECTsee_no,diag_no,diag_code,...

取消回复欢迎 发表评论: