From 1d6854beff46584e521cfed076e47a19d967dabe Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 26 Oct 2021 20:43:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E4=B8=BA=E6=89=8B=E5=8A=A8=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 1 + settings.py.template | 4 ---- single_process.py | 8 ++++---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/app.py b/app.py index 15fe886..33ef189 100644 --- a/app.py +++ b/app.py @@ -65,6 +65,7 @@ class XProcess(Process): self.log.info(type_) if data.get('#time', 0) + 3600 < int(time.time()): continue + kafka_client.commit() kafka_client.close() # 停止消费kafka self.log.info(f'进程{self.partition} 等待90秒') diff --git a/settings.py.template b/settings.py.template index bb8cac2..85db4c0 100644 --- a/settings.py.template +++ b/settings.py.template @@ -17,10 +17,6 @@ class Config: 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, 'auto_offset_reset': 'earliest', - 'max_poll_interval_ms': 600000, - 'session_timeout_ms': 600000, - 'connections_max_idle_ms': 655000, - 'request_timeout_ms': 605000, 'enable_auto_commit': False, # 每个游戏不一样 diff --git a/single_process.py b/single_process.py index eb87f1a..1ca42a8 100644 --- a/single_process.py +++ b/single_process.py @@ -18,14 +18,14 @@ from v2.log import logger rdb = redis.Redis(**settings.REDIS_CONF) event_attr = EventAttr(rdb) -partition = -1 +partition = 0 def run(): db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client) handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_user = HandlerUser(db_client, settings.GAME) - transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) + transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr,partition) transmitter.add_source(handler_event, 1000, 10) transmitter.add_source(handler_user, 1000, 10) last_ts = int(time.time()) @@ -60,7 +60,7 @@ def run(): print(f'进程{msg.partition} 等待90秒') time.sleep(1) print(f'进程{msg.partition} 写入数据') - transmitter.run() + transmitter.run(kafka_client) print(f'进程{msg.partition} 结束') break @@ -69,7 +69,7 @@ def run(): else: continue - transmitter.run() + transmitter.run(kafka_client) while True: time.sleep(5)