停止消费

This commit is contained in:
wuaho 2021-09-18 10:09:18 +08:00
parent 75bca68aeb
commit 4a670de26b

9
app.py
View File

@ -54,15 +54,16 @@ class XProcess(Process):
elif type_ == settings.STOP_SIGNAL: elif type_ == settings.STOP_SIGNAL:
# continue # continue
# 1 小时内有效 # 1 小时内有效
self.log.info(type_)
if msg.get('#time', 0) + 3600 < int(time.time()): if msg.get('#time', 0) + 3600 < int(time.time()):
continue continue
kafka_client.close() kafka_client.close()
# 停止消费kafka # 停止消费kafka
print(f'进程{self.partition} 等待90秒') self.log.info(f'进程{self.partition} 等待90秒')
time.sleep(90) time.sleep(90)
print(f'进程{self.partition} 写入数据') self.log.info(f'进程{self.partition} 写入数据')
transmitter.run() transmitter.run()
print(f'进程{self.partition} 结束') self.log.info(f'进程{self.partition} 结束')
break break
else: else:
@ -72,4 +73,4 @@ class XProcess(Process):
while True: while True:
time.sleep(5) time.sleep(5)
print(f'消费分区{self.partition} 已结束。。。') self.log.info(f'消费分区{self.partition} 已结束。。。')