package services import ( "context" "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" "go_dreamfactory/sys/configure" "go_dreamfactory/sys/db" "reflect" "time" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/event" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/utils/mapstructure" "google.golang.org/protobuf/proto" ) // 用户协议处理函数注册的反射对象 type msghandle struct { rcvr reflect.Value msgType reflect.Type //消息请求类型 handle reflect.Method //处理函数 } // 组件参数 type CompOptions struct { } func (this *CompOptions) LoadConfig(settings map[string]interface{}) (err error) { if settings != nil { err = mapstructure.Decode(settings, this) } return } /* 服务网关组件 用于接收网关服务发送过来的消息 */ func NewGateRouteComp() comm.ISC_GateRouteComp { comp := new(SCompGateRoute) return comp } // 服务网关组件 type SCompGateRoute struct { cbase.ServiceCompBase options *CompOptions service comm.IService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口 msghandles map[string]*msghandle //处理函数的管理对象 } // 设置服务组件名称 方便业务模块中获取此组件对象 func (this *SCompGateRoute) GetName() core.S_Comps { return comm.SC_ServiceGateRouteComp } func (this *SCompGateRoute) NewOptions() (options core.ICompOptions) { return new(CompOptions) } // 组件初始化函数 func (this *SCompGateRoute) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) { err = this.ServiceCompBase.Init(service, comp, options) this.options = options.(*CompOptions) this.service = service.(comm.IService) this.msghandles = make(map[string]*msghandle) return err } // 组件启动时注册rpc服务监听 func (this *SCompGateRoute) Start() (err error) { this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口 this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserLogin), this.NoticeUserLogin) //注册用户登录通知 this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserCreate), this.NoticeUserCreate) //注册用户离线创角 this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserClose), this.NoticeUserClose) //注册用户离线通知 this.service.RegisterFunctionName(string(comm.Rpc_ConfigureUpDate), this.ConfigureUpDate) //注册配置更新 if db.IsCross() { //跨服环境 this.service.RegisterFunctionName(string(comm.Rpc_DBSyncCross), this.DBSyncCross) //注册配置更新 } err = this.ServiceCompBase.Start() return } // 业务模块注册用户消息处理路由 func (this *SCompGateRoute) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, handele reflect.Method) { //log.Debugf("注册用户路由【%s】", methodName) _, ok := this.msghandles[methodName] if ok { log.Errorf("重复 注册网关消息【%s】", methodName) return } this.msghandles[methodName] = &msghandle{ rcvr: comp, msgType: msg, handle: handele, } } // Rpc_GatewayRoute服务接口的接收函数 func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) (err error) { method := fmt.Sprintf("%s.%s", args.MainType, args.SubType) // defer func() { //程序异常 收集异常信息传递给前端显示 // if r := recover(); r != nil { // buf := make([]byte, 4096) // l := runtime.Stack(buf, false) // reply.Code = pb.ErrorCode_Exception // reply.ErrorMessage = fmt.Sprintf("%v: %s", r, buf[:l]) // log.Errorf("[Handle Api] m:%s reply:%s", method, reply) // } // }() //获取用户消息处理函数 // log.Debug("ReceiveMsg", // log.Field{Key: "args", Value: args.String()}, // ) reply.ServiceId = this.service.GetId() msghandle, ok := this.msghandles[method] if ok { session := this.service.GetUserSession() session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId) defer func() { //回收 session.Reset() this.service.PutUserSession(session) }() //序列化用户消息对象 var msg proto.Message if msg, err = args.Message.UnmarshalNew(); err != nil { log.Errorf("[Handle Api] UserMessage:%s Unmarshal err:%v", method, err) return err } //执行处理流 stime := time.Now() handlereturn := msghandle.handle.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(session), reflect.ValueOf(msg)}) errdata := handlereturn[0] if !errdata.IsNil() { //处理返货错误码 返回用户错误信息 //data, _ := anypb.New(errdata.(proto.Message)) reply.ErrorData = errdata.Interface().(*pb.ErrorData) // log.Errorf("[Handle Api] t:%v m:%s req:%v reply:%v", time.Since(stime), method, msg, reply) log.Error("[Handle Api]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "m", Value: method}, log.Field{Key: "uid", Value: args.UserId}, log.Field{Key: "req", Value: msg}, log.Field{Key: "reply", Value: reply.String()}, ) } else { reply.Reply = session.Polls() // log.Debugf("[Handle Api] t:%v m:%s uid:%s req:%v reply:%v", time.Since(stime), method, args.UserId, msg, reply) log.Debug("[Handle Api]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "m", Value: method}, log.Field{Key: "uid", Value: args.UserId}, log.Field{Key: "req", Value: msg}, log.Field{Key: "reply", Value: reply.String()}, ) } } else { //未找到消息处理函数 log.Errorf("[Handle Api] no found handle %s", method) reply.ErrorData = &pb.ErrorData{ Code: pb.ErrorCode_ReqParameterError, Title: pb.ErrorCode_ReqParameterError.ToString(), } } return nil } // RPC_NoticeUserClose 接收用户登录通知 func (this *SCompGateRoute) NoticeUserLogin(ctx context.Context, args *pb.NoticeUserLoginReq, reply *pb.NoticeUserLoginResp) error { log.Debug("RPC_NoticeUserLogin", log.Field{Key: "args", Value: args}) conn, err := db.Local() if err != nil { log.Errorf("[RPC] NoticeUserLogin err: %v", err) return err } model := db.NewDBModelByExpired(comm.TableSession, conn) user := &pb.CacheUser{ Uid: args.UserId, SessionId: args.UserSessionId, ServiceTag: args.ServiceTag, GatewayServiceId: args.GatewayServiceId, Ip: args.Ip, } model.AddList(comm.RDS_EMPTY, args.UserId, user, db.SetDBMgoLog(false)) session := this.GetUserSession(user) defer this.PutUserSession(session) event.TriggerEvent(comm.EventUserLogin, session) session.Push() reply.WorkerSId = this.service.GetId() return nil } func (this *SCompGateRoute) NoticeUserCreate(ctx context.Context, args *pb.NoticeUserCreateReq, reply *pb.RPCMessageReply) error { log.Debug("RPC_NoticeUserCreate", log.Field{Key: "args", Value: args}) event.TriggerEvent(comm.EventCreateUser, args.UserId) return nil } // RPC_NoticeUserClose 接收用户离线通知 func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.NoticeUserCloseReq, reply *pb.RPCMessageReply) error { // session := this.pools.Get().(comm.IUserSession) // session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId) log.Debug("RPC_NoticeUserClose", log.Field{Key: "args", Value: args}) event.TriggerEvent(comm.EventUserOffline, args.UserId, args.UserSessionId) return nil } // RPC_ConfigureUpDate 接收配置更新消息 func (this *SCompGateRoute) ConfigureUpDate(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) error { log.Debugln("RPC_ConfigureUpDate") configure.Update() return nil } // RPC_DBSyncCross 接收配置更新消息 func (this *SCompGateRoute) DBSyncCross(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) (err error) { log.Debugln("RPC_DBSyncCross") err = db.SyncServiceList() return } // 获取用户的会话对象 func (this *SCompGateRoute) GetUserSession(udata *pb.CacheUser) (session comm.IUserSession) { session = this.service.GetUserSession() session.SetSession(udata.Ip, udata.SessionId, udata.ServiceTag, udata.GatewayServiceId, udata.Uid) return } // 获取用户的会话对象 func (this *SCompGateRoute) PutUserSession(session comm.IUserSession) { this.service.PutUserSession(session) return }