From 8515c267504cf316f05edab3d594d60dbc897b51 Mon Sep 17 00:00:00 2001 From: liwei <2211068034@qq.com> Date: Fri, 7 Jul 2023 18:04:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/rpcx/options.go | 25 ++++++++++++++----------- lego/sys/rpcx/rpcx.go | 18 ++++++++++-------- modules/worldtask/model_worldtask.go | 2 +- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/lego/sys/rpcx/options.go b/lego/sys/rpcx/options.go index cb231ab8d..507a807dc 100644 --- a/lego/sys/rpcx/options.go +++ b/lego/sys/rpcx/options.go @@ -18,16 +18,17 @@ const ( type Option func(*Options) type Options struct { - ServiceTag string //集群标签 - ServiceType string //服务类型 - ServiceId string //服务id - ServiceVersion string //服务版本 - ServiceAddr string //服务地址 - ConsulServers []string //Consul集群服务地址 - RpcxStartType RpcxStartType //Rpcx启动类型 - AutoConnect bool //自动连接 客户端启动模式下 主动连接发现的节点服务器 - SerializeType protocol.SerializeType - Debug bool //日志是否开启 + ServiceTag string //集群标签 + ServiceType string //服务类型 + ServiceId string //服务id + ServiceVersion string //服务版本 + ServiceAddr string //服务地址 + ConsulServers []string //Consul集群服务地址 + RpcxStartType RpcxStartType //Rpcx启动类型 + AutoConnect bool //自动连接 客户端启动模式下 主动连接发现的节点服务器 + SerializeType protocol.SerializeType //序列化方式 + OutTime int32 //超时配置 单位秒 0 无超时限制 + Debug bool //日志是否开启 Log log.ILogger } @@ -67,7 +68,7 @@ func SetConsulServers(v []string) Option { } } -//设置启动类型 +// 设置启动类型 func SetRpcxStartType(v RpcxStartType) Option { return func(o *Options) { o.RpcxStartType = v @@ -89,6 +90,7 @@ func newOptions(config map[string]interface{}, opts ...Option) (options *Options options = &Options{ AutoConnect: true, SerializeType: protocol.MsgPack, + OutTime: 5, } if config != nil { mapstructure.Decode(config, options) @@ -111,6 +113,7 @@ func newOptionsByOption(opts ...Option) (options *Options, err error) { options = &Options{ AutoConnect: true, SerializeType: protocol.MsgPack, + OutTime: 5, } for _, o := range opts { o(options) diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index f3c913af8..42aa6f0a9 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -24,6 +24,7 @@ func newSys(options *Options) (sys ISys, err error) { service, err = newService(options) client, err = newClient(options) sys = &RPCX{ + options: options, service: service, client: client, } @@ -31,6 +32,7 @@ func newSys(options *Options) (sys ISys, err error) { } type RPCX struct { + options *Options service ISys client ISys } @@ -48,7 +50,7 @@ func (this *RPCX) Stop() (err error) { return } -//获取服务集群列表 +// 获取服务集群列表 func (this *RPCX) GetServiceTags() []string { return this.GetServiceTags() } @@ -70,7 +72,7 @@ func (this *RPCX) UnregisterAll() (err error) { return } -//同步调用 +// 同步调用 func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { //先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接 err = this.service.Call(ctx, servicePath, serviceMethod, args, reply) @@ -80,7 +82,7 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st return } -//广播调用 +// 广播调用 func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { @@ -89,7 +91,7 @@ func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMeth return } -//异步调用 +// 异步调用 func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done) if err != nil && strings.Contains(err.Error(), "on found") { @@ -98,7 +100,7 @@ func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod stri return } -//跨服同步调用 +// 跨服同步调用 func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { @@ -107,7 +109,7 @@ func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, serv return } -//跨集群 广播 +// 跨集群 广播 func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { @@ -116,7 +118,7 @@ func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, return } -//跨服异步调用 +// 跨服异步调用 func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) if err != nil && strings.Contains(err.Error(), "on found") { @@ -125,7 +127,7 @@ func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servic return } -//全集群广播 +// 全集群广播 func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { diff --git a/modules/worldtask/model_worldtask.go b/modules/worldtask/model_worldtask.go index 0463cc930..5c0993bcb 100644 --- a/modules/worldtask/model_worldtask.go +++ b/modules/worldtask/model_worldtask.go @@ -209,7 +209,7 @@ func (this *ModelWorldtask) taskFinishPush(session comm.IUserSession, userTask * this.moduleWorldtask.Debug("nextTaskIds", log.Field{Key: "nextTaskIds", Value: nextTaskIds}) nextTask := make(map[int32]*pb.Worldtasks) - if len(nextTask) > 0 { + if len(nextTaskIds) > 0 { for _, next := range nextTaskIds { ut := this.updateCurrentTaskCond(session.GetUserId(), u.Lv, userTask, curTaskConf.Key, next) if ut != nil {