上传跨服平台对接

This commit is contained in:
liwei1dao 2023-01-30 17:59:03 +08:00
parent 689f6368cf
commit 5f77fc4a81
13 changed files with 319 additions and 131 deletions

View File

@ -34,7 +34,7 @@ type DBConfig struct {
type CrossConfigs map[string]*CrossConfig
type CrossConfig struct {
AreaId string //区服id
LoaclDB *DBConfig //本地db
ServiceList map[string]*DBConfig
AreaId string //区服id
DBConfig //本地db
// ServiceList map[string]*DBConfig
}

View File

@ -7,6 +7,7 @@ import (
"go_dreamfactory/comm"
"go_dreamfactory/pb"
"go_dreamfactory/sys/configure"
"go_dreamfactory/sys/db"
"go_dreamfactory/utils"
"strings"
"sync"
@ -275,14 +276,14 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
paths := strings.Split(rule, "/")
if len(paths) == 3 {
if paths[0] == "~" {
serviceTag = this.gateway.CrossServiceTag()
serviceTag = db.CrossTag()
} else {
serviceTag = paths[0]
}
servicePath = fmt.Sprintf("%s/%s", paths[1], paths[2])
} else if len(paths) == 2 {
if paths[0] == "~" {
serviceTag = this.gateway.CrossServiceTag()
serviceTag = db.CrossTag()
servicePath = paths[1]
} else {
servicePath = rule

View File

@ -5,6 +5,7 @@ import (
"fmt"
"go_dreamfactory/comm"
"go_dreamfactory/pb"
"go_dreamfactory/sys/db"
"sync"
"go_dreamfactory/lego/base"
@ -65,18 +66,18 @@ func (this *AgentMgrComp) DisConnect(a IAgent) {
}, nil); err != nil {
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
}
if this.options.SpanServiceTag != "" {
//推送跨服集群处理
if _, err := this.service.AcrossClusterRpcGo(context.Background(), this.options.SpanServiceTag, comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{
Ip: a.IP(),
ServiceTag: this.service.GetTag(),
GatewayServiceId: this.service.GetId(),
UserSessionId: a.SessionId(),
UserId: a.UserId(),
}, nil); err != nil {
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
}
// if this.options.SpanServiceTag != "" {
//推送跨服集群处理
if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{
Ip: a.IP(),
ServiceTag: this.service.GetTag(),
GatewayServiceId: this.service.GetId(),
UserSessionId: a.SessionId(),
UserId: a.UserId(),
}, nil); err != nil {
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
}
// }
}
}
@ -85,18 +86,18 @@ func (this *AgentMgrComp) Bind(ctx context.Context, args *pb.AgentBuildReq, repl
if a, ok := this.agents.Load(args.UserSessionId); ok {
agent := a.(IAgent)
agent.Bind(args.UserId, args.WorkerId)
if this.options.SpanServiceTag != "" { //跨服集群配置存在 推送通知过去
//推送跨服集群处理
if _, err := this.service.AcrossClusterRpcGo(context.Background(), this.options.SpanServiceTag, comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{
Ip: agent.IP(),
ServiceTag: this.service.GetTag(),
GatewayServiceId: this.service.GetId(),
UserSessionId: agent.SessionId(),
UserId: agent.UserId(),
}, nil); err != nil {
log.Errorf("uId:%s Rpc_NoticeUserLogin err:%v", agent.UserId(), err)
}
// if this.options.SpanServiceTag != "" { //跨服集群配置存在 推送通知过去
//推送跨服集群处理
if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{
Ip: agent.IP(),
ServiceTag: this.service.GetTag(),
GatewayServiceId: this.service.GetId(),
UserSessionId: agent.SessionId(),
UserId: agent.UserId(),
}, nil); err != nil {
log.Errorf("uId:%s Rpc_NoticeUserLogin err:%v", agent.UserId(), err)
}
// }
} else {
reply.Code = pb.ErrorCode_UserSessionNobeing
reply.ErrorMessage = pb.GetErrorCodeMsg(pb.ErrorCode_UserSessionNobeing)

View File

@ -27,7 +27,6 @@ type (
core.IModule
log.ILogger
Service() base.IRPCXService
CrossServiceTag() string
Connect(a IAgent)
DisConnect(a IAgent)
GetMsgDistribute(msgmid, msguid string) (rule string, ok bool)

View File

@ -47,11 +47,6 @@ func (this *Gateway) Service() base.IRPCXService {
return this.service
}
//跨服集群标签
func (this *Gateway) CrossServiceTag() string {
return this.options.SpanServiceTag
}
// Init 模块初始化函数
func (this *Gateway) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options)

View File

@ -14,9 +14,9 @@ import (
type (
Options struct {
modules.Options
GinDebug bool //web引擎日志开关
ListenPort int //websocket 监听端口
SpanServiceTag string //跨服集群
GinDebug bool //web引擎日志开关
ListenPort int //websocket 监听端口
// SpanServiceTag string //跨服集群
}
)

View File

@ -25,7 +25,7 @@ type DBCombatUser struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid"`
Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid"` //uid
Level map[int32]*DBCombatLevel `protobuf:"bytes,2,rep,name=level,proto3" json:"level" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` //关卡信息
}

View File

@ -1621,6 +1621,150 @@ func (x *RPCRTaskReq) GetParam() []int32 {
return nil
}
//服务列表信息
type ServiceDBInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Serverid string `protobuf:"bytes,1,opt,name=serverid,proto3" json:"serverid" bson:"serverid"`
ServerName string `protobuf:"bytes,2,opt,name=serverName,proto3" json:"serverName" bson:"serverName"`
Owner string `protobuf:"bytes,3,opt,name=owner,proto3" json:"owner" bson:"owner"`
Cross string `protobuf:"bytes,4,opt,name=cross,proto3" json:"cross" bson:"cross"`
CrossId string `protobuf:"bytes,5,opt,name=crossId,proto3" json:"crossId" bson:"crossId"`
Singleserver string `protobuf:"bytes,6,opt,name=singleserver,proto3" json:"singleserver" bson:"singleserver"`
Opentime int64 `protobuf:"varint,7,opt,name=opentime,proto3" json:"opentime" bson:"opentime"`
RedisIsCluster bool `protobuf:"varint,8,opt,name=redisIsCluster,proto3" json:"redisIsCluster" bson:"redisIsCluster"`
RedisAddr []string `protobuf:"bytes,9,rep,name=redisAddr,proto3" json:"redisAddr" bson:"redisAddr"`
RedisPassword string `protobuf:"bytes,10,opt,name=redisPassword,proto3" json:"redisPassword" bson:"redisPassword"`
RedisDb int32 `protobuf:"varint,11,opt,name=redisDb,proto3" json:"redisDb" bson:"redisDb"`
MongodbUrl string `protobuf:"bytes,12,opt,name=MongodbUrl,proto3" json:"MongodbUrl" bson:"MongodbUrl"`
MongodbDatabase string `protobuf:"bytes,13,opt,name=mongodbDatabase,proto3" json:"mongodbDatabase" bson:"mongodbDatabase"`
}
func (x *ServiceDBInfo) Reset() {
*x = ServiceDBInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_comm_proto_msgTypes[25]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ServiceDBInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ServiceDBInfo) ProtoMessage() {}
func (x *ServiceDBInfo) ProtoReflect() protoreflect.Message {
mi := &file_comm_proto_msgTypes[25]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ServiceDBInfo.ProtoReflect.Descriptor instead.
func (*ServiceDBInfo) Descriptor() ([]byte, []int) {
return file_comm_proto_rawDescGZIP(), []int{25}
}
func (x *ServiceDBInfo) GetServerid() string {
if x != nil {
return x.Serverid
}
return ""
}
func (x *ServiceDBInfo) GetServerName() string {
if x != nil {
return x.ServerName
}
return ""
}
func (x *ServiceDBInfo) GetOwner() string {
if x != nil {
return x.Owner
}
return ""
}
func (x *ServiceDBInfo) GetCross() string {
if x != nil {
return x.Cross
}
return ""
}
func (x *ServiceDBInfo) GetCrossId() string {
if x != nil {
return x.CrossId
}
return ""
}
func (x *ServiceDBInfo) GetSingleserver() string {
if x != nil {
return x.Singleserver
}
return ""
}
func (x *ServiceDBInfo) GetOpentime() int64 {
if x != nil {
return x.Opentime
}
return 0
}
func (x *ServiceDBInfo) GetRedisIsCluster() bool {
if x != nil {
return x.RedisIsCluster
}
return false
}
func (x *ServiceDBInfo) GetRedisAddr() []string {
if x != nil {
return x.RedisAddr
}
return nil
}
func (x *ServiceDBInfo) GetRedisPassword() string {
if x != nil {
return x.RedisPassword
}
return ""
}
func (x *ServiceDBInfo) GetRedisDb() int32 {
if x != nil {
return x.RedisDb
}
return 0
}
func (x *ServiceDBInfo) GetMongodbUrl() string {
if x != nil {
return x.MongodbUrl
}
return ""
}
func (x *ServiceDBInfo) GetMongodbDatabase() string {
if x != nil {
return x.MongodbDatabase
}
return ""
}
var File_comm_proto protoreflect.FileDescriptor
var file_comm_proto_rawDesc = []byte{
@ -1773,13 +1917,39 @@ var file_comm_proto_rawDesc = []byte{
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x74,
0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x74,
0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d,
0x18, 0x03, 0x20, 0x03, 0x28, 0x05, 0x52, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x2a, 0x43, 0x0a,
0x12, 0x48, 0x65, 0x72, 0x6f, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x54,
0x79, 0x70, 0x65, 0x12, 0x06, 0x0a, 0x02, 0x48, 0x70, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x41,
0x74, 0x6b, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x44, 0x65, 0x66, 0x10, 0x02, 0x12, 0x09, 0x0a,
0x05, 0x53, 0x70, 0x65, 0x65, 0x64, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04, 0x43, 0x72, 0x69, 0x74,
0x10, 0x04, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x18, 0x03, 0x20, 0x03, 0x28, 0x05, 0x52, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x22, 0xa1, 0x03,
0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x42, 0x49, 0x6e, 0x66, 0x6f, 0x12,
0x1a, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x69, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x73,
0x65, 0x72, 0x76, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f,
0x77, 0x6e, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65,
0x72, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x72, 0x6f, 0x73, 0x73,
0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x72, 0x6f, 0x73, 0x73, 0x49,
0x64, 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x73,
0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x69, 0x6d,
0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x69, 0x6d,
0x65, 0x12, 0x26, 0x0a, 0x0e, 0x72, 0x65, 0x64, 0x69, 0x73, 0x49, 0x73, 0x43, 0x6c, 0x75, 0x73,
0x74, 0x65, 0x72, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x72, 0x65, 0x64, 0x69, 0x73,
0x49, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x64,
0x69, 0x73, 0x41, 0x64, 0x64, 0x72, 0x18, 0x09, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65,
0x64, 0x69, 0x73, 0x41, 0x64, 0x64, 0x72, 0x12, 0x24, 0x0a, 0x0d, 0x72, 0x65, 0x64, 0x69, 0x73,
0x50, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x72, 0x65, 0x64, 0x69, 0x73, 0x50, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x12, 0x18, 0x0a,
0x07, 0x72, 0x65, 0x64, 0x69, 0x73, 0x44, 0x62, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07,
0x72, 0x65, 0x64, 0x69, 0x73, 0x44, 0x62, 0x12, 0x1e, 0x0a, 0x0a, 0x4d, 0x6f, 0x6e, 0x67, 0x6f,
0x64, 0x62, 0x55, 0x72, 0x6c, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x4d, 0x6f, 0x6e,
0x67, 0x6f, 0x64, 0x62, 0x55, 0x72, 0x6c, 0x12, 0x28, 0x0a, 0x0f, 0x6d, 0x6f, 0x6e, 0x67, 0x6f,
0x64, 0x62, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0f, 0x6d, 0x6f, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
0x65, 0x2a, 0x43, 0x0a, 0x12, 0x48, 0x65, 0x72, 0x6f, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75,
0x74, 0x65, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x06, 0x0a, 0x02, 0x48, 0x70, 0x10, 0x00, 0x12,
0x07, 0x0a, 0x03, 0x41, 0x74, 0x6b, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x44, 0x65, 0x66, 0x10,
0x02, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x70, 0x65, 0x65, 0x64, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04,
0x43, 0x72, 0x69, 0x74, 0x10, 0x04, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1795,7 +1965,7 @@ func file_comm_proto_rawDescGZIP() []byte {
}
var file_comm_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 25)
var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 26)
var file_comm_proto_goTypes = []interface{}{
(HeroAttributesType)(0), // 0: HeroAttributesType
(*UserMessage)(nil), // 1: UserMessage
@ -1823,18 +1993,19 @@ var file_comm_proto_goTypes = []interface{}{
(*RPCGeneralReqA3)(nil), // 23: RPCGeneralReqA3
(*RPCGeneralReqA4)(nil), // 24: RPCGeneralReqA4
(*RPCRTaskReq)(nil), // 25: RPCRTaskReq
(*anypb.Any)(nil), // 26: google.protobuf.Any
(ErrorCode)(0), // 27: ErrorCode
(*ServiceDBInfo)(nil), // 26: ServiceDBInfo
(*anypb.Any)(nil), // 27: google.protobuf.Any
(ErrorCode)(0), // 28: ErrorCode
}
var file_comm_proto_depIdxs = []int32{
26, // 0: UserMessage.data:type_name -> google.protobuf.Any
26, // 1: AgentMessage.Message:type_name -> google.protobuf.Any
27, // 2: RPCMessageReply.Code:type_name -> ErrorCode
26, // 3: RPCMessageReply.ErrorData:type_name -> google.protobuf.Any
27, // 0: UserMessage.data:type_name -> google.protobuf.Any
27, // 1: AgentMessage.Message:type_name -> google.protobuf.Any
28, // 2: RPCMessageReply.Code:type_name -> ErrorCode
27, // 3: RPCMessageReply.ErrorData:type_name -> google.protobuf.Any
1, // 4: RPCMessageReply.Reply:type_name -> UserMessage
1, // 5: AgentSendMessageReq.Reply:type_name -> UserMessage
26, // 6: BatchMessageReq.Data:type_name -> google.protobuf.Any
26, // 7: BroadCastMessageReq.Data:type_name -> google.protobuf.Any
27, // 6: BatchMessageReq.Data:type_name -> google.protobuf.Any
27, // 7: BroadCastMessageReq.Data:type_name -> google.protobuf.Any
8, // [8:8] is the sub-list for method output_type
8, // [8:8] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
@ -2149,6 +2320,18 @@ func file_comm_proto_init() {
return nil
}
}
file_comm_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ServiceDBInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -2156,7 +2339,7 @@ func file_comm_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_comm_proto_rawDesc,
NumEnums: 1,
NumMessages: 25,
NumMessages: 26,
NumExtensions: 0,
NumServices: 0,
},

View File

@ -298,8 +298,7 @@ func convertServiceSttings(config *comm.GameConfig, id int, stype string, ip str
sseting.Type = comm.Service_Gateway
sseting.Sys["rpcx"]["RpcxStartType"] = 1
sseting.Modules["gateway"] = map[string]interface{}{
"ListenPort": lport,
"SpanServiceTag": config.BelongCrossServerId,
"ListenPort": lport,
}
break
case comm.Service_Worker: //业务服务
@ -352,7 +351,7 @@ func convertServiceSttings(config *comm.GameConfig, id int, stype string, ip str
}
sseting.Sys["db"] = map[string]interface{}{
"IsCross": config.IsCross,
"CrossTag": config.BelongCrossServerId,
"CrossChannel": config.BelongCrossServerId,
"RedisIsCluster": config.LoaclDB.RedisIsCluster,
"RedisAddr": config.LoaclDB.RedisAddr,
"RedisPassword": config.LoaclDB.RedisPassword,

View File

@ -2,12 +2,15 @@ package main
import (
"flag"
"fmt"
"go_dreamfactory/modules/gateway"
"go_dreamfactory/services"
"go_dreamfactory/sys/db"
"go_dreamfactory/lego"
"go_dreamfactory/lego/base/rpcx"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/log"
)
/*
@ -45,4 +48,11 @@ type Service struct {
func (this *Service) InitSys() {
this.ServiceBase.InitSys()
//存储系统
if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil {
panic(fmt.Sprintf("init sys.db err: %s", err.Error()))
} else {
log.Infof("init sys.db success!")
}
}

View File

@ -140,7 +140,7 @@ func (this *Service) InitSys() {
log.Infof("init sys.timewheel success!")
}
//存储系统
if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil {
if err := db.OnInit(this.GetSettings().Sys["db"], db.SetServiceId(this.GetTag())); err != nil {
panic(fmt.Sprintf("init sys.db err: %s", err.Error()))
} else {
log.Infof("init sys.db success!")

View File

@ -1,13 +1,18 @@
package db
import (
"context"
"errors"
"fmt"
"go_dreamfactory/comm"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/utils/codec/json"
"go_dreamfactory/pb"
"io/ioutil"
"os"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func newSys(options *Options) (sys *DB, err error) {
@ -20,10 +25,11 @@ func newSys(options *Options) (sys *DB, err error) {
}
type DB struct {
options *Options
local *DBConn
cross *DBConn
servers map[string]*DBConn
options *Options
local *DBConn
cross *DBConn
servers map[string]*DBConn
crossTag string
}
func (this *DB) init() (err error) {
@ -58,44 +64,29 @@ func (this *DB) readercrossconf(path string) (err error) {
if err = json.Unmarshal(byteValue, &config); err != nil {
return
}
if cf, ok := config[this.options.CrossTag]; !ok {
err = fmt.Errorf("no found Crossconfig:%s", this.options.CrossTag)
if cf, ok := config[this.options.CrossChannel]; !ok {
err = fmt.Errorf("no found Crossconfig:%s", this.options.CrossChannel)
return
} else {
this.crossTag = cf.AreaId
if !this.options.IsCross {
if this.cross, err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: cf.LoaclDB.RedisIsCluster,
RedisAddr: cf.LoaclDB.RedisAddr,
RedisPassword: cf.LoaclDB.RedisPassword,
RedisDB: cf.LoaclDB.RedisDB,
MongodbUrl: cf.LoaclDB.MongodbUrl,
MongodbDatabase: cf.LoaclDB.MongodbDatabase,
RedisIsCluster: cf.RedisIsCluster,
RedisAddr: cf.RedisAddr,
RedisPassword: cf.RedisPassword,
RedisDB: cf.RedisDB,
MongodbUrl: cf.MongodbUrl,
MongodbDatabase: cf.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: cf.AreaId},
log.Field{Key: "db", Value: cf.LoaclDB},
log.Field{Key: "cf", Value: cf},
log.Field{Key: "err", Value: err.Error()},
)
return
}
} else {
for k, v := range cf.ServiceList {
if this.servers[k], err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: v.RedisIsCluster,
RedisAddr: v.RedisAddr,
RedisPassword: v.RedisPassword,
RedisDB: v.RedisDB,
MongodbUrl: v.MongodbUrl,
MongodbDatabase: v.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: cf.AreaId},
log.Field{Key: "db", Value: cf.LoaclDB},
log.Field{Key: "err", Value: err.Error()},
)
return
}
}
err = this.ConnectServiceList()
}
}
}
@ -104,52 +95,53 @@ func (this *DB) readercrossconf(path string) (err error) {
//同步服务列表
func (this *DB) SyncServiceList() (err error) {
config := make(comm.CrossConfigs, 0)
var (
jsonFile *os.File
byteValue []byte
)
if jsonFile, err = os.Open(this.options.CrossConfig); err != nil {
return
} else {
defer jsonFile.Close()
if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
return
}
if err = json.Unmarshal(byteValue, &config); err != nil {
return
}
if cf, ok := config[this.options.CrossTag]; !ok {
err = fmt.Errorf("no found Crossconfig:%s", this.options.CrossTag)
return
} else {
if this.options.IsCross {
for k, v := range cf.ServiceList {
if _, ok := this.servers[k]; !ok {
if this.servers[k], err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: v.RedisIsCluster,
RedisAddr: v.RedisAddr,
RedisPassword: v.RedisPassword,
RedisDB: v.RedisDB,
MongodbUrl: v.MongodbUrl,
MongodbDatabase: v.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: cf.AreaId},
log.Field{Key: "db", Value: cf.LoaclDB},
log.Field{Key: "err", Value: err.Error()},
)
return
}
}
if this.options.IsCross {
err = this.ConnectServiceList()
}
return
}
}
} else {
err = errors.New("curr service is not cross!")
func (this *DB) ConnectServiceList() (err error) {
if this.local == nil {
err = errors.New("LocalDBConn on init")
return
}
var (
c *mongo.Cursor
ss []*pb.ServiceDBInfo
)
if c, err = this.local.Mgo.Find("serverdata", bson.M{"cross": this.options.CrossChannel}); err != nil {
log.Errorf("ConnectServiceList err:%v", err)
} else {
ss = make([]*pb.ServiceDBInfo, 0)
for c.Next(context.Background()) {
temp := &pb.ServiceDBInfo{}
if err = c.Decode(temp); err == nil {
ss = append(ss, temp)
}
}
}
for _, v := range ss {
if _, ok := this.servers[v.Serverid]; !ok && v.Serverid != this.options.ServiceId {
if this.servers[v.Serverid], err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: v.RedisIsCluster,
RedisAddr: v.RedisAddr,
RedisPassword: v.RedisPassword,
RedisDB: int(v.RedisDb),
MongodbUrl: v.MongodbUrl,
MongodbDatabase: v.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: v.Serverid},
log.Field{Key: "db", Value: v},
log.Field{Key: "err", Value: err.Error()},
)
return
}
}
}
return
}
@ -160,11 +152,13 @@ func (this *DB) Local() (conn *DBConn, err error) {
}
return
}
func (this *DB) IsCross() bool {
return this.options.IsCross
}
func (this *DB) CrossTag() string {
return this.options.CrossTag
return this.crossTag
}
func (this *DB) Cross() (conn *DBConn, err error) {

View File

@ -18,8 +18,9 @@ type DBConfig struct {
type Option func(*Options)
type Options struct {
ServiceId string //服务id
IsCross bool //是否是跨服
CrossTag string //跨服区服id
CrossChannel string //跨服渠道
RedisIsCluster bool //是否是集群
RedisAddr []string //redis 的集群地址
RedisPassword string //redis的密码
@ -31,6 +32,11 @@ type Options struct {
Log log.ILogger
}
func SetServiceId(v string) Option {
return func(o *Options) {
o.ServiceId = v
}
}
func SetDebug(v bool) Option {
return func(o *Options) {
o.Debug = v