From 9aa46f12a8de2acdd1a4f92b67280e5478a74f00 Mon Sep 17 00:00:00 2001 From: wuaho Date: Sat, 8 May 2021 18:24:31 +0800 Subject: [PATCH] 1 --- app.py | 48 ++++++++++++++++++++++++++++++++++++++ main.py | 49 ++++----------------------------------- settings.py | 7 +++--- v2/consumer.py | 25 +++++++++++++------- v2/db.py | 57 +++++++++++++++++++++++++++++++++++----------- v2/handler_user.py | 12 ++++++---- v2/transmitter.py | 39 +++++++++++++++++++++---------- v2/valid_data.py | 1 - 8 files changed, 153 insertions(+), 85 deletions(-) create mode 100644 app.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..c3f39da --- /dev/null +++ b/app.py @@ -0,0 +1,48 @@ +import time +from multiprocessing import Process + +from settings import settings +from v2 import * + + +class XProcess(Process): + + def __init__(self, partition, lock): + super(XProcess, self).__init__() + self.partition = partition + self.lock = lock + + + def run(self): + db_client = CK(**settings.CK_CONFIG) + sketch = Sketch(db_client) + handler_event = HandlerEvent(db_client, settings.GAME) + handler_user = HandlerUser(db_client, settings.GAME) + transmitter = Transmitter(db_client, settings.GAME, sketch, self.lock) + transmitter.add_source(handler_event, 10000, 60) + transmitter.add_source(handler_user, 1000, 60) + last_ts = int(time.time()) + consumer = create_consumer(self.partition) + + for topic, msg in consumer(): + # print(msg) + type_ = msg['#type'] + del msg['#type'] + ts = int(time.time()) + + if 'user' in type_: + # continue + obj = getattr(handler_user, type_) + handler_user.receive_data.append(UserAct(obj, msg)) + if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: + last_ts = ts + handler_user.execute() + + elif 'track' in type_: + # continue + obj = getattr(handler_event, type_) + obj(msg) + else: + continue + + transmitter.run() diff --git a/main.py b/main.py index f041f67..a6ffcbf 100644 --- a/main.py +++ b/main.py @@ -1,46 +1,7 @@ -import time - -from settings import settings -from v2 import * - -db_client = CK(**settings.CK_CONFIG) -sketch = Sketch(db_client) -handler_event = HandlerEvent(db_client, settings.GAME) -handler_user = HandlerUser(db_client, settings.GAME) -transmitter = Transmitter(db_client, settings.GAME, sketch) - - -def run(): - transmitter.add_source(handler_event, 1000, 60) - transmitter.add_source(handler_user, 500, 60) - i = 0 - ts = time.time() * 1000 - for topic, msg in consumer(): - i += 1 - if i > 10000: - print(time.time() * 1000 - ts) - ts = time.time() * 1000 - - i = 0 - type_ = msg['#type'] - del msg['#type'] - if 'user' in type_: - # continue - obj = getattr(handler_user, type_) - handler_user.receive_data.append(UserAct(obj, msg)) - if len(handler_user.receive_data) >= 1000: - handler_user.execute() - - - elif 'track' in type_: - # continue - obj = getattr(handler_event, type_) - obj(msg) - else: - continue - - transmitter.run() - +from app import XProcess +from multiprocessing import Lock if __name__ == '__main__': - run() + lock = Lock() + for i in range(0, 16): + XProcess(i, lock).start() diff --git a/settings.py b/settings.py index 0c8cc14..27a326e 100644 --- a/settings.py +++ b/settings.py @@ -3,14 +3,15 @@ import json class Config: CK_CONFIG = {'host': '119.29.176.224', - 'send_receive_timeout': 3} + 'send_receive_timeout': 30} - SUBSCRIBE_TOPIC = ['test', 'test2'] + SUBSCRIBE_TOPIC = 'test2' KAFKA_CONSUMER_CONF = { 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, - 'group_id': 'legu_group' + 'auto_offset_reset': 'earliest', + 'group_id': 'legu_group3' } GAME = 'shjy' diff --git a/v2/consumer.py b/v2/consumer.py index 77f753f..897ac58 100644 --- a/v2/consumer.py +++ b/v2/consumer.py @@ -1,13 +1,22 @@ from kafka import KafkaConsumer +from kafka import TopicPartition + from settings import settings -__all__ = 'consumer', +__all__ = 'create_consumer', -def consumer(): - c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) - c.subscribe(settings.SUBSCRIBE_TOPIC) - for msg in c: - topic = msg.topic - val = msg.value - yield topic, val +def create_consumer(partition=-1): + def consumer(): + c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) + if partition > 0: + c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) + else: + c.subscribe([settings.SUBSCRIBE_TOPIC]) + for msg in c: + # print(msg) + topic = msg.topic + val = msg.value + yield topic, val + + return consumer diff --git a/v2/db.py b/v2/db.py index a21cf2a..49fdfd1 100644 --- a/v2/db.py +++ b/v2/db.py @@ -1,5 +1,7 @@ __all__ = 'CK', +import traceback + import pandas as pd from datetime import datetime from datetime import timedelta @@ -8,23 +10,35 @@ from clickhouse_driver import Client from pandas import DatetimeTZDtype -class CK(Client): +class CK: def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + self.args = args + self.kwargs = kwargs + self.__client = self.__create_client() + + def __create_client(self): + return Client(*self.args, **self.kwargs) + + def execute(self, *args, **kwargs): + return self.__client.execute(*args, **kwargs) + + def get_one(self, db, tb, try_cnt=3, **where): - def get_one(self, db, tb, **where) -> dict: - """ - 注意 还原时区 - :param db: - :param tb: - :param where: - :return: - """ sql = f"select * from {db}.{tb} where 1" for k, v in where.items(): sql += f" and `{k}`='{v}'" sql += ' limit 1' - data, columns = self.execute(sql, with_column_types=True) + data = None + try: + data, columns = self.__client.execute(sql, with_column_types=True) + except Exception as e: + traceback.print_exc() + self.__client.disconnect() + self.__client = self.__create_client() + if try_cnt > 0: + self.get_one(db, tb, try_cnt - 1, **where) + else: + return None res = dict() if data: data = {k[0]: v for k, v in zip(columns, data[0])} @@ -34,8 +48,9 @@ class CK(Client): else: res[k] = v return res + return None - def get_all(self, db, tb, where: str) -> dict: + def get_all(self, db, tb, where: str, try_cnt=3): """ 注意 还原时区 :param db: @@ -45,7 +60,23 @@ class CK(Client): """ sql = f"select * from {db}.{tb} where " sql += where - data, columns = self.execute(sql, columnar=True, with_column_types=True) + data = None + try: + data, columns = self.__client.execute(sql, columnar=True, with_column_types=True) + except Exception as e: + traceback.print_exc() + self.__client.disconnect() + self.__client = self.__create_client() + if try_cnt > 0: + self.get_all(db, tb, where, try_cnt - 1) + + # 异常导致导致 避免认为用户不存在 + if data is None: + return None + + if not data: + return dict() + df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) tz = df['#zone_offset'].apply(lambda x: timedelta(hours=x)) for t_type in df.select_dtypes(include=[DatetimeTZDtype]): diff --git a/v2/handler_user.py b/v2/handler_user.py index 1829699..680d701 100644 --- a/v2/handler_user.py +++ b/v2/handler_user.py @@ -27,6 +27,9 @@ class HandlerUser: def get_users(self, account_ids: set): where = f'`#account_id` in {tuple(account_ids)}' res = self.db_client.get_all(self.db_name, 'user_view', where) + if res is None: + print('ck查询出错了') + return for item in res.values(): self.users[item['#account_id']] = item @@ -35,13 +38,14 @@ class HandlerUser: if user: return user - user = self.db_client.get_one(self.db_name, f'{self.tb}_view', **{'#account_id': account_id}) - if user: - self.users[account_id] = user - return user + # user = self.db_client.get_one(self.db_name, f'{self.tb}_view', **{'#account_id': account_id}) + # if user: + # self.users[account_id] = user + # return user if not isinstance(data, dict): return user = dict() + data['#reg_time'] = data['#time'] self.merge(user, data) self.users[account_id] = user return user diff --git a/v2/transmitter.py b/v2/transmitter.py index 6dfe78d..6ba0072 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -1,6 +1,8 @@ import json +import os import re import time +import traceback from .valid_data import * @@ -8,23 +10,23 @@ __all__ = 'Transmitter', class Transmitter: - def __init__(self, db_client, db_name, sketch): + def __init__(self, db_client, db_name, sketch, lock): self.db_client = db_client self.db_name = db_name self.sketch = sketch - self.ts = int(time.time()) self.slots = dict() + self.lock = lock def add_source(self, handler, bulk_max=1000, time_out=60): - self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out} + self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} def check_send(self): for h, p in self.slots.items(): ts = int(time.time()) tb, buffer = h.buffer_pool buffer_size = len(buffer) - if (self.ts + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: - self.ts = ts + if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: + p['ts'] = ts yield tb, buffer @staticmethod @@ -39,11 +41,15 @@ class Transmitter: sql = sql + '\n'.join(data) try: # 允许20%错误率 + # self.lock.acquire() + ts = int(time.time() * 1000) + # print(f'{ts} {os.getpid()} 获得锁') self.db_client.execute('set input_format_allow_errors_ratio=0.2') self.db_client.execute(sql) + print(f'写入耗时 {int(time.time() * 1000) - ts}') except Exception as e: # 丢弃错误行 再次发送 - if e.code == 26: + if hasattr(e, 'code') and e.code == 26: m = re.match('(.*)?Stack trace', e.message) if m: error_msg = m.group(1) @@ -52,9 +58,15 @@ class Transmitter: error_row = int(error_row.group(1)) - 1 error_data = data.pop(error_row) self.__send(db, tb, data) - else: + else: + # pass + traceback.print_exc() + finally: pass - # print(f'{db}.{tb}插入{len(data)}条') + # print(f' {os.getpid()} 释放锁') + # self.lock.release() + + print(f'{db}.{tb}插入{len(data)}条') def check_table(self, db, tb, data): [self.sketch.alter_table(db, tb, item) for item in data] @@ -77,8 +89,11 @@ class Transmitter: def run(self): for tb, buffer in self.check_send(): - data = [self.flat_data(x) for x in buffer.values()] - self.check_table(self.db_name, tb, data) - self.check_type(self.db_name, tb, data) - self.__send(self.db_name, tb, [json.dumps(item) for item in data]) + try: + data = [self.flat_data(x) for x in buffer.values()] + self.check_table(self.db_name, tb, data) + self.check_type(self.db_name, tb, data) + self.__send(self.db_name, tb, [json.dumps(item) for item in data]) + except: + pass buffer.clear() diff --git a/v2/valid_data.py b/v2/valid_data.py index 2df001f..d1cfa0b 100644 --- a/v2/valid_data.py +++ b/v2/valid_data.py @@ -52,7 +52,6 @@ def is_valid_array(v, **kwargs): def is_valid_ipv4(v, **kwargs): try: - return v return str(IPv4Address(v)) except: return None