diff --git a/app.py b/app.py index 56aa02d..af71434 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,4 @@ +# coding:utf-8 import time from multiprocessing import Process @@ -27,7 +28,7 @@ class XProcess(Process): transmitter.add_source(handler_event, 5000, 60) transmitter.add_source(handler_user, 500, 60) last_ts = int(time.time()) - consumer = create_consumer(self.partition) + consumer, kafka_client = create_consumer(self.partition) for topic, msg in consumer(): # print(msg) @@ -48,12 +49,16 @@ class XProcess(Process): obj = getattr(handler_event, type_) obj(msg) elif type_ == settings.STOP_SIGNAL: - # 停止消费kafka - print(f'进程{self.partition} 等待90秒') + # 1 灏忔椂鍐呮湁鏁 + if msg.get('#time', 0) + 3600 < int(time.time()): + continue + kafka_client.close() + # 鍋滄娑堣垂kafka + print(f'杩涚▼{self.partition} 绛夊緟90绉') time.sleep(90) - print(f'进程{self.partition} 写入数据') + print(f'杩涚▼{self.partition} 鍐欏叆鏁版嵁') transmitter.run() - print(f'进程{self.partition} 结束') + print(f'杩涚▼{self.partition} 缁撴潫') break else: diff --git a/main.py b/main.py index c605202..52687ee 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +#coding:utf-8 import os import redis diff --git a/settings.py b/settings.py index 40c0a60..edec1fa 100644 --- a/settings.py +++ b/settings.py @@ -14,13 +14,21 @@ class Config: 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, 'auto_offset_reset': 'earliest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 10000, # 姣忎釜娓告垙涓嶄竴鏍 'group_id': 'zhengba_consumer_group' } + KAFKA_PRODUCER_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + } # 娓告垙鏁版嵁搴撳悕 GAME = 'zhengba' + STOP_SIGNAL = 'stop_MntxuXMc' + REDIS_CONF = { 'host': '192.168.0.161', 'port': 6379, @@ -29,8 +37,6 @@ class Config: 'decode_responses': True } - STOP_SIGNAL = 'stop_MntxuXMc' - class Debug(Config): pass diff --git a/v2/consumer.py b/v2/consumer.py index 747cef4..8e3cfcd 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -7,8 +7,9 @@ __all__ = 'create_consumer', def create_consumer(partition=-1): + c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) + def consumer(): - c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) if partition > 0: c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) else: @@ -17,8 +18,6 @@ def create_consumer(partition=-1): # print(msg) topic = msg.topic val = msg.value - if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'): - continue yield topic, val - return consumer + return consumer, c