数据结构
type Master struct {
ewt chan *EMessage
mutex sync.Mutex
clients map[*SyncClient]struct{}
}
Master表示当前服务的角色为主服务器,Master维护了其对应的从服务器与当前Master的连接列表clients。
mutex互斥锁的目的就是同步访问这些clients。
主要工作
ewt通道
Master会监听这个通道,每当有消息存入文件后,同时会将该消息发往Mater的ewt通道。
数据结构
type Slaver struct {
addr string
}
Slaver表示当前服务的角色为从服务器,里面addr字段表示其对应的主服务器的网络地址。
只有配置了master_address
参数,才会启动Slaver从服务器,去与主服务器Master进行数据同步操作。
主要工作
注:从代码看,Master只会批量的与Slaver进行数据同步,也即只会发送MSG_STORAGE_SYNC_MESSAGE_BATCH命令。
代码:
msgid := storage.NextMessageID()
cursor := &SyncCursor{msgid}
log.Info("cursor msgid:", msgid)
msg := &Message{cmd:MSG_STORAGE_SYNC_BEGIN, body:cursor}
seq += 1
msg.seq = seq
SendMessage(conn, msg)
for {
msg := ReceiveStorageSyncMessage(conn)
if msg == nil {
return
}
if msg.cmd == MSG_STORAGE_SYNC_MESSAGE {
emsg := msg.body.(*EMessage)
storage.SaveSyncMessage(emsg)
} else if msg.cmd == MSG_STORAGE_SYNC_MESSAGE_BATCH {
mb := msg.body.(*MessageBatch)
storage.SaveSyncMessageBatch(mb)
} else {
log.Error("unknown message cmd:", Command(msg.cmd))
}
}
数据结构
type SyncClient struct {
conn *net.TCPConn
ewt chan *Message
}
conn为对应从服务器的连接
ewt通道用于主服务器与SyncClient的通信,SyncClient从ewt通道接收到消息时,会直接发送到对应的从服务器达到数据同步的目的。
Master主服务器上,与每个从服务器Slaver建立连接之后,都会生成一个与之对应的SyncClient实例,这个实例其实就是维护了主服务器与从服务器的连接关系。Master主服务器上维护了一个从服务器连接集合clients。
主要工作
问题:在从服务器连接上主服务器进行数据同步的过程中,监听ewt通道之前,如果有消息进来,这些消息感觉会丢失哦?!
我们可以配置多层级主从复制架构!
Slaver与Master间的通讯协议: