修复用户消息网关处理流代码

This commit is contained in:
liwei1dao 2022-06-15 11:12:58 +08:00
parent de050c8b78
commit ca0a418b4f
3 changed files with 22 additions and 11 deletions

View File

@ -1,6 +1,7 @@
package comm package comm
import ( import (
"context"
"go_dreamfactory/pb" "go_dreamfactory/pb"
"reflect" "reflect"
@ -52,6 +53,7 @@ const (
// 服务网关组件接口定义 // 服务网关组件接口定义
type ISC_GateRouteComp interface { type ISC_GateRouteComp interface {
core.IServiceComp core.IServiceComp
ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error
RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method)
RegisterRouteCheck(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) RegisterRouteCheck(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method)
} }

View File

@ -1,11 +1,14 @@
package pack package pack
import ( import (
"context"
"fmt" "fmt"
"go_dreamfactory/comm"
"go_dreamfactory/lego" "go_dreamfactory/lego"
"go_dreamfactory/lego/base/rpcx" "go_dreamfactory/lego/base/rpcx"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"go_dreamfactory/pb"
"go_dreamfactory/services" "go_dreamfactory/services"
"go_dreamfactory/sys/cache" "go_dreamfactory/sys/cache"
"go_dreamfactory/sys/configure" "go_dreamfactory/sys/configure"
@ -13,6 +16,8 @@ import (
"os" "os"
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/ptypes"
) )
func newService(ops ...rpcx.Option) core.IService { func newService(ops ...rpcx.Option) core.IService {
@ -46,27 +51,31 @@ func (this *TestService) InitSys() {
} }
} }
var service core.IService
var s_gateComp comm.ISC_GateRouteComp = services.NewGateRouteComp()
var module = new(Pack) var module = new(Pack)
//测试环境下初始化db和cache 系统 //测试环境下初始化db和cache 系统
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
s := newService( service = newService(
rpcx.SetConfPath("../../bin/conf/worker_1.yaml"), rpcx.SetConfPath("../../bin/conf/worker_1.yaml"),
rpcx.SetVersion("1.0.0.0"), rpcx.SetVersion("1.0.0.0"),
) )
s.OnInstallComp( //装备组件 service.OnInstallComp( //装备组件
services.NewGateRouteComp(), //此服务需要接受用户的消息 需要装备网关组件 s_gateComp, //此服务需要接受用户的消息 需要装备网关组件
) )
go func() { go func() {
lego.Run(s, //运行模块 lego.Run(service, //运行模块
module, module,
) )
}() }()
time.Sleep(time.Second) time.Sleep(time.Second * 3)
defer os.Exit(m.Run()) defer os.Exit(m.Run())
} }
func Test_Log(t *testing.T) { func Test_Log(t *testing.T) {
items, err := module.db_comp.Pack_QueryUserPack("liwei1dao") data, _ := ptypes.MarshalAny(&pb.GetlistReq{})
log.Debugf("item:%v err:%v", items, err) s_gateComp.ReceiveMsg(context.Background(), &pb.AgentMessage{Method: "pack.getlist", Message: data}, &pb.RPCMessageReply{})
// items, err := module.db_comp.Pack_QueryUserPack("liwei1dao")
// log.Debugf("item:%v err:%v", items, err)
} }

View File

@ -117,7 +117,7 @@ func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentM
log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method) log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
this.mrlock.RLock() this.mrlock.RLock()
msghandle, ok := this.msghandles[args.Method] msghandle, ok := this.msghandles[args.Method]
msgcheck := this.msghandles[args.Method] msgcheck := this.msgcheck[args.Method]
this.mrlock.RUnlock() this.mrlock.RUnlock()
if ok { if ok {
session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId) session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
@ -126,15 +126,15 @@ func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentM
log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err) log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
return err return err
} }
returnValues := msgcheck.fn.Func.Call([]reflect.Value{msgcheck.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)}) returnValues := msgcheck.fn.Func.Call([]reflect.Value{msgcheck.rcvr, reflect.ValueOf(session), reflect.ValueOf(msg)})
// The return value for the method is an error. // The return value for the method is an error.
code := returnValues[1].Int() code := returnValues[1].Int()
if pb.ErrorCode(code) != pb.ErrorCode_Success { if pb.ErrorCode(code) != pb.ErrorCode_Success {
log.Errorf("HandleUserMsg:%s msg:%v code:%d", args.Method, msg, code) log.Errorf("HandleUserMsg:%s msg:%v code:%d", args.Method, msg, code)
return nil return nil
} }
// result := .Interface().(map[string]interface{}) result := returnValues[0].Interface().(map[string]interface{})
msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(session), reflect.ValueOf(returnValues[0]), reflect.ValueOf(msg)}) msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(session), reflect.ValueOf(result), reflect.ValueOf(msg)})
} else { } else {
reply.Code = pb.ErrorCode_ReqParameterError reply.Code = pb.ErrorCode_ReqParameterError
// reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_ReqParameterError) // reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_ReqParameterError)