diff --git a/app.py b/app.py index e307140..b624073 100644 --- a/app.py +++ b/app.py @@ -8,18 +8,20 @@ from v2.struct_cache import StructCacheFile, StructCacheRedis class XProcess(Process): - def __init__(self, partition, lock, ipsearch): + def __init__(self, partition, lock, ipsearch, log, rdb=None): super(XProcess, self).__init__() self.partition = partition self.lock = lock self.ipsearch = ipsearch + self.rdb = rdb + self.log = log def run(self): db_client = CK(**settings.CK_CONFIG) - sketch = Sketch(db_client, StructCacheRedis()) + sketch = Sketch(db_client, StructCacheRedis(self.rdb)) handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch) handler_user = HandlerUser(db_client, settings.GAME) - transmitter = Transmitter(db_client, settings.GAME, sketch, self.lock) + transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock) transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_user, 1000, 60) last_ts = int(time.time()) diff --git a/main.py b/main.py index 5861504..b873b83 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,12 @@ import os +import redis + from app import XProcess from multiprocessing import Lock +from settings import settings from v2.ipregion import IpSearch, Ip2Region +from v2.log import logger if __name__ == '__main__': pid = os.getpid() @@ -14,6 +18,8 @@ echo `pstree -p {pid}`|awk 'BEGIN{{ FS="(" ; RS=")" }} NF>1 {{ print $NF }}'|xar f.write(stop_shell.format(pid=pid)) lock = Lock() + rdb = redis.Redis(**settings.REDIS_CONF) + ipsearch = IpSearch(Ip2Region, "ip2region.db") for i in range(0, 16): - XProcess(i, lock, ipsearch).start() + XProcess(i, lock, ipsearch, logger, rdb).start() diff --git a/settings.py b/settings.py index 51bcd49..7c4df1c 100644 --- a/settings.py +++ b/settings.py @@ -1,12 +1,11 @@ import json - class Config: # ck数据库连接 CK_CONFIG = {'host': '119.29.176.224', 'send_receive_timeout': 30} - #每个游戏不一样 游戏上报 kafka 主题 + # 每个游戏不一样 游戏上报 kafka 主题 SUBSCRIBE_TOPIC = 'test2' KAFKA_CONSUMER_CONF = { @@ -25,7 +24,7 @@ class Config: 'port': 6379, 'password': 'd1Gh*zp5', 'db': 1, # 存表结构 - 'decode_responses':True + 'decode_responses': True } diff --git a/v2/log.py b/v2/log.py new file mode 100644 index 0000000..67523ff --- /dev/null +++ b/v2/log.py @@ -0,0 +1,8 @@ +import time + +from loguru import logger + +from settings import settings + +__all__ = 'logger' +logger.add(settings.GAME, format="{time} {level} {message}", level="INFO", rotation=100, enqueue=True) diff --git a/v2/struct_cache.py b/v2/struct_cache.py index b397498..5e58653 100644 --- a/v2/struct_cache.py +++ b/v2/struct_cache.py @@ -15,8 +15,8 @@ class StructCacheFile: class StructCacheRedis: - def __init__(self): - self.rdb = redis.Redis(**settings.REDIS_CONF) + def __init__(self, rdb): + self.rdb = rdb def update(self, db, tb, data): self.rdb.set(f'{db}_{tb}', json.dumps(data)) diff --git a/v2/transmitter.py b/v2/transmitter.py index 6ba0072..99e86ae 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -10,11 +10,12 @@ __all__ = 'Transmitter', class Transmitter: - def __init__(self, db_client, db_name, sketch, lock): + def __init__(self, db_client, db_name, sketch, log, lock): self.db_client = db_client self.db_name = db_name self.sketch = sketch self.slots = dict() + self.log = log self.lock = lock def add_source(self, handler, bulk_max=1000, time_out=60): @@ -83,7 +84,7 @@ class Transmitter: item[k] = TYPE_CK2PY[type_](v, **item) if v is None: del_keys.add(k) - print(k, '类型不一致') + self.log.warning(f'{k} {type(k)} 类型不一致') for key in del_keys: del item[key]