From d7a027933024114ec05a8c31925ffeed345d62fc Mon Sep 17 00:00:00 2001 From: liwei <2211068034@qq.com> Date: Mon, 10 Jul 2023 18:15:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E7=BD=91=E7=BB=9C=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/rpcx/rpcx.go | 81 ++++++++++++++++++++++++++++++++++------ modules/gateway/agent.go | 46 ++++++++++++++++------- modules/timer/parkour.go | 8 ++-- 3 files changed, 107 insertions(+), 28 deletions(-) diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index bc2a042a0..ba41d72cd 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -9,15 +9,15 @@ import ( ) func newSys(options *Options) (sys ISys, err error) { - if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 - sys, err = newService(options) - return - } + // if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 + // sys, err = newService(options) + // return + // } - if options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端 - sys, err = newClient(options) - return - } + // if options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端 + // sys, err = newClient(options) + // return + // } var ( service ISys client ISys @@ -83,6 +83,16 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st defer cancel() } + if this.options.RpcxStartType == RpcxStartByService { + err = this.service.Call(ctx, servicePath, serviceMethod, args, reply) + return + } + + if this.options.RpcxStartType == RpcxStartByClient { + err = this.client.Call(ctx, servicePath, serviceMethod, args, reply) + return + } + //先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接 err = this.service.Call(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { @@ -101,6 +111,15 @@ func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMeth defer cancel() } + if this.options.RpcxStartType == RpcxStartByService { + err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply) + return + } + if this.options.RpcxStartType == RpcxStartByClient { + err = this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) + return + } + err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) @@ -110,6 +129,16 @@ func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMeth // 异步调用 func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + + if this.options.RpcxStartType == RpcxStartByService { + call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done) + return + } + if this.options.RpcxStartType == RpcxStartByClient { + call, err = this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) + return + } + call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) @@ -126,7 +155,14 @@ func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, serv ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) defer cancel() } - + if this.options.RpcxStartType == RpcxStartByService { + err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + return + } + if this.options.RpcxStartType == RpcxStartByClient { + err = this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + return + } err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) @@ -143,7 +179,14 @@ func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) defer cancel() } - + if this.options.RpcxStartType == RpcxStartByService { + err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + return + } + if this.options.RpcxStartType == RpcxStartByClient { + err = this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + return + } err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) @@ -153,6 +196,15 @@ func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, // 跨服异步调用 func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + if this.options.RpcxStartType == RpcxStartByService { + call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + return + } + if this.options.RpcxStartType == RpcxStartByClient { + call, err = this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + return + } + call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) @@ -169,7 +221,14 @@ func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serv ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) defer cancel() } - + if this.options.RpcxStartType == RpcxStartByService { + err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + return + } + if this.options.RpcxStartType == RpcxStartByClient { + err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + return + } err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index 2d7d516e1..ee92aaa86 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -34,7 +34,7 @@ func newAgent(gateway IGateway, conn *websocket.Conn) *Agent { wsConn: conn, sessionId: id.NewUUId(), uId: "", - writeChan: make(chan []byte, 2), + writeChan: make(chan [][]byte, 2), closeSignal: make(chan bool), state: 1, protoMsg: make(map[string]int64, 0), @@ -52,7 +52,7 @@ type Agent struct { sessionId string uId string wId string - writeChan chan []byte + writeChan chan [][]byte closeSignal chan bool state int32 //状态 0 关闭 1 运行 2 关闭中 wg sync.WaitGroup @@ -153,14 +153,15 @@ locp: select { case <-this.closeSignal: break locp - case msg, ok := <-this.writeChan: + case msgs, ok := <-this.writeChan: if ok { - //data, err = proto.Marshal(msg) - if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil { - this.gateway.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) - go this.Close() + for _, msg := range msgs { + //data, err = proto.Marshal(msg) + if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil { + this.gateway.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) + go this.Close() + } } - } else { go this.Close() } @@ -278,18 +279,37 @@ func (this *Agent) WriteMsg(msg *pb.UserMessage) (err error) { var ( data []byte ) + if data, err = proto.Marshal(msg); err == nil { - this.writeChan <- data + this.writeChan <- [][]byte{data} } return } +func (this *Agent) WriteMsgs(msgs []*pb.UserMessage) (err error) { + if atomic.LoadInt32(&this.state) != 1 { + return + } + var ( + datas [][]byte = make([][]byte, 0) + data []byte + ) + for _, msg := range msgs { + if data, err = proto.Marshal(msg); err == nil { + datas = append(datas, data) + } + } + + this.writeChan <- datas + return +} + func (this *Agent) WriteBytes(data []byte) (err error) { if atomic.LoadInt32(&this.state) != 1 { err = fmt.Errorf("Uid%s Staet:%d", this.uId, this.state) return } - this.writeChan <- data + this.writeChan <- [][]byte{data} return } @@ -413,9 +433,9 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { if v.MainType == msg.MainType && v.SubType == msg.SubType { v.MsgId = msg.MsgId } - if err = this.WriteMsg(v); err != nil { - return - } + } + if err = this.WriteMsgs(reply.Reply); err != nil { + return } } return nil diff --git a/modules/timer/parkour.go b/modules/timer/parkour.go index 7a729828e..6883c1948 100644 --- a/modules/timer/parkour.go +++ b/modules/timer/parkour.go @@ -129,12 +129,12 @@ func (this *ParkourComp) match() { if !atomic.CompareAndSwapInt32(&this.timerlock, 1, 2) { //正在执行,就不要在进来了 return } - startime := time.Now() + // startime := time.Now() defer func() { atomic.StoreInt32(&this.timerlock, 1) //执行完毕释放 - log.Debug("Parkour Match", - log.Field{Key: "t", Value: time.Since(startime).Milliseconds()}, - ) + // log.Debug("Parkour Match", + // log.Field{Key: "t", Value: time.Since(startime).Milliseconds()}, + // ) }() // this.module.Errorf("捕羊大赛 定时匹配,%d", this.total)