golang 使用 mysql XA 事务
2023-1-19 08:53:36 Author: Go语言中文网(查看原文) 阅读量:12 收藏

在MySQL 5.7.7版本中,才将xa的bug修复,符合Open Group 的<<Distributed Transaction Processing:The XA Specification>> 标准。Mysql中存在两种XA事务,一种是内部XA事务主要用来协调存储引擎和二进制日志,一种是外部事务可以参与到外部分布式事务中(比如多个数据库实现的分布式事务)。xa的语法如下:

XA {START|BEGIN} xid [JOIN|RESUME] //开启本地事务XA END xid //结束本地事务XA PREPARE xid  //全局事务进入预备状态XA COMMIT xid[ONE PHASE] //提交XA ROLLBACK xid  //回滚XA RECOVER[CONVERT XID ] //恢复没有提交的事务,继续执行

XA是牺牲可用性保证强一致性的事务,因为需要mysql的事务隔离级别为串行化。下面我们来实践一把。首先启动两个mysql实例,端口分别是3306和3307:

version: "3.1"services:  mysql:    image: mysql:5.7    container_name: mysql    environment:      - MYSQL_ROOT_PASSWORD=12345678    command: --default-authentication-plugin=mysql_native_password --default-time-zone='+08:00'    volumes:      - /learn/mysql:/docker-entrypoint-initdb.d      - /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf    ports:      - "3306:3306"    extra_hosts:     - host.docker.internal:host-gateway  mysql2:    image: mysql:5.7    container_name: mysql2    environment:      - MYSQL_ROOT_PASSWORD=12345678    command: --default-authentication-plugin=mysql_native_password --default-time-zone='+08:00'    volumes:      - /learn/mysql:/docker-entrypoint-initdb.d      - /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf    ports:      - "3307:3306"    extra_hosts:     - host.docker.internal:host-gateway

分别链接mysql,确认是否支持xa协议,修改事务隔离级别

 % mysql -uroot -p12345678 -h127.0.0.1 % mysql -uroot -p12345678 -h127.0.0.1 -P3307 mysql> show variables like 'innodb_support_xa';+-------------------+-------+| Variable_name     | Value |+-------------------+-------+| innodb_support_xa | ON    |+-------------------+-------+1 row in set (0.01 sec)mysql> set global tx_isolation=serializable;Query OK, 0 rows affected, 1 warning (0.01 sec)mysql> show variables like '%tx_isolation%';+---------------+--------------+| Variable_name | Value        |+---------------+--------------+| tx_isolation  | SERIALIZABLE |+---------------+--------------+1 row in set (0.00 sec)

然后创建数据库和表

mysql3306中 我们有一个user表

 create database orders; use orders;create table user (    id int,    name varchar(10),    score int);insert into user values(1, "foo", 10);

在mysql3307中,我们有一个wallet表。

use orders;create table wallet (    id int,    money float );insert into wallet values(1, 10.1);

然后就可以通过xa协议实现分布式事务

package main
import ( "database/sql" "fmt" "log" "strconv" "time"
_ "github.com/go-sql-driver/mysql" "github.com/pkg/errors")
func main() { var err error // db1的连接 db1, err := sql.Open("mysql", "root:[email protected](127.0.0.1:3306)/orders") if err != nil { panic(err.Error()) } defer db1.Close() // db2的连接 db2, err := sql.Open("mysql", "root:[email protected](127.0.0.1:3307)/orders") if err != nil { panic(err.Error()) } defer db2.Close() // 开始前显示 var score int db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) var money float64 db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) // 生成xid xid := strconv.FormatInt(time.Now().Unix(), 10) fmt.Println("=== xid:" + xid + " ====") defer func() { if err := recover(); err != nil { fmt.Printf("%+v\n", err) fmt.Println("=== call rollback ====") // db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) // db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }() // XA 启动 fmt.Println("=== call start ====") if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // DML操作 if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil { panic(errors.WithStack(err)) } // panic(errors.WithStack(err))   /*   xa recover;   Empty set (0.01 sec)  */ // XA end fmt.Println("=== call end ====") if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // prepare fmt.Println("=== call prepare ====") if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db1 prepare error")) /* db1 xa recover; +----------+--------------+--------------+------------+ | formatID | gtrid_length | bqual_length | data | +----------+--------------+--------------+------------+ | 1 | 10 | 0 | 1671287200 | +----------+--------------+--------------+------------+ 1 row in set (0.00 sec) db2 xa recover; Empty set (0.00 sec) //新事务会死锁 === xid:1671287283 ==== === call start ==== Error 1205: Lock wait timeout exceeded; try restarting transaction main.main
mysql> xa commit '1671287200'; Query OK, 0 rows affected (0.00 sec) */ if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db2 prepare error")) /* db1 mysql> xa recover; +----------+--------------+--------------+------------+ | formatID | gtrid_length | bqual_length | data | +----------+--------------+--------------+------------+ | 1 | 10 | 0 | 1671287434 | +----------+--------------+--------------+------------+ 1 row in set (0.00 sec) db2 xa recover; +----------+--------------+--------------+------------+ | formatID | gtrid_length | bqual_length | data | +----------+--------------+--------------+------------+ | 1 | 10 | 0 | 1671287434 | +----------+--------------+--------------+------------+ 1 row in set (0.00 sec) */ // commit fmt.Println("=== call commit ====") if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { // TODO: 尝试重新提交COMMIT // TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交 log.Println("xid:" + xid) } // panic(errors.New("db2 commit error")) if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { log.Println("xid:" + xid) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money)}

为了实验充分理解分布式事务,我们分别在几个特殊阶段点来panic,中断事务。

1,在两个db实例prepare之前

2,在db1  prepare之后,db2 prepare之前

3,在两个db prepare之后,db1 commit之前

4,在db1 commit之后, db2 commit之前

情形1下:两个事务都没有prepare,全局不可见,异常中断后,本地事务回滚掉了,在两个实例上恢复的时候,都是一致的

 xa recover;Empty set (0.00 sec)

情形2下:db1,已经prepare了,全局事务会记录下来,本地事务不会自动回滚掉,db2 没有prepare,本地事务会自动回滚掉

db1xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data       |+----------+--------------+--------------+------------+|        1 |           10 |            0 | 1671287200 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)db2xa recover;Empty set (0.00 sec)

这个时候如果新开一个事务,会等待上一个没有结束的事务释放锁,而超时

    === xid:1671287283 ====    === call start ====    Error 1205: Lock wait timeout exceeded; try restarting transaction    main.main

情形3下:两个事务都处于prepare状态,等待处理

  db1      mysql> xa recover;    +----------+--------------+--------------+------------+    | formatID | gtrid_length | bqual_length | data       |    +----------+--------------+--------------+------------+    |        1 |           10 |            0 | 1671287434 |    +----------+--------------+--------------+------------+    1 row in set (0.00 sec)  db2      xa recover;    +----------+--------------+--------------+------------+    | formatID | gtrid_length | bqual_length | data       |    +----------+--------------+--------------+------------+    |        1 |           10 |            0 | 1671287434 |    +----------+--------------+--------------+------------+    1 row in set (0.00 sec)

情形4下:db1已经提交,recover后是空,db2是待提交状态。

db1      mysql> xa recover;      Empty set (0.00 sec)db2      xa recover;      +----------+--------------+--------------+------------+      | formatID | gtrid_length | bqual_length | data       |      +----------+--------------+--------------+------------+      |        1 |           10 |            0 | 1671289143 |      +----------+--------------+--------------+------------+      1 row in set (0.00 sec)

这个时候可以操作db2的回滚,或者提交,db1已经提交,没法回滚了

db1xa rollback '1671289143';ERROR 1397 (XAE04): XAER_NOTA: Unknown XIDdb2 xa rollback '1671289143';Query OK, 0 rows affected (0.01 sec)

所以在xa下也会出现不一致,需要人工介入进行回滚或者提交,保证最终的一致性。

总结下:没有银弹,xa虽然尽最大努力保证了一致性,但是如果出现部分提交,还是需要人工介入处理,保证最终的一致性。


推荐阅读

福利
我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


文章来源: http://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&mid=2651453994&idx=1&sn=d4aae160b805a998167514b0a08a0d1b&chksm=80bb24d8b7ccadce2f038154a9a00e9723159ff0fc73288a8fd3c315690f51ad02b27401b3f8#rd
如有侵权请联系:admin#unsafe.sh