diff --git a/app.py b/app.py index 15fe886..33ef189 100644 --- a/app.py +++ b/app.py @@ -65,6 +65,7 @@ class XProcess(Process): self.log.info(type_) if data.get('#time', 0) + 3600 < int(time.time()): continue + kafka_client.commit() kafka_client.close() # 停止消费kafka self.log.info(f'进程{self.partition} 等待90秒') diff --git a/settings.py.template b/settings.py.template index bb8cac2..85db4c0 100644 --- a/settings.py.template +++ b/settings.py.template @@ -17,10 +17,6 @@ class Config: 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, 'auto_offset_reset': 'earliest', - 'max_poll_interval_ms': 600000, - 'session_timeout_ms': 600000, - 'connections_max_idle_ms': 655000, - 'request_timeout_ms': 605000, 'enable_auto_commit': False, # 每个游戏不一样 diff --git a/single_process.py b/single_process.py index eb87f1a..1ca42a8 100644 --- a/single_process.py +++ b/single_process.py @@ -18,14 +18,14 @@ from v2.log import logger rdb = redis.Redis(**settings.REDIS_CONF) event_attr = EventAttr(rdb) -partition = -1 +partition = 0 def run(): db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client) handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_user = HandlerUser(db_client, settings.GAME) - transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) + transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr,partition) transmitter.add_source(handler_event, 1000, 10) transmitter.add_source(handler_user, 1000, 10) last_ts = int(time.time()) @@ -60,7 +60,7 @@ def run(): print(f'进程{msg.partition} 等待90秒') time.sleep(1) print(f'进程{msg.partition} 写入数据') - transmitter.run() + transmitter.run(kafka_client) print(f'进程{msg.partition} 结束') break @@ -69,7 +69,7 @@ def run(): else: continue - transmitter.run() + transmitter.run(kafka_client) while True: time.sleep(5)