修复聊天系统消息群发机制
This commit is contained in:
parent
377026b7ab
commit
9ade1bb5a5
@ -85,7 +85,7 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
msg.ChannelId = userexpand.Chatchannel //指定频道
|
msg.ChannelId = userexpand.Chatchannel //指定频道
|
||||||
if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s:%d", crosschatkey, userexpand.Chatchannel), int64(max_chat), msg); err != nil {
|
if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%d", crosschatkey, userexpand.Chatchannel), int64(max_chat), msg); err != nil {
|
||||||
code = pb.ErrorCode_DBError
|
code = pb.ErrorCode_DBError
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -189,11 +189,17 @@ func (this *modelChatComp) ChanageChannel(session comm.IUserSession, channel int
|
|||||||
//读取跨服聊天频道下成员
|
//读取跨服聊天频道下成员
|
||||||
func (this *modelChatComp) GetCrossChannelMember(channel int32) (result []*pb.CacheUser, err error) {
|
func (this *modelChatComp) GetCrossChannelMember(channel int32) (result []*pb.CacheUser, err error) {
|
||||||
key := fmt.Sprintf("%s-%d-member", crosschatkey, channel)
|
key := fmt.Sprintf("%s-%d-member", crosschatkey, channel)
|
||||||
result = make([]*pb.CacheUser, 0)
|
temp := make(map[string]*pb.CacheUser, 0)
|
||||||
if err = this.Redis.HGetAll(key, &result); err != nil {
|
if err = this.Redis.HGetAll(key, &temp); err != nil {
|
||||||
this.module.Errorf("err:%v", err)
|
this.module.Errorf("err:%v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
n := 0
|
||||||
|
result = make([]*pb.CacheUser, len(temp))
|
||||||
|
for _, v := range temp {
|
||||||
|
result[n] = v
|
||||||
|
n++
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,30 +111,30 @@ func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Messag
|
|||||||
func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.CacheUser) (err error) {
|
func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.CacheUser) (err error) {
|
||||||
var (
|
var (
|
||||||
gateways map[string]map[string][]string = make(map[string]map[string][]string)
|
gateways map[string]map[string][]string = make(map[string]map[string][]string)
|
||||||
cluster map[string][]string
|
cluster map[string][]string = make(map[string][]string)
|
||||||
gateway []string
|
gateway []string
|
||||||
ok bool
|
ok bool
|
||||||
)
|
)
|
||||||
for _, v := range user {
|
for _, v := range user {
|
||||||
if cluster, ok = gateways[v.ServiceTag]; !ok {
|
if cluster, ok = gateways[v.ServiceTag]; !ok {
|
||||||
gateways[v.ServiceTag] = make(map[string][]string)
|
cluster = make(map[string][]string)
|
||||||
|
gateways[v.ServiceTag] = cluster
|
||||||
}
|
}
|
||||||
if gateway, ok = cluster[v.GatewayServiceId]; !ok {
|
if gateway, ok = cluster[v.GatewayServiceId]; !ok {
|
||||||
gateway = make([]string, 0)
|
gateway = make([]string, 0)
|
||||||
cluster[v.GatewayServiceId] = gateway
|
cluster[v.GatewayServiceId] = gateway
|
||||||
}
|
}
|
||||||
gateway = append(gateway, v.SessionId)
|
cluster[v.GatewayServiceId] = append(cluster[v.GatewayServiceId], v.SessionId)
|
||||||
}
|
}
|
||||||
reply := &pb.RPCMessageReply{}
|
|
||||||
data, _ := anypb.New(msg)
|
data, _ := anypb.New(msg)
|
||||||
for k, v := range gateways {
|
for k, v := range gateways {
|
||||||
for k1, v1 := range v {
|
for k1, v1 := range v {
|
||||||
if _, err = this.service.AcrossClusterRpcGo(context.Background(), k1, fmt.Sprintf("%s/%s", comm.Service_Gateway, k), string(comm.Rpc_GatewayAgentSendMsg), &pb.BatchMessageReq{
|
if _, err = this.service.AcrossClusterRpcGo(context.Background(), k, fmt.Sprintf("%s/%s", comm.Service_Gateway, k1), string(comm.Rpc_GatewaySendBatchMsg), &pb.BatchMessageReq{
|
||||||
UserSessionIds: v1,
|
UserSessionIds: v1,
|
||||||
MainType: mainType,
|
MainType: mainType,
|
||||||
SubType: subType,
|
SubType: subType,
|
||||||
Data: data,
|
Data: data,
|
||||||
}, reply); err != nil {
|
}, nil); err != nil {
|
||||||
log.Errorf("SendMsgToUsers:%s.%s->%s.%s err:%v", k1, k, mainType, subType, err)
|
log.Errorf("SendMsgToUsers:%s.%s->%s.%s err:%v", k1, k, mainType, subType, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ func (this *Options) LoadConfig(settings map[string]interface{}) (err error) {
|
|||||||
err = mapstructure.Decode(settings, this)
|
err = mapstructure.Decode(settings, this)
|
||||||
}
|
}
|
||||||
|
|
||||||
if this.Log = log.NewTurnlog(this.Debug, log.Clone("", 3)); this.Log == nil {
|
if this.Log = log.NewTurnlog(this.Debug, log.Clone("", 4)); this.Log == nil {
|
||||||
err = errors.New("log is nil")
|
err = errors.New("log is nil")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user