diff --git a/comm/const.go b/comm/const.go index faf2c4b00..7685a0ab6 100644 --- a/comm/const.go +++ b/comm/const.go @@ -163,6 +163,8 @@ const ( //Rpc Rpc_GatewayNoticeUserClose core.Rpc_Key = "Rpc_NoticeUserClose" //通知用户离线 //GM 命令 Rpc_ModuleGMCreateCmd core.Rpc_Key = "Rpc_ModuleGMCreateCmd" //执行GM命令 + //Chat 命令 + Rpc_ModuleChatPushChat core.Rpc_Key = "Rpc_ModuleChatPushChat" //推送聊天消息 //Moonfantasy 月之秘境 Rpc_ModuleMoonfantasyTrigger core.Rpc_Key = "Rpc_ModuleMoonfantasyTrigger" //月之秘境触发消息 //rtask 上传随机任务代码 diff --git a/modules/chat/api_chanagechannel.go b/modules/chat/api_chanagechannel.go index c755eaca9..3b7f2620e 100644 --- a/modules/chat/api_chanagechannel.go +++ b/modules/chat/api_chanagechannel.go @@ -22,12 +22,12 @@ func (this *apiComp) ChanageChannel(session comm.IUserSession, req *pb.ChatChana ok bool ) - if err, ok = this.module.modelChat.ChanageChannel(session, req.ChannelId); err != nil { + if err, ok = this.module.modelChat.chanageChannel(session, req.ChannelId); err != nil { code = pb.ErrorCode_DBError return } if ok { - if err = this.module.modelChat.RemoveCrossChannelMember(session); err != nil { + if err = this.module.modelChat.removeCrossChannelMember(session); err != nil { code = pb.ErrorCode_DBError return } diff --git a/modules/chat/api_crosschannel.go b/modules/chat/api_crosschannel.go index 261921a15..747776afc 100644 --- a/modules/chat/api_crosschannel.go +++ b/modules/chat/api_crosschannel.go @@ -22,7 +22,7 @@ func (this *apiComp) CrossChannel(session comm.IUserSession, req *pb.ChatCrossCh if code = this.CrossChannelCheck(session, req); code != pb.ErrorCode_Success { return } - if channel, err = this.module.modelChat.AddCrossChannelMember(session); err != nil { + if channel, err = this.module.modelChat.addCrossChannelMember(session); err != nil { code = pb.ErrorCode_DBError return } diff --git a/modules/chat/api_getlist.go b/modules/chat/api_getlist.go index 2016442c5..ad625ec66 100644 --- a/modules/chat/api_getlist.go +++ b/modules/chat/api_getlist.go @@ -41,7 +41,7 @@ func (this *apiComp) GetList(session comm.IUserSession, req *pb.ChatGetListReq) } break case pb.ChatChannel_Private: - if list, err = this.module.modelChat.QueryUserMsg(session.GetUserId()); err != nil { + if list, err = this.module.modelChat.queryUserMsg(session.GetUserId()); err != nil { code = pb.ErrorCode_DBError return } @@ -72,6 +72,6 @@ func (this *apiComp) GetList(session comm.IUserSession, req *pb.ChatGetListReq) this.module.Errorf("getlist no support channel:%d ", req.Channel) return } - session.SendMsg(string(this.module.GetType()), "getcrosslist", &pb.ChatGetListResp{Chats: list}) + session.SendMsg(string(this.module.GetType()), "getlist", &pb.ChatGetListResp{Chats: list}) return } diff --git a/modules/chat/api_send.go b/modules/chat/api_send.go index 522d7a5b2..0e7753b72 100644 --- a/modules/chat/api_send.go +++ b/modules/chat/api_send.go @@ -2,7 +2,6 @@ package chat import ( "context" - "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" "time" @@ -23,7 +22,6 @@ func (this *apiComp) SendCheck(session comm.IUserSession, req *pb.ChatSendReq) ( func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code pb.ErrorCode, data proto.Message) { var ( err error - max int32 msg *pb.DBChat userexpand *pb.DBUserExpand max_chat int32 @@ -48,10 +46,7 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code AppendBool: req.AppendBool, AppendBytes: req.AppendBytes, } - if max, err = this.module.configure.GetChannelRecordMax(); err != nil { - code = pb.ErrorCode_ConfigNoFound - return - } + if max_chat, err = this.module.configure.GetChannelRecordMax(); err != nil { code = pb.ErrorCode_ConfigNoFound return @@ -74,12 +69,7 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code } code = pb.ErrorCode_Success } - - if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%s", worldchatkey, session.GetServiecTag()), int64(max), msg); err != nil { - code = pb.ErrorCode_DBError - return - } - if err = this.module.PushWorld(msg); err != nil { + if err = this.module.modelChat.sendChatToWorld(msg, max_chat); err != nil { code = pb.ErrorCode_DBError return } @@ -96,18 +86,14 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code break case pb.ChatChannel_Union: msg.UnionId = req.TargetId - if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s:%s", unionchatkey, req.TargetId), int64(max_chat), msg); err != nil { - code = pb.ErrorCode_DBError - return - } - if err = this.module.PushUnion(req.TargetId, msg); err != nil { + if err = this.module.modelChat.sendChatToUnion(msg, max_chat); err != nil { code = pb.ErrorCode_DBError return } break case pb.ChatChannel_Private: msg.Ruid = req.TargetId - if err = this.module.PushUser(msg); err != nil { + if err = this.module.modelChat.sendChatToPrivate(msg); err != nil { code = pb.ErrorCode_DBError return } @@ -119,17 +105,17 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code return } msg.ChannelId = userexpand.Chatchannel //指定频道 - if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%d", crosschatkey, userexpand.Chatchannel), int64(max_chat), msg); err != nil { + + if err = this.module.modelChat.sendChatToCrossServer(msg, max_chat); err != nil { code = pb.ErrorCode_DBError return } - this.module.PushToUsers(userexpand.Chatchannel, msg) break default: code = pb.ErrorCode_ReqParameterError this.module.Errorf("getlist no support channel:%d ", req.Channel) return } - session.SendMsg(string(this.module.GetType()), "sendcross", &pb.ChatSendResp{Issucc: true}) + session.SendMsg(string(this.module.GetType()), "send", &pb.ChatSendResp{Issucc: true}) return } diff --git a/modules/chat/modelChat.go b/modules/chat/modelChat.go index 2a87cedd5..b28faef8a 100644 --- a/modules/chat/modelChat.go +++ b/modules/chat/modelChat.go @@ -13,7 +13,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/x/bsonx" - "google.golang.org/protobuf/types/known/anypb" ) var worldchatkey = "chat:world" @@ -40,7 +39,7 @@ func (this *modelChatComp) Init(service core.IService, module core.IModule, comp } //查询用户未读消息 -func (this *modelChatComp) QueryUserMsg(uid string) (result []*pb.DBChat, err error) { +func (this *modelChatComp) queryUserMsg(uid string) (result []*pb.DBChat, err error) { var ( c *mongo.Cursor ) @@ -123,7 +122,7 @@ func (this *modelChatComp) getChatQueue(channel pb.ChatChannel, stag, union stri } //添加跨服频道成员 -func (this *modelChatComp) AddCrossChannelMember(session comm.IUserSession) (channel int32, err error) { +func (this *modelChatComp) addCrossChannelMember(session comm.IUserSession) (channel int32, err error) { udata := &pb.CacheUser{ Uid: session.GetUserId(), SessionId: session.GetSessionId(), @@ -158,7 +157,8 @@ func (this *modelChatComp) AddCrossChannelMember(session comm.IUserSession) (cha return } -func (this *modelChatComp) ChanageChannel(session comm.IUserSession, channel int32) (err error, ok bool) { +//切换跨服频道 +func (this *modelChatComp) chanageChannel(session comm.IUserSession, channel int32) (err error, ok bool) { udata := &pb.CacheUser{ Uid: session.GetUserId(), SessionId: session.GetSessionId(), @@ -189,7 +189,7 @@ func (this *modelChatComp) ChanageChannel(session comm.IUserSession, channel int } //读取跨服聊天频道下成员 -func (this *modelChatComp) GetCrossChannelMember(channel int32) (result []*pb.CacheUser, err error) { +func (this *modelChatComp) getCrossChannelMember(channel int32) (result []*pb.CacheUser, err error) { key := fmt.Sprintf("%s-%d-member", crosschatkey, channel) temp := make(map[string]*pb.CacheUser, 0) if err = this.Redis.HGetAll(key, &temp); err != nil { @@ -206,7 +206,7 @@ func (this *modelChatComp) GetCrossChannelMember(channel int32) (result []*pb.Ca } //移除频道成员 -func (this *modelChatComp) RemoveCrossChannelMember(session comm.IUserSession) (err error) { +func (this *modelChatComp) removeCrossChannelMember(session comm.IUserSession) (err error) { var ( result *pb.DBUserExpand ) @@ -223,7 +223,8 @@ func (this *modelChatComp) RemoveCrossChannelMember(session comm.IUserSession) ( return } -func (this *modelChatComp) SaveUserMsg(msg *pb.DBChat) (err error) { +//保存用户聊天消息 +func (this *modelChatComp) saveUserMsg(msg *pb.DBChat) (err error) { if _, err = this.DB.InsertOne(core.SqlTable(this.TableName), msg); err != nil { this.module.Errorf("err:%v", err) return @@ -231,48 +232,48 @@ func (this *modelChatComp) SaveUserMsg(msg *pb.DBChat) (err error) { return } -//发送世界频道聊天 -func (this *modelChatComp) sendWorldChat(msg *pb.DBChat) (code pb.ErrorCode) { - var ( - err error - max int32 - ) - if max, err = this.module.configure.GetChannelRecordMax(); err != nil { - code = pb.ErrorCode_ConfigNoFound - return - } - if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%s", worldchatkey, msg.Stag), int64(max), msg); err != nil { - code = pb.ErrorCode_DBError - return - } - data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) - if err = this.module.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ - MainType: string(this.module.GetType()), - SubType: "message", - Data: data, - }, nil); err != nil { - this.module.Errorf("err:%v", err) - code = pb.ErrorCode_SystemError - } - return -} +// //发送世界频道聊天 +// func (this *modelChatComp) sendWorldChat(msg *pb.DBChat) (code pb.ErrorCode) { +// var ( +// err error +// max int32 +// ) +// if max, err = this.module.configure.GetChannelRecordMax(); err != nil { +// code = pb.ErrorCode_ConfigNoFound +// return +// } +// if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%s", worldchatkey, msg.Stag), int64(max), msg); err != nil { +// code = pb.ErrorCode_DBError +// return +// } +// data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) +// if err = this.module.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ +// MainType: string(this.module.GetType()), +// SubType: "message", +// Data: data, +// }, nil); err != nil { +// this.module.Errorf("err:%v", err) +// code = pb.ErrorCode_SystemError +// } +// return +// } -//发送系统频道聊天 -func (this *modelChatComp) sendSystemChat(msg *pb.DBChat) (code pb.ErrorCode) { - var ( - err error - ) - data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) - if err = this.module.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ - MainType: string(this.module.GetType()), - SubType: "message", - Data: data, - }, nil); err != nil { - this.module.Errorf("err:%v", err) - code = pb.ErrorCode_SystemError - } - return -} +// //发送系统频道聊天 +// func (this *modelChatComp) sendSystemChat(msg *pb.DBChat) (code pb.ErrorCode) { +// var ( +// err error +// ) +// data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) +// if err = this.module.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ +// MainType: string(this.module.GetType()), +// SubType: "message", +// Data: data, +// }, nil); err != nil { +// this.module.Errorf("err:%v", err) +// code = pb.ErrorCode_SystemError +// } +// return +// } func (this *modelChatComp) addChatMsg(key string, count int64, msgs ...*pb.DBChat) (err error) { var ( @@ -308,3 +309,59 @@ func (this *modelChatComp) addChatMsg(key string, count int64, msgs ...*pb.DBCha } return } + +//发送聊天消息到世界频道 +func (this *modelChatComp) sendChatToWorld(chat *pb.DBChat, queuecount int32) (err error) { + if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%s", worldchatkey, chat.Stag), int64(queuecount), chat); err != nil { + this.module.Errorln(err) + return + } + if err = this.module.pushChatToWorld(chat); err != nil { + this.module.Errorln(err) + return + } + return +} + +//发送聊天消息到工会频道 +func (this *modelChatComp) sendChatToUnion(chat *pb.DBChat, queuecount int32) (err error) { + if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%s", unionchatkey, chat.UnionId), int64(queuecount), chat); err != nil { + this.module.Errorln(err) + return + } + if err = this.module.pushChatToUnion(chat); err != nil { + this.module.Errorln(err) + return + } + return +} + +//发送聊天消息到工会频道 +func (this *modelChatComp) sendChatToCrossServer(chat *pb.DBChat, queuecount int32) (err error) { + if err = this.module.modelChat.addChatMsg(fmt.Sprintf("%s-%d", crosschatkey, chat.ChannelId), int64(queuecount), chat); err != nil { + this.module.Errorln(err) + return + } + if err = this.module.pushChatToCross(chat); err != nil { + return + } + return +} + +//发送聊天消息到私聊频道 +func (this *modelChatComp) sendChatToSystem(chat *pb.DBChat) (err error) { + if err = this.module.pushChatToSystem(chat); err != nil { + this.module.Errorln(err) + return + } + return +} + +//发送聊天消息到私聊频道 +func (this *modelChatComp) sendChatToPrivate(chat *pb.DBChat) (err error) { + if err = this.module.pushChatToPrivate(chat); err != nil { + this.module.Errorln(err) + return + } + return +} diff --git a/modules/chat/module.go b/modules/chat/module.go index e7cde76eb..2d45e609f 100644 --- a/modules/chat/module.go +++ b/modules/chat/module.go @@ -62,6 +62,7 @@ func (this *Chat) Start() (err error) { } this.gm = module.(comm.IGm) event.RegisterGO(comm.EventUserOffline, this.EventUserOffline) + this.service.RegisterFunctionName(string(comm.Rpc_ModuleChatPushChat), this.Rpc_ModuleChatPushChat) return } @@ -75,15 +76,85 @@ func (this *Chat) OnInstallComp() { //Event------------------------------------------------------------------------------------------------------------ func (this *Chat) EventUserOffline(session comm.IUserSession) { - if err := this.modelChat.RemoveCrossChannelMember(session); err != nil { + if err := this.modelChat.removeCrossChannelMember(session); err != nil { this.Debug("EventUserOffline:", log.Field{"uid", session.GetUserId()}, log.Field{"err", err}) } } +//RPC-------------------------------------------------------------------------------------------------------------- +//推送聊天消息 +func (this *Chat) Rpc_ModuleChatPushChat(ctx context.Context, args *pb.DBChat, reply *pb.EmptyResp) (err error) { + var ( + max_chat int32 + ) + if max_chat, err = this.configure.GetChannelRecordMax(); err != nil { + this.Errorln(err) + return + } + switch args.Channel { + case pb.ChatChannel_World: + if err = this.modelChat.sendChatToWorld(args, max_chat); err != nil { + this.Errorln(err) + return + } + break + case pb.ChatChannel_Union: + if err = this.modelChat.sendChatToUnion(args, max_chat); err != nil { + this.Errorln(err) + return + } + break + case pb.ChatChannel_Private: + if err = this.modelChat.sendChatToPrivate(args); err != nil { + this.Errorln(err) + return + } + break + case pb.ChatChannel_CrossServer: + if err = this.modelChat.sendChatToCrossServer(args, max_chat); err != nil { + this.Errorln(err) + return + } + break + case pb.ChatChannel_System: + if err = this.modelChat.sendChatToSystem(args); err != nil { + this.Errorln(err) + return + } + break + } + return +} + //对外接口---------------------------------------------------------------------------------------------------------- //向世界频道发送聊天消息 func (this *Chat) SendWorldChat(msg *pb.DBChat) (code pb.ErrorCode) { - code = this.modelChat.sendWorldChat(msg) + var ( + max_chat int32 + err error + ) + if max_chat, err = this.configure.GetChannelRecordMax(); err != nil { + code = pb.ErrorCode_ConfigNoFound + this.Errorln(err) + return + } + if this.IsCross() { + if err = this.modelChat.sendChatToWorld(msg, max_chat); err != nil { + this.Errorln(err) + return + } + } else { + if _, err = this.service.AcrossClusterRpcGo( + context.Background(), + msg.Stag, + comm.Service_Worker, + string(comm.Rpc_ModuleChatPushChat), + msg, + nil); err != nil { + this.Errorln(err) + code = pb.ErrorCode_RpcFuncExecutionError + } + } return } @@ -100,7 +171,7 @@ func (this *Chat) SendUserChat(msg *pb.DBChat) (code pb.ErrorCode) { } return } else { - if err = this.modelChat.SaveUserMsg(msg); err != nil { + if err = this.modelChat.saveUserMsg(msg); err != nil { code = pb.ErrorCode_DBError return } @@ -140,7 +211,24 @@ func (this *Chat) SendSysChatToWorld(ctype comm.ChatSystemType, appenddata inter } } } - code = this.modelChat.sendSystemChat(msg) + + if this.IsCross() { + if err = this.modelChat.sendChatToSystem(msg); err != nil { + this.Errorln(err) + return + } + } else { + if _, err = this.service.AcrossClusterRpcGo( + context.Background(), + msg.Stag, + comm.Service_Worker, + string(comm.Rpc_ModuleChatPushChat), + msg, + nil); err != nil { + this.Errorln(err) + code = pb.ErrorCode_RpcFuncExecutionError + } + } } return } @@ -160,7 +248,7 @@ func (this *Chat) SendSysChatToUser(session comm.IUserSession, ctype comm.ChatSy //Push-------------------------------------------------------------------------------------------------------------- //推送消息到世界 -func (this *Chat) PushWorld(msg *pb.DBChat) (err error) { +func (this *Chat) pushChatToWorld(msg *pb.DBChat) (err error) { data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) if err = this.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ MainType: string(this.GetType()), @@ -173,12 +261,12 @@ func (this *Chat) PushWorld(msg *pb.DBChat) (err error) { } //推送消息到工会 -func (this *Chat) PushUnion(unionId string, msg *pb.DBChat) (err error) { +func (this *Chat) pushChatToUnion(msg *pb.DBChat) (err error) { return } -//推送消息到用户 -func (this *Chat) PushUser(msg *pb.DBChat) (err error) { +//推送私聊消息 +func (this *Chat) pushChatToPrivate(msg *pb.DBChat) (err error) { if session, ok := this.GetUserSession(msg.Ruid); ok { session.SendMsg(string(this.GetType()), "message", &pb.ChatMessagePush{Chat: msg}) if err = session.Push(); err != nil { @@ -186,17 +274,17 @@ func (this *Chat) PushUser(msg *pb.DBChat) (err error) { } return } else { - err = this.modelChat.SaveUserMsg(msg) + err = this.modelChat.saveUserMsg(msg) } return } -//推送消息到指定用户群 -func (this *Chat) PushToUsers(channel int32, msg *pb.DBChat) (err error) { +//推送跨服频道消息 +func (this *Chat) pushChatToCross(msg *pb.DBChat) (err error) { var ( users []*pb.CacheUser ) - if users, err = this.modelChat.GetCrossChannelMember(channel); err == nil { + if users, err = this.modelChat.getCrossChannelMember(msg.ChannelId); err == nil { if err = this.SendMsgToCUsers(string(this.GetType()), "message", &pb.ChatMessagePush{Chat: msg}, users...); err != nil { this.Errorf("err:%v", err) return @@ -205,8 +293,8 @@ func (this *Chat) PushToUsers(channel int32, msg *pb.DBChat) (err error) { return } -//全集群推送 -func (this *Chat) PushAllWorld(msg *pb.DBChat) (err error) { +//推送系统消息 +func (this *Chat) pushChatToSystem(msg *pb.DBChat) (err error) { data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) if err = this.service.ClusterBroadcast(context.Background(), comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ MainType: string(this.GetType()),