go_dreamfactory/services/comp_gateroute.go
2022-07-01 11:58:59 +08:00

154 lines
5.1 KiB
Go

package services
import (
"context"
"fmt"
"go_dreamfactory/comm"
"go_dreamfactory/pb"
"reflect"
"runtime"
"sync"
"go_dreamfactory/lego/base"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/event"
"go_dreamfactory/lego/sys/log"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
/*
服务网关组件 用于接收网关服务发送过来的消息
*/
func NewGateRouteComp() comm.ISC_GateRouteComp {
comp := new(SCompGateRoute)
return comp
}
//用户协议处理函数注册的反射对象
type msghandle struct {
rcvr reflect.Value
msgType reflect.Type //消息请求类型
handle reflect.Method //处理函数
}
//服务网关组件
type SCompGateRoute struct {
cbase.ServiceCompBase
service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口
mrlock sync.RWMutex //msghandles 对象的锁
msghandles map[string]*msghandle //处理函数的管理对象
sessionslock sync.RWMutex //msghandles 对象的锁
sessions map[string]comm.IUserSession //用户会话管理 避免频繁创建
}
//设置服务组件名称 方便业务模块中获取此组件对象
func (this *SCompGateRoute) GetName() core.S_Comps {
return comm.SC_ServiceGateRouteComp
}
//组件初始化函数
func (this *SCompGateRoute) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) {
err = this.ServiceCompBase.Init(service, comp, options)
this.service = service.(base.IRPCXService)
this.msghandles = make(map[string]*msghandle)
this.sessions = make(map[string]comm.IUserSession)
return err
} //
//组件启动时注册rpc服务监听
func (this *SCompGateRoute) Start() (err error) {
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
this.service.RegisterFunctionName(string(comm.Rpc_NoticeUserClose), this.NoticeUserClose) //注册用户离线通知
err = this.ServiceCompBase.Start()
return
}
//业务模块注册用户消息处理路由
func (this *SCompGateRoute) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, handele reflect.Method) {
log.Debugf("注册用户路由【%s】", methodName)
this.mrlock.RLock()
_, ok := this.msghandles[methodName]
this.mrlock.RUnlock()
if ok {
log.Errorf("重复 注册网关消息【%s】", methodName)
return
}
this.mrlock.Lock()
this.msghandles[methodName] = &msghandle{
rcvr: comp,
msgType: msg,
handle: handele,
}
this.mrlock.Unlock()
}
//Rpc_GatewayRoute服务接口的接收函数
func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) (err error) {
defer func() { //程序异常 收集异常信息传递给前端显示
if r := recover(); r != nil {
buf := make([]byte, 1024)
l := runtime.Stack(buf, false)
reply.Code = pb.ErrorCode_Exception
reply.ErrorMessage = fmt.Sprintf("%v: %s", r, buf[:l])
log.Errorf("HandleUserMsg:[%s-%s] err:%s", args.MainType, args.SubType, reply.ErrorMessage)
}
}()
method := fmt.Sprintf("%s.%s", args.MainType, args.SubType)
//获取用户消息处理函数
this.mrlock.RLock()
msghandle, ok := this.msghandles[method]
this.mrlock.RUnlock()
if ok {
//读取会话对象
this.sessionslock.RLock()
session, ok := this.sessions[args.UserSessionId]
this.sessionslock.RUnlock()
if !ok { //没有 创建会话
//封装用户会话
session = comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
this.sessionslock.Lock()
this.sessions[args.UserSessionId] = session
this.sessionslock.Unlock()
}
//序列化用户消息对象
var msg proto.Message
if msg, err = args.Message.UnmarshalNew(); err != nil {
log.Errorf("UserMessage:%s Unmarshal err:%v", method, err)
return err
}
//执行处理流
log.Debugf("ReceiveMsg Message:[%s-%s]-%v user[%s-%s]", args.MainType, args.SubType, msg, args.UserSessionId, args.UserId)
handlereturn := msghandle.handle.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(session), reflect.ValueOf(msg)})
errcode := pb.ErrorCode(handlereturn[0].Int())
errdata := handlereturn[0].Interface().(proto.Message)
data, _ := anypb.New(errdata)
if errcode != pb.ErrorCode_Success { //处理返货错误码 返回用户错误信息
log.Errorf("HandleUserMsg:%s msg:%v code:%d", method, msg, errcode)
reply.Code = errcode
reply.ErrorMessage = pb.GetErrorCodeMsg(errcode)
reply.ErrorData = data
return nil
} else {
reply.Reply = session.Polls()
}
} else { //未找到消息处理函数
log.Errorf("no found handle %s", method)
reply.Code = pb.ErrorCode_ReqParameterError
}
return nil
}
//RPC_NoticeUserClose 接收用户离线通知
func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.NoticeUserCloseReq, reply *pb.RPCMessageReply) error {
event.TriggerEvent(comm.EventUserOffline, args.UserId)
this.sessionslock.Lock()
delete(this.sessions, args.UserSessionId)
this.sessionslock.Unlock()
return nil
}