diff --git a/app.py b/app.py index 6424826..af71434 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,4 @@ +# coding:utf-8 import time from multiprocessing import Process @@ -27,7 +28,7 @@ class XProcess(Process): transmitter.add_source(handler_event, 5000, 60) transmitter.add_source(handler_user, 500, 60) last_ts = int(time.time()) - consumer = create_consumer(self.partition) + consumer, kafka_client = create_consumer(self.partition) for topic, msg in consumer(): # print(msg) @@ -47,6 +48,19 @@ class XProcess(Process): # continue obj = getattr(handler_event, type_) obj(msg) + elif type_ == settings.STOP_SIGNAL: + # 1 小时内有效 + if msg.get('#time', 0) + 3600 < int(time.time()): + continue + kafka_client.close() + # 停止消费kafka + print(f'进程{self.partition} 等待90秒') + time.sleep(90) + print(f'进程{self.partition} 写入数据') + transmitter.run() + print(f'进程{self.partition} 结束') + + break else: continue diff --git a/main.py b/main.py index c605202..52687ee 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +#coding:utf-8 import os import redis diff --git a/settings.py b/settings.py index 75c27aa..62341e5 100644 --- a/settings.py +++ b/settings.py @@ -1,5 +1,6 @@ import json + class Config: # ck数据库连接 CK_CONFIG = {'host': '139.159.159.3', @@ -13,13 +14,21 @@ class Config: 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, 'auto_offset_reset': 'earliest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 10000, # 每个游戏不一样 'group_id': 'shanhai_group' } + KAFKA_PRODUCER_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + } # 游戏数据库名 GAME = 'shanhai' + STOP_SIGNAL = 'stop_MntxuXMc' + REDIS_CONF = { 'host': '192.168.0.161', 'port': 6379, diff --git a/stop.py b/stop.py new file mode 100644 index 0000000..f2160fb --- /dev/null +++ b/stop.py @@ -0,0 +1,21 @@ +# coding:utf-8 +import time +import traceback + +from kafka import KafkaProducer + +from settings import settings + +producer = KafkaProducer(**settings.KAFKA_PRODUCER_CONF) + +msg = { + '#type': settings.STOP_SIGNAL, + '#time': int(time.time()) +} +for i in range(0,16): + print(f'发送分区{i}') + future1 = producer.send(settings.GAME, msg, partition=i) + try: + future1.get(timeout=10) + except Exception as e: # 发送失败抛出kafka_errors + traceback.format_exc() diff --git a/v2/consumer.py b/v2/consumer.py index 897ac58..8e3cfcd 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -7,8 +7,9 @@ __all__ = 'create_consumer', def create_consumer(partition=-1): + c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) + def consumer(): - c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) if partition > 0: c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) else: @@ -19,4 +20,4 @@ def create_consumer(partition=-1): val = msg.value yield topic, val - return consumer + return consumer, c