This commit is contained in:
wuaho 2021-06-10 13:59:37 +08:00
parent e74afd7e45
commit 4f66ebbe64
2 changed files with 5 additions and 3 deletions

3
app.py
View File

@ -22,7 +22,8 @@ class XProcess(Process):
sketch = Sketch(db_client, StructCacheRedis(self.rdb))
handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch)
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
self.partition)
transmitter.add_source(handler_event, 10000, 60)
transmitter.add_source(handler_user, 99, 60)
last_ts = int(time.time())

View File

@ -10,7 +10,7 @@ __all__ = 'Transmitter',
class Transmitter:
def __init__(self, db_client, db_name, sketch, log, lock, event_attr):
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
self.db_client = db_client
self.db_name = db_name
self.sketch = sketch
@ -18,6 +18,7 @@ class Transmitter:
self.log = log
self.lock = lock
self.event_attr = event_attr
self.p = p
def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
@ -48,7 +49,7 @@ class Transmitter:
# print(f'{ts} {os.getpid()} 获得锁')
self.db_client.execute('set input_format_allow_errors_ratio=0.2')
self.db_client.execute(sql)
self.log.info(f'写入耗时 {int(time.time() * 1000) - ts}')
self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}')
except Exception as e:
# 丢弃错误行 再次发送
if hasattr(e, 'code') and e.code == 26: