In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, Xiaobian will bring you about how to use Redis to realize TCC distributed transactions in Golang. The article is rich in content and analyzed and described from a professional perspective. After reading this article, I hope you can gain something.
This partial success and partial failure scenario is very difficult for users to handle, so we need to ensure that MSET operations are either all successful or all failed.
MSET command problems in cluster mode #
The key involved in DEL, MSET and other commands may be distributed in different nodes. The simplest way to implement such commands involving multiple keys in cluster mode is of course For-Each traversing keys and sending corresponding operation instructions to the nodes where they are located. Take the implementation of the MGET command as an example:
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args)
< 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command") } // 从参数列表中取出要读取的 key keys := make([]string, len(args)-1) for i := 1; i < len(args); i++ { keys[i-1] = string(args[i]) } resultMap := make(map[string][]byte) // 计算每个 key 所在的节点,并按照节点分组 groupMap := cluster.groupBy(keys) // groupMap 的类型为 map[string][]string,key 是节点的地址,value 是 keys 中属于该节点的 key 列表 for peer, group := range groupMap { // 向每个节点发送 mget 指令,读取分布在它上面的 key resp := cluster.Relay(peer, c, makeArgs("MGET", group...)) if reply.IsErrorReply(resp) { errReply := resp.(reply.ErrorReply) return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error())) } arrReply, _ := resp.(*reply.MultiBulkReply) // 将每个节点上的结果 merge 到 map 中 for i, v := range arrReply.Args { key := group[i] resultMap[key] = v } } result := make([][]byte, len(keys)) for i, k := range keys { result[i] = resultMap[k] } return reply.MakeMultiBulkReply(result) } // 计算 key 所属的节点,并按节点分组 func (cluster *Cluster) groupBy(keys []string) map[string][]string { result := make(map[string][]string) for _, key := range keys { // 使用一致性 hash 计算所属节点 peer := cluster.peerPicker.Get(key) // 将 key 加入到相应节点的分组中 group, ok := result[peer] if !ok { group = make([]string, 0) } group = append(group, key) result[peer] = group } return result } 那么 MSET 命令的实现能否如法炮制呢?答案是否定的。在上面的代码中我们注意到,在向各个节点发送指令时若某个节点读取失败则会直接退出整个 MGET 执行过程。 若在执行 MSET 指令时遇到部分节点失败或超时,则会出现部分 key 设置成功而另一份设置失败的情况。对于缓存使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证 MSET 操作要么全部成功要么全部失败。 两阶段提交# 两阶段提交(2-Phase Commit, 2PC)算法是解决我们遇到的一致性问题最简单的算法。在 2PC 算法中写操作被分为两个阶段来执行: Prepare 阶段 协调者向所有参与者发送事务内容,询问是否可以执行事务操作。在 Godis 中收到客户端 MSET 命令的节点是事务的协调者,所有持有相关 key 的节点都要参与事务。 各参与者锁定事务相关 key 防止被其它操作修改。各参与者写 undo log 准备在事务失败后进行回滚。 参与者回复协调者可以提交。若协调者收到所有参与者的YES回复,则准备进行事务提交。若有参与者回复NO或者超时,则准备回滚事务 Commit 阶段 协调者向所有参与者发送提交请求 参与者正式提交事务,并在完成后释放相关 key 的锁。 参与者协调者回复ACK,协调者收到所有参与者的ACK后认为事务提交成功。 Rollback 阶段 在事务请求阶段若有参与者回复NO或者超时,协调者向所有参与者发出回滚请求 各参与者执行事务回滚,并在完成后释放相关资源。 参与者协调者回复ACK,协调者收到所有参与者的ACK后认为事务回滚成功。 2PC是一种简单的一致性协议,它存在一些问题: 单点服务: 若协调者突然崩溃则事务流程无法继续进行或者造成状态不一致 无法保证一致性: 若协调者第二阶段发送提交请求时崩溃,可能部分参与者受到COMMIT请求提交了事务,而另一部分参与者未受到请求而放弃事务造成不一致现象。 阻塞: 为了保证事务完成提交,各参与者在完成第一阶段事务执行后必须锁定相关资源直到正式提交,影响系统的吞吐量。 首先我们定义事务的描述结构: type Transaction struct { id string // 事务 ID, 由 snowflake 算法生成 args [][]byte // 命令参数 cluster *Cluster conn redis.Connection keys []string // 事务中涉及的 key undoLog map[string][]byte // 每个 key 在事务执行前的值,用于回滚事务 } Prepare 阶段# 先看事务参与者 prepare 阶段的操作: // prepare 命令的格式是: PrepareMSet TxID key1, key2 ... // TxID 是事务 ID,由协调者决定 func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 3 { return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command") } txId := string(args[1]) size := (len(args) - 2) / 2 keys := make([]string, size) for i := 0; i < size; i++ { keys[i] = string(args[2*i+2]) } txArgs := [][]byte{ []byte("MSet"), } // actual args for cluster.db txArgs = append(txArgs, args[2:]...) tx := NewTransaction(cluster, c, txId, txArgs, keys) // 创建新事务 cluster.transactions.Put(txId, tx) // 存储到节点的事务列表中 err := tx.prepare() // 准备事务 if err != nil { return reply.MakeErrReply(err.Error()) } return &reply.OkReply{} } 实际的准备操作在 tx.prepare() 中: func (tx *Transaction) prepare() error { // 锁定相关 key tx.cluster.db.Locks(tx.keys...) // 准备 undo log tx.undoLog = make(map[string][]byte) for _, key := range tx.keys { entity, ok := tx.cluster.db.Get(key) if ok { blob, err := gob.Marshal(entity) // 将修改之前的状态序列化之后存储作为 undo log if err != nil { return err } tx.undoLog[key] = blob } else { // 若事务执行前 key 是空的,在回滚时应删除它 tx.undoLog[key] = []byte{} } } tx.status = PreparedStatus return nil } 看看协调者在做什么: func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { // 解析参数 argCount := len(args) - 1 if argCount%2 != 0 || argCount < 1 { return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command") } size := argCount / 2 keys := make([]string, size) valueMap := make(map[string]string) for i := 0; i < size; i++ { keys[i] = string(args[2*i+1]) valueMap[keys[i]] = string(args[2*i+2]) } // 找到所属的节点 groupMap := cluster.groupBy(keys) if len(groupMap) == 1 { // do fast // 若所有的 key 都在同一个节点直接执行,不使用较慢的 2pc 算法 for peer := range groupMap { return cluster.Relay(peer, c, args) } } // 开始准备阶段 var errReply redis.Reply txId := cluster.idGenerator.NextId() // 使用 snowflake 算法决定事务 ID txIdStr := strconv.FormatInt(txId, 10) rollback := false // 向所有参与者发送 prepare 请求 for peer, group := range groupMap { peerArgs := []string{txIdStr} for _, k := range group { peerArgs = append(peerArgs, k, valueMap[k]) } var resp redis.Reply if peer == cluster.self { resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...)) } else { resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...)) } if reply.IsErrorReply(resp) { errReply = resp rollback = true break } } if rollback { // 若 prepare 过程出错则执行回滚 RequestRollback(cluster, c, txId, groupMap) } else { _, errReply = RequestCommit(cluster, c, txId, groupMap) rollback = errReply != nil } if !rollback { return &reply.OkReply{} } return errReply } Commit 阶段# 事务参与者提交本地事务: func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command") } // 读取事务信息 txId := string(args[1]) raw, ok := cluster.transactions.Get(txId) if !ok { return reply.MakeIntReply(0) } tx, _ := raw.(*Transaction) // 在提交成功后解锁 key defer func() { cluster.db.UnLocks(tx.keys...) tx.status = CommitedStatus //cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit }() cmd := strings.ToLower(string(tx.args[0])) var result redis.Reply if cmd == "del" { result = CommitDel(cluster, c, tx) } else if cmd == "mset" { result = CommitMSet(cluster, c, tx) } // 提交失败 if reply.IsErrorReply(result) { 阳痿早泄前列腺炎医院哪家好http://www.zztjxb.com/ err2 := tx.rollback() return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result)) } return result } // 执行操作 func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { size := len(tx.args) / 2 keys := make([]string, size) values := make([][]byte, size) for i := 0; i < size; i++ { keys[i] = string(tx.args[2*i+1]) values[i] = tx.args[2*i+2]郑州无痛人流医院哪家好http://www.hnzzxb.com/ } for i, key := range keys { value := values[i] cluster.db.Put(key, &db.DataEntity{Data: value}) } cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args)) return &reply.OkReply{} } 协调者的逻辑也很简单: func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) { var errReply reply.ErrorReply txIdStr := strconv.FormatInt(txId, 10) respList := make([]redis.Reply, 0, len(peers)) for peer := range peers { var resp redis.Reply if peer == cluster.self { resp = Commit(cluster, c, makeArgs("commit", txIdStr)) } else { resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr)) } if reply.IsErrorReply(resp) { errReply = resp.(reply.ErrorReply) break } respList = append(respList, resp) } if errReply != nil { RequestRollback(cluster, c, txId, peers) return nil, errReply } return respList, nil } Rollback# 回滚本地事务: func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command") } txId := string(args[1]) raw, ok := cluster.transactions.Get(txId) if !ok { return reply.MakeIntReply(0) } tx, _ := raw.(*Transaction) err := tx.rollback() if err != nil { return reply.MakeErrReply(err.Error()) } return reply.MakeIntReply(1) } func (tx *Transaction) rollback() error { for key, blob := range tx.undoLog { if len(blob) >0 {
entity := &db.DataEntity{}
err := gob.UnMarshal(blob, entity) //deserialize pre-transaction snapshot
if err != nil {
return err
}
tx.cluster.db.Put(key, entity) //Write data before transaction
} else {
tx.cluster.db.Remove(key) //Remove key if it doesn't exist before transaction starts
}
}
if tx.status != CommitedStatus {
tx.cluster.db.UnLocks(tx.keys...)
}
tx.status = RollbackedStatus
return nil
}
The coordinator logic is similar to commit:
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {
txIdStr := strconv.FormatInt(txId, 10)
for peer := range peers {
if peer == cluster.self {
Rollback(cluster, c, makeArgs("rollback", txIdStr))
} else {
cluster.Relay(peer, c, makeArgs("rollback", txIdStr))
}
}
}
The above is how to use Redis to implement TCC distributed transactions in Golang shared by everyone. If there is a similar doubt, please refer to the above analysis for understanding. If you want to know more about it, please pay attention to the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.