diff --git a/comm/core.go b/comm/core.go index 56dfb2462..3c61ee71e 100644 --- a/comm/core.go +++ b/comm/core.go @@ -48,10 +48,11 @@ type ErrorCode struct { //用户会话 type IUserSession interface { - SetSession(ip, sessionId, gatewayServiceId, uid string) + SetSession(ip, sessionId, stag, sid, uid string) GetSessionId() string GetUserId() string GetIP() string + GetServiecTag() string GetGatewayServiceId() string IsLogin() bool Bind(uid string, wokerId string) (err error) diff --git a/comm/imodule.go b/comm/imodule.go index fd437d1e9..12367f55f 100644 --- a/comm/imodule.go +++ b/comm/imodule.go @@ -37,7 +37,6 @@ type ( QueryHeroAmount(uId string, heroCfgId int32) (amount uint32) //创建新英雄 CreateHero(uid string, bPush bool, heroCfgId ...int32) error - // 获取英雄 // heroId 英雄ID GetHero(uid, heroId string) (*pb.DBHero, pb.ErrorCode) diff --git a/comm/usersession.go b/comm/usersession.go index d325bb00c..1bf031692 100644 --- a/comm/usersession.go +++ b/comm/usersession.go @@ -16,17 +16,6 @@ import ( 用户会话对象 跨服操作用户的对象 */ -func NewUserSession(service base.IRPCXService, ip, sessionId, gatewayServiceId string, uid string) IUserSession { - return &UserSession{ - IP: ip, - SessionId: sessionId, - GatewayServiceId: gatewayServiceId, - UserId: uid, - msgqueue: make([]*pb.UserMessage, 0), - service: service, - } -} - func NewUserSessionByPools(service base.IRPCXService) IUserSession { return &UserSession{ msgqueue: make([]*pb.UserMessage, 0), @@ -37,6 +26,7 @@ func NewUserSessionByPools(service base.IRPCXService) IUserSession { type UserSession struct { IP string SessionId string + ServiceTag string GatewayServiceId string //用户所在网关服务 UserId string service base.IRPCXService @@ -44,10 +34,11 @@ type UserSession struct { } //重置 -func (this *UserSession) SetSession(ip, sessionId, gatewayServiceId, uid string) { +func (this *UserSession) SetSession(ip, sessionId, stag, sid, uid string) { this.IP = ip this.SessionId = sessionId - this.GatewayServiceId = gatewayServiceId + this.ServiceTag = stag + this.GatewayServiceId = sid this.UserId = uid this.msgqueue = this.msgqueue[:0] } @@ -76,6 +67,11 @@ func (this *UserSession) GetIP() string { return this.IP } +//会话所在集群 +func (this *UserSession) GetServiecTag() string { + return this.ServiceTag +} + //用户当先所在网关服务 func (this *UserSession) GetGatewayServiceId() string { return this.GatewayServiceId diff --git a/lego/base/core.go b/lego/base/core.go index 888f2db96..08f906a53 100644 --- a/lego/base/core.go +++ b/lego/base/core.go @@ -81,4 +81,5 @@ type IRPCXService interface { RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) AcrossClusterRpcCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) AcrossClusterRpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) + ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) } diff --git a/lego/base/rpcx/service.go b/lego/base/rpcx/service.go index 7f0c5926c..34c7ab015 100644 --- a/lego/base/rpcx/service.go +++ b/lego/base/rpcx/service.go @@ -185,3 +185,9 @@ func (this *RPCXService) AcrossClusterRpcCall(ctx context.Context, clusterTag st func (this *RPCXService) AcrossClusterRpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) { return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil) } + +///全集群广播 执行目标远程服务方法 +///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法 +func (this *RPCXService) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return rpcx.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) +} diff --git a/lego/sys/redis/sys_test.go b/lego/sys/redis/sys_test.go index 6fd3b299b..16aa59ec2 100644 --- a/lego/sys/redis/sys_test.go +++ b/lego/sys/redis/sys_test.go @@ -188,3 +188,66 @@ func Test_Redis_Lua_HSETALL(t *testing.T) { fmt.Printf("data: %v", result) } } + +func Test_Redis_Lua_Queue(t *testing.T) { + //Redis 自定义脚本 批量读取队列数据 + script := redis.NewScript(` + local count = tonumber(ARGV[1]) + local k = tostring(ARGV[3]) + local keys = {} + local out = {} + local n = 1 + for i, v in ipairs(KEYS) do + if (i == 1) then + for i=n,#ARGV,1 do + n = n+1 + if ARGV[i] == "#end" then + break + end + end + elseif (i == 2) then + for i=n,#ARGV,1 do + n = n+1 + if ARGV[i] == "#end" then + break + end + end + else + local key = v + local argv = {} + table.insert(keys, key) + for i=n,#ARGV,1 do + n = n+1 + if ARGV[i] == "#end" then + redis.call("HMSet", key,unpack(argv)) + break + else + table.insert(argv, ARGV[i]) + end + end + end + end + redis.call("RPush", k,unpack(keys)) + local c = tonumber(redis.call("LLEN", k)) + if (c > count) then + local off = c-count + out = redis.call("LRANGE", k,0,off-1) + redis.call("LTRIM", k,off,-1) + for i, v in ipairs(out) do + redis.call("DEL", v) + end + end + return out + `) + sha, err := script.Result() + if err != nil { + fmt.Println(err) + } + ret := redis.EvalSha(redis.Context(), sha, []string{"count", "key", "Test_Redis_Lua_Queue_1", "Test_Redis_Lua_Queue_2", "Test_Redis_Lua_Queue_3"}, "10", "#end", "Test_Redis_Lua_Queue", "#end", "a", "1", "b", "2", "#end", "a1", "11", "b1", "21", "#end", "a3", "13", "b3", "23", "#end") + if result, err := ret.Result(); err != nil { + fmt.Printf("Execute Redis err: %v", err.Error()) + } else { + fmt.Printf("data: %v", result) + } + return +} diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index 3e8c38841..7d6024bb7 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -20,32 +20,32 @@ import ( func newClient(options *Options) (sys *Client, err error) { sys = &Client{ - options: options, - metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), - clients: make(map[string]client.XClient), - otherClusterClients: make(map[string]map[string]client.XClient), - conns: make(map[string]net.Conn), - connecting: make(map[string]struct{}), - serviceMap: make(map[string]*service), - msgChan: make(chan *protocol.Message, 1000), + options: options, + metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), + // clients: make(map[string]client.XClient), + clusterClients: make(map[string]map[string]client.XClient), + conns: make(map[string]net.Conn), + connecting: make(map[string]struct{}), + serviceMap: make(map[string]*service), + msgChan: make(chan *protocol.Message, 1000), } return } type Client struct { - options *Options - metadata string - writeTimeout time.Duration - AsyncWrite bool - clients map[string]client.XClient - otherClusterClients map[string]map[string]client.XClient //其他集群客户端 - connsMapMu sync.RWMutex - conns map[string]net.Conn - connectMapMu sync.RWMutex - connecting map[string]struct{} - serviceMapMu sync.RWMutex - serviceMap map[string]*service - msgChan chan *protocol.Message // 接收rpcXServer推送消息 + options *Options + metadata string + writeTimeout time.Duration + AsyncWrite bool + // clients map[string]client.XClient + clusterClients map[string]map[string]client.XClient //其他集群客户端 + connsMapMu sync.RWMutex + conns map[string]net.Conn + connectMapMu sync.RWMutex + connecting map[string]struct{} + serviceMapMu sync.RWMutex + serviceMap map[string]*service + msgChan chan *protocol.Message // 接收rpcXServer推送消息 } // DoMessage 服务端消息处理 @@ -77,8 +77,10 @@ func (this *Client) Start() (err error) { //停止RPC 服务 func (this *Client) Stop() (err error) { - for _, v := range this.clients { - v.Close() + for _, v := range this.clusterClients { + for _, v1 := range v { + v1.Close() + } } close(this.msgChan) //关闭消息处理 return @@ -114,24 +116,30 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod return } var ( - spath []string - d *client.ConsulDiscovery - c client.XClient - ok bool + spath []string + clients map[string]client.XClient + d *client.ConsulDiscovery + c client.XClient + ok bool ) spath = strings.Split(servicePath, "/") - if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { - return + if clients, ok = this.clusterClients[this.options.ServiceTag]; !ok { + this.clusterClients[this.options.ServiceTag] = make(map[string]client.XClient) + clients = this.clusterClients[this.options.ServiceTag] + } else { + if c, ok = clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { + return + } + c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) + c.GetPlugins().Add(this) + if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { + c.SetSelector(newSelector(this.UpdateServer)) + } else { + c.SetSelector(newSelector(nil)) + } + clients[spath[0]] = c } - c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) - c.GetPlugins().Add(this) - if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { - c.SetSelector(newSelector(this.UpdateServer)) - } else { - c.SetSelector(newSelector(nil)) - } - this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ ServiceClusterTag: this.options.ServiceTag, @@ -151,20 +159,26 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st return } var ( - spath []string - d *client.ConsulDiscovery - c client.XClient - ok bool + spath []string + clients map[string]client.XClient + d *client.ConsulDiscovery + c client.XClient + ok bool ) spath = strings.Split(servicePath, "/") - if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { - return + if clients, ok = this.clusterClients[this.options.ServiceTag]; !ok { + this.clusterClients[this.options.ServiceTag] = make(map[string]client.XClient) + clients = this.clusterClients[this.options.ServiceTag] + } else { + if c, ok = clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { + return + } + c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) + c.GetPlugins().Add(this) + c.SetSelector(newSelector(this.UpdateServer)) + clients[spath[0]] = c } - c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) - c.GetPlugins().Add(this) - c.SetSelector(newSelector(this.UpdateServer)) - this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ ServiceClusterTag: this.options.ServiceTag, @@ -182,24 +196,30 @@ func (this *Client) Broadcast(ctx context.Context, servicePath string, serviceMe return } var ( - spath []string - d *client.ConsulDiscovery - c client.XClient - ok bool + spath []string + clients map[string]client.XClient + d *client.ConsulDiscovery + c client.XClient + ok bool ) spath = strings.Split(servicePath, "/") - if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { - return + if clients, ok = this.clusterClients[this.options.ServiceTag]; !ok { + this.clusterClients[this.options.ServiceTag] = make(map[string]client.XClient) + clients = this.clusterClients[this.options.ServiceTag] + } else { + if c, ok = clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { + return + } + c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) + c.GetPlugins().Add(this) + if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { + c.SetSelector(newSelector(this.UpdateServer)) + } else { + c.SetSelector(newSelector(nil)) + } + clients[spath[0]] = c } - c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) - c.GetPlugins().Add(this) - if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { - c.SetSelector(newSelector(this.UpdateServer)) - } else { - c.SetSelector(newSelector(nil)) - } - this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ ServiceClusterTag: this.options.ServiceTag, @@ -225,9 +245,9 @@ func (this *Client) AcrossClusterCall(ctx context.Context, clusterTag string, se ok bool ) spath = strings.Split(servicePath, "/") - if clients, ok = this.otherClusterClients[clusterTag]; !ok { - this.otherClusterClients[clusterTag] = make(map[string]client.XClient) - clients = this.otherClusterClients[clusterTag] + if clients, ok = this.clusterClients[clusterTag]; !ok { + this.clusterClients[clusterTag] = make(map[string]client.XClient) + clients = this.clusterClients[clusterTag] } else { if c, ok = clients[spath[0]]; !ok { if d, err = client.NewConsulDiscovery(clusterTag, spath[0], this.options.ConsulServers, nil); err != nil { @@ -267,9 +287,9 @@ func (this *Client) AcrossClusterGo(ctx context.Context, clusterTag string, serv ok bool ) spath = strings.Split(servicePath, "/") - if clients, ok = this.otherClusterClients[clusterTag]; !ok { - this.otherClusterClients[clusterTag] = make(map[string]client.XClient) - clients = this.otherClusterClients[clusterTag] + if clients, ok = this.clusterClients[clusterTag]; !ok { + this.clusterClients[clusterTag] = make(map[string]client.XClient) + clients = this.clusterClients[clusterTag] } else { if c, ok = clients[spath[0]]; !ok { if d, err = client.NewConsulDiscovery(clusterTag, spath[0], this.options.ConsulServers, nil); err != nil { @@ -294,6 +314,96 @@ func (this *Client) AcrossClusterGo(ctx context.Context, clusterTag string, serv return c.Go(ctx, string(serviceMethod), args, reply, done) } +//跨集群 广播 +func (this *Client) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + var ( + spath []string + clients map[string]client.XClient + d *client.ConsulDiscovery + c client.XClient + ok bool + ) + spath = strings.Split(servicePath, "/") + if clients, ok = this.clusterClients[clusterTag]; !ok { + this.clusterClients[clusterTag] = make(map[string]client.XClient) + clients = this.clusterClients[clusterTag] + } else { + if c, ok = clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { + return + } + c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) + c.GetPlugins().Add(this) + if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { + c.SetSelector(newSelector(this.UpdateServer)) + } else { + c.SetSelector(newSelector(nil)) + } + clients[spath[0]] = c + } + } + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + ServiceClusterTag: this.options.ServiceTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + }) + err = c.Broadcast(ctx, serviceMethod, args, reply) + return +} + +func (this *Client) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + var ( + spath []string + clients []client.XClient + ) + spath = strings.Split(servicePath, "/") + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + ServiceClusterTag: this.options.ServiceTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + }) + clients = make([]client.XClient, 0) + for _, v := range this.clusterClients { + if _client, ok := v[spath[0]]; ok { + clients = append(clients, _client) + } + } + + l := len(clients) + done := make(chan error, l) + for _, v := range clients { + go func(c client.XClient) { + done <- c.Broadcast(ctx, serviceMethod, args, reply) + }(v) + } + timeout := time.NewTimer(time.Minute) +check: + for { + select { + case err = <-done: + l-- + if l == 0 || err != nil { // all returns or some one returns an error + break check + } + case <-timeout.C: + err = errors.New(("timeout")) + break check + } + } + timeout.Stop() + return +} + //监控服务发现,发现没有连接上的额服务端 就连接上去 func (this *Client) UpdateServer(servers map[string]*ServiceNode) { for _, v := range servers { diff --git a/lego/sys/rpcx/core.go b/lego/sys/rpcx/core.go index 1833894dc..337a8046e 100644 --- a/lego/sys/rpcx/core.go +++ b/lego/sys/rpcx/core.go @@ -29,7 +29,9 @@ type ( Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) + AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) + ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) } ISelector interface { client.Selector @@ -90,6 +92,10 @@ func Go(ctx context.Context, servicePath string, serviceMethod string, args inte return defsys.Go(ctx, servicePath, serviceMethod, args, reply, done) } +func ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return defsys.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) +} + //服务元数据转服务节点信息 func smetaToServiceNode(meta string) (node *ServiceNode, err error) { if meta == "" { diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 986103bb8..edd14091b 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -80,7 +80,18 @@ func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, serv return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) } +//跨集群 广播 +func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + err = this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + return +} + //跨服异步调用 func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) } + +//全集群广播 +func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) +} diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index f4e446079..f58217054 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -176,12 +176,24 @@ func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, s return } +//跨集群 广播 +func (this *Service) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + err = this.broadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + return +} + //跨服 异步调用 远程服务 func (this *Service) AcrossClusterGo(ctx context.Context, clusterTag, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) { _, _call, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) return } +//全集群广播 +func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + err = this.clusterbroadcast(ctx, servicePath, serviceMethod, args, reply) + return +} + //监听客户端链接到服务上 保存客户端的连接对象 func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { var ( @@ -319,7 +331,7 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st return } metadata = map[string]string{ - ServiceClusterTag: clusterTag, + ServiceClusterTag: this.options.ServiceTag, CallRoutRulesKey: servicePath, ServiceAddrKey: "tcp@" + this.options.ServiceAddr, ServiceMetaKey: this.metadata, @@ -376,7 +388,7 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa return } metadata = map[string]string{ - ServiceClusterTag: clusterTag, + ServiceClusterTag: this.options.ServiceTag, CallRoutRulesKey: servicePath, ServiceAddrKey: "tcp@" + this.options.ServiceAddr, ServiceMetaKey: this.metadata, @@ -398,11 +410,10 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa done := make(chan error, l) for _, v := range clientaddrs { go func(addr string) { - this.clientmutex.RLock() conn, ok := this.clients[addr] if !ok { - err = fmt.Errorf("on found clientaddr:%s", addr) + done <- fmt.Errorf("on found clientaddr:%s", addr) this.clientmutex.RUnlock() return } @@ -440,7 +451,6 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa } }(v) } - timeout := time.NewTimer(time.Minute) check: for { @@ -455,7 +465,105 @@ check: break check } } + timeout.Stop() + return err +} +//全集群广播 +func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + spath []string + clientaddrs []string + metadata map[string]string + ) + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + metadata = map[string]string{ + ServiceClusterTag: this.options.ServiceTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + } + spath = strings.Split(servicePath, "/") + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + }) + addrs := make(map[string]struct{}) + for _, v := range this.selectors { + if clientaddrs = v.Find(ctx, spath[0], serviceMethod, args); clientaddrs != nil && len(clientaddrs) >= 0 { + for _, v1 := range clientaddrs { + addrs[v1] = struct{}{} + } + } + } + if len(addrs) == 0 { + err = fmt.Errorf("on found service:%s", spath[0]) + return + } + + l := len(addrs) + done := make(chan error, l) + for v, _ := range addrs { + go func(addr string) { + this.clientmutex.RLock() + conn, ok := this.clients[addr] + if !ok { + done <- fmt.Errorf("on found clientaddr:%s", addr) + this.clientmutex.RUnlock() + return + } + this.clientmutex.RUnlock() + _call := new(client.Call) + _call.ServicePath = servicePath + _call.ServiceMethod = serviceMethod + _call.Args = args + _call.Reply = reply + _call.Done = make(chan *client.Call, 10) + this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) + seq, _ := ctx.Value(seqKey{}).(*uint64) + select { + case <-ctx.Done(): // cancel by context + this.mutex.Lock() + call := this.pending[*seq] + delete(this.pending, *seq) + this.mutex.Unlock() + if call != nil { + call.Error = ctx.Err() + call.Done <- call + } + done <- ctx.Err() + case call := <-_call.Done: + err = call.Error + meta := ctx.Value(share.ResMetaDataKey) + if meta != nil && len(call.ResMetadata) > 0 { + resMeta := meta.(map[string]string) + for k, v := range call.ResMetadata { + resMeta[k] = v + } + resMeta[share.ServerAddress] = conn.RemoteAddr().String() + } + done <- nil + } + }(v) + } + timeout := time.NewTimer(time.Minute) +check: + for { + select { + case err = <-done: + l-- + if l == 0 || err != nil { // all returns or some one returns an error + break check + } + case <-timeout.C: + err = errors.New(("timeout")) + break check + } + } timeout.Stop() return err } diff --git a/modules/chat/api_crosschannel.go b/modules/chat/api_crosschannel.go new file mode 100644 index 000000000..818d29adc --- /dev/null +++ b/modules/chat/api_crosschannel.go @@ -0,0 +1,20 @@ +package chat + +import ( + "go_dreamfactory/comm" + "go_dreamfactory/pb" + + "google.golang.org/protobuf/proto" +) + +//参数校验 +func (this *apiComp) CrossChannelCheck(session comm.IUserSession, req *pb.ChatCrossChannelReq) (code pb.ErrorCode) { + + return +} + +///获取未读消息 +func (this *apiComp) CrossChannel(session comm.IUserSession, req *pb.ChatCrossChannelReq) (code pb.ErrorCode, data proto.Message) { + this.module.modelChat.AddCrossChannelMember(session) + return +} diff --git a/modules/chat/api_send.go b/modules/chat/api_send.go index 01e022e86..5a29498db 100644 --- a/modules/chat/api_send.go +++ b/modules/chat/api_send.go @@ -27,7 +27,6 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code Channel: req.Channel, Suid: session.GetUserId(), Ruid: req.TargetId, - AreaId: this.service.GetTag(), Headid: user.Avatar, Content: req.Content, Ctime: time.Now().Unix(), @@ -47,12 +46,6 @@ func (this *apiComp) Send(session comm.IUserSession, req *pb.ChatSendReq) (code case pb.ChatChannel_Private: this.module.PushUser(msg) break - case pb.ChatChannel_CrossServer: - this.module.PushAllWorld(msg) - break - case pb.ChatChannel_System: - this.module.PushAllWorld(msg) - break } return } diff --git a/modules/chat/api_spansend.go b/modules/chat/api_spansend.go index 957e9e307..faa73ec07 100644 --- a/modules/chat/api_spansend.go +++ b/modules/chat/api_spansend.go @@ -3,7 +3,9 @@ package chat import ( "go_dreamfactory/comm" "go_dreamfactory/pb" + "time" + "go.mongodb.org/mongo-driver/bson/primitive" "google.golang.org/protobuf/proto" ) @@ -15,12 +17,31 @@ func (this *apiComp) SpanSendCheck(session comm.IUserSession, req *pb.ChatSpanSe ///跨越服务消息请求 func (this *apiComp) SpanSend(session comm.IUserSession, req *pb.ChatSpanSendReq) (code pb.ErrorCode, data proto.Message) { - - defer func() { - if code == pb.ErrorCode_Success { - session.SendMsg(string(this.module.GetType()), "send", &pb.ChatSendResp{}) - } - }() - + var ( + err error + msg *pb.DBChat + user *pb.DBUser + ) + msg = &pb.DBChat{ + Id: primitive.NewObjectID().Hex(), + Channel: req.Channel, + Suid: session.GetUserId(), + Headid: user.Avatar, + Content: req.Content, + Ctime: time.Now().Unix(), + } + if err = this.module.modelChat.AddChatMsg(msg); err != nil { + code = pb.ErrorCode_DBError + return + } + session.SendMsg(string(this.module.GetType()), "send", &pb.ChatSendResp{}) + switch msg.Channel { + case pb.ChatChannel_CrossServer: + this.module.PushAllWorld(msg) + break + case pb.ChatChannel_System: + this.module.PushAllWorld(msg) + break + } return } diff --git a/modules/chat/modelChat.go b/modules/chat/modelChat.go index dd19bc156..c6e05625f 100644 --- a/modules/chat/modelChat.go +++ b/modules/chat/modelChat.go @@ -2,7 +2,11 @@ package chat import ( "context" + "fmt" + "go_dreamfactory/comm" "go_dreamfactory/lego/core" + "go_dreamfactory/lego/sys/codec" + "go_dreamfactory/lego/sys/log" "go_dreamfactory/modules" "go_dreamfactory/pb" @@ -11,6 +15,11 @@ import ( "go.mongodb.org/mongo-driver/x/bsonx" ) +var worldchatkey = "chat:world" +var unionchatkey = "chat:union" +var crosschatkey = "chat:cross" +var systemchatkey = "chat:system" + ///论坛 数据组件 type modelChatComp struct { modules.MCompModel @@ -53,8 +62,79 @@ func (this *modelChatComp) QueryUserMsg(uid string) (result []*pb.DBChat, err er return } -//添加聊天消息到数据库中 -func (this *modelChatComp) AddChatMsg(msg *pb.DBChat) (err error) { - +//添加跨服频道成员 +func (this *modelChatComp) AddCrossChannelMember(session comm.IUserSession) (channel int, err error) { + udata := &pb.CacheUser{ + Uid: session.GetUserId(), + SessionId: session.GetSessionId(), + ServiceTag: session.GetServiecTag(), + GatewayServiceId: session.GetGatewayServiceId(), + Ip: session.GetIP(), + } + channel = 0 + count := 0 + for { + key := fmt.Sprintf("%s:%d-member", crosschatkey, channel) + if count, err = this.Redis.Hlen(key); err != nil { + this.module.Errorf("err:%v", err) + return + } + if count < 3000 { + if err = this.Redis.HMSet(key, map[string]interface{}{session.GetUserId(): udata}); err != nil { + log.Errorf("err%v", err) + return + } + break + } else { + channel++ + } + } + return +} + +//添加聊天消息到数据库中 +func (this *modelChatComp) AddChatMsg(msg *pb.DBChat) (err error) { + switch msg.Channel { + case pb.ChatChannel_World: + this.addChatMsg(worldchatkey, 99, msg) + break + case pb.ChatChannel_Union: + this.addChatMsg(unionchatkey, 99, msg) + break + case pb.ChatChannel_CrossServer: + this.addChatMsg(fmt.Sprintf("%s:%d", crosschatkey, msg.AreaId), 99, msg) + break + case pb.ChatChannel_System: + // this.addChatMsg(systemchatkey, 99, msg) + break + } + return +} + +func (this *modelChatComp) SaveUserMsg(msg *pb.DBChat) (err error) { + if _, err = this.DB.InsertOne(core.SqlTable(this.TableName), msg); err != nil { + this.module.Errorf("err:%v", err) + return + } + return +} + +func (this *modelChatComp) addChatMsg(key string, count int, msg *pb.DBChat) (err error) { + var ( + tempdata map[string]string + outkey []string + ) + if tempdata, err = codec.MarshalMapJson(msg); err != nil { + this.module.Errorf("err:%v", err) + return + } + if outkey, err = this.Batchsetqueues(key, count, []string{fmt.Sprintf("%s-%s", key, msg.Id)}, []map[string]string{tempdata}); err != nil { + this.module.Errorf("err:%v", err) + return + } + if err = this.DeleteModelLogs(this.TableName, msg.Ruid, bson.M{"_id": bson.M{"$in": outkey}}); err != nil { + this.module.Errorf("err:%v", err) + return + } return } diff --git a/modules/chat/module.go b/modules/chat/module.go index fae9dc3ba..6f93e3186 100644 --- a/modules/chat/module.go +++ b/modules/chat/module.go @@ -80,10 +80,24 @@ func (this *Chat) PushUser(msg *pb.DBChat) { this.Errorf("err:%v", err) } return + } else { + this.modelChat.SaveUserMsg(msg) } } -//推送消息到所有区服 +//全集群推送 func (this *Chat) PushAllWorld(msg *pb.DBChat) { - + var ( + err error + reply *pb.RPCMessageReply + ) + reply = &pb.RPCMessageReply{} + data, _ := anypb.New(msg) + if err = this.service.ClusterBroadcast(context.Background(), comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ + MainType: string(this.GetType()), + SubType: "push", + Data: data, + }, reply); err != nil { + this.Errorf("err:%v", err) + } } diff --git a/modules/comp_model.go b/modules/comp_model.go index 2a11b67b1..c8e93bf26 100644 --- a/modules/comp_model.go +++ b/modules/comp_model.go @@ -58,17 +58,80 @@ end return "OK" ` +//Redis 自定义脚本 批量读取队列数据 +var LusScriptgetQueue = ` +local key = tostring(KEYS[1]) +local keys = redis.call("LRANGE", key,0,-1) +local data = {} +for i, v in ipairs(keys) do + data[i] = redis.call("HGETALL", v) +end +return data +` + +//Redis 自定义脚本 批量写入队列数据 +var LusScriptsetQueue = ` +local count = tonumber(ARGV[1]) +local k = tostring(ARGV[3]) +local keys = {} +local out = {} +local n = 1 +for i, v in ipairs(KEYS) do + if (i == 1) then + for i=n,#ARGV,1 do + n = n+1 + if ARGV[i] == "#end" then + break + end + end + elseif (i == 2) then + for i=n,#ARGV,1 do + n = n+1 + if ARGV[i] == "#end" then + break + end + end + else + local key = v + local argv = {} + table.insert(keys, key) + for i=n,#ARGV,1 do + n = n+1 + if ARGV[i] == "#end" then + redis.call("HMSet", key,unpack(argv)) + break + else + table.insert(argv, ARGV[i]) + end + end + end +end +redis.call("RPush", k,unpack(keys)) +local c = tonumber(redis.call("LLEN", k)) +if (c > count) then + local off = c-count + out = redis.call("LRANGE", k,0,off-1) + redis.call("LTRIM", k,off,-1) + for i, v in ipairs(out) do + redis.call("DEL", v) + end +end +return out +` + /* 基础组件 缓存组件 读写缓存数据 DB组件也封装进来 */ type MCompModel struct { cbase.ModuleCompBase - Redis redis.ISys - DB mgo.ISys - TableName string //redis key前缀 - getListSha1 string //getList LusScript 的shal值 - setListSha1 string //getList LusScript 的shal值 + Redis redis.ISys + DB mgo.ISys + TableName string //redis key前缀 + getListSha1 string //getList LusScript 的shal值 + setListSha1 string //getList LusScript 的shal值 + getQueueSha1 string //getList LusScript 的shal值 + setQueueSha1 string //getList LusScript 的shal值 } const ( @@ -90,6 +153,12 @@ func (this *MCompModel) Start() (err error) { if this.setListSha1, err = this.Redis.NewScript(LusScriptsetList).Result(); err != nil { return } + if this.getQueueSha1, err = this.Redis.NewScript(LusScriptgetQueue).Result(); err != nil { + return + } + if this.setQueueSha1, err = this.Redis.NewScript(LusScriptsetQueue).Result(); err != nil { + return + } return } @@ -314,7 +383,7 @@ func (this *MCompModel) GetList(uid string, data interface{}) (err error) { return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() - if cdata, err = this.batchgetlists(this.ukey(uid)); err == nil { + if cdata, err = this.Batchgetlists(this.ukey(uid)); err == nil { for _, v := range cdata { sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) @@ -364,7 +433,7 @@ func (this *MCompModel) GetList(uid string, data interface{}) (err error) { } if len(wdata) > 0 { wdata[this.ukey(uid)] = keys - err = this.batchsetlists(wdata) + err = this.Batchsetlists(wdata) } } } @@ -418,20 +487,30 @@ func (this *MCompModel) DelListlds(uid string, ids ...string) (err error) { //获取用户通过扩展表 func (this *MCompModel) GetUserExpand(uid string) (result *pb.DBUserExpand, err error) { result = &pb.DBUserExpand{} - if err = this.Redis.HGetAll(this.ukey(uid), result); err != nil && err != redis.RedisNil { + key := fmt.Sprintf("userexpand:%s", uid) + if err = this.Redis.HGetAll(key, result); err != nil && err != redis.RedisNil { return } if err == redis.RedisNil { if err = this.DB.FindOne(core.SqlTable("userexpand"), bson.M{"uid": uid}).Decode(result); err != nil { return } - err = this.Redis.HMSet(this.ukey(uid), result) + err = this.Redis.HMSet(key, result) } return } +//修改用户扩展数据 +func (this *MCompModel) ChanageUserExpand(uid string, value map[string]interface{}) (err error) { + if err = this.Redis.HSet("userexpand", uid, value); err != nil && err != redis.RedisNil { + return + } + err = this.UpdateModelLogs("userexpand", uid, bson.M{"uid": uid}, value) + return +} + //批量读取列表数据 -func (this *MCompModel) batchgetlists(key string) (result []map[string]string, err error) { +func (this *MCompModel) Batchgetlists(key string) (result []map[string]string, err error) { var data interface{} ret := this.Redis.EvalSha(this.Redis.Context(), this.getListSha1, []string{key}) if data, err = ret.Result(); err != nil { @@ -455,7 +534,7 @@ func (this *MCompModel) batchgetlists(key string) (result []map[string]string, e } //批量写入数据 -func (this *MCompModel) batchsetlists(data map[string]map[string]string) (err error) { +func (this *MCompModel) Batchsetlists(data map[string]map[string]string) (err error) { var ( n int keys []string @@ -479,6 +558,67 @@ func (this *MCompModel) batchsetlists(data map[string]map[string]string) (err er return } +//批量读取队列数据 +func (this *MCompModel) Batchgetqueues(key string) (result []map[string]string, err error) { + var data interface{} + ret := this.Redis.EvalSha(this.Redis.Context(), this.getQueueSha1, []string{key}) + if data, err = ret.Result(); err != nil { + fmt.Printf("Execute batchgetqueues err: %v", err.Error()) + } else { + temp1 := data.([]interface{}) + result = make([]map[string]string, len(temp1)) + for i, v := range temp1 { + temp2 := v.([]interface{}) + result[i] = make(map[string]string) + for n := 0; n < len(temp2); n += 2 { + result[i][temp2[n].(string)] = temp2[n+1].(string) + } + } + if len(result) == 0 { + err = redis.RedisNil + return + } + } + return +} + +//批量写入队列 并返回移除队列 +func (this *MCompModel) Batchsetqueues(key string, count int, ks []string, vs []map[string]string) (outkey []string, err error) { + var ( + n int + keys []string + values []interface{} + result interface{} + ) + keys = make([]string, len(ks)+2) + values = make([]interface{}, 0) + keys[0] = "count" + values = append(values, count) + values = append(values, "#end") + keys[1] = "key" + values = append(values, key) + values = append(values, "#end") + n = 2 + for i, v := range ks { + keys[n] = v + for k1, v1 := range vs[i] { + values = append(values, k1, v1) + } + values = append(values, "#end") + n++ + } + ret := this.Redis.EvalSha(this.Redis.Context(), this.setQueueSha1, keys, values...) + if result, err = ret.Result(); err != nil { + fmt.Printf("Execute batchsetqueues err: %v", err.Error()) + } else { + outkey = make([]string, len(result.([]interface{}))) + for i, v := range result.([]interface{}) { + outkey[i] = v.(string) + } + } + return +} + //日志操作可选项 func (this *MCompModel) logOpt(uid string, data interface{}, attrs ...*cache.OperationAttr) error { ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil) diff --git a/modules/gateway/agentmgr_comp.go b/modules/gateway/agentmgr_comp.go index 3c85854b9..af1cbfc91 100644 --- a/modules/gateway/agentmgr_comp.go +++ b/modules/gateway/agentmgr_comp.go @@ -44,7 +44,7 @@ func (this *AgentMgrComp) DisConnect(a IAgent) { UserSessionId: a.SessionId(), UserId: a.UserId(), }, reply); err != nil { - log.Errorf(" uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err) + log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err) } } } diff --git a/modules/mgolog/db_comp.go b/modules/mgolog/db_comp.go index 0dd434bb1..66f7718cf 100644 --- a/modules/mgolog/db_comp.go +++ b/modules/mgolog/db_comp.go @@ -148,7 +148,7 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { for _, v := range data.D[2].(bson.D) { Query[v.Key] = v.Value } - _, err = this.DB.UpdateMany(core.SqlTable(_tableName), Where, bson.M{"$set": Query}) //, new(options.UpdateOptions).SetUpsert(true) + _, err = this.DB.UpdateMany(core.SqlTable(_tableName), Where, bson.M{"$set": Query}, options.MergeUpdateOptions().SetUpsert(true)) //, new(options.UpdateOptions).SetUpsert(true) if err != nil { log.Errorf("Update %s db err:%v", core.SqlTable(_tableName), err) ErrorLogCount[data.ID]++ diff --git a/modules/notify/api_getlist.go b/modules/notify/api_getlist.go index b493c3f0b..13e5dcf96 100644 --- a/modules/notify/api_getlist.go +++ b/modules/notify/api_getlist.go @@ -3,6 +3,7 @@ package notify import ( "go_dreamfactory/comm" "go_dreamfactory/pb" + "time" "google.golang.org/protobuf/proto" ) @@ -20,9 +21,6 @@ func (this *apiComp) GetList(session comm.IUserSession, req *pb.NotifyGetListReq userexpand *pb.DBUserExpand notify []*pb.DBSystemNotify ) - defer func() { - session.SendMsg(string(this.module.GetType()), "getlist", &pb.NotifyGetListResp{LastReadTime: userexpand.Lastreadnotiftime, SysNotify: notify}) - }() if notify, err = this.module.modelNotify.GetFullNotify(); err != nil { code = pb.ErrorCode_DBError return @@ -30,9 +28,15 @@ func (this *apiComp) GetList(session comm.IUserSession, req *pb.NotifyGetListReq if session.GetUserId() != "" { if userexpand, err = this.module.modelNotify.GetUserExpand(session.GetUserId()); err != nil { code = pb.ErrorCode_DBError + return } } else { userexpand = &pb.DBUserExpand{} } + //修改最后公告读取时间 + this.module.modelNotify.ChanageUserExpand(session.GetUserId(), map[string]interface{}{ + "lastreadnotiftime": time.Now().Unix(), + }) + session.SendMsg(string(this.module.GetType()), "getlist", &pb.NotifyGetListResp{LastReadTime: userexpand.Lastreadnotiftime, SysNotify: notify}) return } diff --git a/pb/chat_db.pb.go b/pb/chat_db.pb.go index 8e562450d..c4399a68a 100644 --- a/pb/chat_db.pb.go +++ b/pb/chat_db.pb.go @@ -84,7 +84,7 @@ type DBChat struct { Channel ChatChannel `protobuf:"varint,2,opt,name=channel,proto3,enum=ChatChannel" json:"channel"` //频道 Suid string `protobuf:"bytes,3,opt,name=suid,proto3" json:"suid"` //发送用户id Ruid string `protobuf:"bytes,4,opt,name=ruid,proto3" json:"ruid"` //接收用户id channel == Private 有效 - AreaId string `protobuf:"bytes,5,opt,name=areaId,proto3" json:"areaId"` //区服id + AreaId int32 `protobuf:"varint,5,opt,name=areaId,proto3" json:"areaId"` //跨服频道Id UnionId string `protobuf:"bytes,6,opt,name=unionId,proto3" json:"unionId"` //工会id Headid int32 `protobuf:"varint,7,opt,name=headid,proto3" json:"headid"` //用户头像 Uname string `protobuf:"bytes,8,opt,name=uname,proto3" json:"uname"` //用户名 @@ -152,11 +152,11 @@ func (x *DBChat) GetRuid() string { return "" } -func (x *DBChat) GetAreaId() string { +func (x *DBChat) GetAreaId() int32 { if x != nil { return x.AreaId } - return "" + return 0 } func (x *DBChat) GetUnionId() string { @@ -205,7 +205,7 @@ var file_chat_chat_db_proto_rawDesc = []byte{ 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x75, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x72, 0x75, 0x69, 0x64, 0x12, - 0x16, 0x0a, 0x06, 0x61, 0x72, 0x65, 0x61, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x16, 0x0a, 0x06, 0x61, 0x72, 0x65, 0x61, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x61, 0x72, 0x65, 0x61, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x75, 0x6e, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x75, 0x6e, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, diff --git a/pb/chat_msg.pb.go b/pb/chat_msg.pb.go index fa5f33810..fe5826429 100644 --- a/pb/chat_msg.pb.go +++ b/pb/chat_msg.pb.go @@ -68,6 +68,93 @@ func (x *ChatMessagePush) GetChats() []*DBChat { return nil } +//申请跨服频道号 +type ChatCrossChannelReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ChatCrossChannelReq) Reset() { + *x = ChatCrossChannelReq{} + if protoimpl.UnsafeEnabled { + mi := &file_chat_chat_msg_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChatCrossChannelReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChatCrossChannelReq) ProtoMessage() {} + +func (x *ChatCrossChannelReq) ProtoReflect() protoreflect.Message { + mi := &file_chat_chat_msg_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChatCrossChannelReq.ProtoReflect.Descriptor instead. +func (*ChatCrossChannelReq) Descriptor() ([]byte, []int) { + return file_chat_chat_msg_proto_rawDescGZIP(), []int{1} +} + +//申请跨服频道号 回应 +type ChatCrossChannelResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ChannelId int32 `protobuf:"varint,1,opt,name=ChannelId,proto3" json:"ChannelId"` +} + +func (x *ChatCrossChannelResp) Reset() { + *x = ChatCrossChannelResp{} + if protoimpl.UnsafeEnabled { + mi := &file_chat_chat_msg_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChatCrossChannelResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChatCrossChannelResp) ProtoMessage() {} + +func (x *ChatCrossChannelResp) ProtoReflect() protoreflect.Message { + mi := &file_chat_chat_msg_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChatCrossChannelResp.ProtoReflect.Descriptor instead. +func (*ChatCrossChannelResp) Descriptor() ([]byte, []int) { + return file_chat_chat_msg_proto_rawDescGZIP(), []int{2} +} + +func (x *ChatCrossChannelResp) GetChannelId() int32 { + if x != nil { + return x.ChannelId + } + return 0 +} + //请求未读消息 type ChatGetListReq struct { state protoimpl.MessageState @@ -78,7 +165,7 @@ type ChatGetListReq struct { func (x *ChatGetListReq) Reset() { *x = ChatGetListReq{} if protoimpl.UnsafeEnabled { - mi := &file_chat_chat_msg_proto_msgTypes[1] + mi := &file_chat_chat_msg_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -91,7 +178,7 @@ func (x *ChatGetListReq) String() string { func (*ChatGetListReq) ProtoMessage() {} func (x *ChatGetListReq) ProtoReflect() protoreflect.Message { - mi := &file_chat_chat_msg_proto_msgTypes[1] + mi := &file_chat_chat_msg_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,7 +191,7 @@ func (x *ChatGetListReq) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatGetListReq.ProtoReflect.Descriptor instead. func (*ChatGetListReq) Descriptor() ([]byte, []int) { - return file_chat_chat_msg_proto_rawDescGZIP(), []int{1} + return file_chat_chat_msg_proto_rawDescGZIP(), []int{3} } type ChatGetListResp struct { @@ -118,7 +205,7 @@ type ChatGetListResp struct { func (x *ChatGetListResp) Reset() { *x = ChatGetListResp{} if protoimpl.UnsafeEnabled { - mi := &file_chat_chat_msg_proto_msgTypes[2] + mi := &file_chat_chat_msg_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -131,7 +218,7 @@ func (x *ChatGetListResp) String() string { func (*ChatGetListResp) ProtoMessage() {} func (x *ChatGetListResp) ProtoReflect() protoreflect.Message { - mi := &file_chat_chat_msg_proto_msgTypes[2] + mi := &file_chat_chat_msg_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -144,7 +231,7 @@ func (x *ChatGetListResp) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatGetListResp.ProtoReflect.Descriptor instead. func (*ChatGetListResp) Descriptor() ([]byte, []int) { - return file_chat_chat_msg_proto_rawDescGZIP(), []int{2} + return file_chat_chat_msg_proto_rawDescGZIP(), []int{4} } func (x *ChatGetListResp) GetChats() []*DBChat { @@ -168,7 +255,7 @@ type ChatSendReq struct { func (x *ChatSendReq) Reset() { *x = ChatSendReq{} if protoimpl.UnsafeEnabled { - mi := &file_chat_chat_msg_proto_msgTypes[3] + mi := &file_chat_chat_msg_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -181,7 +268,7 @@ func (x *ChatSendReq) String() string { func (*ChatSendReq) ProtoMessage() {} func (x *ChatSendReq) ProtoReflect() protoreflect.Message { - mi := &file_chat_chat_msg_proto_msgTypes[3] + mi := &file_chat_chat_msg_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -194,7 +281,7 @@ func (x *ChatSendReq) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatSendReq.ProtoReflect.Descriptor instead. func (*ChatSendReq) Descriptor() ([]byte, []int) { - return file_chat_chat_msg_proto_rawDescGZIP(), []int{3} + return file_chat_chat_msg_proto_rawDescGZIP(), []int{5} } func (x *ChatSendReq) GetChannel() ChatChannel { @@ -228,7 +315,7 @@ type ChatSendResp struct { func (x *ChatSendResp) Reset() { *x = ChatSendResp{} if protoimpl.UnsafeEnabled { - mi := &file_chat_chat_msg_proto_msgTypes[4] + mi := &file_chat_chat_msg_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -241,7 +328,7 @@ func (x *ChatSendResp) String() string { func (*ChatSendResp) ProtoMessage() {} func (x *ChatSendResp) ProtoReflect() protoreflect.Message { - mi := &file_chat_chat_msg_proto_msgTypes[4] + mi := &file_chat_chat_msg_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -254,7 +341,7 @@ func (x *ChatSendResp) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatSendResp.ProtoReflect.Descriptor instead. func (*ChatSendResp) Descriptor() ([]byte, []int) { - return file_chat_chat_msg_proto_rawDescGZIP(), []int{4} + return file_chat_chat_msg_proto_rawDescGZIP(), []int{6} } //跨服消息发送请求 @@ -270,7 +357,7 @@ type ChatSpanSendReq struct { func (x *ChatSpanSendReq) Reset() { *x = ChatSpanSendReq{} if protoimpl.UnsafeEnabled { - mi := &file_chat_chat_msg_proto_msgTypes[5] + mi := &file_chat_chat_msg_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -283,7 +370,7 @@ func (x *ChatSpanSendReq) String() string { func (*ChatSpanSendReq) ProtoMessage() {} func (x *ChatSpanSendReq) ProtoReflect() protoreflect.Message { - mi := &file_chat_chat_msg_proto_msgTypes[5] + mi := &file_chat_chat_msg_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -296,7 +383,7 @@ func (x *ChatSpanSendReq) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatSpanSendReq.ProtoReflect.Descriptor instead. func (*ChatSpanSendReq) Descriptor() ([]byte, []int) { - return file_chat_chat_msg_proto_rawDescGZIP(), []int{5} + return file_chat_chat_msg_proto_rawDescGZIP(), []int{7} } func (x *ChatSpanSendReq) GetChannel() ChatChannel { @@ -323,7 +410,7 @@ type ChatSpanSendResp struct { func (x *ChatSpanSendResp) Reset() { *x = ChatSpanSendResp{} if protoimpl.UnsafeEnabled { - mi := &file_chat_chat_msg_proto_msgTypes[6] + mi := &file_chat_chat_msg_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -336,7 +423,7 @@ func (x *ChatSpanSendResp) String() string { func (*ChatSpanSendResp) ProtoMessage() {} func (x *ChatSpanSendResp) ProtoReflect() protoreflect.Message { - mi := &file_chat_chat_msg_proto_msgTypes[6] + mi := &file_chat_chat_msg_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -349,7 +436,7 @@ func (x *ChatSpanSendResp) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatSpanSendResp.ProtoReflect.Descriptor instead. func (*ChatSpanSendResp) Descriptor() ([]byte, []int) { - return file_chat_chat_msg_proto_rawDescGZIP(), []int{6} + return file_chat_chat_msg_proto_rawDescGZIP(), []int{8} } var File_chat_chat_msg_proto protoreflect.FileDescriptor @@ -360,27 +447,32 @@ var file_chat_chat_msg_proto_rawDesc = []byte{ 0x5f, 0x64, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x30, 0x0a, 0x0f, 0x43, 0x68, 0x61, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x12, 0x1d, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x44, 0x42, - 0x43, 0x68, 0x61, 0x74, 0x52, 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x22, 0x10, 0x0a, 0x0e, 0x43, - 0x68, 0x61, 0x74, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x22, 0x30, 0x0a, - 0x0f, 0x43, 0x68, 0x61, 0x74, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, - 0x12, 0x1d, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x07, 0x2e, 0x44, 0x42, 0x43, 0x68, 0x61, 0x74, 0x52, 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x22, - 0x6b, 0x0a, 0x0b, 0x43, 0x68, 0x61, 0x74, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x26, - 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, - 0x0c, 0x2e, 0x43, 0x68, 0x61, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x07, 0x63, - 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, - 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, - 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x0e, 0x0a, 0x0c, - 0x43, 0x68, 0x61, 0x74, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x22, 0x53, 0x0a, 0x0f, - 0x43, 0x68, 0x61, 0x74, 0x53, 0x70, 0x61, 0x6e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, - 0x26, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, - 0x32, 0x0c, 0x2e, 0x43, 0x68, 0x61, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x07, - 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, - 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, - 0x74, 0x22, 0x12, 0x0a, 0x10, 0x43, 0x68, 0x61, 0x74, 0x53, 0x70, 0x61, 0x6e, 0x53, 0x65, 0x6e, - 0x64, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x43, 0x68, 0x61, 0x74, 0x52, 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x22, 0x15, 0x0a, 0x13, 0x43, + 0x68, 0x61, 0x74, 0x43, 0x72, 0x6f, 0x73, 0x73, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, + 0x65, 0x71, 0x22, 0x34, 0x0a, 0x14, 0x43, 0x68, 0x61, 0x74, 0x43, 0x72, 0x6f, 0x73, 0x73, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x12, 0x1c, 0x0a, 0x09, 0x43, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x64, 0x22, 0x10, 0x0a, 0x0e, 0x43, 0x68, 0x61, 0x74, + 0x47, 0x65, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x22, 0x30, 0x0a, 0x0f, 0x43, 0x68, + 0x61, 0x74, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x1d, 0x0a, + 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x44, + 0x42, 0x43, 0x68, 0x61, 0x74, 0x52, 0x05, 0x43, 0x68, 0x61, 0x74, 0x73, 0x22, 0x6b, 0x0a, 0x0b, + 0x43, 0x68, 0x61, 0x74, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x26, 0x0a, 0x07, 0x63, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x43, + 0x68, 0x61, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x49, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x49, 0x64, 0x12, + 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x0e, 0x0a, 0x0c, 0x43, 0x68, 0x61, + 0x74, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x22, 0x53, 0x0a, 0x0f, 0x43, 0x68, 0x61, + 0x74, 0x53, 0x70, 0x61, 0x6e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x12, 0x26, 0x0a, 0x07, + 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, + 0x43, 0x68, 0x61, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x07, 0x63, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x22, 0x12, + 0x0a, 0x10, 0x43, 0x68, 0x61, 0x74, 0x53, 0x70, 0x61, 0x6e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, + 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -395,28 +487,30 @@ func file_chat_chat_msg_proto_rawDescGZIP() []byte { return file_chat_chat_msg_proto_rawDescData } -var file_chat_chat_msg_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_chat_chat_msg_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_chat_chat_msg_proto_goTypes = []interface{}{ - (*ChatMessagePush)(nil), // 0: ChatMessagePush - (*ChatGetListReq)(nil), // 1: ChatGetListReq - (*ChatGetListResp)(nil), // 2: ChatGetListResp - (*ChatSendReq)(nil), // 3: ChatSendReq - (*ChatSendResp)(nil), // 4: ChatSendResp - (*ChatSpanSendReq)(nil), // 5: ChatSpanSendReq - (*ChatSpanSendResp)(nil), // 6: ChatSpanSendResp - (*DBChat)(nil), // 7: DBChat - (ChatChannel)(0), // 8: ChatChannel + (*ChatMessagePush)(nil), // 0: ChatMessagePush + (*ChatCrossChannelReq)(nil), // 1: ChatCrossChannelReq + (*ChatCrossChannelResp)(nil), // 2: ChatCrossChannelResp + (*ChatGetListReq)(nil), // 3: ChatGetListReq + (*ChatGetListResp)(nil), // 4: ChatGetListResp + (*ChatSendReq)(nil), // 5: ChatSendReq + (*ChatSendResp)(nil), // 6: ChatSendResp + (*ChatSpanSendReq)(nil), // 7: ChatSpanSendReq + (*ChatSpanSendResp)(nil), // 8: ChatSpanSendResp + (*DBChat)(nil), // 9: DBChat + (ChatChannel)(0), // 10: ChatChannel } var file_chat_chat_msg_proto_depIdxs = []int32{ - 7, // 0: ChatMessagePush.Chats:type_name -> DBChat - 7, // 1: ChatGetListResp.Chats:type_name -> DBChat - 8, // 2: ChatSendReq.channel:type_name -> ChatChannel - 8, // 3: ChatSpanSendReq.channel:type_name -> ChatChannel - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 9, // 0: ChatMessagePush.Chats:type_name -> DBChat + 9, // 1: ChatGetListResp.Chats:type_name -> DBChat + 10, // 2: ChatSendReq.channel:type_name -> ChatChannel + 10, // 3: ChatSpanSendReq.channel:type_name -> ChatChannel + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_chat_chat_msg_proto_init() } @@ -439,7 +533,7 @@ func file_chat_chat_msg_proto_init() { } } file_chat_chat_msg_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChatGetListReq); i { + switch v := v.(*ChatCrossChannelReq); i { case 0: return &v.state case 1: @@ -451,7 +545,7 @@ func file_chat_chat_msg_proto_init() { } } file_chat_chat_msg_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChatGetListResp); i { + switch v := v.(*ChatCrossChannelResp); i { case 0: return &v.state case 1: @@ -463,7 +557,7 @@ func file_chat_chat_msg_proto_init() { } } file_chat_chat_msg_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChatSendReq); i { + switch v := v.(*ChatGetListReq); i { case 0: return &v.state case 1: @@ -475,7 +569,7 @@ func file_chat_chat_msg_proto_init() { } } file_chat_chat_msg_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChatSendResp); i { + switch v := v.(*ChatGetListResp); i { case 0: return &v.state case 1: @@ -487,7 +581,7 @@ func file_chat_chat_msg_proto_init() { } } file_chat_chat_msg_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChatSpanSendReq); i { + switch v := v.(*ChatSendReq); i { case 0: return &v.state case 1: @@ -499,6 +593,30 @@ func file_chat_chat_msg_proto_init() { } } file_chat_chat_msg_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChatSendResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chat_chat_msg_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChatSpanSendReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chat_chat_msg_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ChatSpanSendResp); i { case 0: return &v.state @@ -517,7 +635,7 @@ func file_chat_chat_msg_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_chat_chat_msg_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/pb/comm.pb.go b/pb/comm.pb.go index 00d712538..a4b1f5abc 100644 --- a/pb/comm.pb.go +++ b/pb/comm.pb.go @@ -159,10 +159,11 @@ type AgentMessage struct { Ip string `protobuf:"bytes,1,opt,name=Ip,proto3" json:"Ip"` UserSessionId string `protobuf:"bytes,2,opt,name=UserSessionId,proto3" json:"UserSessionId"` UserId string `protobuf:"bytes,3,opt,name=UserId,proto3" json:"UserId"` - GatewayServiceId string `protobuf:"bytes,4,opt,name=GatewayServiceId,proto3" json:"GatewayServiceId"` - MainType string `protobuf:"bytes,5,opt,name=MainType,proto3" json:"MainType"` - SubType string `protobuf:"bytes,6,opt,name=SubType,proto3" json:"SubType"` - Message *anypb.Any `protobuf:"bytes,7,opt,name=Message,proto3" json:"Message"` + ServiceTag string `protobuf:"bytes,4,opt,name=ServiceTag,proto3" json:"ServiceTag"` + GatewayServiceId string `protobuf:"bytes,5,opt,name=GatewayServiceId,proto3" json:"GatewayServiceId"` + MainType string `protobuf:"bytes,6,opt,name=MainType,proto3" json:"MainType"` + SubType string `protobuf:"bytes,7,opt,name=SubType,proto3" json:"SubType"` + Message *anypb.Any `protobuf:"bytes,8,opt,name=Message,proto3" json:"Message"` } func (x *AgentMessage) Reset() { @@ -218,6 +219,13 @@ func (x *AgentMessage) GetUserId() string { return "" } +func (x *AgentMessage) GetServiceTag() string { + if x != nil { + return x.ServiceTag + } + return "" +} + func (x *AgentMessage) GetGatewayServiceId() string { if x != nil { return x.GatewayServiceId @@ -851,20 +859,22 @@ var file_comm_proto_rawDesc = []byte{ 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x65, 0x63, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x65, 0x63, 0x22, 0xee, 0x01, 0x0a, 0x0c, 0x41, 0x67, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x65, 0x63, 0x22, 0x8e, 0x02, 0x0a, 0x0c, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x70, 0x12, 0x24, 0x0a, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, 0x74, 0x65, - 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x52, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x54, 0x61, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x54, 0x61, 0x67, 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, 0x74, 0x65, + 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x4d, 0x61, 0x69, 0x6e, 0x54, 0x79, 0x70, 0x65, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x4d, 0x61, 0x69, 0x6e, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x18, 0x0a, 0x07, 0x53, 0x75, 0x62, 0x54, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x4d, 0x61, 0x69, 0x6e, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x53, 0x75, 0x62, 0x54, 0x79, 0x70, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x53, 0x75, 0x62, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2e, 0x0a, 0x07, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xad, 0x01, 0x0a, 0x0f, 0x52, 0x50, 0x43, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1e, diff --git a/pb/proto/chat/chat_db.proto b/pb/proto/chat/chat_db.proto index 2cf9d4645..1f75bddde 100644 --- a/pb/proto/chat/chat_db.proto +++ b/pb/proto/chat/chat_db.proto @@ -15,7 +15,7 @@ message DBChat { ChatChannel channel = 2; //频道 string suid =3; //发送用户id string ruid = 4; //接收用户id channel == Private 有效 - string areaId = 5; //区服id + int32 areaId = 5; //跨服频道Id string unionId = 6; //工会id int32 headid = 7; //用户头像 string uname = 8; //用户名 diff --git a/pb/proto/chat/chat_msg.proto b/pb/proto/chat/chat_msg.proto index bc666bbf8..bf4acff7e 100644 --- a/pb/proto/chat/chat_msg.proto +++ b/pb/proto/chat/chat_msg.proto @@ -7,6 +7,15 @@ message ChatMessagePush{ repeated DBChat Chats = 1; } +//申请跨服频道号 +message ChatCrossChannelReq { + +} +//申请跨服频道号 回应 +message ChatCrossChannelResp { + int32 ChannelId = 1; +} + //请求未读消息 message ChatGetListReq { diff --git a/pb/proto/comm.proto b/pb/proto/comm.proto index ba2de3d20..9d468fe91 100644 --- a/pb/proto/comm.proto +++ b/pb/proto/comm.proto @@ -17,10 +17,11 @@ message AgentMessage { string Ip = 1; string UserSessionId = 2; string UserId = 3; - string GatewayServiceId = 4; - string MainType = 5; - string SubType = 6; - google.protobuf.Any Message = 7; + string ServiceTag = 4; + string GatewayServiceId = 5; + string MainType = 6; + string SubType = 7; + google.protobuf.Any Message = 8; } // RPC 服务固定回复结构 diff --git a/pb/proto/user/user_db.proto b/pb/proto/user/user_db.proto index 17fadb160..3b6aef8a9 100644 --- a/pb/proto/user/user_db.proto +++ b/pb/proto/user/user_db.proto @@ -2,10 +2,11 @@ syntax = "proto3"; option go_package = ".;pb"; message CacheUser { - string uid = 1; - string SessionId = 2; - string GatewayServiceId = 3; - string ip = 4; + string uid = 1; //用户id + string SessionId = 2; //会话id + string ServiceTag = 3; //所在服务集群 区服id + string GatewayServiceId = 4; //所在网关服务id + string ip = 5; //远程ip // DB_UserData UserData = 4; //@go_tags(`json:",inline"`) } diff --git a/pb/user_db.pb.go b/pb/user_db.pb.go index e9badda16..737ae3002 100644 --- a/pb/user_db.pb.go +++ b/pb/user_db.pb.go @@ -25,10 +25,11 @@ type CacheUser struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid"` - SessionId string `protobuf:"bytes,2,opt,name=SessionId,proto3" json:"SessionId"` - GatewayServiceId string `protobuf:"bytes,3,opt,name=GatewayServiceId,proto3" json:"GatewayServiceId"` - Ip string `protobuf:"bytes,4,opt,name=ip,proto3" json:",inline"` // DB_UserData UserData = 4; // + Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid"` //用户id + SessionId string `protobuf:"bytes,2,opt,name=SessionId,proto3" json:"SessionId"` //会话id + ServiceTag string `protobuf:"bytes,3,opt,name=ServiceTag,proto3" json:"ServiceTag"` //所在服务集群 区服id + GatewayServiceId string `protobuf:"bytes,4,opt,name=GatewayServiceId,proto3" json:"GatewayServiceId"` //所在网关服务id + Ip string `protobuf:"bytes,5,opt,name=ip,proto3" json:"ip"` //远程ip } func (x *CacheUser) Reset() { @@ -77,6 +78,13 @@ func (x *CacheUser) GetSessionId() string { return "" } +func (x *CacheUser) GetServiceTag() string { + if x != nil { + return x.ServiceTag + } + return "" +} + func (x *CacheUser) GetGatewayServiceId() string { if x != nil { return x.GatewayServiceId @@ -278,42 +286,44 @@ var File_user_user_db_proto protoreflect.FileDescriptor var file_user_user_db_proto_rawDesc = []byte{ 0x0a, 0x12, 0x75, 0x73, 0x65, 0x72, 0x2f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x64, 0x62, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x77, 0x0a, 0x09, 0x43, 0x61, 0x63, 0x68, 0x65, 0x55, 0x73, 0x65, - 0x72, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, - 0x64, 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, 0x74, - 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x0e, 0x0a, - 0x02, 0x69, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0xa6, 0x03, - 0x0a, 0x06, 0x44, 0x42, 0x55, 0x73, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, - 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x62, 0x69, 0x6e, 0x64, 0x75, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x62, 0x69, 0x6e, 0x64, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, - 0x73, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, 0x1a, - 0x0a, 0x08, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x69, 0x70, 0x12, 0x20, 0x0a, 0x0b, 0x6c, 0x61, - 0x73, 0x74, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x69, 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x69, 0x70, 0x12, 0x14, 0x0a, 0x05, - 0x63, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x74, 0x69, - 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x18, - 0x0a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x74, 0x69, 0x6d, 0x65, - 0x12, 0x20, 0x0a, 0x0b, 0x66, 0x72, 0x69, 0x65, 0x6e, 0x64, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x18, - 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x66, 0x72, 0x69, 0x65, 0x6e, 0x64, 0x50, 0x6f, 0x69, - 0x6e, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, 0x18, 0x0c, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x06, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x67, 0x6f, - 0x6c, 0x64, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x67, 0x6f, 0x6c, 0x64, 0x12, 0x10, - 0x0a, 0x03, 0x65, 0x78, 0x70, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x65, 0x78, 0x70, - 0x12, 0x18, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x6c, 0x76, - 0x18, 0x10, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x6c, 0x76, 0x12, 0x10, 0x0a, 0x03, 0x76, 0x69, - 0x70, 0x18, 0x11, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x76, 0x69, 0x70, 0x12, 0x18, 0x0a, 0x07, - 0x64, 0x69, 0x61, 0x6d, 0x6f, 0x6e, 0x64, 0x18, 0x12, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x64, - 0x69, 0x61, 0x6d, 0x6f, 0x6e, 0x64, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x01, 0x0a, 0x09, 0x43, 0x61, 0x63, 0x68, 0x65, 0x55, 0x73, + 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x54, 0x61, 0x67, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x54, + 0x61, 0x67, 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, + 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0xa6, + 0x03, 0x0a, 0x06, 0x44, 0x42, 0x55, 0x73, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x75, + 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, + 0x18, 0x0a, 0x07, 0x62, 0x69, 0x6e, 0x64, 0x75, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x62, 0x69, 0x6e, 0x64, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x73, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x69, 0x64, 0x12, + 0x1a, 0x0a, 0x08, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x69, 0x70, 0x12, 0x20, 0x0a, 0x0b, 0x6c, + 0x61, 0x73, 0x74, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x69, 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x69, 0x70, 0x12, 0x14, 0x0a, + 0x05, 0x63, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x74, + 0x69, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x74, 0x69, 0x6d, 0x65, + 0x18, 0x0a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x6c, 0x6f, 0x67, 0x69, 0x6e, 0x74, 0x69, 0x6d, + 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x66, 0x72, 0x69, 0x65, 0x6e, 0x64, 0x50, 0x6f, 0x69, 0x6e, 0x74, + 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x66, 0x72, 0x69, 0x65, 0x6e, 0x64, 0x50, 0x6f, + 0x69, 0x6e, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, 0x18, 0x0c, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x06, 0x61, 0x76, 0x61, 0x74, 0x61, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x67, + 0x6f, 0x6c, 0x64, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x67, 0x6f, 0x6c, 0x64, 0x12, + 0x10, 0x0a, 0x03, 0x65, 0x78, 0x70, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x65, 0x78, + 0x70, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x18, 0x0f, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x6c, + 0x76, 0x18, 0x10, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x6c, 0x76, 0x12, 0x10, 0x0a, 0x03, 0x76, + 0x69, 0x70, 0x18, 0x11, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x76, 0x69, 0x70, 0x12, 0x18, 0x0a, + 0x07, 0x64, 0x69, 0x61, 0x6d, 0x6f, 0x6e, 0x64, 0x18, 0x12, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, + 0x64, 0x69, 0x61, 0x6d, 0x6f, 0x6e, 0x64, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/services/comp_gateroute.go b/services/comp_gateroute.go index 180122b01..923b21f1e 100644 --- a/services/comp_gateroute.go +++ b/services/comp_gateroute.go @@ -128,7 +128,7 @@ func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessag this.mrlock.RUnlock() if ok { session := this.pools.Get().(comm.IUserSession) - session.SetSession(args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId) + session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId) defer func() { //回收 session.Reset() @@ -177,7 +177,7 @@ func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.Notice //获取用户的会话对象 func (this *SCompGateRoute) GetUserSession(udata *pb.CacheUser) (session comm.IUserSession) { session = this.pools.Get().(comm.IUserSession) - session.SetSession("", udata.SessionId, udata.GatewayServiceId, udata.Uid) + session.SetSession("", udata.SessionId, udata.ServiceTag, udata.GatewayServiceId, udata.Uid) return }