上传战斗服务多节点负载均衡

This commit is contained in:
liwei1dao 2022-11-28 15:26:39 +08:00
parent 91a9c65d03
commit 016124983b
2 changed files with 43 additions and 19 deletions

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/log"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -21,10 +22,8 @@ type clientComp struct {
options *Options options *Options
service core.IService service core.IService
module *Battle module *Battle
conn *websocket.Conn clinets []*client
seq uint64 i int
pendingmutex sync.Mutex
pending map[uint64]*MessageCall //高并发回调
} }
//组件初始化接口 //组件初始化接口
@ -32,19 +31,44 @@ func (this *clientComp) Init(service core.IService, module core.IModule, comp co
this.MCompGate.Init(service, module, comp, options) this.MCompGate.Init(service, module, comp, options)
this.options = options.(*Options) this.options = options.(*Options)
this.module = module.(*Battle) this.module = module.(*Battle)
this.pending = make(map[uint64]*MessageCall) this.clinets = make([]*client, len(this.options.BattleServerAddr))
return return
} }
func (this *clientComp) Start() (err error) { func (this *clientComp) Start() (err error) {
err = this.MCompGate.Start() err = this.MCompGate.Start()
dialer := websocket.Dialer{} for i, v := range this.options.BattleServerAddr {
this.conn, _, err = dialer.Dial(this.options.BattleServerAddr, nil) if this.clinets[i], err = newClient(v, this.options.Log); err != nil {
return
}
}
return return
} }
//校验战斗过程
func (this *clientComp) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) { func (this *clientComp) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) {
i := this.i % len(this.clinets)
this.i += 1
err = this.clinets[i].callBattle(ctx, method, req, reply)
return
}
func newClient(addr string, log log.ILogger) (c *client, err error) {
c = &client{log: log, pending: make(map[uint64]*MessageCall)}
dialer := websocket.Dialer{}
c.conn, _, err = dialer.Dial(addr, nil)
return
}
type client struct {
log log.ILogger
conn *websocket.Conn
seq uint64
pendingmutex sync.Mutex
pending map[uint64]*MessageCall //高并发回调
}
//校验战斗过程
func (this *client) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) {
call := new(MessageCall) call := new(MessageCall)
call.Done = make(chan *MessageCall, 10) call.Done = make(chan *MessageCall, 10)
call.Args = req call.Args = req
@ -74,7 +98,7 @@ func (this *clientComp) callBattle(ctx context.Context, method string, req proto
this.pendingmutex.Unlock() this.pendingmutex.Unlock()
if call != nil { if call != nil {
call.Error = ctx.Err() call.Error = ctx.Err()
call.done(this.options.Log) call.done(this.log)
} }
return ctx.Err() return ctx.Err()
case call := <-call.Done: case call := <-call.Done:
@ -83,7 +107,7 @@ func (this *clientComp) callBattle(ctx context.Context, method string, req proto
return return
} }
func (this *clientComp) run() { func (this *client) run() {
var ( var (
data []byte data []byte
msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{} msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{}
@ -92,23 +116,23 @@ func (this *clientComp) run() {
locp: locp:
for { for {
if _, data, err = this.conn.ReadMessage(); err != nil { if _, data, err = this.conn.ReadMessage(); err != nil {
this.module.Errorf("client err:%v", err) this.log.Errorf("client err:%v", err)
break locp break locp
} }
if err = proto.Unmarshal(data, msg); err != nil { if err = proto.Unmarshal(data, msg); err != nil {
this.module.Errorf("client Unmarshal err:%v", err) this.log.Errorf("client Unmarshal err:%v", err)
break locp break locp
} }
go this.handleresponse(msg) go this.handleresponse(msg)
} }
} }
func (this *clientComp) handleresponse(resp *pb.BattleRpcMessage) { func (this *client) handleresponse(resp *pb.BattleRpcMessage) {
var call *MessageCall var call *MessageCall
this.pendingmutex.Lock() this.pendingmutex.Lock()
call = this.pending[resp.Rid] call = this.pending[resp.Rid]
delete(this.pending, resp.Rid) delete(this.pending, resp.Rid)
this.pendingmutex.Unlock() this.pendingmutex.Unlock()
call.Error = resp.Data.UnmarshalTo(call.Reply) call.Error = resp.Data.UnmarshalTo(call.Reply)
call.done(this.options.Log) call.done(this.log)
} }

View File

@ -13,7 +13,7 @@ type (
} }
Options struct { Options struct {
modules.Options modules.Options
BattleServerAddr string BattleServerAddr []string
} }
) )
@ -25,7 +25,7 @@ func (this *Options) GetLog() log.ILogger {
return this.Log return this.Log
} }
func (this *Options) GetBattleServerAddr() string { func (this *Options) GetBattleServerAddr() []string {
return this.BattleServerAddr return this.BattleServerAddr
} }