221 lines
8.5 KiB
Python
221 lines
8.5 KiB
Python
import asyncio
|
||
import time
|
||
|
||
import aiomysql
|
||
import abc
|
||
|
||
|
||
class LoadData(metaclass=abc.ABCMeta):
|
||
|
||
@abc.abstractmethod
|
||
async def get_data(self):
|
||
pass
|
||
|
||
@abc.abstractmethod
|
||
async def update_data(self, data):
|
||
pass
|
||
|
||
async def run(self):
|
||
data = await self.get_data()
|
||
await self.update_data(data)
|
||
|
||
|
||
class LoadUserData(LoadData):
|
||
def __init__(self, *args, **kwargs):
|
||
self.mongo_client = kwargs.get('mongo_client')
|
||
self.gm_db_pool = kwargs.get('gm_db_pool')
|
||
self.game_name = kwargs.get('game_name')
|
||
self.s_db_df = kwargs.get('s_db_df')
|
||
self.gm_mysql = None
|
||
self.channel_set = set()
|
||
|
||
async def get_data(self) -> list:
|
||
"""
|
||
获取这组 登陆数据
|
||
:return: list
|
||
"""
|
||
|
||
last_act_time = int(time.time()) - 6000 # todo 测试
|
||
where = {
|
||
'logintime': {'$gt': last_act_time}
|
||
}
|
||
projection = {
|
||
'_id': False,
|
||
'uid': True,
|
||
'name': True,
|
||
'logintime': True,
|
||
'ext_channel': True,
|
||
'ext_owner': True,
|
||
'binduid': True,
|
||
'lv': True,
|
||
'ctime': True,
|
||
'vip': True,
|
||
'sid': True
|
||
}
|
||
user_info_data = []
|
||
for s in self.s_db_df.T.to_dict().values():
|
||
|
||
sid = s['serverid']
|
||
db = s['db']
|
||
where['uid'] = {"$regex": "^" + str(sid) + "_"}
|
||
user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection)
|
||
data_list = await user_info_cursor.to_list(length=65535)
|
||
for user in data_list:
|
||
user['lasttime'] = user['logintime']
|
||
user['vip'] = user.get('vip', 0)
|
||
self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn')))
|
||
user_info_data.extend(data_list)
|
||
print(f'db:{s["db"]}')
|
||
print(f'区服id:{s["serverid"]}')
|
||
print(f'更新数量:{len(data_list)}')
|
||
print('-' * 50)
|
||
|
||
return user_info_data
|
||
|
||
async def update_data(self, users: list, try_num=3):
|
||
if not users:
|
||
return
|
||
async with self.gm_db_pool.acquire() as conn:
|
||
async with conn.cursor() as cur:
|
||
# 记录 userinfo
|
||
sql_values = []
|
||
is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s'
|
||
role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s"
|
||
update_sql = """insert into `gm_game_user` (
|
||
game,
|
||
serverid,
|
||
binduid,
|
||
uid,
|
||
`name`,
|
||
owner,
|
||
channel,
|
||
ctime,
|
||
lv,
|
||
vip,
|
||
rolenum,
|
||
lastlogintime,
|
||
lastip
|
||
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
||
on duplicate key update
|
||
lv = values(lv),
|
||
`name` = values(`name`),
|
||
vip = values(vip),
|
||
lastlogintime = values(lastlogintime),
|
||
lastip = values(lastip),
|
||
game = values(game),
|
||
serverid = values(serverid),
|
||
uid = values(uid)
|
||
"""
|
||
for u in users:
|
||
# 判断角色是否存在
|
||
await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid']))
|
||
is_role = await cur.fetchone()
|
||
# 账号有多少个角色
|
||
await cur.execute(role_count_sql, (self.game_name, u['binduid']))
|
||
role_cont = await cur.fetchone()
|
||
role_cont = role_cont[0]
|
||
if not is_role:
|
||
role_cont += 1
|
||
sql_values.append(
|
||
(
|
||
self.game_name,
|
||
u['sid'],
|
||
u['binduid'],
|
||
u['uid'],
|
||
u['name'],
|
||
u.get('ext_owner', 'unkonwn'),
|
||
u.get('ext_channel', 'ext_channel'),
|
||
u['ctime'],
|
||
u['lv'],
|
||
u['vip'],
|
||
role_cont,
|
||
int(time.time()),
|
||
u.get('lastloginip', '')
|
||
)
|
||
)
|
||
await cur.executemany(update_sql, sql_values)
|
||
await conn.commit()
|
||
|
||
# 记录渠道
|
||
update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s)
|
||
on duplicate key update
|
||
game=values(game),
|
||
channel=values(channel)
|
||
"""
|
||
await cur.executemany(update_channel_sql, self.channel_set)
|
||
await conn.commit()
|
||
|
||
# 记录登录 TODO 1天内每次更新都加1???
|
||
sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s"
|
||
for u in users:
|
||
date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime'])))
|
||
await cur.execute(sql, [
|
||
self.game_name,
|
||
u['sid'],
|
||
u['uid'],
|
||
date
|
||
])
|
||
is_login = await cur.fetchone()
|
||
if not is_login:
|
||
sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s"
|
||
await cur.execute(sql1, [
|
||
self.game_name,
|
||
u['sid'],
|
||
u['uid'],
|
||
date,
|
||
1
|
||
])
|
||
await conn.commit()
|
||
else:
|
||
sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s"
|
||
await cur.execute(sql1, [
|
||
self.game_name,
|
||
u['sid'],
|
||
u['uid'],
|
||
date
|
||
])
|
||
await conn.commit()
|
||
|
||
|
||
class LoadPayData(LoadData):
|
||
def __init__(self, *args, **kwargs):
|
||
self.pay_mysql_conf = kwargs.get('pay_db_conf')
|
||
self.gm_db_pool = kwargs.get('gm_db_pool')
|
||
self.game_name = kwargs.get('game_name')
|
||
self.lastActTime = int(time.time()) # todo 测试用
|
||
|
||
async def get_data(self):
|
||
# 获取充值数据 所有区服
|
||
pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, loop=asyncio.get_event_loop())
|
||
cur = await pay_mysql.cursor(aiomysql.DictCursor)
|
||
sql = "select * from paylist where state=2 and ctime>=%s"
|
||
await cur.execute(sql, (self.lastActTime - 3600,))
|
||
rss = await cur.fetchall()
|
||
for p in rss:
|
||
p['data'] = p['data2']
|
||
await cur.close()
|
||
pay_mysql.close()
|
||
return rss
|
||
|
||
async def update_data(self, data):
|
||
sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s"
|
||
async with self.gm_db_pool.acquire() as conn:
|
||
async with conn.cursor() as cur:
|
||
for u in data:
|
||
try:
|
||
await cur.execute(sql, (
|
||
self.game_name,
|
||
u['serverid'],
|
||
u['uid'],
|
||
u['orderid'],
|
||
u['payorder'],
|
||
u['money'],
|
||
u['proid'],
|
||
u['ctime'],
|
||
u['data'],
|
||
u['paytype']
|
||
))
|
||
await conn.commit()
|
||
except Exception as e:
|
||
print(e)
|