程序员的资源宝库

网站首页 > gitee 正文

分布式事务之 Seata

sanyeah 2024-04-08 12:54:25 gitee 10 ℃ 0 评论

Seata 是什么?

  官网 :https://seata.io/zh-cn/docs/overview/what-is-seata.html

  Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 对应的内部版本在阿里经济体内部一直扮演着分布式一致性中间件的角色,帮助经济体平稳的度过历年的双11,对各BU业务进行了有力的支撑。经过多年沉淀与积累,商业化产品先后在阿里云、金融云进行售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助其技术更加可靠与完备。

Seata术语:

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务。
  • RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

  其实 Seata 之所以能保证分布式事务的一致性,我的理解是Seata锁扮演的角色跟Zookeeper是类似的。在服务进行拆分后进行多服务节点的部署,这使得多个节点的事务操作失去了联系,而Seata 作为事务协调者,扮演着一个上帝视角。对于托管了事务操作的服务来说,Seata是完全可见的。事务是否提交/回滚。均要受到Seata的控制。也正是如此。Seata 可以控制分布式事务。Seata 提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata 各事务模式

Seata AT 模式

  前提

  • 基于支持本地 ACID 事务的关系型数据库。

  • Java 应用,通过 JDBC 访问数据库。

  整体机制

  两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

  • 二阶段:

    • 提交异步化,非常快速地完成。

    • 回滚通过一阶段的回滚日志进行反向补偿。

Seata TCC 模式

  根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.

  AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:

    • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
    • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
    • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

  相应的,TCC 模式,不依赖于底层数据资源的事务支持:

    • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
    • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
    • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

  所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

SEATA Saga 模式

  适用场景:

  • 业务流程长、业务流程多

  • 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

  优势:

  • 一阶段提交本地事务,无锁,高性能

  • 事件驱动架构,参与者可异步执行,高吞吐

  • 补偿服务易于实现

  缺点:

  • 不保证隔离性

  Saga的实现:

  基于状态机引擎的 Saga 实现:

  目前SEATA提供的Saga模式是基于状态机引擎来实现的,机制是:

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件

  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点

  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚

  4. 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能

Seata XA 模式

  从编程模型上,XA 模式与 AT 模式保持完全一致。

  可以参考 Seata 官网的样例:seata-xa

  样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。

  在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。

Seata 服务启动:

   下载服务器软件包,将其解压缩。

  Seata-Server包含两个核心配置文件,其中registry.conf表示配置Seata服务注册的地址,它目前支持所有主流的注册中心(nacos 、eureka、redis、zk、consul、etcd3、sofa)。默认是file,表示不依赖于注册中心以及配置中心。

  file.conf存储的是Seata服务端的配置信息,完整的配置包含transport、Server、Metrics,分别表示通信配置,服务端配置,监控等。

  配置信息修改参考 :https://seata.io/zh-cn/docs/user/configurations.html

   然后保持默认配置启动服务即可 sh seata-server.sh  ,后台启动 nohup ./seata-server.sh >log.out 2>1 &

springboot-dubbo-seata 服务测试 :

  官方提供了常见的各种集成方式,github地址 :https://github.com/seata/seata-samples

  将工程 clone 下来,其中有个 springboot-dubbo-seata 工程,基于 AT 模式。涉及组件 SpringBoot + Dubbo + Mybatis + Nacos + Seata 。按照演示步骤操作

  • 预先安装并启动  Nacos 、Mysql
  • 导入sql 目录下的sql脚本
  • 启动 Seata 服务
  • 修改应用中application.propertie的相关IP、file.conf 文件的default.grouplist 属性,指向我们Seata的服务IP端口。然后分别启动 演示应用服务。连接成功会打印以下日志:

  整合 Seata 中比较重要的操作:

1.依赖 

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>1.3.0</version>
</dependency>

2.配置事务扫描器

@Bean
public GlobalTransactionScanner globalTransactionScanner(){
    return new GlobalTransactionScanner("account-gts-seata-example", "my_test_tx_group");
}

  关于事务分组的概念参考:https://seata.io/zh-cn/docs/user/transaction-group.html

3.配置文件file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "192.168.1.101:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

  registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"

  nacos {
    application = "seata-server"
    serverAddr = "localhost"
    namespace = ""
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    timeout = "0"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

  然后在 samples-business 模块的 io.seata.samples.integration.call.service 包下有个 BusinessServiceImpl 测试服务:

@Service
public class BusinessServiceImpl implements BusinessService{

    @Reference(version = "1.0.0")
    private StorageDubboService storageDubboService;

    @Reference(version = "1.0.0")
    private OrderDubboService orderDubboService;

    private boolean flag;

    /**
     * 处理业务逻辑
     * @Param:
     * @Return:
     */
    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")
    public ObjectResponse handleBusiness(BusinessDTO businessDTO) {
        System.out.println("开始全局事务,XID = " + RootContext.getXID());
        ObjectResponse<Object> objectResponse = new ObjectResponse<>();
        //1、扣减库存
        CommodityDTO commodityDTO = new CommodityDTO();
        commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
        commodityDTO.setCount(businessDTO.getCount());
        ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO);
        //2、创建订单
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setUserId(businessDTO.getUserId());
        orderDTO.setCommodityCode(businessDTO.getCommodityCode());
        orderDTO.setOrderCount(businessDTO.getCount());
        orderDTO.setOrderAmount(businessDTO.getAmount());
        ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);

        //打开注释测试事务发生异常后,全局回滚功能
        if (!flag) {
            throw new RuntimeException("测试抛异常后,分布式事务回滚!");
        }

        if (storageResponse.getStatus() != 200 || response.getStatus() != 200) {
            throw new DefaultException(RspStatusEnum.FAIL);
        }

        objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());
        objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());
        objectResponse.setData(response.getData());
        return objectResponse;
    }
}

  可以看到最核心的还是注解  @GlobalTransactional。 可以通过其中的异常测试事务的回滚,当我们在这行代码打上断点的时候,再去看数据库的 undo_log 表,会插入三条回滚日志,用于回滚操作

 

  其中 rollback_info 内容大致如下:

{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
    "xid": "192.168.1.101:8091:58238671456194560",
    "branchId": 58238672634793984,
    "sqlUndoLogs": ["java.util.ArrayList", [{
        "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
        "sqlType": "UPDATE",
        "tableName": "t_storage",
        "beforeImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "t_storage",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "count",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 1000
                }]]
            }]]
        },
        "afterImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "t_storage",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "count",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 998
                }]]
            }]]
        }
    }]]
}

  大概就是数据的操作前后镜像,然后根据这个做回滚操作。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表