commit aba308cf5b707ec10965a15ba1b4d6b21d0401b3 Author: wuhao <15392746632@qq.com> Date: Sat Sep 12 18:03:05 2020 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7db61e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,104 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..73f69e0 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/deployment.xml b/.idea/deployment.xml new file mode 100644 index 0000000..831a06a --- /dev/null +++ b/.idea/deployment.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..ba506d2 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,22 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..8f12866 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..d221a0f --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sync_game_data.iml b/.idea/sync_game_data.iml new file mode 100644 index 0000000..2eecd03 --- /dev/null +++ b/.idea/sync_game_data.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..b44b201 --- /dev/null +++ b/config.py @@ -0,0 +1,331 @@ +import os + +from loguru import logger + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +logger.add(os.path.join(BASE_DIR, 'log.log'), format="{time} {level} {name}:{line} {message}", level="INFO", + rotation="50 MB", retention='7 days', + enqueue=True) + + +class Config: + # 国外服务器连接时间较长 不要低于3s + TIMEOUT_MS = 5000 # 5秒 + TIMEOUT = 5 # 5秒 + KV_PREFIX = 'lastActTime_' + SERVER_LIST_URL = "http://gametools.legu.cc/?app=api&act=getServerList&showdb=1" + GAME_MANA_MYSQLDB = { + "host": "10.0.0.5", + "port": 3306, + "user": "root", + "password": "87251326", + "db": "gamemana" + } + + PAY_DB = { + 'zhengba': { + "host": "zhengbaapi.legu.cc", + "port": 3306, + "user": "root", + "password": "SH9Pjrcr52ZJJhDT", + "db": "zhengbadata" + }, + 'heros': { + "host": "homm1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "hommdata" + }, + 'fengshen': { + "host": "fs1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "fsdata" + }, + 'geshouh5': { + "host": "134.175.135.230", + "port": 3306, + "user": "root", + "password": "v4rNd4aeMMftNGf", + "db": "mzgeshou" + }, + 'geshouccs': { + "host": "gsccsapi.legu.cc", + "port": 3306, + "user": "root", + "password": "v4rNd4aeMMftNGf", + "db": "gsccsdata" + }, + 'junshibig5': { + "host": "junshibig5.legu.cc", + "port": 3306, + "user": "root", + "password": "i0OBWUsLlLiM", + "db": "jsbig5data" + }, + 'xiaoying': { + "host": "106.52.175.193", + "port": 3306, + "user": "root", + "password": "KyQCGS9UIrgs", + "db": "yingdata" + }, + 'xiaoyingmicro': { + "host": "ying.legu.cc", + "port": 3306, + "user": "root", + "password": "2lRZPbspGI5p", + "db": "yingdata" + }, + 'legusg': { + "host": "homm1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "legusgdata" + }, + 'legujs': { + "host": "39.104.110.225", + "port": 3306, + "user": "root", + "password": "PFxqdwDmPhvCUDDe", + "db": "legujsdata" + }, + 'daqin': { + "host": "daqinapi.legu.cc", + "port": 3306, + "user": "root", + "password": "XZKcs6eG4YMFFPqM", + "db": "daqin" + }, + 'mzmfmh5': { + "host": "mzhomms.legu.cc", + "port": 3306, + "user": "root", + "password": "jm8dKqsXcxcP42Cd", + "db": "mzhero" + }, + 'mth5': { + "host": "v3.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "mth5data" + }, + 'xpet': { + "host": "xpetapi.weirongwl.com", + "port": 3306, + "user": "root", + "password": "a5hsSZU8ELb77f64", + "db": "xpetdata" + }, + 'shaihai': { + "host": "42.194.158.84", + "port": 3306, + "user": "root", + "password": "EawHvVBmSxhbPKJX", + "db": "shanhaidata" + }, + 'wow': { + "host": "wow1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "wowdata" + }, + 'gods': { + "host": "god1.legu.cc", + "port": 3306, + "user": "root", + "password": "TwmFYZcLhNN3vvK6", + "db": "goddata" + }, + 'jiushen': { + "host": "jiushenapi.legu.cc", + "port": 3306, + "user": "root", + "password": "9XqfSduK6Wx7359S", + "db": "jiushendata" + }, + 'sanguo': { + "host": "sanguo1.legu.cc", + "port": 3306, + "user": "root", + "password": "mQaDz8wcvEaKEZAY", + "db": "sgdata" + }, + 'tank': { + "host": "tankapi.legu.cc", + "port": 3306, + "user": "root", + "password": "lG1x4cJWXyIm", + "db": "tankdata" + }, + 'mt': { + "host": "mtapi.legu.cc", + "port": 3306, + "user": "root", + "password": "juP4jBaq9VMGVH9F", + "db": "mtdata" + }, + 'zhengbaen': { + "host": "fbenapi.legu.cc", + "port": 3306, + "user": "root", + "password": "HjVMBrYzpfhxQcGy", + "db": "fbendata" + }, + 'zhengbavn': { + "host": "fbvnapi.legu.cc", + "port": 3306, + "user": "root", + "password": "q9n3fseQmtfztNnd", + "db": "fbvndata" + }, + 'zhengbakr': { + "host": "fbkrapi.legu.cc", + "port": 3306, + "user": "root", + "password": "XeuT9QNHyBuR8jE5", + "db": "fbkoredata" + }, + 'zhengbabig5': { + "host": "fbbig5api.legu.cc", + "port": 3306, + "user": "root", + "password": "4EyM8FyPh6waHhe5", + "db": "fbbig5data" + }, + 'jqzb': { + "host": "yaolingapi.legu.cc", + "port": 3306, + "user": "root", + "password": "xwjJ6mcMZuWHWs78", + "db": "yaolingdata" + }, + 'hommh5': { + "host": "hommweb1.legu.cc", + "port": 3306, + "user": "root", + "password": "jMNxqdDpGFLjM8wW", + "db": "hommh5data" + }, + 'king': { + "host": "homm1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "kingdata" + }, + 'zgpromvbt': { + "host": "dzgbt2019115.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "dzgbt2019115" + }, + 'dzgbt': { + "host": "homm1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "dzgbt" + }, + 'dasanguo': { + "host": "homm1.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "dsgdata" + }, + 'xiyou': { + "host": "ft5.legu.cc", + "port": 3306, + "user": "root", + "password": "iamciniao", + "db": "xydata" + }, + # 'wuxia___':{ + # "host":"127.0.0.1", #v3.legu.cc + # "port":3306, + # "user":"root", + # "password":"iamciniao", + # "db":"xydata" + # }, + # 'xycsb___':{ + # "host":"123.59.138.88", + # "port":3306, + # "user":"root", + # "password":"iamciniao", + # "db":"baigu" + # }, + # 'yxwhm___':{ + # "host":"115.159.98.163", + # "port":3306, + # "user":"root", + # "password":"iamciniao", + # "db":"xydata" + # }, + + 'mori': { + "host": "119.29.222.234", + "port": 3306, + "user": "root", + "password": "56EnChDEnT9cmC9w", + "db": "shenghuadata" + }, + 'mfm2': { + "host": "mfmccsapi.legu.cc", + "port": 3306, + "user": "root", + "password": "nyVcHvwFAPjn83yX", + "db": "mfmccsdata" + }, + 'zhengbath': { + "host": "zhengbathapi.legu.cc", + "port": 3306, + "user": "root", + "password": "q9n3fseQmtfztNnd", + "db": "zbthdata" + }, + 'geshouccsqd': { + "host": "gsccsqudaoapi.legu.cc", + "port": 3306, + "user": "root", + "password": "NJM4n4hJdQwPwr3d", + "db": "gsccsqddata" + }, + 'huixie': { + "host": "140.143.150.125", + "port": 3306, + "user": "root", + "password": "Qp4v9NBTbMzKKMuH", + "db": "huixiedata" + } + # 'mth5bt': { + # "host": "127.0.0.1", + # "port": 3306, + # "user": "root", + # "password": "iamciniao", + # "db": "mtbth5data" + # } + + } + + +class DevConfig(Config): + PAY_DB = { + 'mori': { + "host": "119.29.222.234", + "port": 3306, + "user": "root", + "password": "56EnChDEnT9cmC9w", + "db": "shenghuadata" + }, + } + + +settings = Config() diff --git a/load_data.py.bak b/load_data.py.bak new file mode 100644 index 0000000..1611aab --- /dev/null +++ b/load_data.py.bak @@ -0,0 +1,220 @@ +import asyncio +import time + +import aiomysql +import abc + + +class LoadData(metaclass=abc.ABCMeta): + + @abc.abstractmethod + async def get_data(self): + pass + + @abc.abstractmethod + async def update_data(self, data): + pass + + async def run(self): + data = await self.get_data() + await self.update_data(data) + + +class LoadUserData(LoadData): + def __init__(self, *args, **kwargs): + self.mongo_client = kwargs.get('mongo_client') + self.gm_db_pool = kwargs.get('gm_db_pool') + self.game_name = kwargs.get('game_name') + self.s_db_df = kwargs.get('s_db_df') + self.gm_mysql = None + self.channel_set = set() + + async def get_data(self) -> list: + """ + 获取这组 登陆数据 + :return: list + """ + + last_act_time = int(time.time()) - 6000 # todo 测试 + where = { + 'logintime': {'$gt': last_act_time} + } + projection = { + '_id': False, + 'uid': True, + 'name': True, + 'logintime': True, + 'ext_channel': True, + 'ext_owner': True, + 'binduid': True, + 'lv': True, + 'ctime': True, + 'vip': True, + 'sid': True + } + user_info_data = [] + for s in self.s_db_df.T.to_dict().values(): + + sid = s['serverid'] + db = s['db'] + where['uid'] = {"$regex": "^" + str(sid) + "_"} + user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection) + data_list = await user_info_cursor.to_list(length=65535) + for user in data_list: + user['lasttime'] = user['logintime'] + user['vip'] = user.get('vip', 0) + self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'))) + user_info_data.extend(data_list) + print(f'db:{s["db"]}') + print(f'区服id:{s["serverid"]}') + print(f'更新数量:{len(data_list)}') + print('-' * 50) + + return user_info_data + + async def update_data(self, users: list, try_num=3): + if not users: + return + async with self.gm_db_pool.acquire() as conn: + async with conn.cursor() as cur: + # 记录 userinfo + sql_values = [] + is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s' + role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s" + update_sql = """insert into `gm_game_user` ( + game, + serverid, + binduid, + uid, + `name`, + owner, + channel, + ctime, + lv, + vip, + rolenum, + lastlogintime, + lastip + ) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + on duplicate key update + lv = values(lv), + `name` = values(`name`), + vip = values(vip), + lastlogintime = values(lastlogintime), + lastip = values(lastip), + game = values(game), + serverid = values(serverid), + uid = values(uid) + """ + for u in users: + # 判断角色是否存在 + await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid'])) + is_role = await cur.fetchone() + # 账号有多少个角色 + await cur.execute(role_count_sql, (self.game_name, u['binduid'])) + role_cont = await cur.fetchone() + role_cont = role_cont[0] + if not is_role: + role_cont += 1 + sql_values.append( + ( + self.game_name, + u['sid'], + u['binduid'], + u['uid'], + u['name'], + u.get('ext_owner', 'unkonwn'), + u.get('ext_channel', 'ext_channel'), + u['ctime'], + u['lv'], + u['vip'], + role_cont, + int(time.time()), + u.get('lastloginip', '') + ) + ) + await cur.executemany(update_sql, sql_values) + await conn.commit() + + # 记录渠道 + update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s) + on duplicate key update + game=values(game), + channel=values(channel) + """ + await cur.executemany(update_channel_sql, self.channel_set) + await conn.commit() + + # 记录登录 TODO 1天内每次更新都加1??? + sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s" + for u in users: + date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime']))) + await cur.execute(sql, [ + self.game_name, + u['sid'], + u['uid'], + date + ]) + is_login = await cur.fetchone() + if not is_login: + sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s" + await cur.execute(sql1, [ + self.game_name, + u['sid'], + u['uid'], + date, + 1 + ]) + await conn.commit() + else: + sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s" + await cur.execute(sql1, [ + self.game_name, + u['sid'], + u['uid'], + date + ]) + await conn.commit() + + +class LoadPayData(LoadData): + def __init__(self, *args, **kwargs): + self.pay_mysql_conf = kwargs.get('pay_db_conf') + self.gm_db_pool = kwargs.get('gm_db_pool') + self.game_name = kwargs.get('game_name') + self.lastActTime = int(time.time()) # todo 测试用 + + async def get_data(self): + # 获取充值数据 所有区服 + pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, loop=asyncio.get_event_loop()) + cur = await pay_mysql.cursor(aiomysql.DictCursor) + sql = "select * from paylist where state=2 and ctime>=%s" + await cur.execute(sql, (self.lastActTime - 3600,)) + rss = await cur.fetchall() + for p in rss: + p['data'] = p['data2'] + await cur.close() + pay_mysql.close() + return rss + + async def update_data(self, data): + sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s" + async with self.gm_db_pool.acquire() as conn: + async with conn.cursor() as cur: + for u in data: + try: + await cur.execute(sql, ( + self.game_name, + u['serverid'], + u['uid'], + u['orderid'], + u['payorder'], + u['money'], + u['proid'], + u['ctime'], + u['data'], + u['paytype'] + )) + await conn.commit() + except Exception as e: + print(e) diff --git a/main.py b/main.py new file mode 100644 index 0000000..bcde1b6 --- /dev/null +++ b/main.py @@ -0,0 +1,60 @@ +import asyncio +import time + +import aiomysql +from motor import motor_asyncio +from loguru import logger + +from config import settings +from update_data import LoadUserData, LoadPayData +from utils import get_server_list + +loop = asyncio.get_event_loop() + + +async def main(): + tasks = [] + server_df = get_server_list() + + # gm_db pool + gm_db_pool = await aiomysql.create_pool(**settings.GAME_MANA_MYSQLDB, maxsize=100, autocommit=True) + + for server, data in server_df: + # 这一组的 mongodb 相同 建立一次连接 + game_name = server[0] + + # 如果没配置订单数据库 就不处理这个游戏 + if game_name not in settings.PAY_DB: + continue + + mongo_client = motor_asyncio.AsyncIOMotorClient( + f'mongodb://{str(server[3])}:{str(server[4])}@{str(server[1])}:{int(server[2])}/?authSource=admin&readPreference=primary', + serverSelectionTimeoutMS=settings.TIMEOUT_MS, socketTimeoutMS=settings.TIMEOUT_MS + ) + + s_db_df = data[['db', 'serverid']] + + task = asyncio.create_task(LoadUserData(mongo_client=mongo_client, + gm_db_pool=gm_db_pool, + game_name=game_name, + s_db_df=s_db_df + ).run()) + tasks.append(task) + + # 更新支付信息 + for game_name, pay_db_conf in settings.PAY_DB.items(): + task = asyncio.create_task(LoadPayData(pay_db_conf=pay_db_conf, + gm_db_pool=gm_db_pool, + game_name=game_name, + ).run()) + tasks.append(task) + + await asyncio.gather(*tasks) + gm_db_pool.close() + await gm_db_pool.wait_closed() + + +if __name__ == '__main__': + st = time.time() + loop.run_until_complete(main()) + logger.info(f'共耗时{time.time() - st}秒') diff --git a/requements.txt b/requements.txt new file mode 100644 index 0000000..97595c5 --- /dev/null +++ b/requements.txt @@ -0,0 +1,20 @@ +aiomysql==0.0.20 +certifi==2020.6.20 +cffi==1.14.2 +chardet==3.0.4 +colorama==0.4.3 +cryptography==3.0 +idna==2.10 +loguru==0.5.2 +motor==2.2.0 +numpy==1.19.1 +pandas==1.1.0 +pycparser==2.20 +pymongo==3.11.0 +PyMySQL==0.9.2 +python-dateutil==2.8.1 +pytz==2020.1 +requests==2.24.0 +six==1.15.0 +urllib3==1.25.10 +win32-setctime==1.0.2 diff --git a/update_data/__init__.py b/update_data/__init__.py new file mode 100644 index 0000000..8ce5812 --- /dev/null +++ b/update_data/__init__.py @@ -0,0 +1,3 @@ +from .update_data import LoadData +from .update_pay import LoadPayData +from .update_user import LoadUserData \ No newline at end of file diff --git a/update_data/update_data.py b/update_data/update_data.py new file mode 100644 index 0000000..b134659 --- /dev/null +++ b/update_data/update_data.py @@ -0,0 +1,60 @@ +import abc +import time + +import aiomysql +from loguru import logger + +from config import settings + + +class LoadData(metaclass=abc.ABCMeta): + def __init__(self, *args, **kwargs): + self.gm_db_pool = kwargs.get('gm_db_pool') + self.gm_key = None + self.act_time = None + self.last_act_time = None + + async def get_kv(self, key, try_count=3): + try: + sql = 'select v from gm_kv where k=%s' + ts = int(time.time()) + async with self.gm_db_pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, (settings.KV_PREFIX + key,)) + res = (await cur.fetchone()) or {} + v = int(res.get('v') or ts) + self.last_act_time = v + if not res: + await self.set_kv(key, ts) + return v + except Exception as e: + if try_count > 0: + logger.warning(f'连接gm mysql失败 剩余次数{try_count - 1}') + return await self.get_kv(key, try_count - 1) + else: + raise e + + async def set_kv(self, key, value): + key = settings.KV_PREFIX + key + sql = 'replace into gm_kv set k=%s,v=%s' + async with self.gm_db_pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(sql, (key, value)) + await conn.commit() + + @abc.abstractmethod + async def get_data(self): + pass + + @abc.abstractmethod + async def update_data(self, data): + pass + + async def run(self): + ts = int(time.time()) + self.act_time = ts + + data = await self.get_data() + await self.update_data(data) + if self.gm_key: + await self.set_kv(self.gm_key, self.act_time) diff --git a/update_data/update_pay.py b/update_data/update_pay.py new file mode 100644 index 0000000..562ab9c --- /dev/null +++ b/update_data/update_pay.py @@ -0,0 +1,66 @@ +import asyncio + +import aiomysql +import pymysql +from loguru import logger + +from config import settings +from update_data import LoadData + + +class LoadPayData(LoadData): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pay_mysql_conf = kwargs.get('pay_db_conf') + self.game_name = kwargs.get('game_name') + + async def get_data(self): + # 获取充值数据 所有区服 + self.gm_key = 'pay_' + self.pay_mysql_conf['host'] + self.last_act_time = await self.get_kv(self.gm_key) + try: + pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, + loop=asyncio.get_event_loop(), + connect_timeout=settings.TIMEOUT) + except Exception as e: + logger.error(e) + self.gm_key = None + return [] + cur = await pay_mysql.cursor(aiomysql.DictCursor) + sql = "select * from paylist where state=2 and ctime>=%s" + await cur.execute(sql, (self.last_act_time - 3600,)) + rss = await cur.fetchall() + for p in rss: + p['data'] = p['data2'] + await cur.close() + pay_mysql.close() + logger.info(f'{self.game_name}查询到{len(rss)}条订单') + return rss + + async def update_data(self, data): + if not data: + logger.info(f'{self.game_name}没有订单数据') + return + sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s" + async with self.gm_db_pool.acquire() as conn: + async with conn.cursor() as cur: + for u in data: + try: + await cur.execute(sql, ( + self.game_name, + u['serverid'], + u['uid'], + u['orderid'], + u['payorder'], + u['money'], + u['proid'], + u['ctime'], + u['data'], + u['paytype'] + )) + await conn.commit() + except pymysql.err.IntegrityError: + logger.debug(f'{self.game_name} {u["orderid"]}订单已插入') + except Exception as e: + logger.error(e) + self.gm_key = None diff --git a/update_data/update_user.py b/update_data/update_user.py new file mode 100644 index 0000000..b5ca998 --- /dev/null +++ b/update_data/update_user.py @@ -0,0 +1,170 @@ +import time + +from loguru import logger + +from update_data import LoadData + + +class LoadUserData(LoadData): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.mongo_client = kwargs.get('mongo_client') + self.game_name = kwargs.get('game_name') + self.s_db_df = kwargs.get('s_db_df') + self.gm_mysql = None + self.channel_set = set() + + async def get_data(self) -> list: + """ + 获取这组 登陆数据 + :return: list + """ + user_info_data = [] + + try: + + self.gm_key = self.game_name + '_user_' + self.mongo_client.address[0] + self.last_act_time = await self.get_kv(self.gm_key) + + where = { + 'logintime': {'$gte': self.last_act_time, '$lt': self.act_time} + } + projection = { + '_id': False, + 'uid': True, + 'name': True, + 'logintime': True, + 'lastloginip': True, + 'ext_channel': True, + 'ext_owner': True, + 'binduid': True, + 'lv': True, + 'ctime': True, + 'vip': True, + 'sid': True + } + + for s in self.s_db_df.T.to_dict().values(): + sid = s['serverid'] + db = s['db'] + where['uid'] = {"$regex": "^" + str(sid) + "_"} + user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection) + data_list = await user_info_cursor.to_list(length=65535) + for user in data_list: + user['lasttime'] = user['logintime'] + user['vip'] = user.get('vip', 0) + self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn'))) + user_info_data.extend(data_list) + logger.info(f'db:{db}') + logger.info(f'区服id:{sid}') + logger.info(f'更新数量:{len(data_list)}') + logger.info('-' * 50) + except Exception as e: + logger.error(e) + self.gm_key = None + + return user_info_data + + async def update_data(self, users: list, try_num=3): + if not users: + return + async with self.gm_db_pool.acquire() as conn: + async with conn.cursor() as cur: + # 记录 userinfo + sql_values = [] + is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s' + role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s" + update_sql = """insert into `gm_game_user` ( + game, + serverid, + binduid, + uid, + `name`, + owner, + channel, + ctime, + lv, + vip, + rolenum, + lastlogintime, + lastip + ) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + on duplicate key update + lv = values(lv), + `name` = values(`name`), + vip = values(vip), + lastlogintime = values(lastlogintime), + lastip = values(lastip), + game = values(game), + serverid = values(serverid), + uid = values(uid) + """ + for u in users: + # 判断角色是否存在 + await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid'])) + is_role = await cur.fetchone() + # 账号有多少个角色 + await cur.execute(role_count_sql, (self.game_name, u['binduid'])) + role_cont = await cur.fetchone() + role_cont = role_cont[0] + if not is_role: + role_cont += 1 + sql_values.append( + ( + self.game_name, + u['sid'], + u['binduid'], + u['uid'], + u['name'], + u.get('ext_owner', 'unkonwn'), + u.get('ext_channel', 'ext_channel'), + u['ctime'], + u['lv'], + u['vip'], + role_cont, + int(time.time()), + u.get('lastloginip', '') + ) + ) + await cur.executemany(update_sql, sql_values) + await conn.commit() + + # 记录渠道 + update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s) + on duplicate key update + game=values(game), + channel=values(channel) + """ + await cur.executemany(update_channel_sql, self.channel_set) + await conn.commit() + + # 记录登录 + sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s" + for u in users: + date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime']))) + await cur.execute(sql, [ + self.game_name, + u['sid'], + u['uid'], + date + ]) + is_login = await cur.fetchone() + if not is_login: + sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s" + await cur.execute(sql1, [ + self.game_name, + u['sid'], + u['uid'], + date, + 1 + ]) + await conn.commit() + else: + sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s" + await cur.execute(sql1, [ + self.game_name, + u['sid'], + u['uid'], + date + ]) + await conn.commit() diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..2d7ab8e --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1 @@ +from .get_server import get_server_list diff --git a/utils/get_server.py b/utils/get_server.py new file mode 100644 index 0000000..b5a26d1 --- /dev/null +++ b/utils/get_server.py @@ -0,0 +1,20 @@ +import pandas as pd + +from config import settings + +""" +不能使用(server_df['running'] == 1)过滤 +当停服且设置为维护状态该区服 running=0 该区服会过滤掉, +但以db host同步 lastActTime_**_host 并不会停止 +导致 该区服丢失一次数据 +""" +def get_server_list() -> pd.DataFrame: + server_df = pd.read_json(settings.SERVER_LIST_URL) + server_df = server_df[ + (server_df['debug'] != 1) + # & (server_df['running'] == 1) + & (server_df['dbuser'] != '') + & (~server_df['game'].isin(['xiyou', 'fengshen', 'hommh5', 'sanguo', 'gods'])) + ] + + return server_df.groupby(['game', 'dbhost', 'dbport', 'dbuser', 'dbpwd'])