停止入库信号

This commit is contained in:
wuaho 2021-08-02 16:48:11 +08:00
parent b11dae3ed5
commit ff79538302
5 changed files with 49 additions and 3 deletions

16
app.py
View File

@ -1,3 +1,4 @@
# coding:utf-8
import time
from multiprocessing import Process
@ -27,7 +28,7 @@ class XProcess(Process):
transmitter.add_source(handler_event, 5000, 60)
transmitter.add_source(handler_user, 500, 60)
last_ts = int(time.time())
consumer = create_consumer(self.partition)
consumer, kafka_client = create_consumer(self.partition)
for topic, msg in consumer():
# print(msg)
@ -47,6 +48,19 @@ class XProcess(Process):
# continue
obj = getattr(handler_event, type_)
obj(msg)
elif type_ == settings.STOP_SIGNAL:
# 1 小时内有效
if msg.get('#time', 0) + 3600 < int(time.time()):
continue
kafka_client.close()
# 停止消费kafka
print(f'进程{self.partition} 等待90秒')
time.sleep(90)
print(f'进程{self.partition} 写入数据')
transmitter.run()
print(f'进程{self.partition} 结束')
break
else:
continue

View File

@ -1,3 +1,4 @@
#coding:utf-8
import os
import redis

View File

@ -1,5 +1,6 @@
import json
class Config:
# ck数据库连接
CK_CONFIG = {'host': '139.159.159.3',
@ -13,13 +14,21 @@ class Config:
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads,
'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'auto_commit_interval_ms': 10000,
# 每个游戏不一样
'group_id': 'shanhai_group'
}
KAFKA_PRODUCER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
}
# 游戏数据库名
GAME = 'shanhai'
STOP_SIGNAL = 'stop_MntxuXMc'
REDIS_CONF = {
'host': '192.168.0.161',
'port': 6379,

21
stop.py Normal file
View File

@ -0,0 +1,21 @@
# coding:utf-8
import time
import traceback
from kafka import KafkaProducer
from settings import settings
producer = KafkaProducer(**settings.KAFKA_PRODUCER_CONF)
msg = {
'#type': settings.STOP_SIGNAL,
'#time': int(time.time())
}
for i in range(0,16):
print(f'发送分区{i}')
future1 = producer.send(settings.GAME, msg, partition=i)
try:
future1.get(timeout=10)
except Exception as e: # 发送失败抛出kafka_errors
traceback.format_exc()

View File

@ -7,8 +7,9 @@ __all__ = 'create_consumer',
def create_consumer(partition=-1):
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
if partition > 0:
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
else:
@ -19,4 +20,4 @@ def create_consumer(partition=-1):
val = msg.value
yield topic, val
return consumer
return consumer, c