From 8ae3568c1573279c5ded0337fd2e37349fd59596 Mon Sep 17 00:00:00 2001 From: wuaho Date: Sat, 18 Sep 2021 10:42:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 18 ++++++++++-------- single_process.py | 44 ++++++++++++++++++++++++++++++++------------ v2/consumer.py | 7 ++++--- 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/app.py b/app.py index b554f58..93b0578 100644 --- a/app.py +++ b/app.py @@ -2,6 +2,8 @@ import time from multiprocessing import Process +from kafka import TopicPartition + from settings import settings from v2 import * from v2.struct_cache import StructCacheFile, StructCacheRedis @@ -33,16 +35,16 @@ class XProcess(Process): last_ts = int(time.time()) consumer, kafka_client = create_consumer(self.partition) - for topic, msg in consumer(): - # print(msg) - type_ = msg['#type'] - del msg['#type'] + for msg in consumer(): + data = msg.value + type_ = data['#type'] + del data['#type'] ts = int(time.time()) if 'user' in type_: # continue obj = getattr(handler_user, type_) - handler_user.receive_data.append(UserAct(obj, msg)) + handler_user.receive_data.append(UserAct(obj, data)) if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: last_ts = ts handler_user.execute() @@ -50,12 +52,12 @@ class XProcess(Process): elif 'track' in type_: # continue obj = getattr(handler_event, type_) - obj(msg) + obj(data) elif type_ == settings.STOP_SIGNAL: # continue # 1 小时内有效 self.log.info(type_) - if msg.get('#time', 0) + 3600 < int(time.time()): + if data.get('#time', 0) + 3600 < int(time.time()): continue kafka_client.close() # 停止消费kafka @@ -67,7 +69,7 @@ class XProcess(Process): break elif type_ == 'test': - self.log.info(f'消费分区{self.partition} -> {msg}') + self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}') else: continue diff --git a/single_process.py b/single_process.py index bd081b4..eb87f1a 100644 --- a/single_process.py +++ b/single_process.py @@ -1,6 +1,8 @@ +# coding:utf-8 import time import redis +from kafka import TopicPartition from settings import settings from v2 import * @@ -16,6 +18,7 @@ from v2.log import logger rdb = redis.Redis(**settings.REDIS_CONF) event_attr = EventAttr(rdb) +partition = -1 def run(): db_client = CK(**settings.CK_CONFIG) @@ -23,25 +26,21 @@ def run(): handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_user = HandlerUser(db_client, settings.GAME) transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) - transmitter.add_source(handler_event, 10000, 10) + transmitter.add_source(handler_event, 1000, 10) transmitter.add_source(handler_user, 1000, 10) last_ts = int(time.time()) - consumer, kafka_client = create_consumer(-1) + consumer, kafka_client = create_consumer(partition) - for topic, msg in consumer(): - # print(msg) - type_ = msg['#type'] - # if msg['#event_name'] != 'pay': - # continue - # print(msg) - - del msg['#type'] + for msg in consumer(): + data = msg.value + type_ = data['#type'] + del data['#type'] ts = int(time.time()) if 'user' in type_: # continue obj = getattr(handler_user, type_) - handler_user.receive_data.append(UserAct(obj, msg)) + handler_user.receive_data.append(UserAct(obj, data)) if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: last_ts = ts handler_user.execute() @@ -49,12 +48,33 @@ def run(): elif 'track' in type_: # continue obj = getattr(handler_event, type_) - obj(msg) + obj(data) + elif type_ == settings.STOP_SIGNAL: + # continue + # 1 小时内有效 + print(type_) + if data.get('#time', 0) + 3600 < int(time.time()): + continue + kafka_client.close() + # 停止消费kafka + print(f'进程{msg.partition} 等待90秒') + time.sleep(1) + print(f'进程{msg.partition} 写入数据') + transmitter.run() + print(f'进程{msg.partition} 结束') + + break + elif type_ == 'test': + print(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}') else: continue transmitter.run() + while True: + time.sleep(5) + print(f'消费分区{partition} 已结束。。。') + if __name__ == '__main__': run() diff --git a/v2/consumer.py b/v2/consumer.py index 8e3cfcd..1c6c862 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -16,8 +16,9 @@ def create_consumer(partition=-1): c.subscribe([settings.SUBSCRIBE_TOPIC]) for msg in c: # print(msg) - topic = msg.topic - val = msg.value - yield topic, val + yield msg + # topic = msg.topic + # val = msg.value + # yield topic, val return consumer, c