上传战斗rpc消息矿设计

This commit is contained in:
liwei1dao 2022-11-24 16:30:14 +08:00
parent 85c174a498
commit 679e79abdd
2 changed files with 78 additions and 9 deletions

View File

@ -1,12 +1,28 @@
package battle package battle
import (
"go_dreamfactory/lego/sys/log"
"google.golang.org/protobuf/proto"
)
//异步返回结构 //异步返回结构
type MessageCall struct { type MessageCall struct {
Method string Method string
Metadata map[string]string Metadata map[string]string
ResMetadata map[string]string ResMetadata map[string]string
Args interface{} //请求参数 Args proto.Message //请求参数
Reply interface{} //返回参数 Reply proto.Message //返回参数
Error error //错误信息 Error error //错误信息
Done chan *MessageCall Done chan *MessageCall
} }
func (call *MessageCall) done(log log.Ilogf) {
select {
case call.Done <- call:
// ok
default:
log.Debugf("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}

View File

@ -1,13 +1,16 @@
package battle package battle
import ( import (
"context"
"go_dreamfactory/modules" "go_dreamfactory/modules"
"go_dreamfactory/pb" "go_dreamfactory/pb"
"sync"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
) )
/* /*
@ -19,6 +22,9 @@ type clientComp struct {
service core.IService service core.IService
module *Battle module *Battle
conn *websocket.Conn conn *websocket.Conn
seq uint64
pendingmutex sync.Mutex
pending map[uint64]*MessageCall //高并发回调
} }
//组件初始化接口 //组件初始化接口
@ -26,6 +32,7 @@ func (this *clientComp) Init(service core.IService, module core.IModule, comp co
this.MCompGate.Init(service, module, comp, options) this.MCompGate.Init(service, module, comp, options)
this.options = options.(*Options) this.options = options.(*Options)
this.module = module.(*Battle) this.module = module.(*Battle)
this.pending = make(map[uint64]*MessageCall)
return return
} }
@ -37,8 +44,43 @@ func (this *clientComp) Start() (err error) {
} }
//校验战斗过程 //校验战斗过程
func (this *clientComp) CheckBattle() { func (this *clientComp) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) {
call := new(MessageCall)
call.Done = make(chan *MessageCall, 10)
call.Args = req
call.Reply = reply
this.pendingmutex.Lock()
seq := this.seq
this.seq++
this.pending[seq] = call
this.pendingmutex.Unlock()
msg := &pb.BattleRpcMessage{
Rid: seq,
Method: method,
}
msg.Data, _ = anypb.New(req)
data, _ := proto.Marshal(msg)
if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil {
this.pendingmutex.Lock()
delete(this.pending, seq)
this.pendingmutex.Unlock()
return
}
select {
case <-ctx.Done(): // cancel by context
this.pendingmutex.Lock()
call := this.pending[seq]
delete(this.pending, seq)
this.pendingmutex.Unlock()
if call != nil {
call.Error = ctx.Err()
call.done(this.options.Log)
}
return ctx.Err()
case call := <-call.Done:
err = call.Error
}
return
} }
func (this *clientComp) run() { func (this *clientComp) run() {
@ -57,5 +99,16 @@ locp:
this.module.Errorf("client Unmarshal err:%v", err) this.module.Errorf("client Unmarshal err:%v", err)
break locp break locp
} }
go this.handleresponse(msg)
} }
} }
func (this *clientComp) handleresponse(resp *pb.BattleRpcMessage) {
var call *MessageCall
this.pendingmutex.Lock()
call = this.pending[resp.Rid]
delete(this.pending, resp.Rid)
this.pendingmutex.Unlock()
call.Error = resp.Data.UnmarshalTo(call.Reply)
call.done(this.options.Log)
}