diff --git a/app.py b/app.py index c725ea6..f83ba0e 100644 --- a/app.py +++ b/app.py @@ -69,14 +69,13 @@ class XProcess(Process): self.log.info(type_) if data.get('#time', 0) + 3600 < int(time.time()): continue - kafka_client.commit() - kafka_client.close() # 停止消费kafka self.log.info(f'进程{self.partition} 等待90秒') time.sleep(90) self.log.info(f'进程{self.partition} 写入数据') transmitter.run(kafka_client) self.log.info(f'进程{self.partition} 结束') + kafka_client.close() break elif type_ == 'test':