改为手动提交

This commit is contained in:
wuaho 2021-10-26 20:43:45 +08:00
parent bf55fafe46
commit 1d6854beff
3 changed files with 5 additions and 8 deletions

1
app.py
View File

@ -65,6 +65,7 @@ class XProcess(Process):
self.log.info(type_) self.log.info(type_)
if data.get('#time', 0) + 3600 < int(time.time()): if data.get('#time', 0) + 3600 < int(time.time()):
continue continue
kafka_client.commit()
kafka_client.close() kafka_client.close()
# 停止消费kafka # 停止消费kafka
self.log.info(f'进程{self.partition} 等待90秒') self.log.info(f'进程{self.partition} 等待90秒')

View File

@ -17,10 +17,6 @@ class Config:
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads, 'value_deserializer': json.loads,
'auto_offset_reset': 'earliest', 'auto_offset_reset': 'earliest',
'max_poll_interval_ms': 600000,
'session_timeout_ms': 600000,
'connections_max_idle_ms': 655000,
'request_timeout_ms': 605000,
'enable_auto_commit': False, 'enable_auto_commit': False,
# 每个游戏不一样 # 每个游戏不一样

View File

@ -18,14 +18,14 @@ from v2.log import logger
rdb = redis.Redis(**settings.REDIS_CONF) rdb = redis.Redis(**settings.REDIS_CONF)
event_attr = EventAttr(rdb) event_attr = EventAttr(rdb)
partition = -1 partition = 0
def run(): def run():
db_client = CK(**settings.CK_CONFIG) db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client) sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr,partition)
transmitter.add_source(handler_event, 1000, 10) transmitter.add_source(handler_event, 1000, 10)
transmitter.add_source(handler_user, 1000, 10) transmitter.add_source(handler_user, 1000, 10)
last_ts = int(time.time()) last_ts = int(time.time())
@ -60,7 +60,7 @@ def run():
print(f'进程{msg.partition} 等待90秒') print(f'进程{msg.partition} 等待90秒')
time.sleep(1) time.sleep(1)
print(f'进程{msg.partition} 写入数据') print(f'进程{msg.partition} 写入数据')
transmitter.run() transmitter.run(kafka_client)
print(f'进程{msg.partition} 结束') print(f'进程{msg.partition} 结束')
break break
@ -69,7 +69,7 @@ def run():
else: else:
continue continue
transmitter.run() transmitter.run(kafka_client)
while True: while True:
time.sleep(5) time.sleep(5)