diff --git a/app.py b/app.py index 6424826..56aa02d 100644 --- a/app.py +++ b/app.py @@ -47,6 +47,15 @@ class XProcess(Process): # continue obj = getattr(handler_event, type_) obj(msg) + elif type_ == settings.STOP_SIGNAL: + # 停止消费kafka + print(f'进程{self.partition} 等待90秒') + time.sleep(90) + print(f'进程{self.partition} 写入数据') + transmitter.run() + print(f'进程{self.partition} 结束') + + break else: continue diff --git a/settings.py b/settings.py index df0c107..40c0a60 100644 --- a/settings.py +++ b/settings.py @@ -29,6 +29,8 @@ class Config: 'decode_responses': True } + STOP_SIGNAL = 'stop_MntxuXMc' + class Debug(Config): pass