import time from loguru import logger from update_data import LoadData class LoadUserData(LoadData): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.mongo_client = kwargs.get('mongo_client') self.game_name = kwargs.get('game_name') self.s_db_df = kwargs.get('s_db_df') self.gm_mysql = None self.channel_set = set() self.owner_channel_set = set() async def get_data(self) -> list: """ 获取这组 登陆数据 :return: list """ user_info_data = [] try: self.gm_key = self.game_name + '_user_' + self.mongo_client.address[0] self.last_act_time = await self.get_kv(self.gm_key) where = { 'logintime': {'$gte': self.last_act_time, '$lt': self.act_time} } projection = { '_id': False, 'uid': True, 'name': True, 'logintime': True, 'lastloginip': True, 'ext_channel': True, 'ext_owner': True, 'binduid': True, 'lv': True, 'ctime': True, 'vip': True, 'sid': True } for s in self.s_db_df.T.to_dict().values(): sid = s['serverid'] db = s['db'] where['uid'] = {"$regex": "^" + str(sid) + "_"} user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection) data_list = await user_info_cursor.to_list(length=65535) for user in data_list: user['lasttime'] = user['logintime'] user['vip'] = user.get('vip', 0) self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'))) self.owner_channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'), user.get('ext_owner', 'unkonwn'))) user_info_data.extend(data_list) logger.info(f'db:{db}') logger.info(f'区服id:{sid}') logger.info(f'更新数量:{len(data_list)}') logger.info('-' * 50) except Exception as e: logger.error(e) self.gm_key = None return user_info_data async def update_data(self, users: list, try_num=3): if not users: return async with self.gm_db_pool.acquire() as conn: async with conn.cursor() as cur: # 记录 userinfo sql_values = [] is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s' role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s" update_sql = """insert into `gm_game_user` ( game, serverid, binduid, uid, `name`, owner, channel, ctime, lv, vip, rolenum, lastlogintime, lastip ) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) on duplicate key update lv = values(lv), `name` = values(`name`), vip = values(vip), lastlogintime = values(lastlogintime), lastip = values(lastip), game = values(game), serverid = values(serverid), uid = values(uid) """ for u in users: # 判断角色是否存在 await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid'])) is_role = await cur.fetchone() # 账号有多少个角色 await cur.execute(role_count_sql, (self.game_name, u['binduid'])) role_cont = await cur.fetchone() role_cont = role_cont[0] if not is_role: role_cont += 1 sql_values.append( ( self.game_name, u['sid'], u['binduid'], u['uid'], u['name'], u.get('ext_owner', 'unkonwn'), u.get('ext_channel', 'ext_channel'), u['ctime'], u['lv'], u['vip'], role_cont, int(time.time()), u.get('lastloginip', '') ) ) await cur.executemany(update_sql, sql_values) await conn.commit() # 记录渠道 update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s) on duplicate key update game=values(game), channel=values(channel) """ await cur.executemany(update_channel_sql, self.channel_set) await conn.commit() # 记录发行和渠道 update_owner_channel_sql = """insert into `gm_owner_channel` (game,channel,owner) values (%s,%s,%s) on duplicate key update game=values(game), channel=values(channel), owner=values(owner) """ await cur.executemany(update_owner_channel_sql, self.owner_channel_set) await conn.commit() # 记录登录 sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s" for u in users: date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime']))) await cur.execute(sql, [ self.game_name, u['sid'], u['uid'], date ]) is_login = await cur.fetchone() if not is_login: sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s" await cur.execute(sql1, [ self.game_name, u['sid'], u['uid'], date, 1 ]) await conn.commit() else: sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s" await cur.execute(sql1, [ self.game_name, u['sid'], u['uid'], date ]) await conn.commit()