diff --git a/modules/chat/module.go b/modules/chat/module.go index 03ced0fa3..e7cde76eb 100644 --- a/modules/chat/module.go +++ b/modules/chat/module.go @@ -197,7 +197,7 @@ func (this *Chat) PushToUsers(channel int32, msg *pb.DBChat) (err error) { users []*pb.CacheUser ) if users, err = this.modelChat.GetCrossChannelMember(channel); err == nil { - if err = this.SendMsgToUsers(string(this.GetType()), "message", &pb.ChatMessagePush{Chat: msg}, users...); err != nil { + if err = this.SendMsgToCUsers(string(this.GetType()), "message", &pb.ChatMessagePush{Chat: msg}, users...); err != nil { this.Errorf("err:%v", err) return } diff --git a/modules/core.go b/modules/core.go index ed64b3ba5..d3e8ca283 100644 --- a/modules/core.go +++ b/modules/core.go @@ -21,6 +21,7 @@ type ( SendMsgToUser(mainType, subType string, msg proto.Message, uid string) (err error) //向多个用户发送消息 SendMsgToUsers(mainType, subType string, msg proto.Message, uids ...string) (err error) + SendMsgToCUsers(mainType, subType string, msg proto.Message, users ...*pb.CacheUser) (err error) //校验消耗资源 CheckConsumeRes(uid string, res []*cfg.Gameatn) (code pb.ErrorCode) } diff --git a/modules/modulebase.go b/modules/modulebase.go index 49750dd3b..e78508bb2 100644 --- a/modules/modulebase.go +++ b/modules/modulebase.go @@ -180,6 +180,41 @@ func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Messa return } +//向多个用户发送消息 +func (this *ModuleBase) SendMsgToCUsers(mainType, subType string, msg proto.Message, users ...*pb.CacheUser) (err error) { + var ( + gateways map[string]map[string][]string = make(map[string]map[string][]string) + cluster map[string][]string = make(map[string][]string) + gateway []string + ok bool + ) + for _, v := range users { + if cluster, ok = gateways[v.ServiceTag]; !ok { + cluster = make(map[string][]string) + gateways[v.ServiceTag] = cluster + } + if gateway, ok = cluster[v.GatewayServiceId]; !ok { + gateway = make([]string, 0) + cluster[v.GatewayServiceId] = gateway + } + cluster[v.GatewayServiceId] = append(cluster[v.GatewayServiceId], v.SessionId) + } + data, _ := anypb.New(msg) + for k, v := range gateways { + for k1, v1 := range v { + if _, err = this.service.AcrossClusterRpcGo(context.Background(), k, fmt.Sprintf("%s/%s", comm.Service_Gateway, k1), string(comm.Rpc_GatewaySendBatchMsg), &pb.BatchMessageReq{ + UserSessionIds: v1, + MainType: mainType, + SubType: subType, + Data: data, + }, nil); err != nil { + log.Errorf("SendMsgToUsers:%s.%s->%s.%s err:%v", k1, k, mainType, subType, err) + } + } + } + return +} + // 只校验资源 参数 atn格式 func (this *ModuleBase) CheckRes(session comm.IUserSession, res []*cfg.Gameatn) (code pb.ErrorCode) {