191 lines
6.4 KiB
Go
191 lines
6.4 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/pb"
|
|
"reflect"
|
|
"runtime"
|
|
"sync"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
|
|
"go_dreamfactory/lego/base"
|
|
"go_dreamfactory/lego/core"
|
|
"go_dreamfactory/lego/core/cbase"
|
|
"go_dreamfactory/lego/sys/event"
|
|
"go_dreamfactory/lego/sys/log"
|
|
"go_dreamfactory/lego/utils/codec"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/golang/protobuf/ptypes"
|
|
)
|
|
|
|
/*
|
|
服务网关组件 用于接收网关服务发送过来的消息
|
|
*/
|
|
|
|
func NewGateRouteComp() comm.ISC_GateRouteComp {
|
|
comp := new(SComp_GateRouteComp)
|
|
return comp
|
|
}
|
|
|
|
//用户协议处理函数注册的反射对象
|
|
type msghandle struct {
|
|
rcvr reflect.Value
|
|
msgType reflect.Type
|
|
fn reflect.Method
|
|
}
|
|
|
|
//服务网关组件
|
|
type SComp_GateRouteComp struct {
|
|
cbase.ServiceCompBase
|
|
service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口
|
|
mrlock sync.RWMutex //msghandles 对象的锁
|
|
msgcheck map[string]*msghandle //处理函数的校验接口
|
|
msghandles map[string]*msghandle //处理函数的管理对象
|
|
}
|
|
|
|
//设置服务组件名称 方便业务模块中获取此组件对象
|
|
func (this *SComp_GateRouteComp) GetName() core.S_Comps {
|
|
return comm.SC_ServiceGateRouteComp
|
|
}
|
|
|
|
//组件初始化函数
|
|
func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) {
|
|
err = this.ServiceCompBase.Init(service, comp, options)
|
|
this.service = service.(base.IRPCXService)
|
|
this.msgcheck = make(map[string]*msghandle)
|
|
this.msghandles = make(map[string]*msghandle)
|
|
return err
|
|
} //
|
|
|
|
//组件启动时注册rpc服务监听
|
|
func (this *SComp_GateRouteComp) Start() (err error) {
|
|
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
|
|
this.service.RegisterFunctionName(string(comm.Rpc_NoticeUserClose), this.NoticeUserClose) //注册用户离线通知
|
|
err = this.ServiceCompBase.Start()
|
|
event.RegisterGO(core.Event_ServiceStartEnd, func() {
|
|
for k, v := range this.msghandles {
|
|
if v1, ok := this.msgcheck[k]; !ok {
|
|
log.Panicf("注册用户消息处理函数:%s 没有实现参数校验接口", k)
|
|
return
|
|
} else if v.msgType != v1.msgType {
|
|
log.Panicf("注册用户消息处理函数:%s 实现参数校验接口不一致 请检查代码!", k)
|
|
return
|
|
}
|
|
}
|
|
})
|
|
event.RegisterGO(core.Event_FindNewService, func() {
|
|
log.Debugf("find new service")
|
|
})
|
|
return
|
|
}
|
|
|
|
//业务模块注册用户消息处理路由
|
|
func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) {
|
|
log.Debugf("注册用户路由【%s】", methodName)
|
|
this.mrlock.RLock()
|
|
_, ok := this.msghandles[methodName]
|
|
this.mrlock.RUnlock()
|
|
if ok {
|
|
log.Errorf("重复 注册网关消息【%s】", methodName)
|
|
return
|
|
}
|
|
this.mrlock.Lock()
|
|
this.msghandles[methodName] = &msghandle{
|
|
rcvr: comp,
|
|
msgType: msg,
|
|
fn: fn,
|
|
}
|
|
this.mrlock.Unlock()
|
|
}
|
|
|
|
//业务模块注册用户消息处理路由
|
|
func (this *SComp_GateRouteComp) RegisterRouteCheck(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) {
|
|
log.Debugf("注册用户路由校验[%s]", methodName)
|
|
this.mrlock.RLock()
|
|
_, ok := this.msgcheck[methodName]
|
|
this.mrlock.RUnlock()
|
|
if ok {
|
|
log.Errorf("重复 注册用户路由校验[%s]", methodName)
|
|
return
|
|
}
|
|
this.mrlock.Lock()
|
|
this.msgcheck[methodName] = &msghandle{
|
|
rcvr: comp,
|
|
msgType: msg,
|
|
fn: fn,
|
|
}
|
|
this.mrlock.Unlock()
|
|
}
|
|
|
|
//Rpc_GatewayRoute服务接口的接收函数
|
|
func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error {
|
|
defer func() { //程序异常 收集异常信息传递给前端显示
|
|
if r := recover(); r != nil {
|
|
buf := make([]byte, 1024)
|
|
l := runtime.Stack(buf, false)
|
|
reply.Code = pb.ErrorCode_Exception
|
|
reply.Message = fmt.Sprintf("%v: %s", r, buf[:l])
|
|
log.Errorf("HandleUserMsg:%s err:%s", args.Method, reply.Message)
|
|
}
|
|
}()
|
|
log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
|
|
//获取用户消息处理函数
|
|
this.mrlock.RLock()
|
|
msghandle, ok := this.msghandles[args.Method]
|
|
msgcheck := this.msgcheck[args.Method]
|
|
this.mrlock.RUnlock()
|
|
if ok {
|
|
//封装用户会话
|
|
session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
|
|
//序列化用户消息对象
|
|
msg := reflect.New(msghandle.msgType.Elem()).Interface()
|
|
if err := ptypes.UnmarshalAny(args.Message, msg.(proto.Message)); err != nil {
|
|
log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
|
|
return err
|
|
}
|
|
|
|
//调用校验接口
|
|
checkreturn := msgcheck.fn.Func.Call([]reflect.Value{msgcheck.rcvr, reflect.ValueOf(session), reflect.ValueOf(msg)})
|
|
//读取校验结果 有错误直接返回错误码给用户
|
|
code := checkreturn[1].Interface().(comm.ErrorCode)
|
|
if code.Code != pb.ErrorCode_Success {
|
|
log.Errorf("HandleUserMsg:%s msg:%v code:%d", args.Method, msg, code)
|
|
reply.Code = code.Code
|
|
reply.Message = pb.GetErrorCodeMsg(code.Code)
|
|
if code.Data != nil { //处理错误附加数据 采用json 序列化为string
|
|
if d, err := jsoniter.Marshal(code.Data); err != nil {
|
|
log.Errorf("HandleUserMsg:%s msg:%v code:%d err:%v", args.Method, msg, code, err)
|
|
return nil
|
|
} else {
|
|
reply.Data = codec.BytesToString(d)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
//校验结果成功 处理临时数据转移
|
|
result := checkreturn[0].Interface().(map[string]interface{})
|
|
//调用用户处理函数
|
|
handlereturn := msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(session), reflect.ValueOf(result), reflect.ValueOf(msg)})
|
|
errcode := pb.ErrorCode(handlereturn[0].Int())
|
|
if errcode != pb.ErrorCode_Success { //处理返货错误码 返回用户错误信息
|
|
log.Errorf("HandleUserMsg:%s msg:%v code:%d", args.Method, msg, code)
|
|
reply.Code = errcode
|
|
reply.Message = pb.GetErrorCodeMsg(errcode)
|
|
return nil
|
|
}
|
|
} else { //未找到消息处理函数
|
|
reply.Code = pb.ErrorCode_ReqParameterError
|
|
}
|
|
return nil
|
|
}
|
|
|
|
//RPC_NoticeUserClose 接收用户离线通知
|
|
func (this *SComp_GateRouteComp) NoticeUserClose(ctx context.Context, args *pb.NoticeUserCloseReq, reply *pb.RPCMessageReply) error {
|
|
event.TriggerEvent(comm.Event_UserOffline, args.UserId)
|
|
return nil
|
|
}
|