From 7550002f7a89dc63d2f6a771276e2811255bf312 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Wed, 1 Jun 2022 16:38:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A9=E5=B1=95=E5=9F=BA=E7=A1=80=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E5=86=85=E7=BD=AE=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/core.go | 5 ++++- modules/modulebase.go | 36 +++++++++++++++++++++++++----------- services/worker/main.go | 1 + 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/modules/core.go b/modules/core.go index 2d0e32da0..3740cd5f0 100644 --- a/modules/core.go +++ b/modules/core.go @@ -1,6 +1,8 @@ package modules import ( + "go_dreamfactory/pb" + "github.com/liwei1dao/lego/core" "google.golang.org/protobuf/proto" ) @@ -8,6 +10,7 @@ import ( type ( IModule interface { core.IModule - SendMsgToAgent(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) + SendMsgToUser(ServiceMethod string, msg proto.Message, user *pb.Cache_UserData) (err error) + SendMsgToUsers(ServiceMethod string, msg proto.Message, user ...*pb.Cache_UserData) (err error) } ) diff --git a/modules/modulebase.go b/modules/modulebase.go index 231f9e1b5..4edccae72 100644 --- a/modules/modulebase.go +++ b/modules/modulebase.go @@ -23,28 +23,42 @@ func (this *ModuleBase) Init(service core.IService, module core.IModule, options return } -func (this *ModuleBase) SendMsgToAgent(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) { +func (this *ModuleBase) SendMsgToUser(ServiceMethod string, msg proto.Message, user *pb.Cache_UserData) (err error) { reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) - if err = this.service.RpcCallById(GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ - UserSessionId: SessionId, + if _, err = this.service.RpcGoById(user.GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ + UserSessionId: user.SessionId, ServiceMethod: ServiceMethod, Data: data, }, reply); err != nil { - log.Errorf("SendMsgToAgent%s:[%s] err:%v", SessionId, ServiceMethod, err) + log.Errorf("SendMsgToUser%d:%s [%s] err:%v", user.UserData.UserId, user.SessionId, ServiceMethod, err) } return } -func (this *ModuleBase) SendMsgToAgents(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) { +func (this *ModuleBase) SendMsgToUsers(ServiceMethod string, msg proto.Message, user ...*pb.Cache_UserData) (err error) { + var ( + gateways map[string][]string = make(map[string][]string) + gateway []string + ok bool + ) + for _, v := range user { + if gateway, ok = gateways[v.GatewayServiceId]; !ok { + gateway = make([]string, 0) + gateways[v.GatewayServiceId] = gateway + } + gateway = append(gateway, v.SessionId) + } reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) - if err = this.service.RpcCallById(GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ - UserSessionId: SessionId, - ServiceMethod: ServiceMethod, - Data: data, - }, reply); err != nil { - log.Errorf("SendMsgToAgent%s:[%s] err:%v", SessionId, ServiceMethod, err) + for k, v := range gateways { + if _, err = this.service.RpcGoById(k, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.BatchMessageReq{ + UserSessionIds: v, + ServiceMethod: ServiceMethod, + Data: data, + }, reply); err != nil { + log.Errorf("SendMsgToUsers:%s->%s err:%v", k, ServiceMethod, err) + } } return } diff --git a/services/worker/main.go b/services/worker/main.go index 492dbf80f..ebf9316f1 100644 --- a/services/worker/main.go +++ b/services/worker/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "go_dreamfactory/modules/login" "go_dreamfactory/services" "go_dreamfactory/sys/cache"