im-service使用的RPC为:valyala/gorpc
与存储服务进行RPC交互的是各个IM服务器:
func ListenRPCClient() {
dispatcher := gorpc.NewDispatcher()
dispatcher.AddFunc("SavePeerMessage", SavePeerMessage)
dispatcher.AddFunc("SaveGroupMessage", SaveGroupMessage)
dispatcher.AddFunc("GetNewCount", GetNewCount)
dispatcher.AddFunc("GetLatestMessage", GetLatestMessage)
dispatcher.AddFunc("SyncMessage", SyncMessage)
dispatcher.AddFunc("SyncGroupMessage", SyncGroupMessage)
s := &gorpc.Server{
Addr: config.rpc_listen,
Handler: dispatcher.NewHandlerFunc(),
}
if err := s.Serve(); err != nil {
log.Fatalf("Cannot start rpc server: %s", err)
}
}
先来看几个参数类型:SyncHistory、SyncGroupHistory、HistoryRequest
type SyncHistory struct {
AppID int64
Uid int64
DeviceID int64
LastMsgID int64
}
type SyncGroupHistory struct {
AppID int64
Uid int64
DeviceID int64
GroupID int64
LastMsgID int64
Timestamp int32
}
type HistoryRequest struct {
AppID int64
Uid int64
Limit int32
}
HistoryRequest:没有DeviceID字段,无法记录用户最近消息ID,只能用于网页版拉取最新的消息。
存储单聊消息,返回此条消息对应的消息ID(这里是指消息本身位置,而非元数据位置)。
func SavePeerMessage(addr string, m *PeerMessage) (int64, error) {
atomic.AddInt64(&server_summary.nrequests, 1)
atomic.AddInt64(&server_summary.peer_message_count, 1)
msg := &Message{cmd:int(m.Cmd), version:DEFAULT_VERSION}
msg.FromData(m.Raw)
msgid := storage.SavePeerMessage(m.AppID, m.Uid, m.DeviceID, msg)
return msgid, nil
}
PeerMessage结构参见:消息类型
存储群组消息,返回此条消息对应的消息ID(这里是指消息本身位置,而非元数据位置)。
func SaveGroupMessage(addr string, m *GroupMessage) (int64, error) {
atomic.AddInt64(&server_summary.nrequests, 1)
atomic.AddInt64(&server_summary.group_message_count, 1)
msg := &Message{cmd:int(m.Cmd), version:DEFAULT_VERSION}
msg.FromData(m.Raw)
msgid := storage.SaveGroupMessage(m.AppID, m.GroupID, m.DeviceID, msg)
return msgid, nil
}
GroupMessage结构参见:消息类型
func GetNewCount(addr string, s *SyncHistory) (int64, error)
获取用户最新未读消息数量,从后往前遍历用户的消息链表,直到参数SyncHistory.LastMsgID。
func GetLatestMessage(addr string, r *HistoryRequest) []*HistoryMessage
获取用户最近的HistoryRequest.limit条消息,返回数据类型为[]*HistoryMessage
。
func SyncMessage(addr string, sync_key *SyncHistory) *PeerHistoryMessage
客户端同步拖取消息,客户端会提供用户的SyncHistory.lastMsgID,存储服务器根据这个ID去同步最新达到的消息。
不仅返回一个列表**[]*HistoryMessage**,同时返回用户消息链表中的LastMsgID。
PeerHistoryMessage结构参见:消息类型
func SyncGroupMessage(addr string , sync_key *SyncGroupHistory) *GroupHistoryMessage
客户端同步拖取群组消息,客户端会提供用户的SyncGroupHistory.LastMsgID,存储服务器根据这个ID去同步最新达到的消息。
同时,SyncGroupHistory.Timestamp表示用户的入群时间,不会拖取比入群时间更早的历史消息。
不仅返回一个列表**[]*HistoryMessage**,同时返回用户消息链表中的LastMsgID。
GroupHistoryMessage结构参见:消息类型