import asyncio import aiomysql import pymysql from loguru import logger from config import settings from update_data import LoadData class LoadPayData(LoadData): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.pay_mysql_conf = kwargs.get('pay_db_conf') self.game_name = kwargs.get('game_name') self.serverid_list = kwargs.get('serverid_list') async def get_data(self): # 获取充值数据 所有区服 self.gm_key = 'pay_' + self.pay_mysql_conf['host'] self.last_act_time = await self.get_kv(self.gm_key) try: pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, loop=asyncio.get_event_loop(), connect_timeout=settings.TIMEOUT) except Exception as e: logger.error(e) self.gm_key = None return [] cur = await pay_mysql.cursor(aiomysql.DictCursor) sql = "select * from paylist where state=2 and ctime>=%s" await cur.execute(sql, (self.last_act_time - 3600,)) rss = await cur.fetchall() for p in rss: if p.get('serverid') not in self.serverid_list: continue p['data'] = p['data2'] await cur.close() pay_mysql.close() logger.info(f'{self.game_name}查询到{len(rss)}条订单') return rss async def update_data(self, data): if not data: logger.info(f'{self.game_name}没有订单数据') return sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s" async with self.gm_db_pool.acquire() as conn: async with conn.cursor() as cur: for u in data: try: await cur.execute(sql, ( self.game_name, u['serverid'], u['uid'], u['orderid'], u['payorder'], u['money'], u['proid'], u['ctime'], u['data'], u['paytype'] )) await conn.commit() except pymysql.err.IntegrityError: logger.debug(f'{self.game_name} {u["orderid"]}订单已插入') except Exception as e: logger.error(e) self.gm_key = None