diff --git a/app.py b/app.py index 7a6059b..3173289 100644 --- a/app.py +++ b/app.py @@ -22,7 +22,8 @@ class XProcess(Process): sketch = Sketch(db_client, StructCacheRedis(self.rdb)) handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch) handler_user = HandlerUser(db_client, settings.GAME) - transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr) + transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr, + self.partition) transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_user, 99, 60) last_ts = int(time.time()) diff --git a/v2/transmitter.py b/v2/transmitter.py index 606e4e8..329ba0e 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -10,7 +10,7 @@ __all__ = 'Transmitter', class Transmitter: - def __init__(self, db_client, db_name, sketch, log, lock, event_attr): + def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0): self.db_client = db_client self.db_name = db_name self.sketch = sketch @@ -18,6 +18,7 @@ class Transmitter: self.log = log self.lock = lock self.event_attr = event_attr + self.p = p def add_source(self, handler, bulk_max=1000, time_out=60): self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} @@ -48,7 +49,7 @@ class Transmitter: # print(f'{ts} {os.getpid()} 获得锁') self.db_client.execute('set input_format_allow_errors_ratio=0.2') self.db_client.execute(sql) - self.log.info(f'写入耗时 {int(time.time() * 1000) - ts}') + self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}') except Exception as e: # 丢弃错误行 再次发送 if hasattr(e, 'code') and e.code == 26: