go_dreamfactory/sys/db/db.go
2023-12-15 11:56:30 +08:00

235 lines
5.9 KiB
Go

package db
import (
"context"
"errors"
"fmt"
"go_dreamfactory/comm"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/sys/mgo"
"go_dreamfactory/lego/utils/codec/json"
"go_dreamfactory/pb"
"io/ioutil"
"os"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func newSys(options *Options) (sys *DB, err error) {
sys = &DB{
options: options,
servers: make(map[string]*DBConn),
}
sys.init()
return
}
type DB struct {
options *Options
local *DBConn
cross *DBConn
servers map[string]*DBConn
crossTag string
}
func (this *DB) init() (err error) {
if this.local, err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: this.options.RedisIsCluster,
RedisAddr: this.options.RedisAddr,
RedisPassword: this.options.RedisPassword,
RedisDB: this.options.RedisDB,
MongodbUrl: this.options.MongodbUrl,
MongodbDatabase: this.options.MongodbDatabase,
}); err != nil {
return
}
err = this.readercrossconf(this.options.CrossConfig)
return
}
// 读取游戏配置文件
func (this *DB) readercrossconf(path string) (err error) {
config := make(comm.CrossConfigs, 0)
var (
jsonFile *os.File
byteValue []byte
)
if jsonFile, err = os.Open(path); err != nil {
return
} else {
defer jsonFile.Close()
if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
return
}
if err = json.Unmarshal(byteValue, &config); err != nil {
return
}
if cf, ok := config[this.options.CrossChannel]; !ok {
err = fmt.Errorf("no found Crossconfig:%s", this.options.CrossChannel)
return
} else {
this.crossTag = cf.AreaId
if !this.options.IsCross {
if this.cross, err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: cf.RedisIsCluster,
RedisAddr: cf.RedisAddr,
RedisPassword: cf.RedisPassword,
RedisDB: cf.RedisDB,
MongodbUrl: cf.MongodbUrl,
MongodbDatabase: cf.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: cf.AreaId},
log.Field{Key: "cf", Value: cf},
log.Field{Key: "err", Value: err.Error()},
)
return
}
err = this.RegiestServerDBConn(&pb.ServiceDBInfo{
Serverid: this.options.ServiceId,
ServerName: this.options.ServiceId,
Owner: this.options.CrossChannel,
Cross: this.options.CrossChannel,
CrossId: this.options.CrossChannel,
RedisIsCluster: this.options.RedisIsCluster,
RedisAddr: this.options.RedisAddr,
RedisPassword: this.options.RedisPassword,
RedisDb: int32(this.options.RedisDB),
MongodbUrl: this.options.MongodbUrl,
MongodbDatabase: this.options.MongodbDatabase,
})
} else {
err = this.ConnectServiceList()
}
}
}
return
}
// 同步服务列表
func (this *DB) SyncServiceList() (err error) {
if this.options.IsCross {
err = this.ConnectServiceList()
}
return
}
func (this *DB) ConnectServiceList() (err error) {
if this.local == nil {
err = errors.New("LocalDBConn on init")
return
}
var (
c *mongo.Cursor
ss []*pb.ServiceDBInfo
)
if c, err = this.local.Mgo.Find("serverdata", bson.M{"cross": this.options.CrossChannel}); err != nil {
log.Errorf("ConnectServiceList err:%v", err)
} else {
ss = make([]*pb.ServiceDBInfo, 0)
for c.Next(context.Background()) {
temp := &pb.ServiceDBInfo{}
if err = c.Decode(temp); err == nil {
ss = append(ss, temp)
}
}
}
for _, v := range ss {
if _, ok := this.servers[v.Serverid]; !ok && v.Serverid != this.options.ServiceId {
if this.servers[v.Serverid], err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: v.RedisIsCluster,
RedisAddr: v.RedisAddr,
RedisPassword: v.RedisPassword,
RedisDB: int(v.RedisDb),
MongodbUrl: v.MongodbUrl,
MongodbDatabase: v.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: v.Serverid},
log.Field{Key: "db", Value: v},
log.Field{Key: "err", Value: err.Error()},
)
return
}
}
}
return
}
func (this *DB) Local() (conn *DBConn, err error) {
conn = this.local
if conn == nil {
err = errors.New("LocalDBConn on init")
}
return
}
func (this *DB) IsCross() bool {
return this.options.IsCross
}
func (this *DB) CrossTag() string {
return this.crossTag
}
func (this *DB) Cross() (conn *DBConn, err error) {
conn = this.cross
if conn == nil {
err = errors.New("CrossDBConn on init")
}
return
}
func (this *DB) ServerDBConn(stage string) (conn *DBConn, err error) {
ok := false
conn, ok = this.servers[stage]
if !ok {
conn, err = this.ConnectServerDBConn(stage)
}
return
}
func (this *DB) GetServerTags() []string {
keys := make([]string, 0)
for k, _ := range this.servers {
keys = append(keys, k)
}
return keys
}
//链接到目标服务数据对象
func (this *DB) ConnectServerDBConn(stage string) (conn *DBConn, err error) {
temp := &pb.ServiceDBInfo{}
if err = this.local.Mgo.FindOne("serverdata", bson.M{"cross": this.options.CrossChannel, "serverid": stage}).Decode(temp); err != nil {
return
}
if this.servers[temp.Serverid], err = newDBConn(this.options.Log, DBConfig{
RedisIsCluster: temp.RedisIsCluster,
RedisAddr: temp.RedisAddr,
RedisPassword: temp.RedisPassword,
RedisDB: int(temp.RedisDb),
MongodbUrl: temp.MongodbUrl,
MongodbDatabase: temp.MongodbDatabase,
}); err != nil {
log.Error("comment db err!",
log.Field{Key: "stag", Value: temp.Serverid},
log.Field{Key: "db", Value: temp},
log.Field{Key: "err", Value: err.Error()},
)
return
}
conn = this.servers[temp.Serverid]
return
}
//注册本服数据到跨服
func (this *DB) RegiestServerDBConn(info *pb.ServiceDBInfo) (err error) {
if this.cross.Mgo.FindOne("serverdata", bson.M{"cross": this.options.CrossChannel, "serverid": info.Serverid}).Err() == mgo.MongodbNil {
_, err = this.cross.Mgo.InsertOne("serverdata", info)
}
return
}