diff --git a/modules/core.go b/modules/core.go index 2d0e32da0..3740cd5f0 100644 --- a/modules/core.go +++ b/modules/core.go @@ -1,6 +1,8 @@ package modules import ( + "go_dreamfactory/pb" + "github.com/liwei1dao/lego/core" "google.golang.org/protobuf/proto" ) @@ -8,6 +10,7 @@ import ( type ( IModule interface { core.IModule - SendMsgToAgent(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) + SendMsgToUser(ServiceMethod string, msg proto.Message, user *pb.Cache_UserData) (err error) + SendMsgToUsers(ServiceMethod string, msg proto.Message, user ...*pb.Cache_UserData) (err error) } ) diff --git a/modules/modulebase.go b/modules/modulebase.go index 231f9e1b5..4edccae72 100644 --- a/modules/modulebase.go +++ b/modules/modulebase.go @@ -23,28 +23,42 @@ func (this *ModuleBase) Init(service core.IService, module core.IModule, options return } -func (this *ModuleBase) SendMsgToAgent(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) { +func (this *ModuleBase) SendMsgToUser(ServiceMethod string, msg proto.Message, user *pb.Cache_UserData) (err error) { reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) - if err = this.service.RpcCallById(GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ - UserSessionId: SessionId, + if _, err = this.service.RpcGoById(user.GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ + UserSessionId: user.SessionId, ServiceMethod: ServiceMethod, Data: data, }, reply); err != nil { - log.Errorf("SendMsgToAgent%s:[%s] err:%v", SessionId, ServiceMethod, err) + log.Errorf("SendMsgToUser%d:%s [%s] err:%v", user.UserData.UserId, user.SessionId, ServiceMethod, err) } return } -func (this *ModuleBase) SendMsgToAgents(GatewayServiceId, SessionId, ServiceMethod string, msg proto.Message) (err error) { +func (this *ModuleBase) SendMsgToUsers(ServiceMethod string, msg proto.Message, user ...*pb.Cache_UserData) (err error) { + var ( + gateways map[string][]string = make(map[string][]string) + gateway []string + ok bool + ) + for _, v := range user { + if gateway, ok = gateways[v.GatewayServiceId]; !ok { + gateway = make([]string, 0) + gateways[v.GatewayServiceId] = gateway + } + gateway = append(gateway, v.SessionId) + } reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) - if err = this.service.RpcCallById(GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ - UserSessionId: SessionId, - ServiceMethod: ServiceMethod, - Data: data, - }, reply); err != nil { - log.Errorf("SendMsgToAgent%s:[%s] err:%v", SessionId, ServiceMethod, err) + for k, v := range gateways { + if _, err = this.service.RpcGoById(k, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.BatchMessageReq{ + UserSessionIds: v, + ServiceMethod: ServiceMethod, + Data: data, + }, reply); err != nil { + log.Errorf("SendMsgToUsers:%s->%s err:%v", k, ServiceMethod, err) + } } return } diff --git a/services/worker/main.go b/services/worker/main.go index 492dbf80f..ebf9316f1 100644 --- a/services/worker/main.go +++ b/services/worker/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "go_dreamfactory/modules/login" "go_dreamfactory/services" "go_dreamfactory/sys/cache"