停止入库信号
This commit is contained in:
parent
132f6cdb5c
commit
009d5f0560
15
app.py
15
app.py
@ -1,3 +1,4 @@
|
||||
# coding:utf-8
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
@ -27,7 +28,7 @@ class XProcess(Process):
|
||||
transmitter.add_source(handler_event, 5000, 60)
|
||||
transmitter.add_source(handler_user, 500, 60)
|
||||
last_ts = int(time.time())
|
||||
consumer = create_consumer(self.partition)
|
||||
consumer, kafka_client = create_consumer(self.partition)
|
||||
|
||||
for topic, msg in consumer():
|
||||
# print(msg)
|
||||
@ -48,12 +49,16 @@ class XProcess(Process):
|
||||
obj = getattr(handler_event, type_)
|
||||
obj(msg)
|
||||
elif type_ == settings.STOP_SIGNAL:
|
||||
# 停止消费kafka
|
||||
print(f'进程{self.partition} 等待90秒')
|
||||
# 1 小时内有效
|
||||
if msg.get('#time', 0) + 3600 < int(time.time()):
|
||||
continue
|
||||
kafka_client.close()
|
||||
# 停止消费kafka
|
||||
print(f'进程{self.partition} 等待90秒')
|
||||
time.sleep(90)
|
||||
print(f'进程{self.partition} 写入数据')
|
||||
print(f'进程{self.partition} 写入数据')
|
||||
transmitter.run()
|
||||
print(f'进程{self.partition} 结束')
|
||||
print(f'进程{self.partition} 结束')
|
||||
|
||||
break
|
||||
else:
|
||||
|
10
settings.py
10
settings.py
@ -14,13 +14,21 @@ class Config:
|
||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||
'value_deserializer': json.loads,
|
||||
'auto_offset_reset': 'earliest',
|
||||
'enable_auto_commit': True,
|
||||
'auto_commit_interval_ms': 10000,
|
||||
|
||||
# 每个游戏不一样
|
||||
'group_id': 'zhengba_consumer_group'
|
||||
}
|
||||
KAFKA_PRODUCER_CONF = {
|
||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
|
||||
}
|
||||
# 游戏数据库名
|
||||
GAME = 'zhengba'
|
||||
|
||||
STOP_SIGNAL = 'stop_MntxuXMc'
|
||||
|
||||
REDIS_CONF = {
|
||||
'host': '192.168.0.161',
|
||||
'port': 6379,
|
||||
@ -29,8 +37,6 @@ class Config:
|
||||
'decode_responses': True
|
||||
}
|
||||
|
||||
STOP_SIGNAL = 'stop_MntxuXMc'
|
||||
|
||||
|
||||
class Debug(Config):
|
||||
pass
|
||||
|
@ -7,8 +7,9 @@ __all__ = 'create_consumer',
|
||||
|
||||
|
||||
def create_consumer(partition=-1):
|
||||
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
||||
|
||||
def consumer():
|
||||
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
||||
if partition > 0:
|
||||
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
|
||||
else:
|
||||
@ -17,8 +18,6 @@ def create_consumer(partition=-1):
|
||||
# print(msg)
|
||||
topic = msg.topic
|
||||
val = msg.value
|
||||
if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'):
|
||||
continue
|
||||
yield topic, val
|
||||
|
||||
return consumer
|
||||
return consumer, c
|
||||
|
Loading…
Reference in New Issue
Block a user