253 lines
8.8 KiB
Go
253 lines
8.8 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/pb"
|
|
"go_dreamfactory/sys/configure"
|
|
"go_dreamfactory/sys/db"
|
|
"reflect"
|
|
"time"
|
|
|
|
"go_dreamfactory/lego/core"
|
|
"go_dreamfactory/lego/core/cbase"
|
|
"go_dreamfactory/lego/sys/event"
|
|
"go_dreamfactory/lego/sys/log"
|
|
"go_dreamfactory/lego/utils/mapstructure"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// 用户协议处理函数注册的反射对象
|
|
type msghandle struct {
|
|
rcvr reflect.Value
|
|
msgType reflect.Type //消息请求类型
|
|
handle reflect.Method //处理函数
|
|
}
|
|
|
|
// 组件参数
|
|
type CompOptions struct {
|
|
MaxTime int32
|
|
}
|
|
|
|
func (this *CompOptions) LoadConfig(settings map[string]interface{}) (err error) {
|
|
if settings != nil {
|
|
err = mapstructure.Decode(settings, this)
|
|
}
|
|
return
|
|
}
|
|
|
|
/*
|
|
服务网关组件 用于接收网关服务发送过来的消息
|
|
*/
|
|
|
|
func NewGateRouteComp() comm.ISC_GateRouteComp {
|
|
comp := new(SCompGateRoute)
|
|
return comp
|
|
}
|
|
|
|
// 服务网关组件
|
|
type SCompGateRoute struct {
|
|
cbase.ServiceCompBase
|
|
options *CompOptions
|
|
service comm.IService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口
|
|
msghandles map[string]*msghandle //处理函数的管理对象
|
|
}
|
|
|
|
// 设置服务组件名称 方便业务模块中获取此组件对象
|
|
func (this *SCompGateRoute) GetName() core.S_Comps {
|
|
return comm.SC_ServiceGateRouteComp
|
|
}
|
|
|
|
func (this *SCompGateRoute) NewOptions() (options core.ICompOptions) {
|
|
return new(CompOptions)
|
|
}
|
|
|
|
// 组件初始化函数
|
|
func (this *SCompGateRoute) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) {
|
|
err = this.ServiceCompBase.Init(service, comp, options)
|
|
this.options = options.(*CompOptions)
|
|
this.service = service.(comm.IService)
|
|
this.msghandles = make(map[string]*msghandle)
|
|
|
|
return err
|
|
}
|
|
|
|
// 组件启动时注册rpc服务监听
|
|
func (this *SCompGateRoute) Start() (err error) {
|
|
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
|
|
this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserLogin), this.NoticeUserLogin) //注册用户登录通知
|
|
this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserCreate), this.NoticeUserCreate) //注册用户离线创角
|
|
this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserClose), this.NoticeUserClose) //注册用户离线通知
|
|
this.service.RegisterFunctionName(string(comm.Rpc_ConfigureUpDate), this.ConfigureUpDate) //注册配置更新
|
|
if db.IsCross() { //跨服环境
|
|
this.service.RegisterFunctionName(string(comm.Rpc_DBSyncCross), this.DBSyncCross) //注册配置更新
|
|
}
|
|
err = this.ServiceCompBase.Start()
|
|
return
|
|
}
|
|
|
|
// 业务模块注册用户消息处理路由
|
|
func (this *SCompGateRoute) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, handele reflect.Method) {
|
|
//log.Debugf("注册用户路由【%s】", methodName)
|
|
_, ok := this.msghandles[methodName]
|
|
if ok {
|
|
log.Errorf("重复 注册网关消息【%s】", methodName)
|
|
return
|
|
}
|
|
this.msghandles[methodName] = &msghandle{
|
|
rcvr: comp,
|
|
msgType: msg,
|
|
handle: handele,
|
|
}
|
|
}
|
|
|
|
// Rpc_GatewayRoute服务接口的接收函数
|
|
func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) (err error) {
|
|
method := fmt.Sprintf("%s.%s", args.MainType, args.SubType)
|
|
// defer func() { //程序异常 收集异常信息传递给前端显示
|
|
// if r := recover(); r != nil {
|
|
// buf := make([]byte, 4096)
|
|
// l := runtime.Stack(buf, false)
|
|
// reply.Code = pb.ErrorCode_Exception
|
|
// reply.ErrorMessage = fmt.Sprintf("%v: %s", r, buf[:l])
|
|
// log.Errorf("[Handle Api] m:%s reply:%s", method, reply)
|
|
// }
|
|
// }()
|
|
//获取用户消息处理函数
|
|
// log.Debug("ReceiveMsg",
|
|
// log.Field{Key: "args", Value: args.String()},
|
|
// )
|
|
reply.ServiceId = this.service.GetId()
|
|
msghandle, ok := this.msghandles[method]
|
|
if ok {
|
|
session := this.service.GetUserSession()
|
|
session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId)
|
|
|
|
defer func() { //回收
|
|
session.Reset()
|
|
this.service.PutUserSession(session)
|
|
}()
|
|
|
|
//序列化用户消息对象
|
|
var msg proto.Message
|
|
if msg, err = args.Message.UnmarshalNew(); err != nil {
|
|
log.Errorf("[Handle Api] UserMessage:%s Unmarshal err:%v", method, err)
|
|
return err
|
|
}
|
|
|
|
//执行处理流
|
|
stime := time.Now()
|
|
handlereturn := msghandle.handle.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(session), reflect.ValueOf(msg)})
|
|
errdata := handlereturn[0]
|
|
if !errdata.IsNil() { //处理返货错误码 返回用户错误信息
|
|
//data, _ := anypb.New(errdata.(proto.Message))
|
|
reply.ErrorData = errdata.Interface().(*pb.ErrorData)
|
|
// log.Errorf("[Handle Api] t:%v m:%s req:%v reply:%v", time.Since(stime), method, msg, reply)
|
|
log.Error("[Handle Api]",
|
|
log.Field{Key: "t", Value: time.Since(stime).Milliseconds()},
|
|
log.Field{Key: "m", Value: method},
|
|
log.Field{Key: "uid", Value: args.UserId},
|
|
log.Field{Key: "req", Value: msg},
|
|
log.Field{Key: "reply", Value: reply.String()},
|
|
)
|
|
} else {
|
|
reply.Reply = session.Polls()
|
|
// log.Debugf("[Handle Api] t:%v m:%s uid:%s req:%v reply:%v", time.Since(stime), method, args.UserId, msg, reply)
|
|
nt := time.Since(stime).Milliseconds()
|
|
if this.options.MaxTime == 0 || nt < int64(this.options.MaxTime) {
|
|
log.Debug("[Handle Api]",
|
|
log.Field{Key: "t", Value: time.Since(stime).Milliseconds()},
|
|
log.Field{Key: "m", Value: method},
|
|
log.Field{Key: "uid", Value: args.UserId},
|
|
log.Field{Key: "req", Value: msg},
|
|
log.Field{Key: "reply", Value: reply.String()},
|
|
)
|
|
} else {
|
|
log.Error("[Handle Api]",
|
|
log.Field{Key: "t", Value: time.Since(stime).Milliseconds()},
|
|
log.Field{Key: "m", Value: method},
|
|
log.Field{Key: "uid", Value: args.UserId},
|
|
log.Field{Key: "req", Value: msg},
|
|
log.Field{Key: "reply", Value: reply.String()},
|
|
)
|
|
}
|
|
}
|
|
} else { //未找到消息处理函数
|
|
log.Errorf("[Handle Api] no found handle %s", method)
|
|
reply.ErrorData = &pb.ErrorData{
|
|
Code: pb.ErrorCode_ReqParameterError,
|
|
Title: pb.ErrorCode_ReqParameterError.ToString(),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RPC_NoticeUserClose 接收用户登录通知
|
|
func (this *SCompGateRoute) NoticeUserLogin(ctx context.Context, args *pb.NoticeUserLoginReq, reply *pb.NoticeUserLoginResp) error {
|
|
log.Debug("RPC_NoticeUserLogin", log.Field{Key: "args", Value: args})
|
|
conn, err := db.Local()
|
|
if err != nil {
|
|
log.Errorf("[RPC] NoticeUserLogin err: %v", err)
|
|
return err
|
|
}
|
|
model := db.NewDBModelByExpired(this.service.GetTag(), comm.TableSession, conn)
|
|
user := &pb.CacheUser{
|
|
Uid: args.UserId,
|
|
SessionId: args.UserSessionId,
|
|
ServiceTag: args.ServiceTag,
|
|
GatewayServiceId: args.GatewayServiceId,
|
|
Ip: args.Ip,
|
|
}
|
|
model.AddList(comm.RDS_EMPTY, args.UserId, user, db.SetDBMgoLog(false))
|
|
session := this.GetUserSession(user)
|
|
defer this.PutUserSession(session)
|
|
event.TriggerEvent(comm.EventUserLogin, session)
|
|
session.Push()
|
|
reply.WorkerSId = this.service.GetId()
|
|
return nil
|
|
}
|
|
|
|
func (this *SCompGateRoute) NoticeUserCreate(ctx context.Context, args *pb.NoticeUserCreateReq, reply *pb.RPCMessageReply) error {
|
|
log.Debug("RPC_NoticeUserCreate", log.Field{Key: "args", Value: args})
|
|
event.TriggerEvent(comm.EventCreateUser, args.UserId)
|
|
return nil
|
|
}
|
|
|
|
// RPC_NoticeUserClose 接收用户离线通知
|
|
func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.NoticeUserCloseReq, reply *pb.RPCMessageReply) error {
|
|
// session := this.pools.Get().(comm.IUserSession)
|
|
// session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId)
|
|
log.Debug("RPC_NoticeUserClose", log.Field{Key: "args", Value: args})
|
|
event.TriggerEvent(comm.EventUserOffline, args.UserId, args.UserSessionId)
|
|
return nil
|
|
}
|
|
|
|
// RPC_ConfigureUpDate 接收配置更新消息
|
|
func (this *SCompGateRoute) ConfigureUpDate(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) error {
|
|
log.Debugln("RPC_ConfigureUpDate")
|
|
configure.Update()
|
|
return nil
|
|
}
|
|
|
|
// RPC_DBSyncCross 接收配置更新消息
|
|
func (this *SCompGateRoute) DBSyncCross(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) (err error) {
|
|
log.Debugln("RPC_DBSyncCross")
|
|
err = db.SyncServiceList()
|
|
return
|
|
}
|
|
|
|
// 获取用户的会话对象
|
|
func (this *SCompGateRoute) GetUserSession(udata *pb.CacheUser) (session comm.IUserSession) {
|
|
session = this.service.GetUserSession()
|
|
session.SetSession(udata.Ip, udata.SessionId, udata.ServiceTag, udata.GatewayServiceId, udata.Uid)
|
|
return
|
|
}
|
|
|
|
// 获取用户的会话对象
|
|
func (this *SCompGateRoute) PutUserSession(session comm.IUserSession) {
|
|
this.service.PutUserSession(session)
|
|
return
|
|
}
|