sync_game_data/load_data.py.bak
2020-09-12 18:03:05 +08:00

221 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import time
import aiomysql
import abc
class LoadData(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def get_data(self):
pass
@abc.abstractmethod
async def update_data(self, data):
pass
async def run(self):
data = await self.get_data()
await self.update_data(data)
class LoadUserData(LoadData):
def __init__(self, *args, **kwargs):
self.mongo_client = kwargs.get('mongo_client')
self.gm_db_pool = kwargs.get('gm_db_pool')
self.game_name = kwargs.get('game_name')
self.s_db_df = kwargs.get('s_db_df')
self.gm_mysql = None
self.channel_set = set()
async def get_data(self) -> list:
"""
获取这组 登陆数据
:return: list
"""
last_act_time = int(time.time()) - 6000 # todo 测试
where = {
'logintime': {'$gt': last_act_time}
}
projection = {
'_id': False,
'uid': True,
'name': True,
'logintime': True,
'ext_channel': True,
'ext_owner': True,
'binduid': True,
'lv': True,
'ctime': True,
'vip': True,
'sid': True
}
user_info_data = []
for s in self.s_db_df.T.to_dict().values():
sid = s['serverid']
db = s['db']
where['uid'] = {"$regex": "^" + str(sid) + "_"}
user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection)
data_list = await user_info_cursor.to_list(length=65535)
for user in data_list:
user['lasttime'] = user['logintime']
user['vip'] = user.get('vip', 0)
self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn')))
user_info_data.extend(data_list)
print(f'db:{s["db"]}')
print(f'区服id:{s["serverid"]}')
print(f'更新数量:{len(data_list)}')
print('-' * 50)
return user_info_data
async def update_data(self, users: list, try_num=3):
if not users:
return
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
# 记录 userinfo
sql_values = []
is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s'
role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s"
update_sql = """insert into `gm_game_user` (
game,
serverid,
binduid,
uid,
`name`,
owner,
channel,
ctime,
lv,
vip,
rolenum,
lastlogintime,
lastip
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
on duplicate key update
lv = values(lv),
`name` = values(`name`),
vip = values(vip),
lastlogintime = values(lastlogintime),
lastip = values(lastip),
game = values(game),
serverid = values(serverid),
uid = values(uid)
"""
for u in users:
# 判断角色是否存在
await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid']))
is_role = await cur.fetchone()
# 账号有多少个角色
await cur.execute(role_count_sql, (self.game_name, u['binduid']))
role_cont = await cur.fetchone()
role_cont = role_cont[0]
if not is_role:
role_cont += 1
sql_values.append(
(
self.game_name,
u['sid'],
u['binduid'],
u['uid'],
u['name'],
u.get('ext_owner', 'unkonwn'),
u.get('ext_channel', 'ext_channel'),
u['ctime'],
u['lv'],
u['vip'],
role_cont,
int(time.time()),
u.get('lastloginip', '')
)
)
await cur.executemany(update_sql, sql_values)
await conn.commit()
# 记录渠道
update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s)
on duplicate key update
game=values(game),
channel=values(channel)
"""
await cur.executemany(update_channel_sql, self.channel_set)
await conn.commit()
# 记录登录 TODO 1天内每次更新都加1
sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s"
for u in users:
date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime'])))
await cur.execute(sql, [
self.game_name,
u['sid'],
u['uid'],
date
])
is_login = await cur.fetchone()
if not is_login:
sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date,
1
])
await conn.commit()
else:
sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date
])
await conn.commit()
class LoadPayData(LoadData):
def __init__(self, *args, **kwargs):
self.pay_mysql_conf = kwargs.get('pay_db_conf')
self.gm_db_pool = kwargs.get('gm_db_pool')
self.game_name = kwargs.get('game_name')
self.lastActTime = int(time.time()) # todo 测试用
async def get_data(self):
# 获取充值数据 所有区服
pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, loop=asyncio.get_event_loop())
cur = await pay_mysql.cursor(aiomysql.DictCursor)
sql = "select * from paylist where state=2 and ctime>=%s"
await cur.execute(sql, (self.lastActTime - 3600,))
rss = await cur.fetchall()
for p in rss:
p['data'] = p['data2']
await cur.close()
pay_mysql.close()
return rss
async def update_data(self, data):
sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s"
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
for u in data:
try:
await cur.execute(sql, (
self.game_name,
u['serverid'],
u['uid'],
u['orderid'],
u['payorder'],
u['money'],
u['proid'],
u['ctime'],
u['data'],
u['paytype']
))
await conn.commit()
except Exception as e:
print(e)