消息路由主要涉及三个文件:route.go
、app_route.go
、channel.go
。
type Route struct {
appid int64
mutex sync.Mutex
clients map[int64]ClientSet
room_clients map[int64]ClientSet
}
主要字段:
主要方法:
type AppRoute struct {
mutex sync.Mutex
apps map[int64]*Route
}
Route维护了某个应用内所有上线客户端的一个连接情况;
AppRoute则是维护了所有应用的所有上线客户端的连接。
主要方法
type Subscriber struct {
uids map[int64]int
room_ids map[int64]int
}
type Channel struct {
addr string
wt chan *Message
mutex sync.Mutex
subscribers map[int64]*Subscriber
dispatch func(*AppMessage)
dispatch_group func(*AppMessage)
dispatch_room func(*AppMessage)
}
Subscriber
当前IM服务器上已经连接的用户和聊天室的数量情况。
Channel
Channel表示当前IM服务器与IMR路由服务器的连接,每个Channel里面都维护了一个subscribers映射,其键位appid。
Publish
func (channel *Channel) Publish(amsg *AppMessage) {
msg := &Message{cmd: MSG_PUBLISH, body: amsg}
channel.wt <- msg
}
向IMR发送一个MSG_PUBLISH类型消息,通过IMR路由发送AppMessage单聊消息。
PublishGroup
func (channel *Channel) PublishGroup(amsg *AppMessage) {
msg := &Message{cmd: MSG_PUBLISH_GROUP, body: amsg}
channel.wt <- msg
}
向IMR发送一个MSG_PUBLISH_GROUP类型消息,通过IMR路由发送AppMessage群组消息。
PublishRoom
func (channel *Channel) PublishRoom(amsg *AppMessage) {
msg := &Message{cmd: MSG_PUBLISH_ROOM, body: amsg}
channel.wt <- msg
}
向IMR发送一个MSG_PUBLISH_ROOM类型消息,通过IMR路由发送AppMessage聊天室消息。
路由订阅与取消
以下几个方法只是修改或读取subscribers映射数据,不会与IMR进行通信:
func (channel *Channel) Subscribe(appid int64, uid int64, online bool) {
count, online_count := channel.AddSubscribe(appid, uid, online)
if count == 0 {
//新用户上线
on := 0
if online {
on = 1
}
id := &SubscribeMessage{appid: appid, uid: uid, online:int8(on)}
msg := &Message{cmd: MSG_SUBSCRIBE, body: id}
channel.wt <- msg
} else if online_count == 0 && online {
//手机端上线
id := &SubscribeMessage{appid: appid, uid: uid, online:1}
msg := &Message{cmd: MSG_SUBSCRIBE, body: id}
channel.wt <- msg
}
}
func (channel *Channel) Unsubscribe(appid int64, uid int64, online bool) {
count, online_count := channel.RemoveSubscribe(appid, uid, online)
if count == 1 {
//用户断开全部连接
id := &AppUserID{appid: appid, uid: uid}
msg := &Message{cmd: MSG_UNSUBSCRIBE, body: id}
channel.wt <- msg
} else if count > 1 && online_count == 1 && online {
//手机端断开连接,pc/web端还未断开连接
id := &SubscribeMessage{appid: appid, uid: uid, online:0}
msg := &Message{cmd: MSG_SUBSCRIBE, body: id}
channel.wt <- msg
}
}
func (channel *Channel) ReSubscribe(conn *net.TCPConn, seq int) int {
subs := channel.GetAllSubscriber()
for appid, sub := range(subs) {
for uid, count := range(sub.uids) {
//低16位表示总数量 高16位表示online的数量
c2 := count>>16&0xffff
on := 0
if c2 > 0 {
on = 1
}
id := &SubscribeMessage{appid: appid, uid: uid, online:int8(on)}
msg := &Message{cmd: MSG_SUBSCRIBE, body: id}
seq = seq + 1
msg.seq = seq
SendMessage(conn, msg)
}
}
return seq
}
func (channel *Channel) SubscribeRoom(appid int64, room_id int64) {
count := channel.AddSubscribeRoom(appid, room_id)
if count == 0 {
id := &AppRoomID{appid: appid, room_id: room_id}
msg := &Message{cmd: MSG_SUBSCRIBE_ROOM, body: id}
channel.wt <- msg
}
}
func (channel *Channel) UnsubscribeRoom(appid int64, room_id int64) {
count := channel.RemoveSubscribeRoom(appid, room_id)
if count == 1 {
id := &AppRoomID{appid: appid, room_id: room_id}
msg := &Message{cmd: MSG_UNSUBSCRIBE_ROOM, body: id}
channel.wt <- msg
}
}
func (channel *Channel) ReSubscribeRoom(conn *net.TCPConn, seq int) int {
subs := channel.GetAllRoomSubscriber()
for _, id := range(subs) {
msg := &Message{cmd: MSG_SUBSCRIBE_ROOM, body: id}
seq = seq + 1
msg.seq = seq
SendMessage(conn, msg)
}
return seq
}