import asyncio import time import aiomysql import abc class LoadData(metaclass=abc.ABCMeta): @abc.abstractmethod async def get_data(self): pass @abc.abstractmethod async def update_data(self, data): pass async def run(self): data = await self.get_data() await self.update_data(data) class LoadUserData(LoadData): def __init__(self, *args, **kwargs): self.mongo_client = kwargs.get('mongo_client') self.gm_db_pool = kwargs.get('gm_db_pool') self.game_name = kwargs.get('game_name') self.s_db_df = kwargs.get('s_db_df') self.gm_mysql = None self.channel_set = set() async def get_data(self) -> list: """ 获取这组 登陆数据 :return: list """ last_act_time = int(time.time()) - 6000 # todo 测试 where = { 'logintime': {'$gt': last_act_time} } projection = { '_id': False, 'uid': True, 'name': True, 'logintime': True, 'ext_channel': True, 'ext_owner': True, 'binduid': True, 'lv': True, 'ctime': True, 'vip': True, 'sid': True } user_info_data = [] for s in self.s_db_df.T.to_dict().values(): sid = s['serverid'] db = s['db'] where['uid'] = {"$regex": "^" + str(sid) + "_"} user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection) data_list = await user_info_cursor.to_list(length=65535) for user in data_list: user['lasttime'] = user['logintime'] user['vip'] = user.get('vip', 0) self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'))) user_info_data.extend(data_list) print(f'db:{s["db"]}') print(f'区服id:{s["serverid"]}') print(f'更新数量:{len(data_list)}') print('-' * 50) return user_info_data async def update_data(self, users: list, try_num=3): if not users: return async with self.gm_db_pool.acquire() as conn: async with conn.cursor() as cur: # 记录 userinfo sql_values = [] is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s' role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s" update_sql = """insert into `gm_game_user` ( game, serverid, binduid, uid, `name`, owner, channel, ctime, lv, vip, rolenum, lastlogintime, lastip ) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) on duplicate key update lv = values(lv), `name` = values(`name`), vip = values(vip), lastlogintime = values(lastlogintime), lastip = values(lastip), game = values(game), serverid = values(serverid), uid = values(uid) """ for u in users: # 判断角色是否存在 await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid'])) is_role = await cur.fetchone() # 账号有多少个角色 await cur.execute(role_count_sql, (self.game_name, u['binduid'])) role_cont = await cur.fetchone() role_cont = role_cont[0] if not is_role: role_cont += 1 sql_values.append( ( self.game_name, u['sid'], u['binduid'], u['uid'], u['name'], u.get('ext_owner', 'unkonwn'), u.get('ext_channel', 'ext_channel'), u['ctime'], u['lv'], u['vip'], role_cont, int(time.time()), u.get('lastloginip', '') ) ) await cur.executemany(update_sql, sql_values) await conn.commit() # 记录渠道 update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s) on duplicate key update game=values(game), channel=values(channel) """ await cur.executemany(update_channel_sql, self.channel_set) await conn.commit() # 记录登录 TODO 1天内每次更新都加1??? sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s" for u in users: date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime']))) await cur.execute(sql, [ self.game_name, u['sid'], u['uid'], date ]) is_login = await cur.fetchone() if not is_login: sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s" await cur.execute(sql1, [ self.game_name, u['sid'], u['uid'], date, 1 ]) await conn.commit() else: sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s" await cur.execute(sql1, [ self.game_name, u['sid'], u['uid'], date ]) await conn.commit() class LoadPayData(LoadData): def __init__(self, *args, **kwargs): self.pay_mysql_conf = kwargs.get('pay_db_conf') self.gm_db_pool = kwargs.get('gm_db_pool') self.game_name = kwargs.get('game_name') self.lastActTime = int(time.time()) # todo 测试用 async def get_data(self): # 获取充值数据 所有区服 pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, loop=asyncio.get_event_loop()) cur = await pay_mysql.cursor(aiomysql.DictCursor) sql = "select * from paylist where state=2 and ctime>=%s" await cur.execute(sql, (self.lastActTime - 3600,)) rss = await cur.fetchall() for p in rss: p['data'] = p['data2'] await cur.close() pay_mysql.close() return rss async def update_data(self, data): sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s" async with self.gm_db_pool.acquire() as conn: async with conn.cursor() as cur: for u in data: try: await cur.execute(sql, ( self.game_name, u['serverid'], u['uid'], u['orderid'], u['payorder'], u['money'], u['proid'], u['ctime'], u['data'], u['paytype'] )) await conn.commit() except Exception as e: print(e)