183 lines
7.5 KiB
Python
183 lines
7.5 KiB
Python
import time
|
|
|
|
from loguru import logger
|
|
|
|
from update_data import LoadData
|
|
|
|
|
|
class LoadUserData(LoadData):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.mongo_client = kwargs.get('mongo_client')
|
|
self.game_name = kwargs.get('game_name')
|
|
self.s_db_df = kwargs.get('s_db_df')
|
|
self.gm_mysql = None
|
|
self.channel_set = set()
|
|
self.owner_channel_set = set()
|
|
|
|
async def get_data(self) -> list:
|
|
"""
|
|
获取这组 登陆数据
|
|
:return: list
|
|
"""
|
|
user_info_data = []
|
|
|
|
try:
|
|
|
|
self.gm_key = self.game_name + '_user_' + self.mongo_client.address[0]
|
|
self.last_act_time = await self.get_kv(self.gm_key)
|
|
|
|
where = {
|
|
'logintime': {'$gte': self.last_act_time, '$lt': self.act_time}
|
|
}
|
|
projection = {
|
|
'_id': False,
|
|
'uid': True,
|
|
'name': True,
|
|
'logintime': True,
|
|
'lastloginip': True,
|
|
'ext_channel': True,
|
|
'ext_owner': True,
|
|
'binduid': True,
|
|
'lv': True,
|
|
'ctime': True,
|
|
'vip': True,
|
|
'sid': True
|
|
}
|
|
|
|
for s in self.s_db_df.T.to_dict().values():
|
|
sid = s['serverid']
|
|
db = s['db']
|
|
where['uid'] = {"$regex": "^" + str(sid) + "_"}
|
|
user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection)
|
|
data_list = await user_info_cursor.to_list(length=65535)
|
|
for user in data_list:
|
|
user['lasttime'] = user['logintime']
|
|
user['vip'] = user.get('vip', 0)
|
|
self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn')))
|
|
self.owner_channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'), user.get('ext_owner', 'unkonwn')))
|
|
user_info_data.extend(data_list)
|
|
logger.info(f'db:{db}')
|
|
logger.info(f'区服id:{sid}')
|
|
logger.info(f'更新数量:{len(data_list)}')
|
|
logger.info('-' * 50)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
self.gm_key = None
|
|
|
|
return user_info_data
|
|
|
|
async def update_data(self, users: list, try_num=3):
|
|
if not users:
|
|
return
|
|
async with self.gm_db_pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
# 记录 userinfo
|
|
sql_values = []
|
|
is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s'
|
|
role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s"
|
|
update_sql = """insert into `gm_game_user` (
|
|
game,
|
|
serverid,
|
|
binduid,
|
|
uid,
|
|
`name`,
|
|
owner,
|
|
channel,
|
|
ctime,
|
|
lv,
|
|
vip,
|
|
rolenum,
|
|
lastlogintime,
|
|
lastip
|
|
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
on duplicate key update
|
|
lv = values(lv),
|
|
`name` = values(`name`),
|
|
vip = values(vip),
|
|
lastlogintime = values(lastlogintime),
|
|
lastip = values(lastip),
|
|
game = values(game),
|
|
serverid = values(serverid),
|
|
uid = values(uid)
|
|
"""
|
|
for u in users:
|
|
# 判断角色是否存在
|
|
await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid']))
|
|
is_role = await cur.fetchone()
|
|
# 账号有多少个角色
|
|
await cur.execute(role_count_sql, (self.game_name, u['binduid']))
|
|
role_cont = await cur.fetchone()
|
|
role_cont = role_cont[0]
|
|
if not is_role:
|
|
role_cont += 1
|
|
sql_values.append(
|
|
(
|
|
self.game_name,
|
|
u['sid'],
|
|
u['binduid'],
|
|
u['uid'],
|
|
u['name'],
|
|
u.get('ext_owner', 'unkonwn'),
|
|
u.get('ext_channel', 'ext_channel'),
|
|
u['ctime'],
|
|
u['lv'],
|
|
u['vip'],
|
|
role_cont,
|
|
int(time.time()),
|
|
u.get('lastloginip', '')
|
|
)
|
|
)
|
|
await cur.executemany(update_sql, sql_values)
|
|
await conn.commit()
|
|
|
|
# 记录渠道
|
|
update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s)
|
|
on duplicate key update
|
|
game=values(game),
|
|
channel=values(channel)
|
|
"""
|
|
await cur.executemany(update_channel_sql, self.channel_set)
|
|
await conn.commit()
|
|
|
|
# 记录发行和渠道
|
|
update_owner_channel_sql = """insert into `gm_owner_channel` (game,channel,owner) values (%s,%s,%s)
|
|
on duplicate key update
|
|
game=values(game),
|
|
channel=values(channel),
|
|
owner=values(owner)
|
|
"""
|
|
await cur.executemany(update_owner_channel_sql, self.owner_channel_set)
|
|
await conn.commit()
|
|
|
|
# 记录登录
|
|
sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s"
|
|
for u in users:
|
|
date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime'])))
|
|
await cur.execute(sql, [
|
|
self.game_name,
|
|
u['sid'],
|
|
u['uid'],
|
|
date
|
|
])
|
|
is_login = await cur.fetchone()
|
|
if not is_login:
|
|
sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s"
|
|
await cur.execute(sql1, [
|
|
self.game_name,
|
|
u['sid'],
|
|
u['uid'],
|
|
date,
|
|
1
|
|
])
|
|
await conn.commit()
|
|
else:
|
|
sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s"
|
|
await cur.execute(sql1, [
|
|
self.game_name,
|
|
u['sid'],
|
|
u['uid'],
|
|
date
|
|
])
|
|
await conn.commit()
|