学习RadonDB源码(七)

作者 : 开心源码 本文共3027个字,预计阅读时间需要8分钟 发布时间: 2022-05-12 共194人阅读

1. 准备收官

RadonDB作为一款优秀的分布式数据库产品,离不开这几大组件:

  • 存储
  • 计算

RadonDB的存储由MySQL存储节点提供,这很正常,这些存储节点还负责了计算工作。从这个角度上来说,RadonDB还不是很典型的分布式数据库系统。这种架构上,计算实际上并不是由RadonDB来提供的,而是由MySQL提供的。

RadonDB的设计上还有一个精巧的地方,就是利用了TokuDB存储了全量数据,比照较复杂的计算,需要join的地方,都由这个全量存储来提供。因而RadonDB看着不太像一个典型的分布式存储系统,不过青云实现了MySQL集群的Raft协议,这一点还是很值得称赞的。

昨天夜里和今天白天我读了很多有关Raft协议的资料,那么今天还是来看看RadonDB是如何实现MySQL的Raft的。

2. Xenon源码解读

RadonDB的Raft组件叫做Xenon,在github上也有源代码。先来看看整体的架构图:

架构图

一个Raft集群应该有三种状态:Leader,Candidate和Follower。Xenon的代码是这样实现的:

// Leader tuple.type Leader struct {    *Raft    // the smallest binlog which slaves executed by SQL-Thread    relayMasterLogFile string    // leader degrade to follower    isDegradeToFollower bool    // Used to wait for the async job done.    wg sync.WaitGroup    // the binlog which we should purge to    nextPuregeBinlog string    purgeBinlogTick   *time.Ticker    checkSemiSyncTick *time.Ticker    // leader process heartbeat request handler    processHeartbeatRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse    // leader process voterequest request handler    processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse    // leader send heartbeat request to other followers    sendHeartbeatHandler func(*bool, chan *model.RaftRPCResponse)    // leader process send heartbeat response    processHeartbeatResponseHandler func(*int, *model.RaftRPCResponse)}

Leader要做的事情有两件:

  • 向集群内部发送心跳
  • 解决来自Client的请求

这些都是通过RPC来实现的。代码中首先去新建了一个Leader:

// NewLeader creates new Leader.func NewLeader(r *Raft) *Leader {    L := &Leader{        Raft: r,    }    L.initHandlers()    return L}

现在看看这些都是干什么的吧。

入选举成功以后,一个节点成为Leader,这个时候就会去调度这个方法来新建一个Leader角色。其实新建Leader也无非就做了这么几件事:

// leader handlersfunc (r *Leader) initHandlers() {    // heartbeat request    r.setProcessHeartbeatRequestHandler(r.processHeartbeatRequest)    // vote request    r.setProcessRequestVoteRequestHandler(r.processRequestVoteRequest)    // send heartbeat    r.setSendHeartbeatHandler(r.sendHeartbeat)    r.setProcessHeartbeatResponseHandler(r.processHeartbeatResponse)}

这段代码让我看着有点恍惚,很像Java的setter方法,这些方法将各种request发送了出去,但是我有点不知道为什么要解决投票的请求,按理说这个时候投票都已经结束了,开始了自己的任期,以后假如有机会调试代码再看看。

今天先挑一个发送心跳的方法来看看:

// leaderSendHeartbeatHandler// broadcast hearbeat requests to other peers of the clusterfunc (r *Leader) sendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) {    // check MySQL down    if r.mysql.GetState() == mysql.MysqlDead {        *mysqlDown = true        return    }    // broadcast heartbeat    r.mutex.RLock()    defer r.mutex.RUnlock()    for _, peer := range r.peers {        r.wg.Add(1)        go func(peer *Peer) {            defer r.wg.Done()            peer.sendHeartbeat(c)        }(peer)    }}

由于是基于MySQL节点做的Raft,因而首先要检查mysqld能否存活,假如死掉了也就没有发送心跳的意义了。这里要说明的是,不是mysql向外发送心跳,是Xenon,因而肯定要检查mysqld的状态先,mysqld挂掉了,就不发心跳了,即可以触发新一轮选举了。

发送心跳的时候要遍历所有节点,由于要给所有节点发心跳,每遍历一个节点,就要给WaitGroup中添加1,waitGroup可以保证goroutine安全的结束。

这之后自然就是调用发送的方法,向集群内广播心跳了。

3. Go语言相关

Go语言有个很有意思的地方叫做函数类型。也就是说可以把变量指定成函数类型,比方这样:

package mainimport "fmt"func test(param string) {    fmt.Println(param)}func main()  {    t := test    t("quan")}

我暂时还没有想明白函数变量的好处,都说是让代码变得更灵活。

这段代码中还有一个waitGroup,参考一下下面的代码:

package mainimport (    "fmt"    "sync")var wg sync.WaitGroupfunc test(param int) {    defer wg.Done()    fmt.Println(param)}func main()  {    for i := 1; i < 10; i++ {        wg.Add(1)        go test(i)    }    wg.Wait()}

注意,Add和Done肯定要配对,不然肯定报deadlock错误。

现在画张图看看原理:

并发等待

所有的现成完成之后都会去等待,并在此时将wg的计数器减1,计数器被减到0时,等待结束。

4. 小结

其实要是学习Raft的实现的话,我这个系列可能还得再写一段时间。

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 学习RadonDB源码(七)

发表回复