From 679e79abdd5200dd4a15e4f4b122328dc29fc1c6 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Thu, 24 Nov 2022 16:30:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=88=98=E6=96=97rpc?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=9F=BF=E8=AE=BE=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/battle/call.go | 22 ++++++++++++-- modules/battle/client.go | 65 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/modules/battle/call.go b/modules/battle/call.go index 5f310a06f..fb8d99eae 100644 --- a/modules/battle/call.go +++ b/modules/battle/call.go @@ -1,12 +1,28 @@ package battle +import ( + "go_dreamfactory/lego/sys/log" + + "google.golang.org/protobuf/proto" +) + //异步返回结构 type MessageCall struct { Method string Metadata map[string]string ResMetadata map[string]string - Args interface{} //请求参数 - Reply interface{} //返回参数 - Error error //错误信息 + Args proto.Message //请求参数 + Reply proto.Message //返回参数 + Error error //错误信息 Done chan *MessageCall } + +func (call *MessageCall) done(log log.Ilogf) { + select { + case call.Done <- call: + // ok + default: + log.Debugf("rpc: discarding Call reply due to insufficient Done chan capacity") + + } +} diff --git a/modules/battle/client.go b/modules/battle/client.go index e58bdbd01..d9905a80d 100644 --- a/modules/battle/client.go +++ b/modules/battle/client.go @@ -1,13 +1,16 @@ package battle import ( + "context" "go_dreamfactory/modules" "go_dreamfactory/pb" + "sync" "go_dreamfactory/lego/core" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) /* @@ -15,10 +18,13 @@ import ( */ type clientComp struct { modules.MCompGate - options *Options - service core.IService - module *Battle - conn *websocket.Conn + options *Options + service core.IService + module *Battle + conn *websocket.Conn + seq uint64 + pendingmutex sync.Mutex + pending map[uint64]*MessageCall //高并发回调 } //组件初始化接口 @@ -26,6 +32,7 @@ func (this *clientComp) Init(service core.IService, module core.IModule, comp co this.MCompGate.Init(service, module, comp, options) this.options = options.(*Options) this.module = module.(*Battle) + this.pending = make(map[uint64]*MessageCall) return } @@ -37,8 +44,43 @@ func (this *clientComp) Start() (err error) { } //校验战斗过程 -func (this *clientComp) CheckBattle() { - +func (this *clientComp) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) { + call := new(MessageCall) + call.Done = make(chan *MessageCall, 10) + call.Args = req + call.Reply = reply + this.pendingmutex.Lock() + seq := this.seq + this.seq++ + this.pending[seq] = call + this.pendingmutex.Unlock() + msg := &pb.BattleRpcMessage{ + Rid: seq, + Method: method, + } + msg.Data, _ = anypb.New(req) + data, _ := proto.Marshal(msg) + if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil { + this.pendingmutex.Lock() + delete(this.pending, seq) + this.pendingmutex.Unlock() + return + } + select { + case <-ctx.Done(): // cancel by context + this.pendingmutex.Lock() + call := this.pending[seq] + delete(this.pending, seq) + this.pendingmutex.Unlock() + if call != nil { + call.Error = ctx.Err() + call.done(this.options.Log) + } + return ctx.Err() + case call := <-call.Done: + err = call.Error + } + return } func (this *clientComp) run() { @@ -57,5 +99,16 @@ locp: this.module.Errorf("client Unmarshal err:%v", err) break locp } + go this.handleresponse(msg) } } + +func (this *clientComp) handleresponse(resp *pb.BattleRpcMessage) { + var call *MessageCall + this.pendingmutex.Lock() + call = this.pending[resp.Rid] + delete(this.pending, resp.Rid) + this.pendingmutex.Unlock() + call.Error = resp.Data.UnmarshalTo(call.Reply) + call.done(this.options.Log) +}