91 lines
2.6 KiB
Go
91 lines
2.6 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/pb"
|
|
"reflect"
|
|
"sync"
|
|
|
|
"go_dreamfactory/lego/base"
|
|
"go_dreamfactory/lego/core"
|
|
"go_dreamfactory/lego/core/cbase"
|
|
"go_dreamfactory/lego/sys/log"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func NewGateRouteComp() comm.ISC_GateRouteComp {
|
|
comp := new(SComp_GateRouteComp)
|
|
return comp
|
|
}
|
|
|
|
type msghandle struct {
|
|
rcvr reflect.Value
|
|
msgType reflect.Type
|
|
fn reflect.Method
|
|
}
|
|
|
|
type SComp_GateRouteComp struct {
|
|
cbase.ServiceCompBase
|
|
service base.IRPCXService
|
|
mrlock sync.RWMutex
|
|
msghandles map[string]*msghandle
|
|
}
|
|
|
|
func (this *SComp_GateRouteComp) GetName() core.S_Comps {
|
|
return comm.SC_ServiceGateRouteComp
|
|
}
|
|
|
|
func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) {
|
|
err = this.ServiceCompBase.Init(service, comp, options)
|
|
this.service = service.(base.IRPCXService)
|
|
this.msghandles = make(map[string]*msghandle)
|
|
return err
|
|
}
|
|
|
|
func (this *SComp_GateRouteComp) Start() (err error) {
|
|
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
|
|
err = this.ServiceCompBase.Start()
|
|
return
|
|
}
|
|
|
|
//注册路由
|
|
func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) {
|
|
log.Debugf("注册用户路由【%s】", methodName)
|
|
this.mrlock.RLock()
|
|
_, ok := this.msghandles[methodName]
|
|
this.mrlock.RUnlock()
|
|
if ok {
|
|
log.Errorf("重复 注册网关消息【%s】", methodName)
|
|
return
|
|
}
|
|
this.mrlock.Lock()
|
|
this.msghandles[methodName] = &msghandle{
|
|
rcvr: comp,
|
|
msgType: msg,
|
|
fn: fn,
|
|
}
|
|
this.mrlock.Unlock()
|
|
}
|
|
|
|
func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error {
|
|
log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
|
|
this.mrlock.RLock()
|
|
msghandle, ok := this.msghandles[args.Method]
|
|
this.mrlock.RUnlock()
|
|
if ok {
|
|
session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
|
|
msg := reflect.New(msghandle.msgType.Elem()).Interface()
|
|
if err := proto.Unmarshal(args.Message, msg.(proto.Message)); err != nil {
|
|
log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
|
|
return err
|
|
}
|
|
msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
|
|
} else {
|
|
reply.Code = pb.ErrorCode_ReqParameterError
|
|
// reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_ReqParameterError)
|
|
}
|
|
return nil
|
|
}
|