diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 42aa6f0a9..bc2a042a0 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -3,6 +3,7 @@ package rpcx import ( "context" "strings" + "time" "github.com/smallnest/rpcx/client" ) @@ -74,6 +75,14 @@ func (this *RPCX) UnregisterAll() (err error) { // 同步调用 func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + cancel context.CancelFunc + ) + if this.options.OutTime > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) + defer cancel() + } + //先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接 err = this.service.Call(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { @@ -84,6 +93,14 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st // 广播调用 func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + cancel context.CancelFunc + ) + if this.options.OutTime > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) + defer cancel() + } + err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) @@ -102,6 +119,14 @@ func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod stri // 跨服同步调用 func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + cancel context.CancelFunc + ) + if this.options.OutTime > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) + defer cancel() + } + err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) @@ -111,6 +136,14 @@ func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, serv // 跨集群 广播 func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + cancel context.CancelFunc + ) + if this.options.OutTime > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) + defer cancel() + } + err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) @@ -129,6 +162,14 @@ func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servic // 全集群广播 func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + cancel context.CancelFunc + ) + if this.options.OutTime > 0 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second) + defer cancel() + } + err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index cd221f15f..dfeacc74f 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -59,7 +59,7 @@ type Service struct { pending map[uint64]*client.Call } -//RPC 服务启动 +// RPC 服务启动 func (this *Service) Start() (err error) { go func() { if err = this.server.Serve("tcp", this.options.ServiceAddr); err != nil { @@ -69,13 +69,13 @@ func (this *Service) Start() (err error) { return } -//服务停止 +// 服务停止 func (this *Service) Stop() (err error) { err = this.server.Close() return } -//获取服务集群列表 +// 获取服务集群列表 func (this *Service) GetServiceTags() []string { this.selectormutex.RLock() tags := make([]string, len(this.selectors)) @@ -88,25 +88,25 @@ func (this *Service) GetServiceTags() []string { return tags } -//注册RPC 服务 +// 注册RPC 服务 func (this *Service) RegisterFunction(fn interface{}) (err error) { err = this.server.RegisterFunction(this.options.ServiceType, fn, this.metadata) return } -//注册RPC 服务 +// 注册RPC 服务 func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) { err = this.server.RegisterFunctionName(this.options.ServiceType, name, fn, this.metadata) return } -//注销 暂时不处理 +// 注销 暂时不处理 func (this *Service) UnregisterAll() (err error) { // err = this.server.UnregisterAll() return } -//同步调用远程服务 +// 同步调用远程服务 func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { var ( done *client.Call @@ -114,7 +114,7 @@ func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod ) seq := new(uint64) ctx = context.WithValue(ctx, seqKey{}, seq) - if conn, done, err = this.call(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { + if conn, done, err = this.call(&ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { return } select { @@ -142,19 +142,19 @@ func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod return } -//广播调用 +// 广播调用 func (this *Service) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.broadcast(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply) return } -//异步调用 远程服务 +// 异步调用 远程服务 func (this *Service) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) { - _, _call, err = this.call(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, done) + _, _call, err = this.call(&ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, done) return } -//跨服 同步调用 远程服务 +// 跨服 同步调用 远程服务 func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { var ( done *client.Call @@ -162,7 +162,7 @@ func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, s ) seq := new(uint64) ctx = context.WithValue(ctx, seqKey{}, seq) - if conn, done, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { + if conn, done, err = this.call(&ctx, clusterTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { return } select { @@ -190,25 +190,25 @@ func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, s return } -//跨集群 广播 +// 跨集群 广播 func (this *Service) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.broadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) return } -//跨服 异步调用 远程服务 +// 跨服 异步调用 远程服务 func (this *Service) AcrossClusterGo(ctx context.Context, clusterTag, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) { - _, _call, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + _, _call, err = this.call(&ctx, clusterTag, servicePath, serviceMethod, args, reply, done) return } -//全集群广播 +// 全集群广播 func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.clusterbroadcast(ctx, servicePath, serviceMethod, args, reply) return } -//监控rpc连接收到的请求消息 处理消息回调请求 +// 监控rpc连接收到的请求消息 处理消息回调请求 func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error { if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage { var ( @@ -291,14 +291,14 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e return nil } -//客户端配置 AutoConnect 的默认连接函数 +// 客户端配置 AutoConnect 的默认连接函数 func (this *Service) RpcxShakeHands(ctx context.Context, args *ServiceNode, reply *ServiceNode) error { // this.Debugf("RpcxShakeHands:%+v", ctx.Value(share.ReqMetaDataKey).(map[string]string)) return nil } -//执行远程调用 -func (this *Service) call(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) { +// 执行远程调用 +func (this *Service) call(ctx *context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) { var ( spath []string clientaddr string @@ -317,7 +317,7 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st ServiceMetaKey: this.metadata, } spath = strings.Split(servicePath, "/") - ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + *ctx = context.WithValue(*ctx, share.ReqMetaDataKey, map[string]string{ CallRoutRulesKey: servicePath, ServiceAddrKey: "tcp@" + this.options.ServiceAddr, ServiceMetaKey: this.metadata, @@ -330,7 +330,7 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st return } - if clientaddr = selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { + if clientaddr = selector.Select(*ctx, spath[0], serviceMethod, args); clientaddr == "" { err = fmt.Errorf("on found servicePath:%s", servicePath) return } @@ -354,11 +354,11 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st } } _call.Done = done - this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) + this.send(*ctx, conn, spath[0], serviceMethod, metadata, _call) return } -//执行远程调用 +// 执行远程调用 func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { var ( spath []string @@ -412,8 +412,9 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa _call.Args = args _call.Reply = reply _call.Done = make(chan *client.Call, 10) + seq := new(uint64) + ctx = context.WithValue(ctx, seqKey{}, seq) this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) - seq, _ := ctx.Value(seqKey{}).(*uint64) select { case <-ctx.Done(): // cancel by context this.mutex.Lock() @@ -457,7 +458,7 @@ check: return err } -//全集群广播 +// 全集群广播 func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { var ( spath []string @@ -563,7 +564,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s return } -//发送远程调用请求 +// 发送远程调用请求 func (this *Service) send(ctx context.Context, conn net.Conn, servicePath string, serviceMethod string, metadata map[string]string, call *client.Call) { defer func() { if call.Error != nil { diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index ae6d318b3..2d7d516e1 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -355,11 +355,9 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { } stime := time.Now() // this.gateway.Debugf("----------3 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType) - // ctx, _ := context.WithTimeout(context.Background(), time.Second*5) - ctx := context.Background() if len(serviceTag) == 0 { // this.gateway.Debugf("----------4 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType) - if err = this.gateway.Service().RpcCall(ctx, servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil { + if err = this.gateway.Service().RpcCall(context.Background(), servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil { this.gateway.Error("[UserResponse]", log.Field{Key: "uid", Value: this.uId}, log.Field{Key: "serviceTag", Value: serviceTag}, @@ -374,7 +372,7 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { if msg.ServicePath != "" { //客户端是否制定目标服务器 /wroker/woker0 servicePath = msg.ServicePath } - if err = this.gateway.Service().AcrossClusterRpcCall(ctx, serviceTag, servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil { + if err = this.gateway.Service().AcrossClusterRpcCall(context.Background(), serviceTag, servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil { this.gateway.Error("[UserResponse]", log.Field{Key: "uid", Value: this.uId}, log.Field{Key: "serviceTag", Value: serviceTag}, diff --git a/modules/parkour/module.go b/modules/parkour/module.go index c96d9c019..3fc235555 100644 --- a/modules/parkour/module.go +++ b/modules/parkour/module.go @@ -96,9 +96,8 @@ func (this *Parkour) AddMounts(session comm.IUserSession, mounts map[string]int3 // 匹配 func (this *Parkour) match(team *pb.DBParkour) (err error) { - ctx, _ := context.WithTimeout(context.Background(), time.Second*5) err = this.service.RpcCall( - ctx, + context.Background(), comm.Service_Mainte, string(comm.RPC_ParkourJoinMatch), &pb.RPCParkourJoinMatchReq{Captainid: team.Captainid, Member: team.Member}, diff --git a/modules/timer/parkour.go b/modules/timer/parkour.go index af31002ee..7a729828e 100644 --- a/modules/timer/parkour.go +++ b/modules/timer/parkour.go @@ -36,7 +36,7 @@ type ParkourComp struct { total int32 } -//组件初始化接口 +// 组件初始化接口 func (this *ParkourComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.MCompConfigure.Init(service, module, comp, options) this.service = service.(base.IRPCXService) @@ -46,7 +46,7 @@ func (this *ParkourComp) Init(service core.IService, module core.IModule, comp c return } -//自由跨服环境下开启此功能 +// 自由跨服环境下开启此功能 func (this *ParkourComp) Start() (err error) { err = this.MCompConfigure.Start() if db.IsCross() { @@ -63,7 +63,7 @@ func (this *ParkourComp) Start() (err error) { return } -//刷新推荐列表 +// 刷新推荐列表 func (this *ParkourComp) refreshlist() { var ( c *mongo.Cursor @@ -103,7 +103,7 @@ func (this *ParkourComp) refreshlist() { return } -//加入匹配中 +// 加入匹配中 func (this *ParkourComp) join(ctx context.Context, req *pb.RPCParkourJoinMatchReq, resp *pb.RPCParkourJoinMatchResp) (err error) { this.tlock.Lock() this.teams[req.Captainid] = req.Member @@ -115,7 +115,7 @@ func (this *ParkourComp) join(ctx context.Context, req *pb.RPCParkourJoinMatchRe return } -//加入匹配中 +// 加入匹配中 func (this *ParkourComp) cancel(ctx context.Context, req *pb.RPCParkourCancelMatchReq, resp *pb.RPCParkourCancelMatchResp) (err error) { this.tlock.Lock() delete(this.teams, req.Captainid) @@ -123,7 +123,7 @@ func (this *ParkourComp) cancel(ctx context.Context, req *pb.RPCParkourCancelMat return } -//定时匹配处理 +// 定时匹配处理 func (this *ParkourComp) match() { // this.module.Debug("执行一次匹配!") if !atomic.CompareAndSwapInt32(&this.timerlock, 1, 2) { //正在执行,就不要在进来了 @@ -324,8 +324,7 @@ func (this *ParkourComp) match() { } } - ctx, _ := context.WithTimeout(context.Background(), time.Second*2) - if err = this.service.RpcCall(ctx, + if err = this.service.RpcCall(context.Background(), comm.Service_Worker, string(comm.RPC_ParkourMatchSucc), &pb.RPCParkourMatchSuccReq{Red: reduser, Bule: buleuser}, diff --git a/modules/web/api_paydelivery.go b/modules/web/api_paydelivery.go index 64c27e481..d860b50ae 100644 --- a/modules/web/api_paydelivery.go +++ b/modules/web/api_paydelivery.go @@ -10,7 +10,6 @@ import ( "go_dreamfactory/pb" "go_dreamfactory/utils" "net/http" - "time" ) type PayDeliveryResults struct { @@ -18,7 +17,7 @@ type PayDeliveryResults struct { State int32 `json:"state"` } -//充值发货 +// 充值发货 func (this *Api_Comp) PayDelivery(c *engine.Context) { uid := c.Query("uid") pid := c.Query("productid") @@ -63,9 +62,8 @@ func (this *Api_Comp) PayDelivery(c *engine.Context) { log.Errorf("签名错误 sign:%s _sign:%s", sign, _sign) return } - ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) if err := this.module.service.RpcCall( - ctx, + context.TODO(), comm.Service_Worker, string(comm.Rpc_ModulePayDelivery), payreq, diff --git a/modules/wtask/api_info.go b/modules/wtask/api_info.go index cf07d7612..bc81b2c04 100644 --- a/modules/wtask/api_info.go +++ b/modules/wtask/api_info.go @@ -32,6 +32,9 @@ func (this *apiComp) Info(session comm.IUserSession, req *pb.WTaskInfoReq) (errd if progress, errdata = this.module.fishtask(session, wtask, false); errdata != nil { return } + if progress, errdata = this.module.pushtaskprogress(session, wtask, false); errdata != nil { + return + } session.SendMsg(string(this.module.GetType()), "info", &pb.WTaskInfoResp{Info: wtask, Accepts: progress}) if err = this.module.modelwtask.updateUserWTasks(session.GetUserId(), wtask); err != nil { errdata = &pb.ErrorData{ diff --git a/modules/wtask/module.go b/modules/wtask/module.go index e94c3125a..0eb06677d 100644 --- a/modules/wtask/module.go +++ b/modules/wtask/module.go @@ -467,7 +467,7 @@ func (this *WTask) inquireActivations(session comm.IUserSession, wtask *pb.DBWTa //有新任务接取 if ispush && changeActiva { - session.SendMsg(string(this.GetType()), "activations", &pb.WTaskActivationsChangePush{Activations: wtask.Activations}) + session.SendMsg(string(this.GetType()), "activationschange", &pb.WTaskActivationsChangePush{Activations: wtask.Activations}) } if changeAccept {