sync_game_data/update_data/update_user.py
2020-09-16 14:11:34 +08:00

183 lines
7.5 KiB
Python

import time
from loguru import logger
from update_data import LoadData
class LoadUserData(LoadData):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mongo_client = kwargs.get('mongo_client')
self.game_name = kwargs.get('game_name')
self.s_db_df = kwargs.get('s_db_df')
self.gm_mysql = None
self.channel_set = set()
self.owner_channel_set = set()
async def get_data(self) -> list:
"""
获取这组 登陆数据
:return: list
"""
user_info_data = []
try:
self.gm_key = self.game_name + '_user_' + self.mongo_client.address[0]
self.last_act_time = await self.get_kv(self.gm_key)
where = {
'logintime': {'$gte': self.last_act_time, '$lt': self.act_time}
}
projection = {
'_id': False,
'uid': True,
'name': True,
'logintime': True,
'lastloginip': True,
'ext_channel': True,
'ext_owner': True,
'binduid': True,
'lv': True,
'ctime': True,
'vip': True,
'sid': True
}
for s in self.s_db_df.T.to_dict().values():
sid = s['serverid']
db = s['db']
where['uid'] = {"$regex": "^" + str(sid) + "_"}
user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection)
data_list = await user_info_cursor.to_list(length=65535)
for user in data_list:
user['lasttime'] = user['logintime']
user['vip'] = user.get('vip', 0)
self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn')))
self.owner_channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'), user.get('ext_owner', 'unkonwn')))
user_info_data.extend(data_list)
logger.info(f'db:{db}')
logger.info(f'区服id:{sid}')
logger.info(f'更新数量:{len(data_list)}')
logger.info('-' * 50)
except Exception as e:
logger.error(e)
self.gm_key = None
return user_info_data
async def update_data(self, users: list, try_num=3):
if not users:
return
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
# 记录 userinfo
sql_values = []
is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s'
role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s"
update_sql = """insert into `gm_game_user` (
game,
serverid,
binduid,
uid,
`name`,
owner,
channel,
ctime,
lv,
vip,
rolenum,
lastlogintime,
lastip
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
on duplicate key update
lv = values(lv),
`name` = values(`name`),
vip = values(vip),
lastlogintime = values(lastlogintime),
lastip = values(lastip),
game = values(game),
serverid = values(serverid),
uid = values(uid)
"""
for u in users:
# 判断角色是否存在
await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid']))
is_role = await cur.fetchone()
# 账号有多少个角色
await cur.execute(role_count_sql, (self.game_name, u['binduid']))
role_cont = await cur.fetchone()
role_cont = role_cont[0]
if not is_role:
role_cont += 1
sql_values.append(
(
self.game_name,
u['sid'],
u['binduid'],
u['uid'],
u['name'],
u.get('ext_owner', 'unkonwn'),
u.get('ext_channel', 'ext_channel'),
u['ctime'],
u['lv'],
u['vip'],
role_cont,
int(time.time()),
u.get('lastloginip', '')
)
)
await cur.executemany(update_sql, sql_values)
await conn.commit()
# 记录渠道
update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s)
on duplicate key update
game=values(game),
channel=values(channel)
"""
await cur.executemany(update_channel_sql, self.channel_set)
await conn.commit()
# 记录发行和渠道
update_owner_channel_sql = """insert into `gm_owner_channel` (game,channel,owner) values (%s,%s,%s)
on duplicate key update
game=values(game),
channel=values(channel),
owner=values(owner)
"""
await cur.executemany(update_owner_channel_sql, self.owner_channel_set)
await conn.commit()
# 记录登录
sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s"
for u in users:
date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime'])))
await cur.execute(sql, [
self.game_name,
u['sid'],
u['uid'],
date
])
is_login = await cur.fetchone()
if not is_login:
sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date,
1
])
await conn.commit()
else:
sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date
])
await conn.commit()