From 1b0b37dc2e6962bbfc3199d04e495778ed63cce1 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Wed, 1 Jun 2022 15:57:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=9F=BA=E7=A1=80=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E6=8E=A5=E5=8F=A3=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comm/core.go | 13 ++-- comm/message.go | 1 - modules/core.go | 13 ++++ modules/gateway/agentmgr_comp.go | 14 ++++ modules/gateway/module.go | 2 + modules/modulebase.go | 41 ++++++++++ pb/comm.pb.go | 130 +++++++++++++++++++++++++------ pb/proto/comm.proto | 12 ++- 8 files changed, 195 insertions(+), 31 deletions(-) delete mode 100644 comm/message.go create mode 100644 modules/core.go diff --git a/comm/core.go b/comm/core.go index 4de97fdd4..53fac7ad7 100644 --- a/comm/core.go +++ b/comm/core.go @@ -17,12 +17,13 @@ const ( ) const ( //Rpc - Rpc_GatewayRoute core.Rpc_Key = "Rpc_GatewayRoute" //网关路由 - Rpc_GatewayAgentBuild core.Rpc_Key = "Rpc_GatewayAgentBuild" //代理绑定 绑定用户Id - Rpc_GatewayAgentUnBuild core.Rpc_Key = "Rpc_GatewayAgentUnBuild" //代理解绑 解绑用户Id - Rpc_GatewayAgentSendMsg core.Rpc_Key = "Rpc_GatewayAgentSendMsg" //代理发送消息 向用户发送消息 - Rpc_GatewayAgentRadioMsg core.Rpc_Key = "Rpc_GatewayAgentRadioMsg" //代理广播消息 向所有在线用户发送消息 - Rpc_GatewayAgentClose core.Rpc_Key = "Rpc_GatewayAgentClose" //代理关闭 关闭用户连接 + Rpc_GatewayRoute core.Rpc_Key = "Rpc_GatewayRoute" //网关路由 + Rpc_GatewayAgentBuild core.Rpc_Key = "Rpc_GatewayAgentBuild" //代理绑定 绑定用户Id + Rpc_GatewayAgentUnBuild core.Rpc_Key = "Rpc_GatewayAgentUnBuild" //代理解绑 解绑用户Id + Rpc_GatewayAgentSendMsg core.Rpc_Key = "Rpc_GatewayAgentSendMsg" //代理发送消息 向用户发送消息 + Rpc_GatewaySendBatchMsg core.Rpc_Key = "Rpc_GatewaySendBatchMsg" //向多个用户发送消息 + Rpc_GatewaySendRadioMsg core.Rpc_Key = "Rpc_GatewaySendRadioMsg" //广播消息 + Rpc_GatewayAgentClose core.Rpc_Key = "Rpc_GatewayAgentClose" //代理关闭 关闭用户连接 ) type ISC_GateRouteComp interface { diff --git a/comm/message.go b/comm/message.go deleted file mode 100644 index 9d28c497b..000000000 --- a/comm/message.go +++ /dev/null @@ -1 +0,0 @@ -package comm diff --git a/modules/core.go b/modules/core.go new file mode 100644 index 000000000..2d0e32da0 --- /dev/null +++ b/modules/core.go @@ -0,0 +1,13 @@ +package modules + +import ( + "github.com/liwei1dao/lego/core" + "google.golang.org/protobuf/proto" +) + +type ( + IModule interface { + core.IModule + SendMsgToAgent(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) + } +) diff --git a/modules/gateway/agentmgr_comp.go b/modules/gateway/agentmgr_comp.go index 6d11b2569..2cf811d0e 100644 --- a/modules/gateway/agentmgr_comp.go +++ b/modules/gateway/agentmgr_comp.go @@ -63,6 +63,20 @@ func (this *AgentMgr_Comp) SendMsgToAgent(ctx context.Context, args *pb.AgentSen return nil } +//向多个户发送消息 +func (this *AgentMgr_Comp) SendMsgToAgents(ctx context.Context, args *pb.BatchMessageReq, reply *pb.RPCMessageReply) error { + msg := &pb.UserMessage{ + ServiceMethod: args.ServiceMethod, + Data: args.Data, + } + for _, v := range args.UserSessionIds { + if a, ok := this.agents.Load(v); ok { + a.(IAgent).WriteMsg(msg) + } + } + return nil +} + //向所有户发送消息 func (this *AgentMgr_Comp) SendMsgToAllAgent(ctx context.Context, args *pb.BroadCastMessageReq, reply *pb.RPCMessageReply) error { msg := &pb.UserMessage{ diff --git a/modules/gateway/module.go b/modules/gateway/module.go index 5fe6e74ba..718758e6c 100644 --- a/modules/gateway/module.go +++ b/modules/gateway/module.go @@ -43,6 +43,8 @@ func (this *Gateway) Start() (err error) { this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentBuild), this.agentmgr_comp.Build) this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentUnBuild), this.agentmgr_comp.UnBuild) this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentSendMsg), this.agentmgr_comp.SendMsgToAgent) + this.service.RegisterFunctionName(string(comm.Rpc_GatewaySendBatchMsg), this.agentmgr_comp.SendMsgToAgents) + this.service.RegisterFunctionName(string(comm.Rpc_GatewaySendRadioMsg), this.agentmgr_comp.SendMsgToAllAgent) this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentClose), this.agentmgr_comp.CloseAgent) err = this.ModuleBase.Start() return diff --git a/modules/modulebase.go b/modules/modulebase.go index c8fb7cbab..231f9e1b5 100644 --- a/modules/modulebase.go +++ b/modules/modulebase.go @@ -1,9 +1,50 @@ package modules import ( + "context" + "go_dreamfactory/comm" + "go_dreamfactory/pb" + + "github.com/liwei1dao/lego/base" + "github.com/liwei1dao/lego/core" "github.com/liwei1dao/lego/core/cbase" + "github.com/liwei1dao/lego/sys/log" + "google.golang.org/protobuf/proto" ) type ModuleBase struct { cbase.ModuleBase + service base.IRPCXService +} + +func (this *ModuleBase) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { + err = this.ModuleBase.Init(service, module, options) + this.service = service.(base.IRPCXService) + return +} + +func (this *ModuleBase) SendMsgToAgent(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) { + reply := &pb.RPCMessageReply{} + data, _ := proto.Marshal(msg) + if err = this.service.RpcCallById(GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ + UserSessionId: SessionId, + ServiceMethod: ServiceMethod, + Data: data, + }, reply); err != nil { + log.Errorf("SendMsgToAgent%s:[%s] err:%v", SessionId, ServiceMethod, err) + } + return +} + +func (this *ModuleBase) SendMsgToAgents(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) { + reply := &pb.RPCMessageReply{} + data, _ := proto.Marshal(msg) + if err = this.service.RpcCallById(GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ + UserSessionId: SessionId, + ServiceMethod: ServiceMethod, + Data: data, + }, reply); err != nil { + log.Errorf("SendMsgToAgent%s:[%s] err:%v", SessionId, ServiceMethod, err) + } + return } diff --git a/pb/comm.pb.go b/pb/comm.pb.go index 7b1d02fad..74f718a8a 100644 --- a/pb/comm.pb.go +++ b/pb/comm.pb.go @@ -388,7 +388,71 @@ func (x *AgentSendMessageReq) GetData() []byte { return nil } -//广播消息到所有用户代理 +//发送批量消息 +type BatchMessageReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UserSessionIds []string `protobuf:"bytes,1,rep,name=UserSessionIds,proto3" json:"UserSessionIds,omitempty"` + ServiceMethod string `protobuf:"bytes,2,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=Data,proto3" json:"Data,omitempty"` +} + +func (x *BatchMessageReq) Reset() { + *x = BatchMessageReq{} + if protoimpl.UnsafeEnabled { + mi := &file_comm_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMessageReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMessageReq) ProtoMessage() {} + +func (x *BatchMessageReq) ProtoReflect() protoreflect.Message { + mi := &file_comm_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMessageReq.ProtoReflect.Descriptor instead. +func (*BatchMessageReq) Descriptor() ([]byte, []int) { + return file_comm_proto_rawDescGZIP(), []int{6} +} + +func (x *BatchMessageReq) GetUserSessionIds() []string { + if x != nil { + return x.UserSessionIds + } + return nil +} + +func (x *BatchMessageReq) GetServiceMethod() string { + if x != nil { + return x.ServiceMethod + } + return "" +} + +func (x *BatchMessageReq) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +//发送广播消息 type BroadCastMessageReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -401,7 +465,7 @@ type BroadCastMessageReq struct { func (x *BroadCastMessageReq) Reset() { *x = BroadCastMessageReq{} if protoimpl.UnsafeEnabled { - mi := &file_comm_proto_msgTypes[6] + mi := &file_comm_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -414,7 +478,7 @@ func (x *BroadCastMessageReq) String() string { func (*BroadCastMessageReq) ProtoMessage() {} func (x *BroadCastMessageReq) ProtoReflect() protoreflect.Message { - mi := &file_comm_proto_msgTypes[6] + mi := &file_comm_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -427,7 +491,7 @@ func (x *BroadCastMessageReq) ProtoReflect() protoreflect.Message { // Deprecated: Use BroadCastMessageReq.ProtoReflect.Descriptor instead. func (*BroadCastMessageReq) Descriptor() ([]byte, []int) { - return file_comm_proto_rawDescGZIP(), []int{6} + return file_comm_proto_rawDescGZIP(), []int{7} } func (x *BroadCastMessageReq) GetServiceMethod() string { @@ -456,7 +520,7 @@ type AgentCloseeReq struct { func (x *AgentCloseeReq) Reset() { *x = AgentCloseeReq{} if protoimpl.UnsafeEnabled { - mi := &file_comm_proto_msgTypes[7] + mi := &file_comm_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -469,7 +533,7 @@ func (x *AgentCloseeReq) String() string { func (*AgentCloseeReq) ProtoMessage() {} func (x *AgentCloseeReq) ProtoReflect() protoreflect.Message { - mi := &file_comm_proto_msgTypes[7] + mi := &file_comm_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -482,7 +546,7 @@ func (x *AgentCloseeReq) ProtoReflect() protoreflect.Message { // Deprecated: Use AgentCloseeReq.ProtoReflect.Descriptor instead. func (*AgentCloseeReq) Descriptor() ([]byte, []int) { - return file_comm_proto_rawDescGZIP(), []int{7} + return file_comm_proto_rawDescGZIP(), []int{8} } func (x *AgentCloseeReq) GetUserSessionId() string { @@ -531,17 +595,24 @@ var file_comm_proto_rawDesc = []byte{ 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x4f, 0x0a, 0x13, 0x42, 0x72, - 0x6f, 0x61, 0x64, 0x43, 0x61, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, - 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x36, 0x0a, 0x0e, 0x41, - 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x65, 0x52, 0x65, 0x71, 0x12, 0x24, 0x0a, - 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x49, 0x64, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x73, 0x0a, 0x0f, 0x42, 0x61, + 0x74, 0x63, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x12, 0x26, 0x0a, + 0x0e, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x49, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x44, + 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, + 0x4f, 0x0a, 0x13, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x43, 0x61, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x12, 0x0a, 0x04, + 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, + 0x22, 0x36, 0x0a, 0x0e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x65, 0x52, + 0x65, 0x71, 0x12, 0x24, 0x0a, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -556,7 +627,7 @@ func file_comm_proto_rawDescGZIP() []byte { return file_comm_proto_rawDescData } -var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_comm_proto_goTypes = []interface{}{ (*UserMessage)(nil), // 0: UserMessage (*AgentMessage)(nil), // 1: AgentMessage @@ -564,8 +635,9 @@ var file_comm_proto_goTypes = []interface{}{ (*AgentBuildReq)(nil), // 3: AgentBuildReq (*AgentUnBuildReq)(nil), // 4: AgentUnBuildReq (*AgentSendMessageReq)(nil), // 5: AgentSendMessageReq - (*BroadCastMessageReq)(nil), // 6: BroadCastMessageReq - (*AgentCloseeReq)(nil), // 7: AgentCloseeReq + (*BatchMessageReq)(nil), // 6: BatchMessageReq + (*BroadCastMessageReq)(nil), // 7: BroadCastMessageReq + (*AgentCloseeReq)(nil), // 8: AgentCloseeReq } var file_comm_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -654,7 +726,7 @@ func file_comm_proto_init() { } } file_comm_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*BroadCastMessageReq); i { + switch v := v.(*BatchMessageReq); i { case 0: return &v.state case 1: @@ -666,6 +738,18 @@ func file_comm_proto_init() { } } file_comm_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BroadCastMessageReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_comm_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*AgentCloseeReq); i { case 0: return &v.state @@ -684,7 +768,7 @@ func file_comm_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_comm_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/pb/proto/comm.proto b/pb/proto/comm.proto index ed1b80c62..5c9efaa36 100644 --- a/pb/proto/comm.proto +++ b/pb/proto/comm.proto @@ -32,17 +32,27 @@ message AgentBuildReq { message AgentUnBuildReq { string UserSessionId = 1; } + //向用户代理发送消息请求 message AgentSendMessageReq { string UserSessionId = 1; string ServiceMethod = 2; //服务名 bytes Data = 3; } -//广播消息到所有用户代理 + +//发送批量消息 +message BatchMessageReq { + repeated string UserSessionIds = 1; + string ServiceMethod = 2; + bytes Data = 3; +} + +//发送广播消息 message BroadCastMessageReq { string ServiceMethod = 1; //服务名 bytes Data = 2; } + //关闭用户代理 message AgentCloseeReq { string UserSessionId = 1;