上传网络优化和任务系统修复

This commit is contained in:
liwei 2023-07-10 16:57:16 +08:00
parent 574efd1a81
commit bf2f0eaef8
8 changed files with 86 additions and 47 deletions

View File

@ -3,6 +3,7 @@ package rpcx
import (
"context"
"strings"
"time"
"github.com/smallnest/rpcx/client"
)
@ -74,6 +75,14 @@ func (this *RPCX) UnregisterAll() (err error) {
// 同步调用
func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
cancel context.CancelFunc
)
if this.options.OutTime > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
//先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接
err = this.service.Call(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
@ -84,6 +93,14 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st
// 广播调用
func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
cancel context.CancelFunc
)
if this.options.OutTime > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply)
@ -102,6 +119,14 @@ func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod stri
// 跨服同步调用
func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
cancel context.CancelFunc
)
if this.options.OutTime > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
@ -111,6 +136,14 @@ func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, serv
// 跨集群 广播
func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
cancel context.CancelFunc
)
if this.options.OutTime > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
@ -129,6 +162,14 @@ func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servic
// 全集群广播
func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
cancel context.CancelFunc
)
if this.options.OutTime > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(this.options.OutTime)*time.Second)
defer cancel()
}
err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)

View File

@ -59,7 +59,7 @@ type Service struct {
pending map[uint64]*client.Call
}
//RPC 服务启动
// RPC 服务启动
func (this *Service) Start() (err error) {
go func() {
if err = this.server.Serve("tcp", this.options.ServiceAddr); err != nil {
@ -69,13 +69,13 @@ func (this *Service) Start() (err error) {
return
}
//服务停止
// 服务停止
func (this *Service) Stop() (err error) {
err = this.server.Close()
return
}
//获取服务集群列表
// 获取服务集群列表
func (this *Service) GetServiceTags() []string {
this.selectormutex.RLock()
tags := make([]string, len(this.selectors))
@ -88,25 +88,25 @@ func (this *Service) GetServiceTags() []string {
return tags
}
//注册RPC 服务
// 注册RPC 服务
func (this *Service) RegisterFunction(fn interface{}) (err error) {
err = this.server.RegisterFunction(this.options.ServiceType, fn, this.metadata)
return
}
//注册RPC 服务
// 注册RPC 服务
func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) {
err = this.server.RegisterFunctionName(this.options.ServiceType, name, fn, this.metadata)
return
}
//注销 暂时不处理
// 注销 暂时不处理
func (this *Service) UnregisterAll() (err error) {
// err = this.server.UnregisterAll()
return
}
//同步调用远程服务
// 同步调用远程服务
func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
done *client.Call
@ -114,7 +114,7 @@ func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod
)
seq := new(uint64)
ctx = context.WithValue(ctx, seqKey{}, seq)
if conn, done, err = this.call(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil {
if conn, done, err = this.call(&ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil {
return
}
select {
@ -142,19 +142,19 @@ func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod
return
}
//广播调用
// 广播调用
func (this *Service) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
err = this.broadcast(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply)
return
}
//异步调用 远程服务
// 异步调用 远程服务
func (this *Service) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) {
_, _call, err = this.call(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, done)
_, _call, err = this.call(&ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, done)
return
}
//跨服 同步调用 远程服务
// 跨服 同步调用 远程服务
func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
done *client.Call
@ -162,7 +162,7 @@ func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, s
)
seq := new(uint64)
ctx = context.WithValue(ctx, seqKey{}, seq)
if conn, done, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil {
if conn, done, err = this.call(&ctx, clusterTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil {
return
}
select {
@ -190,25 +190,25 @@ func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, s
return
}
//跨集群 广播
// 跨集群 广播
func (this *Service) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
err = this.broadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
return
}
//跨服 异步调用 远程服务
// 跨服 异步调用 远程服务
func (this *Service) AcrossClusterGo(ctx context.Context, clusterTag, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) {
_, _call, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
_, _call, err = this.call(&ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
return
}
//全集群广播
// 全集群广播
func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
err = this.clusterbroadcast(ctx, servicePath, serviceMethod, args, reply)
return
}
//监控rpc连接收到的请求消息 处理消息回调请求
// 监控rpc连接收到的请求消息 处理消息回调请求
func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error {
if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage {
var (
@ -291,14 +291,14 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e
return nil
}
//客户端配置 AutoConnect 的默认连接函数
// 客户端配置 AutoConnect 的默认连接函数
func (this *Service) RpcxShakeHands(ctx context.Context, args *ServiceNode, reply *ServiceNode) error {
// this.Debugf("RpcxShakeHands:%+v", ctx.Value(share.ReqMetaDataKey).(map[string]string))
return nil
}
//执行远程调用
func (this *Service) call(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) {
// 执行远程调用
func (this *Service) call(ctx *context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) {
var (
spath []string
clientaddr string
@ -317,7 +317,7 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st
ServiceMetaKey: this.metadata,
}
spath = strings.Split(servicePath, "/")
ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{
*ctx = context.WithValue(*ctx, share.ReqMetaDataKey, map[string]string{
CallRoutRulesKey: servicePath,
ServiceAddrKey: "tcp@" + this.options.ServiceAddr,
ServiceMetaKey: this.metadata,
@ -330,7 +330,7 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st
return
}
if clientaddr = selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" {
if clientaddr = selector.Select(*ctx, spath[0], serviceMethod, args); clientaddr == "" {
err = fmt.Errorf("on found servicePath:%s", servicePath)
return
}
@ -354,11 +354,11 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st
}
}
_call.Done = done
this.send(ctx, conn, spath[0], serviceMethod, metadata, _call)
this.send(*ctx, conn, spath[0], serviceMethod, metadata, _call)
return
}
//执行远程调用
// 执行远程调用
func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
spath []string
@ -412,8 +412,9 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa
_call.Args = args
_call.Reply = reply
_call.Done = make(chan *client.Call, 10)
seq := new(uint64)
ctx = context.WithValue(ctx, seqKey{}, seq)
this.send(ctx, conn, spath[0], serviceMethod, metadata, _call)
seq, _ := ctx.Value(seqKey{}).(*uint64)
select {
case <-ctx.Done(): // cancel by context
this.mutex.Lock()
@ -457,7 +458,7 @@ check:
return err
}
//全集群广播
// 全集群广播
func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
var (
spath []string
@ -563,7 +564,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
return
}
//发送远程调用请求
// 发送远程调用请求
func (this *Service) send(ctx context.Context, conn net.Conn, servicePath string, serviceMethod string, metadata map[string]string, call *client.Call) {
defer func() {
if call.Error != nil {

View File

@ -355,11 +355,9 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
}
stime := time.Now()
// this.gateway.Debugf("----------3 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType)
// ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
ctx := context.Background()
if len(serviceTag) == 0 {
// this.gateway.Debugf("----------4 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType)
if err = this.gateway.Service().RpcCall(ctx, servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil {
if err = this.gateway.Service().RpcCall(context.Background(), servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil {
this.gateway.Error("[UserResponse]",
log.Field{Key: "uid", Value: this.uId},
log.Field{Key: "serviceTag", Value: serviceTag},
@ -374,7 +372,7 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
if msg.ServicePath != "" { //客户端是否制定目标服务器 /wroker/woker0
servicePath = msg.ServicePath
}
if err = this.gateway.Service().AcrossClusterRpcCall(ctx, serviceTag, servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil {
if err = this.gateway.Service().AcrossClusterRpcCall(context.Background(), serviceTag, servicePath, string(comm.Rpc_GatewayRoute), req, reply); err != nil {
this.gateway.Error("[UserResponse]",
log.Field{Key: "uid", Value: this.uId},
log.Field{Key: "serviceTag", Value: serviceTag},

View File

@ -96,9 +96,8 @@ func (this *Parkour) AddMounts(session comm.IUserSession, mounts map[string]int3
// 匹配
func (this *Parkour) match(team *pb.DBParkour) (err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
err = this.service.RpcCall(
ctx,
context.Background(),
comm.Service_Mainte,
string(comm.RPC_ParkourJoinMatch),
&pb.RPCParkourJoinMatchReq{Captainid: team.Captainid, Member: team.Member},

View File

@ -36,7 +36,7 @@ type ParkourComp struct {
total int32
}
//组件初始化接口
// 组件初始化接口
func (this *ParkourComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.MCompConfigure.Init(service, module, comp, options)
this.service = service.(base.IRPCXService)
@ -46,7 +46,7 @@ func (this *ParkourComp) Init(service core.IService, module core.IModule, comp c
return
}
//自由跨服环境下开启此功能
// 自由跨服环境下开启此功能
func (this *ParkourComp) Start() (err error) {
err = this.MCompConfigure.Start()
if db.IsCross() {
@ -63,7 +63,7 @@ func (this *ParkourComp) Start() (err error) {
return
}
//刷新推荐列表
// 刷新推荐列表
func (this *ParkourComp) refreshlist() {
var (
c *mongo.Cursor
@ -103,7 +103,7 @@ func (this *ParkourComp) refreshlist() {
return
}
//加入匹配中
// 加入匹配中
func (this *ParkourComp) join(ctx context.Context, req *pb.RPCParkourJoinMatchReq, resp *pb.RPCParkourJoinMatchResp) (err error) {
this.tlock.Lock()
this.teams[req.Captainid] = req.Member
@ -115,7 +115,7 @@ func (this *ParkourComp) join(ctx context.Context, req *pb.RPCParkourJoinMatchRe
return
}
//加入匹配中
// 加入匹配中
func (this *ParkourComp) cancel(ctx context.Context, req *pb.RPCParkourCancelMatchReq, resp *pb.RPCParkourCancelMatchResp) (err error) {
this.tlock.Lock()
delete(this.teams, req.Captainid)
@ -123,7 +123,7 @@ func (this *ParkourComp) cancel(ctx context.Context, req *pb.RPCParkourCancelMat
return
}
//定时匹配处理
// 定时匹配处理
func (this *ParkourComp) match() {
// this.module.Debug("执行一次匹配!")
if !atomic.CompareAndSwapInt32(&this.timerlock, 1, 2) { //正在执行,就不要在进来了
@ -324,8 +324,7 @@ func (this *ParkourComp) match() {
}
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
if err = this.service.RpcCall(ctx,
if err = this.service.RpcCall(context.Background(),
comm.Service_Worker,
string(comm.RPC_ParkourMatchSucc),
&pb.RPCParkourMatchSuccReq{Red: reduser, Bule: buleuser},

View File

@ -10,7 +10,6 @@ import (
"go_dreamfactory/pb"
"go_dreamfactory/utils"
"net/http"
"time"
)
type PayDeliveryResults struct {
@ -18,7 +17,7 @@ type PayDeliveryResults struct {
State int32 `json:"state"`
}
//充值发货
// 充值发货
func (this *Api_Comp) PayDelivery(c *engine.Context) {
uid := c.Query("uid")
pid := c.Query("productid")
@ -63,9 +62,8 @@ func (this *Api_Comp) PayDelivery(c *engine.Context) {
log.Errorf("签名错误 sign:%s _sign:%s", sign, _sign)
return
}
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
if err := this.module.service.RpcCall(
ctx,
context.TODO(),
comm.Service_Worker,
string(comm.Rpc_ModulePayDelivery),
payreq,

View File

@ -32,6 +32,9 @@ func (this *apiComp) Info(session comm.IUserSession, req *pb.WTaskInfoReq) (errd
if progress, errdata = this.module.fishtask(session, wtask, false); errdata != nil {
return
}
if progress, errdata = this.module.pushtaskprogress(session, wtask, false); errdata != nil {
return
}
session.SendMsg(string(this.module.GetType()), "info", &pb.WTaskInfoResp{Info: wtask, Accepts: progress})
if err = this.module.modelwtask.updateUserWTasks(session.GetUserId(), wtask); err != nil {
errdata = &pb.ErrorData{

View File

@ -467,7 +467,7 @@ func (this *WTask) inquireActivations(session comm.IUserSession, wtask *pb.DBWTa
//有新任务接取
if ispush && changeActiva {
session.SendMsg(string(this.GetType()), "activations", &pb.WTaskActivationsChangePush{Activations: wtask.Activations})
session.SendMsg(string(this.GetType()), "activationschange", &pb.WTaskActivationsChangePush{Activations: wtask.Activations})
}
if changeAccept {