上传网络优化

This commit is contained in:
liwei 2023-07-10 18:15:02 +08:00
parent 82dfe43457
commit d7a0279330
3 changed files with 107 additions and 28 deletions

View File

@ -9,15 +9,15 @@ import (
)
func newSys(options *Options) (sys ISys, err error) {
if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端
sys, err = newService(options)
return
}
// if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端
// sys, err = newService(options)
// return
// }
if options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端
sys, err = newClient(options)
return
}
// if options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端
// sys, err = newClient(options)
// return
// }
var (
service ISys
client ISys
@ -83,6 +83,16 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st
defer cancel()
}
if this.options.RpcxStartType == RpcxStartByService {
err = this.service.Call(ctx, servicePath, serviceMethod, args, reply)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
err = this.client.Call(ctx, servicePath, serviceMethod, args, reply)
return
}
//先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接
err = this.service.Call(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
@ -101,6 +111,15 @@ func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMeth
defer cancel()
}
if this.options.RpcxStartType == RpcxStartByService {
err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
err = this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply)
return
}
err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply)
@ -110,6 +129,16 @@ func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMeth
// 异步调用
func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
if this.options.RpcxStartType == RpcxStartByService {
call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
call, err = this.client.Go(ctx, servicePath, serviceMethod, args, reply, done)
return
}
call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done)
@ -126,7 +155,14 @@ func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, serv
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
if this.options.RpcxStartType == RpcxStartByService {
err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
err = this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
return
}
err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
@ -143,7 +179,14 @@ func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string,
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
if this.options.RpcxStartType == RpcxStartByService {
err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
err = this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
return
}
err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
@ -153,6 +196,15 @@ func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string,
// 跨服异步调用
func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
if this.options.RpcxStartType == RpcxStartByService {
call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
call, err = this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
return
}
call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
@ -169,7 +221,14 @@ func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serv
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
if this.options.RpcxStartType == RpcxStartByService {
err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
return
}
if this.options.RpcxStartType == RpcxStartByClient {
err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
return
}
err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)

View File

@ -34,7 +34,7 @@ func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
wsConn: conn,
sessionId: id.NewUUId(),
uId: "",
writeChan: make(chan []byte, 2),
writeChan: make(chan [][]byte, 2),
closeSignal: make(chan bool),
state: 1,
protoMsg: make(map[string]int64, 0),
@ -52,7 +52,7 @@ type Agent struct {
sessionId string
uId string
wId string
writeChan chan []byte
writeChan chan [][]byte
closeSignal chan bool
state int32 //状态 0 关闭 1 运行 2 关闭中
wg sync.WaitGroup
@ -153,14 +153,15 @@ locp:
select {
case <-this.closeSignal:
break locp
case msg, ok := <-this.writeChan:
case msgs, ok := <-this.writeChan:
if ok {
for _, msg := range msgs {
//data, err = proto.Marshal(msg)
if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
this.gateway.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err)
go this.Close()
}
}
} else {
go this.Close()
}
@ -278,18 +279,37 @@ func (this *Agent) WriteMsg(msg *pb.UserMessage) (err error) {
var (
data []byte
)
if data, err = proto.Marshal(msg); err == nil {
this.writeChan <- data
this.writeChan <- [][]byte{data}
}
return
}
func (this *Agent) WriteMsgs(msgs []*pb.UserMessage) (err error) {
if atomic.LoadInt32(&this.state) != 1 {
return
}
var (
datas [][]byte = make([][]byte, 0)
data []byte
)
for _, msg := range msgs {
if data, err = proto.Marshal(msg); err == nil {
datas = append(datas, data)
}
}
this.writeChan <- datas
return
}
func (this *Agent) WriteBytes(data []byte) (err error) {
if atomic.LoadInt32(&this.state) != 1 {
err = fmt.Errorf("Uid%s Staet:%d", this.uId, this.state)
return
}
this.writeChan <- data
this.writeChan <- [][]byte{data}
return
}
@ -413,9 +433,9 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
if v.MainType == msg.MainType && v.SubType == msg.SubType {
v.MsgId = msg.MsgId
}
if err = this.WriteMsg(v); err != nil {
return
}
if err = this.WriteMsgs(reply.Reply); err != nil {
return
}
}
return nil

View File

@ -129,12 +129,12 @@ func (this *ParkourComp) match() {
if !atomic.CompareAndSwapInt32(&this.timerlock, 1, 2) { //正在执行,就不要在进来了
return
}
startime := time.Now()
// startime := time.Now()
defer func() {
atomic.StoreInt32(&this.timerlock, 1) //执行完毕释放
log.Debug("Parkour Match",
log.Field{Key: "t", Value: time.Since(startime).Milliseconds()},
)
// log.Debug("Parkour Match",
// log.Field{Key: "t", Value: time.Since(startime).Milliseconds()},
// )
}()
// this.module.Errorf("捕羊大赛 定时匹配,%d", this.total)