This commit is contained in:
wuhao 2020-09-12 18:03:05 +08:00
commit aba308cf5b
19 changed files with 1136 additions and 0 deletions

104
.gitignore vendored Normal file
View File

@ -0,0 +1,104 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

14
.idea/deployment.xml generated Normal file
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData">
<serverData>
<paths name="root@v3.legu.cc:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
</component>
</project>

View File

@ -0,0 +1,22 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="9">
<item index="0" class="java.lang.String" itemvalue="pymongo" />
<item index="1" class="java.lang.String" itemvalue="six" />
<item index="2" class="java.lang.String" itemvalue="arrow" />
<item index="3" class="java.lang.String" itemvalue="certifi" />
<item index="4" class="java.lang.String" itemvalue="python-dateutil" />
<item index="5" class="java.lang.String" itemvalue="asgiref" />
<item index="6" class="java.lang.String" itemvalue="requests" />
<item index="7" class="java.lang.String" itemvalue="django-cors-headers" />
<item index="8" class="java.lang.String" itemvalue="idna" />
</list>
</value>
</option>
</inspection_tool>
</profile>
</component>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (sync_game_data)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/sync_game_data.iml" filepath="$PROJECT_DIR$/.idea/sync_game_data.iml" />
</modules>
</component>
</project>

10
.idea/sync_game_data.iml generated Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.8 (sync_game_data)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

331
config.py Normal file
View File

@ -0,0 +1,331 @@
import os
from loguru import logger
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
logger.add(os.path.join(BASE_DIR, 'log.log'), format="{time} {level} {name}:{line} {message}", level="INFO",
rotation="50 MB", retention='7 days',
enqueue=True)
class Config:
# 国外服务器连接时间较长 不要低于3s
TIMEOUT_MS = 5000 # 5秒
TIMEOUT = 5 # 5秒
KV_PREFIX = 'lastActTime_'
SERVER_LIST_URL = "http://gametools.legu.cc/?app=api&act=getServerList&showdb=1"
GAME_MANA_MYSQLDB = {
"host": "10.0.0.5",
"port": 3306,
"user": "root",
"password": "87251326",
"db": "gamemana"
}
PAY_DB = {
'zhengba': {
"host": "zhengbaapi.legu.cc",
"port": 3306,
"user": "root",
"password": "SH9Pjrcr52ZJJhDT",
"db": "zhengbadata"
},
'heros': {
"host": "homm1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "hommdata"
},
'fengshen': {
"host": "fs1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "fsdata"
},
'geshouh5': {
"host": "134.175.135.230",
"port": 3306,
"user": "root",
"password": "v4rNd4aeMMftNGf",
"db": "mzgeshou"
},
'geshouccs': {
"host": "gsccsapi.legu.cc",
"port": 3306,
"user": "root",
"password": "v4rNd4aeMMftNGf",
"db": "gsccsdata"
},
'junshibig5': {
"host": "junshibig5.legu.cc",
"port": 3306,
"user": "root",
"password": "i0OBWUsLlLiM",
"db": "jsbig5data"
},
'xiaoying': {
"host": "106.52.175.193",
"port": 3306,
"user": "root",
"password": "KyQCGS9UIrgs",
"db": "yingdata"
},
'xiaoyingmicro': {
"host": "ying.legu.cc",
"port": 3306,
"user": "root",
"password": "2lRZPbspGI5p",
"db": "yingdata"
},
'legusg': {
"host": "homm1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "legusgdata"
},
'legujs': {
"host": "39.104.110.225",
"port": 3306,
"user": "root",
"password": "PFxqdwDmPhvCUDDe",
"db": "legujsdata"
},
'daqin': {
"host": "daqinapi.legu.cc",
"port": 3306,
"user": "root",
"password": "XZKcs6eG4YMFFPqM",
"db": "daqin"
},
'mzmfmh5': {
"host": "mzhomms.legu.cc",
"port": 3306,
"user": "root",
"password": "jm8dKqsXcxcP42Cd",
"db": "mzhero"
},
'mth5': {
"host": "v3.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "mth5data"
},
'xpet': {
"host": "xpetapi.weirongwl.com",
"port": 3306,
"user": "root",
"password": "a5hsSZU8ELb77f64",
"db": "xpetdata"
},
'shaihai': {
"host": "42.194.158.84",
"port": 3306,
"user": "root",
"password": "EawHvVBmSxhbPKJX",
"db": "shanhaidata"
},
'wow': {
"host": "wow1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "wowdata"
},
'gods': {
"host": "god1.legu.cc",
"port": 3306,
"user": "root",
"password": "TwmFYZcLhNN3vvK6",
"db": "goddata"
},
'jiushen': {
"host": "jiushenapi.legu.cc",
"port": 3306,
"user": "root",
"password": "9XqfSduK6Wx7359S",
"db": "jiushendata"
},
'sanguo': {
"host": "sanguo1.legu.cc",
"port": 3306,
"user": "root",
"password": "mQaDz8wcvEaKEZAY",
"db": "sgdata"
},
'tank': {
"host": "tankapi.legu.cc",
"port": 3306,
"user": "root",
"password": "lG1x4cJWXyIm",
"db": "tankdata"
},
'mt': {
"host": "mtapi.legu.cc",
"port": 3306,
"user": "root",
"password": "juP4jBaq9VMGVH9F",
"db": "mtdata"
},
'zhengbaen': {
"host": "fbenapi.legu.cc",
"port": 3306,
"user": "root",
"password": "HjVMBrYzpfhxQcGy",
"db": "fbendata"
},
'zhengbavn': {
"host": "fbvnapi.legu.cc",
"port": 3306,
"user": "root",
"password": "q9n3fseQmtfztNnd",
"db": "fbvndata"
},
'zhengbakr': {
"host": "fbkrapi.legu.cc",
"port": 3306,
"user": "root",
"password": "XeuT9QNHyBuR8jE5",
"db": "fbkoredata"
},
'zhengbabig5': {
"host": "fbbig5api.legu.cc",
"port": 3306,
"user": "root",
"password": "4EyM8FyPh6waHhe5",
"db": "fbbig5data"
},
'jqzb': {
"host": "yaolingapi.legu.cc",
"port": 3306,
"user": "root",
"password": "xwjJ6mcMZuWHWs78",
"db": "yaolingdata"
},
'hommh5': {
"host": "hommweb1.legu.cc",
"port": 3306,
"user": "root",
"password": "jMNxqdDpGFLjM8wW",
"db": "hommh5data"
},
'king': {
"host": "homm1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "kingdata"
},
'zgpromvbt': {
"host": "dzgbt2019115.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "dzgbt2019115"
},
'dzgbt': {
"host": "homm1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "dzgbt"
},
'dasanguo': {
"host": "homm1.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "dsgdata"
},
'xiyou': {
"host": "ft5.legu.cc",
"port": 3306,
"user": "root",
"password": "iamciniao",
"db": "xydata"
},
# 'wuxia___':{
# "host":"127.0.0.1", #v3.legu.cc
# "port":3306,
# "user":"root",
# "password":"iamciniao",
# "db":"xydata"
# },
# 'xycsb___':{
# "host":"123.59.138.88",
# "port":3306,
# "user":"root",
# "password":"iamciniao",
# "db":"baigu"
# },
# 'yxwhm___':{
# "host":"115.159.98.163",
# "port":3306,
# "user":"root",
# "password":"iamciniao",
# "db":"xydata"
# },
'mori': {
"host": "119.29.222.234",
"port": 3306,
"user": "root",
"password": "56EnChDEnT9cmC9w",
"db": "shenghuadata"
},
'mfm2': {
"host": "mfmccsapi.legu.cc",
"port": 3306,
"user": "root",
"password": "nyVcHvwFAPjn83yX",
"db": "mfmccsdata"
},
'zhengbath': {
"host": "zhengbathapi.legu.cc",
"port": 3306,
"user": "root",
"password": "q9n3fseQmtfztNnd",
"db": "zbthdata"
},
'geshouccsqd': {
"host": "gsccsqudaoapi.legu.cc",
"port": 3306,
"user": "root",
"password": "NJM4n4hJdQwPwr3d",
"db": "gsccsqddata"
},
'huixie': {
"host": "140.143.150.125",
"port": 3306,
"user": "root",
"password": "Qp4v9NBTbMzKKMuH",
"db": "huixiedata"
}
# 'mth5bt': {
# "host": "127.0.0.1",
# "port": 3306,
# "user": "root",
# "password": "iamciniao",
# "db": "mtbth5data"
# }
}
class DevConfig(Config):
PAY_DB = {
'mori': {
"host": "119.29.222.234",
"port": 3306,
"user": "root",
"password": "56EnChDEnT9cmC9w",
"db": "shenghuadata"
},
}
settings = Config()

220
load_data.py.bak Normal file
View File

@ -0,0 +1,220 @@
import asyncio
import time
import aiomysql
import abc
class LoadData(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def get_data(self):
pass
@abc.abstractmethod
async def update_data(self, data):
pass
async def run(self):
data = await self.get_data()
await self.update_data(data)
class LoadUserData(LoadData):
def __init__(self, *args, **kwargs):
self.mongo_client = kwargs.get('mongo_client')
self.gm_db_pool = kwargs.get('gm_db_pool')
self.game_name = kwargs.get('game_name')
self.s_db_df = kwargs.get('s_db_df')
self.gm_mysql = None
self.channel_set = set()
async def get_data(self) -> list:
"""
获取这组 登陆数据
:return: list
"""
last_act_time = int(time.time()) - 6000 # todo 测试
where = {
'logintime': {'$gt': last_act_time}
}
projection = {
'_id': False,
'uid': True,
'name': True,
'logintime': True,
'ext_channel': True,
'ext_owner': True,
'binduid': True,
'lv': True,
'ctime': True,
'vip': True,
'sid': True
}
user_info_data = []
for s in self.s_db_df.T.to_dict().values():
sid = s['serverid']
db = s['db']
where['uid'] = {"$regex": "^" + str(sid) + "_"}
user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection)
data_list = await user_info_cursor.to_list(length=65535)
for user in data_list:
user['lasttime'] = user['logintime']
user['vip'] = user.get('vip', 0)
self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn')))
user_info_data.extend(data_list)
print(f'db:{s["db"]}')
print(f'区服id:{s["serverid"]}')
print(f'更新数量:{len(data_list)}')
print('-' * 50)
return user_info_data
async def update_data(self, users: list, try_num=3):
if not users:
return
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
# 记录 userinfo
sql_values = []
is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s'
role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s"
update_sql = """insert into `gm_game_user` (
game,
serverid,
binduid,
uid,
`name`,
owner,
channel,
ctime,
lv,
vip,
rolenum,
lastlogintime,
lastip
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
on duplicate key update
lv = values(lv),
`name` = values(`name`),
vip = values(vip),
lastlogintime = values(lastlogintime),
lastip = values(lastip),
game = values(game),
serverid = values(serverid),
uid = values(uid)
"""
for u in users:
# 判断角色是否存在
await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid']))
is_role = await cur.fetchone()
# 账号有多少个角色
await cur.execute(role_count_sql, (self.game_name, u['binduid']))
role_cont = await cur.fetchone()
role_cont = role_cont[0]
if not is_role:
role_cont += 1
sql_values.append(
(
self.game_name,
u['sid'],
u['binduid'],
u['uid'],
u['name'],
u.get('ext_owner', 'unkonwn'),
u.get('ext_channel', 'ext_channel'),
u['ctime'],
u['lv'],
u['vip'],
role_cont,
int(time.time()),
u.get('lastloginip', '')
)
)
await cur.executemany(update_sql, sql_values)
await conn.commit()
# 记录渠道
update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s)
on duplicate key update
game=values(game),
channel=values(channel)
"""
await cur.executemany(update_channel_sql, self.channel_set)
await conn.commit()
# 记录登录 TODO 1天内每次更新都加1
sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s"
for u in users:
date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime'])))
await cur.execute(sql, [
self.game_name,
u['sid'],
u['uid'],
date
])
is_login = await cur.fetchone()
if not is_login:
sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date,
1
])
await conn.commit()
else:
sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date
])
await conn.commit()
class LoadPayData(LoadData):
def __init__(self, *args, **kwargs):
self.pay_mysql_conf = kwargs.get('pay_db_conf')
self.gm_db_pool = kwargs.get('gm_db_pool')
self.game_name = kwargs.get('game_name')
self.lastActTime = int(time.time()) # todo 测试用
async def get_data(self):
# 获取充值数据 所有区服
pay_mysql = await aiomysql.connect(**self.pay_mysql_conf, loop=asyncio.get_event_loop())
cur = await pay_mysql.cursor(aiomysql.DictCursor)
sql = "select * from paylist where state=2 and ctime>=%s"
await cur.execute(sql, (self.lastActTime - 3600,))
rss = await cur.fetchall()
for p in rss:
p['data'] = p['data2']
await cur.close()
pay_mysql.close()
return rss
async def update_data(self, data):
sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s"
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
for u in data:
try:
await cur.execute(sql, (
self.game_name,
u['serverid'],
u['uid'],
u['orderid'],
u['payorder'],
u['money'],
u['proid'],
u['ctime'],
u['data'],
u['paytype']
))
await conn.commit()
except Exception as e:
print(e)

60
main.py Normal file
View File

@ -0,0 +1,60 @@
import asyncio
import time
import aiomysql
from motor import motor_asyncio
from loguru import logger
from config import settings
from update_data import LoadUserData, LoadPayData
from utils import get_server_list
loop = asyncio.get_event_loop()
async def main():
tasks = []
server_df = get_server_list()
# gm_db pool
gm_db_pool = await aiomysql.create_pool(**settings.GAME_MANA_MYSQLDB, maxsize=100, autocommit=True)
for server, data in server_df:
# 这一组的 mongodb 相同 建立一次连接
game_name = server[0]
# 如果没配置订单数据库 就不处理这个游戏
if game_name not in settings.PAY_DB:
continue
mongo_client = motor_asyncio.AsyncIOMotorClient(
f'mongodb://{str(server[3])}:{str(server[4])}@{str(server[1])}:{int(server[2])}/?authSource=admin&readPreference=primary',
serverSelectionTimeoutMS=settings.TIMEOUT_MS, socketTimeoutMS=settings.TIMEOUT_MS
)
s_db_df = data[['db', 'serverid']]
task = asyncio.create_task(LoadUserData(mongo_client=mongo_client,
gm_db_pool=gm_db_pool,
game_name=game_name,
s_db_df=s_db_df
).run())
tasks.append(task)
# 更新支付信息
for game_name, pay_db_conf in settings.PAY_DB.items():
task = asyncio.create_task(LoadPayData(pay_db_conf=pay_db_conf,
gm_db_pool=gm_db_pool,
game_name=game_name,
).run())
tasks.append(task)
await asyncio.gather(*tasks)
gm_db_pool.close()
await gm_db_pool.wait_closed()
if __name__ == '__main__':
st = time.time()
loop.run_until_complete(main())
logger.info(f'共耗时{time.time() - st}')

20
requements.txt Normal file
View File

@ -0,0 +1,20 @@
aiomysql==0.0.20
certifi==2020.6.20
cffi==1.14.2
chardet==3.0.4
colorama==0.4.3
cryptography==3.0
idna==2.10
loguru==0.5.2
motor==2.2.0
numpy==1.19.1
pandas==1.1.0
pycparser==2.20
pymongo==3.11.0
PyMySQL==0.9.2
python-dateutil==2.8.1
pytz==2020.1
requests==2.24.0
six==1.15.0
urllib3==1.25.10
win32-setctime==1.0.2

3
update_data/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .update_data import LoadData
from .update_pay import LoadPayData
from .update_user import LoadUserData

View File

@ -0,0 +1,60 @@
import abc
import time
import aiomysql
from loguru import logger
from config import settings
class LoadData(metaclass=abc.ABCMeta):
def __init__(self, *args, **kwargs):
self.gm_db_pool = kwargs.get('gm_db_pool')
self.gm_key = None
self.act_time = None
self.last_act_time = None
async def get_kv(self, key, try_count=3):
try:
sql = 'select v from gm_kv where k=%s'
ts = int(time.time())
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, (settings.KV_PREFIX + key,))
res = (await cur.fetchone()) or {}
v = int(res.get('v') or ts)
self.last_act_time = v
if not res:
await self.set_kv(key, ts)
return v
except Exception as e:
if try_count > 0:
logger.warning(f'连接gm mysql失败 剩余次数{try_count - 1}')
return await self.get_kv(key, try_count - 1)
else:
raise e
async def set_kv(self, key, value):
key = settings.KV_PREFIX + key
sql = 'replace into gm_kv set k=%s,v=%s'
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, (key, value))
await conn.commit()
@abc.abstractmethod
async def get_data(self):
pass
@abc.abstractmethod
async def update_data(self, data):
pass
async def run(self):
ts = int(time.time())
self.act_time = ts
data = await self.get_data()
await self.update_data(data)
if self.gm_key:
await self.set_kv(self.gm_key, self.act_time)

66
update_data/update_pay.py Normal file
View File

@ -0,0 +1,66 @@
import asyncio
import aiomysql
import pymysql
from loguru import logger
from config import settings
from update_data import LoadData
class LoadPayData(LoadData):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pay_mysql_conf = kwargs.get('pay_db_conf')
self.game_name = kwargs.get('game_name')
async def get_data(self):
# 获取充值数据 所有区服
self.gm_key = 'pay_' + self.pay_mysql_conf['host']
self.last_act_time = await self.get_kv(self.gm_key)
try:
pay_mysql = await aiomysql.connect(**self.pay_mysql_conf,
loop=asyncio.get_event_loop(),
connect_timeout=settings.TIMEOUT)
except Exception as e:
logger.error(e)
self.gm_key = None
return []
cur = await pay_mysql.cursor(aiomysql.DictCursor)
sql = "select * from paylist where state=2 and ctime>=%s"
await cur.execute(sql, (self.last_act_time - 3600,))
rss = await cur.fetchall()
for p in rss:
p['data'] = p['data2']
await cur.close()
pay_mysql.close()
logger.info(f'{self.game_name}查询到{len(rss)}条订单')
return rss
async def update_data(self, data):
if not data:
logger.info(f'{self.game_name}没有订单数据')
return
sql = "insert into gm_game_paylist set game=%s,serverid=%s,uid=%s,gorder=%s,porder=%s,money=%s,proname=%s,ctime=%s,`data`=%s,paytype=%s"
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
for u in data:
try:
await cur.execute(sql, (
self.game_name,
u['serverid'],
u['uid'],
u['orderid'],
u['payorder'],
u['money'],
u['proid'],
u['ctime'],
u['data'],
u['paytype']
))
await conn.commit()
except pymysql.err.IntegrityError:
logger.debug(f'{self.game_name} {u["orderid"]}订单已插入')
except Exception as e:
logger.error(e)
self.gm_key = None

170
update_data/update_user.py Normal file
View File

@ -0,0 +1,170 @@
import time
from loguru import logger
from update_data import LoadData
class LoadUserData(LoadData):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mongo_client = kwargs.get('mongo_client')
self.game_name = kwargs.get('game_name')
self.s_db_df = kwargs.get('s_db_df')
self.gm_mysql = None
self.channel_set = set()
async def get_data(self) -> list:
"""
获取这组 登陆数据
:return: list
"""
user_info_data = []
try:
self.gm_key = self.game_name + '_user_' + self.mongo_client.address[0]
self.last_act_time = await self.get_kv(self.gm_key)
where = {
'logintime': {'$gte': self.last_act_time, '$lt': self.act_time}
}
projection = {
'_id': False,
'uid': True,
'name': True,
'logintime': True,
'lastloginip': True,
'ext_channel': True,
'ext_owner': True,
'binduid': True,
'lv': True,
'ctime': True,
'vip': True,
'sid': True
}
for s in self.s_db_df.T.to_dict().values():
sid = s['serverid']
db = s['db']
where['uid'] = {"$regex": "^" + str(sid) + "_"}
user_info_cursor = self.mongo_client[db]['userinfo'].find(where, projection)
data_list = await user_info_cursor.to_list(length=65535)
for user in data_list:
user['lasttime'] = user['logintime']
user['vip'] = user.get('vip', 0)
self.channel_set.add((self.game_name, user.get('ext_channel', 'unkonwn')))
user_info_data.extend(data_list)
logger.info(f'db:{db}')
logger.info(f'区服id:{sid}')
logger.info(f'更新数量:{len(data_list)}')
logger.info('-' * 50)
except Exception as e:
logger.error(e)
self.gm_key = None
return user_info_data
async def update_data(self, users: list, try_num=3):
if not users:
return
async with self.gm_db_pool.acquire() as conn:
async with conn.cursor() as cur:
# 记录 userinfo
sql_values = []
is_role_sql = 'select id from gm_game_user where game=%s and serverid=%s and uid=%s'
role_count_sql = "SELECT count(*) FROM `gm_game_user` WHERE game=%s and binduid=%s"
update_sql = """insert into `gm_game_user` (
game,
serverid,
binduid,
uid,
`name`,
owner,
channel,
ctime,
lv,
vip,
rolenum,
lastlogintime,
lastip
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
on duplicate key update
lv = values(lv),
`name` = values(`name`),
vip = values(vip),
lastlogintime = values(lastlogintime),
lastip = values(lastip),
game = values(game),
serverid = values(serverid),
uid = values(uid)
"""
for u in users:
# 判断角色是否存在
await cur.execute(is_role_sql, (self.game_name, u['sid'], u['uid']))
is_role = await cur.fetchone()
# 账号有多少个角色
await cur.execute(role_count_sql, (self.game_name, u['binduid']))
role_cont = await cur.fetchone()
role_cont = role_cont[0]
if not is_role:
role_cont += 1
sql_values.append(
(
self.game_name,
u['sid'],
u['binduid'],
u['uid'],
u['name'],
u.get('ext_owner', 'unkonwn'),
u.get('ext_channel', 'ext_channel'),
u['ctime'],
u['lv'],
u['vip'],
role_cont,
int(time.time()),
u.get('lastloginip', '')
)
)
await cur.executemany(update_sql, sql_values)
await conn.commit()
# 记录渠道
update_channel_sql = """insert into `gm_game_channel_list` (game,channel) values (%s,%s)
on duplicate key update
game=values(game),
channel=values(channel)
"""
await cur.executemany(update_channel_sql, self.channel_set)
await conn.commit()
# 记录登录
sql = "select id from gm_game_login where game=%s and serverid=%s and uid=%s and cdate=%s"
for u in users:
date = time.strftime('%Y-%m-%d', time.localtime(int(u['lasttime'])))
await cur.execute(sql, [
self.game_name,
u['sid'],
u['uid'],
date
])
is_login = await cur.fetchone()
if not is_login:
sql1 = "insert into gm_game_login set game=%s,serverid=%s,uid=%s,cdate=%s,logintimes=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date,
1
])
await conn.commit()
else:
sql1 = "update gm_game_login set logintimes=logintimes+1 where game=%s and serverid=%s and uid=%s and cdate=%s"
await cur.execute(sql1, [
self.game_name,
u['sid'],
u['uid'],
date
])
await conn.commit()

1
utils/__init__.py Normal file
View File

@ -0,0 +1 @@
from .get_server import get_server_list

20
utils/get_server.py Normal file
View File

@ -0,0 +1,20 @@
import pandas as pd
from config import settings
"""
不能使用(server_df['running'] == 1)过滤
当停服且设置为维护状态该区服 running=0 该区服会过滤掉
但以db host同步 lastActTime_**_host 并不会停止
导致 该区服丢失一次数据
"""
def get_server_list() -> pd.DataFrame:
server_df = pd.read_json(settings.SERVER_LIST_URL)
server_df = server_df[
(server_df['debug'] != 1)
# & (server_df['running'] == 1)
& (server_df['dbuser'] != '')
& (~server_df['game'].isin(['xiyou', 'fengshen', 'hommh5', 'sanguo', 'gods']))
]
return server_df.groupby(['game', 'dbhost', 'dbport', 'dbuser', 'dbpwd'])