diff --git a/main.py b/main.py index cc19cdf..4567d29 100644 --- a/main.py +++ b/main.py @@ -1,32 +1,45 @@ +import time + from settings import settings from v2 import * db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client) handler_event = HandlerEvent(db_client) -handler_user = HandlerUser(db_client) +handler_user = HandlerUser(db_client, settings.GAME) transmitter = Transmitter(db_client, sketch) def run(): transmitter.add_source(handler_event, 1000, 60) - transmitter.add_source(handler_user, 100, 60) + transmitter.add_source(handler_user, 500, 60) + i = 0 + ts = time.time() * 1000 for topic, msg in consumer(): - # print(msg) + i += 1 + if i > 10000: + print(time.time() * 1000-ts) + ts = time.time() * 1000 + + i = 0 type_ = msg['#type'] + del msg['#type'] db = settings.APPID_TO_CKDB.get(msg['#app_id']) if 'user' in type_: # continue obj = getattr(handler_user, type_) + handler_user.receive_data.append(User(obj, db, msg)) + if len(handler_user.receive_data) >= 1000: + handler_user.execute() + + elif 'track' in type_: # continue obj = getattr(handler_event, type_) + obj(db, msg) else: continue - del msg['#type'] - obj(db, msg) - transmitter.run() diff --git a/settings.py b/settings.py index 147969b..b4b139b 100644 --- a/settings.py +++ b/settings.py @@ -5,12 +5,13 @@ class Config: CK_CONFIG = {'host': '119.29.176.224', 'send_receive_timeout': 3} - SUBSCRIBE_TOPIC = ['test','test2'] + SUBSCRIBE_TOPIC = ['test', 'test2'] KAFKA_CONSUMER_CONF = { 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, - 'group_id': 'legu_group' + # 'group_id': 'legu_group' + 'group_id': 'ta2legu' } TOPIC_TO_LEGU = { @@ -18,6 +19,8 @@ class Config: 'c3e0409ac18341149877b08f087db640': 'legu_test' } + GAME = 'shjy' + APPID_TO_CKDB = { 'a77703e24e6643d08b74a4163a14f74c': 'shjy', 'c3e0409ac18341149877b08f087db640': 'shjy' diff --git a/v2/db.py b/v2/db.py index b339aa2..fe6d986 100644 --- a/v2/db.py +++ b/v2/db.py @@ -1,9 +1,13 @@ __all__ = 'CK', +import pandas as pd +import numpy as np from datetime import datetime from datetime import timedelta from clickhouse_driver import Client +from pandas import DatetimeTZDtype +from pandas import Timedelta class CK(Client): @@ -32,3 +36,21 @@ class CK(Client): else: res[k] = v return res + + def get_all(self, db, tb, where: str) -> dict: + """ + 注意 还原时区 + :param db: + :param tb: + :param where: + :return: + """ + sql = f"select * from {db}.{tb} where " + sql += where + data, columns = self.execute(sql, columnar=True, with_column_types=True) + df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) + tz = df['#zone_offset'].apply(lambda x: timedelta(hours=x)) + for t_type in df.select_dtypes(include=[DatetimeTZDtype]): + df[t_type] = (df[t_type] + tz).apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) + + return df.T.to_dict() diff --git a/v2/handler_event.py b/v2/handler_event.py index fe06c54..878369d 100644 --- a/v2/handler_event.py +++ b/v2/handler_event.py @@ -6,9 +6,10 @@ __all__ = 'HandlerEvent', class HandlerEvent: tb = 'event' - def __init__(self, db_client): + def __init__(self, db_client,db_name): self.event = dict() self.db_client = db_client + self.db_name = db_name def merge_update(self, a: dict, b: dict): """ diff --git a/v2/handler_user.py b/v2/handler_user.py index 9f39706..53ff0ac 100644 --- a/v2/handler_user.py +++ b/v2/handler_user.py @@ -1,13 +1,35 @@ -__all__ = 'HandlerUser', +from collections import namedtuple + +__all__ = 'HandlerUser', 'User' + +User = namedtuple('User', ['obj', 'db', 'msg']) class HandlerUser: tb = 'user' user_key = '#account_id' - def __init__(self, db_client): + def __init__(self, db_client, db_name): self.users = dict() self.db_client = db_client + self.receive_data = [] + self.db_name = db_name + + def execute(self): + account_ids = set(item.msg.get('#account_id') for item in self.receive_data) - set( + self.users.setdefault(self.db_name, {})) + if not account_ids: + return + self.get_users(account_ids) + for item in self.receive_data: + item.obj(item.db, item.msg) + self.receive_data.clear() + + def get_users(self, account_ids: set): + where = f'`#account_id` in {tuple(account_ids)}' + res = self.db_client.get_all(self.db_name, 'user_view', where) + for item in res.values(): + self.users.setdefault(self.db_name, {}).setdefault(item['#account_id'], item) def get_user(self, db, account_id, data=None): user = self.users.get(db, {}).get(account_id) diff --git a/v2/sketch.py b/v2/sketch.py index a1f6be4..dc010c2 100644 --- a/v2/sketch.py +++ b/v2/sketch.py @@ -106,8 +106,7 @@ class Sketch: except Exception as e: print(f'添加字段 {k} 失败') default_field.pop(k) - else: - self.update_user_view(db, tb) if set(default_field) - keys: self.up_tb_struct(db, tb, default_field) + self.update_user_view(db, tb) diff --git a/v2/transmitter.py b/v2/transmitter.py index f306aee..5b24da4 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -1,5 +1,6 @@ import json import re +import time from .valid_data import * @@ -51,16 +52,25 @@ class Transmitter: error_data = data.pop(error_row) self.__send(db, tb, data) else: - print(f'{db}.{tb}插入{len(data)}条') + pass + # print(f'{db}.{tb}插入{len(data)}条') def check_table(self, db, tb, data): [self.sketch.alter_table(db, tb, item) for item in data] def check_type(self, db, tb, data): + # import cProfile, pstats + # from io import StringIO + # + # pr = cProfile.Profile() + # pr.enable() struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] for item in data: del_keys = set() for k, v in item.items(): + if v is None: + del_keys.add(k) + continue type_ = struct_dict[k] item[k] = TYPE_CK2PY[type_](v, **item) if v is None: @@ -69,10 +79,31 @@ class Transmitter: for key in del_keys: del item[key] + # pr.disable() + # s = StringIO() + # ps = pstats.Stats(pr, stream=s).sort_stats('cumulative') + # ps.print_stats() + # print(s.getvalue()) + def run(self): + + for db, tb, buffer in self.check_send(): + # print('*' * 50) + # print(1, int(time.time() * 1000)) data = [self.flat_data(x) for x in buffer.values()] + # print(2, int(time.time() * 1000)) + self.check_table(db, tb, data) + # print(3, int(time.time() * 1000)) + self.check_type(db, tb, data) + # print(4, int(time.time() * 1000)) + self.__send(db, tb, [json.dumps(item) for item in data]) + # print(5, int(time.time() * 1000)) + buffer.clear() + # print(6, int(time.time() * 1000)) + + diff --git a/v2/valid_data.py b/v2/valid_data.py index 0855900..2df001f 100644 --- a/v2/valid_data.py +++ b/v2/valid_data.py @@ -1,4 +1,3 @@ -import time from datetime import datetime from datetime import timedelta from ipaddress import IPv4Address @@ -6,7 +5,13 @@ from ipaddress import IPv4Address def is_valid_date(v, **kwargs): try: - date = datetime.strptime(v, "%Y-%m-%d %H:%M:%S") + date = datetime(int(v[:4]), + int(v[5:7]), + int(v[8:10]), + int(v[11:13]), + int(v[14:16]), + int(v[17:]) + ) zone_offset = kwargs.get('#zone_offset', 8) return (date - timedelta(hours=zone_offset)).strftime("%Y-%m-%d %H:%M:%S") except: @@ -47,6 +52,7 @@ def is_valid_array(v, **kwargs): def is_valid_ipv4(v, **kwargs): try: + return v return str(IPv4Address(v)) except: return None