diff --git a/.gitignore b/.gitignore index 13d1490..dd1d1f0 100644 --- a/.gitignore +++ b/.gitignore @@ -128,4 +128,4 @@ dmypy.json # Pyre type checker .pyre/ - +.idea diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..8993b7d --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +url = "https://pypi.douban.com/simple" +verify_ssl = false +name = "pypi" + +[packages] +kafka-python = "*" +clickhouse-driver = "*" + +[dev-packages] + +[requires] +python_version = "3.8" diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ck/__init__.py b/ck/__init__.py new file mode 100644 index 0000000..4f98d5a --- /dev/null +++ b/ck/__init__.py @@ -0,0 +1 @@ +from ck import * diff --git a/ck/ck.py b/ck/ck.py new file mode 100644 index 0000000..72de08c --- /dev/null +++ b/ck/ck.py @@ -0,0 +1,271 @@ +import time +from datetime import datetime +import json +import re + +import arrow +from clickhouse_driver.client import Client + +from settings import settings +from .struct_cache import StructCacheFile +from .robot import DDRobot + +__all__ = ('CK',) + + +class DataHandler: + handler_link = [] + + def __init__(self, func): + DataHandler.handler_link.append(func) + + +class CK: + def __init__(self, client=Client(**settings.CK_CONFIG), + struct_cache=StructCacheFile(), + robot=DDRobot(), + bulk_max=1000): + + self.client = client + self.struct_cache = struct_cache + self.robot = robot + self.type_dict = dict() + self.struct_dict = dict() + self.bulk_data = dict() + self.bulk_max = bulk_max + self.__send_ts = int(time.time()) + + def up_tb_struct(self, db, tb, data): + struct_dict = self.struct_dict.get(f'{db}_{tb}', {}) + + # 用于类型转换 + type_dict = self.type_dict.setdefault(f'{db}_{tb}', {}) + for k, v in struct_dict.items(): + type_dict.setdefault(v, set()) + type_dict[v].add(k) + # 更新结构记录 + self.struct_cache.update(db, tb, data) + + @DataHandler + def date2utc(self, db, tb, data: dict): + """ + 日期置为utc时间 + :param data: + :return: + """ + for k in self.type_dict[f'{db}_{tb}'].get('Nullable(DateTime(\'UTC\'))', set()) | self.type_dict[ + f'{db}_{tb}'].get( + 'DateTime(\'UTC\')', set()): + try: + data[k] = arrow.get(data[k]).shift(hours=-data.get('zone_offset', 8)).format('YYYY-MM-DD HH:mm:ss') + except: + pass + + @DataHandler + def array2str(self, db, tb, data: dict): + """ + 数组里统一存字符串 + :param data: + :return: + """ + for k in self.type_dict[f'{db}_{tb}'].get('Array(String)', set()): + try: + data[k] = [str(v) for v in data[k]] + except: + pass + + @DataHandler + def up_user(self, db, tb, data: dict): + """ + 从视图中查出最新user info 与当前user 合并 + :param db: + :param tb: + :param data: + :return: + """ + if tb != 'user': + return + sql = f"SELECT * from {db}.user_view where account_id='{data['account_id']}'" + d, col = self.client.execute(sql, with_column_types=True) + if not d: + return + for v, c in zip(d[0], col): + c = c[0] + if c not in data: + data[c] = v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v + + def __notify(self, *args, **kwargs): + """ + 预留机器人通知 + :param args: + :param kwargs: + :return: + """ + if self.robot: + self.robot.send(*args, **kwargs) + else: + pass + + def get_tb_struct_cache(self, db, tb): + """ + 查一条记录 取字段 和类型 + :param db: + :param tb: + :return: + """ + if self.struct_dict.get(f'{db}_{tb}'): + return self.struct_dict.get(f'{db}_{tb}') + + sql = f'select * from {db}.{tb} limit 1' + _, columns = self.client.execute(sql, with_column_types=True) + res = {item[0]: item[1] for item in columns} + self.struct_dict[f'{db}_{tb}'] = res + + # s = self.client.execute(f'show create {db}.{tb}')[0][0] + # s = re.match(r'.*?\((.*?)\)\n?ENGINE', s, re.S).group(1) + # res = dict() + # for row in s.split('\n'): + # row = row.strip() + # if not row: + # continue + # row = re.sub("[,`]", '', row) + # k, t = row.split(' ')[:2] + # res[k] = t + # + # self.struct_dict[f'{db}_{tb}'] = res + self.up_tb_struct(db, tb, res) + return res + + @staticmethod + def is_valid_date(date: str): + try: + res = arrow.get(date) + return res + except: + return False + + def update_user_view(self, db, tb): + """ + 更新视图 + :param db: + :param tb: + :return: + """ + if tb != 'user': + return + sql = f"""drop table {db}.user_view;create view {db}.user_view as select * + from {db}.user + order by role_create_time desc + LIMIT 1 by account_id""" + self.client.execute(sql) + + def alter_table(self, db, tb, data): + """ + 数据库字段检查 + 添加新字段为第一次出现类型 + + 如果要修改字段类型 存在类型转换问题。停止程序,删除列 + :param db: + :param tb: + :param data: + :return: + """ + default_field = self.get_tb_struct_cache(db, tb) + keys = set(default_field) + for k, v in data.items(): + if k not in default_field: + if isinstance(v, str): + if self.is_valid_date(v): + default_field[k] = "Nullable(DateTime('UTC'))" + else: + default_field[k] = 'Nullable(String)' + + if isinstance(v, int): + default_field[k] = 'Nullable(UInt64)' + + if isinstance(v, float): + default_field[k] = 'Nullable(Float32)' + + if isinstance(v, list): + default_field[k] = 'Array(String)' + + if isinstance(v, bool): + default_field[k] = 'Nullable(UInt8)' + + sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}' + print(sql) + try: + self.client.execute(sql) + except Exception as e: + print(f'添加字段 {k} 失败') + default_field.pop(k) + else: + self.update_user_view(db, tb) + + if set(default_field) - keys: + self.up_tb_struct(db, tb, default_field) + + def __send(self, db, tb, data): + """ + sql写入需要字段名或全字段 + 这里 json 格式写入 + 超过错误允许率去掉那条记录再次递归 + 一般都是 类型不匹配错误 + + :param db: + :param tb: + :param data: + :return: + """ + if not data: + return + sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow ' + sql = sql + '\n'.join(data) + try: + # 允许20%错误率 + self.client.execute('set input_format_allow_errors_ratio=0.2') + self.client.execute(sql) + except Exception as e: + # 丢弃错误行 再次发送 + if e.code == 26: + m = re.match('(.*)?Stack trace', e.message) + if m: + error_msg = m.group(1) + error_row = re.match('.*?errors out of (\d+) rows', error_msg) + if error_row: + error_row = int(error_row.group(1)) - 1 + error_data = data.pop(error_row) + self.__notify(error_msg, error_data) + self.__send(db, tb, data) + else: + print(f'{db}.{tb}插入{len(data)}条') + + finally: + data.clear() + + def __add(self, db, tb, msg): + """ + 列表缓存池 + :param db: + :param tb: + :param msg: + :return: + """ + bulk_data = self.bulk_data.setdefault(f'{db}', {'event': [], 'user': []}) + bulk_data[tb].append(json.dumps(msg)) + ts = int(time.time()) + # 满足其一条件 写入ck + if len(bulk_data[tb]) >= self.bulk_max or self.__send_ts + 60 <= ts: + self.__send_ts = ts + self.__send(db, tb, bulk_data[tb]) + + def send(self, db, tb, msg): + params = (db, tb, msg) + self.alter_table(*params) + # 数据加工链 + [f(self, *params) for f in DataHandler.handler_link] + self.__add(*params) + + +if __name__ == '__main__': + ck_client = CK() diff --git a/ck/robot.py b/ck/robot.py new file mode 100644 index 0000000..3c663ef --- /dev/null +++ b/ck/robot.py @@ -0,0 +1,9 @@ +class DDRobot: + def __init__(self): + pass + + def send(self, *args, **kwargs): + if args: + print(args) + if kwargs: + print(kwargs) diff --git a/ck/struct_cache.py b/ck/struct_cache.py new file mode 100644 index 0000000..17f06ec --- /dev/null +++ b/ck/struct_cache.py @@ -0,0 +1,11 @@ +import json + + +class StructCacheFile: + def __init__(self): + pass + + @staticmethod + def update(db, tb, data): + with open(f'{db}_{tb}.json', 'w') as f: + json.dump(data, f) diff --git a/common.py b/common.py new file mode 100644 index 0000000..77f753f --- /dev/null +++ b/common.py @@ -0,0 +1,13 @@ +from kafka import KafkaConsumer +from settings import settings + +__all__ = 'consumer', + + +def consumer(): + c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) + c.subscribe(settings.SUBSCRIBE_TOPIC) + for msg in c: + topic = msg.topic + val = msg.value + yield topic, val diff --git a/main.py b/main.py new file mode 100644 index 0000000..018e06e --- /dev/null +++ b/main.py @@ -0,0 +1,29 @@ +from ck.ck import CK +from common import * + +from settings import settings +import traceback + +ck_client = CK() + + +def run(): + for topic, msg in consumer(): + # print(msg) + try: + db = settings.APPID_TO_CKDB.get(msg['app_id']) + if 'user' in msg['type']: + table = 'user' + elif 'track' in msg['type']: + table = 'event' + else: + continue + del msg['type'] + + ck_client.send(db, table, msg) + except Exception as e: + print(traceback.print_exc()) + + +if __name__ == '__main__': + run() diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..35bd256 --- /dev/null +++ b/settings.py @@ -0,0 +1,38 @@ +import json + + +class Config: + CK_CONFIG = {'host': '119.29.176.224', + 'send_receive_timeout': 3} + + SUBSCRIBE_TOPIC = ['legu_test'] + + KAFKA_CONSUMER_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_deserializer': json.loads, + 'group_id': 'legu_group' + } + + TOPIC_TO_LEGU = { + 'a77703e24e6643d08b74a4163a14f74c': 'legu_test', + 'c3e0409ac18341149877b08f087db640': 'legu_test' + } + + APPID_TO_CKDB = { + 'a77703e24e6643d08b74a4163a14f74c': 'shjy', + 'c3e0409ac18341149877b08f087db640': 'shjy' + } + + REDIS_CONF = { + 'host': '192.168.0.161', + 'port': 6379, + 'password': 'd1Gh*zp5', + 'ck': 10 # ck + } + + +class Debug(Config): + pass + + +settings = Debug