From 9be70d54115d39565778132e3fec0482f6b976fb Mon Sep 17 00:00:00 2001 From: wuaho Date: Sat, 24 Apr 2021 16:00:28 +0800 Subject: [PATCH] v2 --- __init__.py | 0 ck/__init__.py | 1 - ck/ck.py | 274 ------------------------------------ ck/robot.py | 9 -- main.py | 44 +++--- settings.py | 2 +- user_view.sql | 4 + v2/__init__.py | 6 + common.py => v2/consumer.py | 0 v2/db.py | 34 +++++ v2/handler_event.py | 54 +++++++ v2/handler_user.py | 118 ++++++++++++++++ v2/sketch.py | 113 +++++++++++++++ {ck => v2}/struct_cache.py | 0 v2/transmitter.py | 78 ++++++++++ v2/valid_data.py | 92 ++++++++++++ 初始化事件表.sql | 54 +++++++ 初始化用户表.sql | 13 ++ 18 files changed, 589 insertions(+), 307 deletions(-) delete mode 100644 __init__.py delete mode 100644 ck/__init__.py delete mode 100644 ck/ck.py delete mode 100644 ck/robot.py create mode 100644 user_view.sql create mode 100644 v2/__init__.py rename common.py => v2/consumer.py (100%) create mode 100644 v2/db.py create mode 100644 v2/handler_event.py create mode 100644 v2/handler_user.py create mode 100644 v2/sketch.py rename {ck => v2}/struct_cache.py (100%) create mode 100644 v2/transmitter.py create mode 100644 v2/valid_data.py create mode 100644 初始化事件表.sql create mode 100644 初始化用户表.sql diff --git a/__init__.py b/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ck/__init__.py b/ck/__init__.py deleted file mode 100644 index 4f98d5a..0000000 --- a/ck/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from ck import * diff --git a/ck/ck.py b/ck/ck.py deleted file mode 100644 index 408afde..0000000 --- a/ck/ck.py +++ /dev/null @@ -1,274 +0,0 @@ -import time -from datetime import datetime -import json -import re - -import arrow -from clickhouse_driver.client import Client - -from settings import settings -from .struct_cache import StructCacheFile -from .robot import DDRobot - -__all__ = ('CK',) - - -class DataHandler: - handler_link = [] - - def __init__(self, func): - DataHandler.handler_link.append(func) - - -class CK: - def __init__(self, client=Client(**settings.CK_CONFIG), - struct_cache=StructCacheFile(), - robot=DDRobot(), - bulk_max=1000): - - self.client = client - self.struct_cache = struct_cache - self.robot = robot - self.type_dict = dict() - self.struct_dict = dict() - self.bulk_data = dict() - self.bulk_max = bulk_max - self.__send_ts = int(time.time()) - - def set_connect(self, client): - self.client = client - - def up_tb_struct(self, db, tb, data): - struct_dict = self.struct_dict.get(f'{db}_{tb}', {}) - - # 用于类型转换 - type_dict = self.type_dict.setdefault(f'{db}_{tb}', {}) - for k, v in struct_dict.items(): - type_dict.setdefault(v, set()) - type_dict[v].add(k) - # 更新结构记录 - self.struct_cache.update(db, tb, data) - - @DataHandler - def date2utc(self, db, tb, data: dict): - """ - 日期置为utc时间 - :param data: - :return: - """ - for k in self.type_dict[f'{db}_{tb}'].get('Nullable(DateTime(\'UTC\'))', set()) | self.type_dict[ - f'{db}_{tb}'].get( - 'DateTime(\'UTC\')', set()): - try: - data[k] = arrow.get(data[k]).shift(hours=-data.get('zone_offset', 8)).format('YYYY-MM-DD HH:mm:ss') - except: - pass - - @DataHandler - def array2str(self, db, tb, data: dict): - """ - 数组里统一存字符串 - :param data: - :return: - """ - for k in self.type_dict[f'{db}_{tb}'].get('Array(String)', set()): - try: - data[k] = [str(v) for v in data[k]] - except: - pass - - @DataHandler - def up_user(self, db, tb, data: dict): - """ - 从视图中查出最新user info 与当前user 合并 - :param db: - :param tb: - :param data: - :return: - """ - if tb != 'user': - return - sql = f"SELECT * from {db}.user_view where account_id='{data['account_id']}'" - d, col = self.client.execute(sql, with_column_types=True) - if not d: - return - for v, c in zip(d[0], col): - c = c[0] - if c not in data: - data[c] = v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v - - def __notify(self, *args, **kwargs): - """ - 预留机器人通知 - :param args: - :param kwargs: - :return: - """ - if self.robot: - self.robot.send(*args, **kwargs) - else: - pass - - def get_tb_struct_cache(self, db, tb): - """ - 查一条记录 取字段 和类型 - :param db: - :param tb: - :return: - """ - if self.struct_dict.get(f'{db}_{tb}'): - return self.struct_dict.get(f'{db}_{tb}') - - sql = f'select * from {db}.{tb} limit 1' - _, columns = self.client.execute(sql, with_column_types=True) - res = {item[0]: item[1] for item in columns} - self.struct_dict[f'{db}_{tb}'] = res - - # s = self.client.execute(f'show create {db}.{tb}')[0][0] - # s = re.match(r'.*?\((.*?)\)\n?ENGINE', s, re.S).group(1) - # res = dict() - # for row in s.split('\n'): - # row = row.strip() - # if not row: - # continue - # row = re.sub("[,`]", '', row) - # k, t = row.split(' ')[:2] - # res[k] = t - # - # self.struct_dict[f'{db}_{tb}'] = res - self.up_tb_struct(db, tb, res) - return res - - @staticmethod - def is_valid_date(date: str): - try: - res = arrow.get(date) - return res - except: - return False - - def update_user_view(self, db, tb): - """ - 更新视图 - :param db: - :param tb: - :return: - """ - if tb != 'user': - return - sql = f"""drop table {db}.user_view;create view {db}.user_view as select * - from {db}.user - order by role_create_time desc - LIMIT 1 by account_id""" - self.client.execute(sql) - - def alter_table(self, db, tb, data): - """ - 数据库字段检查 - 添加新字段为第一次出现类型 - - 如果要修改字段类型 存在类型转换问题。停止程序,删除列 - :param db: - :param tb: - :param data: - :return: - """ - default_field = self.get_tb_struct_cache(db, tb) - keys = set(default_field) - for k, v in data.items(): - if k not in default_field: - if isinstance(v, str): - if self.is_valid_date(v): - default_field[k] = "Nullable(DateTime('UTC'))" - else: - default_field[k] = 'Nullable(String)' - - if isinstance(v, int): - default_field[k] = 'Nullable(UInt64)' - - if isinstance(v, float): - default_field[k] = 'Nullable(Float32)' - - if isinstance(v, list): - default_field[k] = 'Array(String)' - - if isinstance(v, bool): - default_field[k] = 'Nullable(UInt8)' - - sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}' - print(sql) - try: - self.client.execute(sql) - except Exception as e: - print(f'添加字段 {k} 失败') - default_field.pop(k) - else: - self.update_user_view(db, tb) - - if set(default_field) - keys: - self.up_tb_struct(db, tb, default_field) - - def __send(self, db, tb, data): - """ - sql写入需要字段名或全字段 - 这里 json 格式写入 - 超过错误允许率去掉那条记录再次递归 - 一般都是 类型不匹配错误 - - :param db: - :param tb: - :param data: - :return: - """ - if not data: - return - sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow ' - sql = sql + '\n'.join(data) - try: - # 允许20%错误率 - self.client.execute('set input_format_allow_errors_ratio=0.2') - self.client.execute(sql) - except Exception as e: - # 丢弃错误行 再次发送 - if e.code == 26: - m = re.match('(.*)?Stack trace', e.message) - if m: - error_msg = m.group(1) - error_row = re.match('.*?errors out of (\d+) rows', error_msg) - if error_row: - error_row = int(error_row.group(1)) - 1 - error_data = data.pop(error_row) - self.__notify(error_msg, error_data) - self.__send(db, tb, data) - else: - print(f'{db}.{tb}插入{len(data)}条') - - finally: - data.clear() - - def __add(self, db, tb, msg): - """ - 列表缓存池 - :param db: - :param tb: - :param msg: - :return: - """ - bulk_data = self.bulk_data.setdefault(f'{db}', {'event': [], 'user': []}) - bulk_data[tb].append(json.dumps(msg)) - ts = int(time.time()) - # 满足其一条件 写入ck - if len(bulk_data[tb]) >= self.bulk_max or self.__send_ts + 60 <= ts: - self.__send_ts = ts - self.__send(db, tb, bulk_data[tb]) - - def send(self, db, tb, msg): - params = (db, tb, msg) - self.alter_table(*params) - # 数据加工链 - [f(self, *params) for f in DataHandler.handler_link] - self.__add(*params) - - -if __name__ == '__main__': - ck_client = CK() diff --git a/ck/robot.py b/ck/robot.py deleted file mode 100644 index 3c663ef..0000000 --- a/ck/robot.py +++ /dev/null @@ -1,9 +0,0 @@ -class DDRobot: - def __init__(self): - pass - - def send(self, *args, **kwargs): - if args: - print(args) - if kwargs: - print(kwargs) diff --git a/main.py b/main.py index d2f33df..cc19cdf 100644 --- a/main.py +++ b/main.py @@ -1,33 +1,33 @@ -import clickhouse_driver -from clickhouse_driver import Client - -from ck.ck import CK -from common import * - from settings import settings -import traceback +from v2 import * -ck_client = CK() +db_client = CK(**settings.CK_CONFIG) +sketch = Sketch(db_client) +handler_event = HandlerEvent(db_client) +handler_user = HandlerUser(db_client) +transmitter = Transmitter(db_client, sketch) def run(): + transmitter.add_source(handler_event, 1000, 60) + transmitter.add_source(handler_user, 100, 60) for topic, msg in consumer(): # print(msg) - try: - db = settings.APPID_TO_CKDB.get(msg['app_id']) - if 'user' in msg['type']: - table = 'user' - elif 'track' in msg['type']: - table = 'event' - else: - continue - del msg['type'] + type_ = msg['#type'] + db = settings.APPID_TO_CKDB.get(msg['#app_id']) + if 'user' in type_: + # continue + obj = getattr(handler_user, type_) + elif 'track' in type_: + # continue + obj = getattr(handler_event, type_) + else: + continue - ck_client.send(db, table, msg) - except clickhouse_driver.errors.NetworkError: - ck_client.set_connect(Client(**settings.CK_CONFIG)) - except Exception as e: - print(traceback.print_exc()) + del msg['#type'] + obj(db, msg) + + transmitter.run() if __name__ == '__main__': diff --git a/settings.py b/settings.py index 35bd256..147969b 100644 --- a/settings.py +++ b/settings.py @@ -5,7 +5,7 @@ class Config: CK_CONFIG = {'host': '119.29.176.224', 'send_receive_timeout': 3} - SUBSCRIBE_TOPIC = ['legu_test'] + SUBSCRIBE_TOPIC = ['test','test2'] KAFKA_CONSUMER_CONF = { 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], diff --git a/user_view.sql b/user_view.sql new file mode 100644 index 0000000..819450f --- /dev/null +++ b/user_view.sql @@ -0,0 +1,4 @@ +create view shjy.user_view as select * + from shjy.user + order by `#role_create_time` desc + LIMIT 1 by `#account_id` \ No newline at end of file diff --git a/v2/__init__.py b/v2/__init__.py new file mode 100644 index 0000000..3c0c782 --- /dev/null +++ b/v2/__init__.py @@ -0,0 +1,6 @@ +from .consumer import * +from .db import * +from .handler_user import * +from .handler_event import * +from .transmitter import * +from .sketch import * \ No newline at end of file diff --git a/common.py b/v2/consumer.py similarity index 100% rename from common.py rename to v2/consumer.py diff --git a/v2/db.py b/v2/db.py new file mode 100644 index 0000000..b339aa2 --- /dev/null +++ b/v2/db.py @@ -0,0 +1,34 @@ +__all__ = 'CK', + +from datetime import datetime +from datetime import timedelta + +from clickhouse_driver import Client + + +class CK(Client): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_one(self, db, tb, **where) -> dict: + """ + 注意 还原时区 + :param db: + :param tb: + :param where: + :return: + """ + sql = f"select * from {db}.{tb} where 1" + for k, v in where.items(): + sql += f" and `{k}`='{v}'" + sql += ' limit 1' + data, columns = self.execute(sql, with_column_types=True) + res = dict() + if data: + data = {k[0]: v for k, v in zip(columns, data[0])} + for k, v in data.items(): + if isinstance(v, datetime): + res[k] = (v + timedelta(hours=data['#zone_offset'])).strftime('%Y-%m-%d %H:%M:%S') + else: + res[k] = v + return res diff --git a/v2/handler_event.py b/v2/handler_event.py new file mode 100644 index 0000000..fe06c54 --- /dev/null +++ b/v2/handler_event.py @@ -0,0 +1,54 @@ +import copy + +__all__ = 'HandlerEvent', + + +class HandlerEvent: + tb = 'event' + + def __init__(self, db_client): + self.event = dict() + self.db_client = db_client + + def merge_update(self, a: dict, b: dict): + """ + 可更新事件 合并 + :param a: + :param b: + :return: + """ + if 'properties' not in b or not isinstance(b['properties'], dict): + return + for k, v in b['properties'].items(): + a[k] = v + + def track(self, db, data): + event = self.event.setdefault(db, {}) + event[len(event)] = data + + def track_update(self, db, data): + if '#event_id' not in data: + return + old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']}) + old_event['sign'] = -1 + event = self.event.setdefault(db, {}) + event[len(event)] = old_event + + new_event = copy.deepcopy(old_event) + self.merge_update(new_event, data) + new_event['sign'] = 1 + event[len(event)] = new_event + + def track_overwrite(self, db, data): + if '#event_id' not in data: + return + old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']}) + old_event['sign'] = -1 + event = self.event.setdefault(db, {}) + event[len(event)] = old_event + + event[len(event)] = data + + @property + def buffer_pool(self): + return self.tb, self.event diff --git a/v2/handler_user.py b/v2/handler_user.py new file mode 100644 index 0000000..9f39706 --- /dev/null +++ b/v2/handler_user.py @@ -0,0 +1,118 @@ +__all__ = 'HandlerUser', + + +class HandlerUser: + tb = 'user' + user_key = '#account_id' + + def __init__(self, db_client): + self.users = dict() + self.db_client = db_client + + def get_user(self, db, account_id, data=None): + user = self.users.get(db, {}).get(account_id) + if user: + return user + + user = self.db_client.get_one(db, f'{self.tb}_view', **{'#account_id': account_id}) + if user: + self.users.setdefault(db, {})[account_id] = user + return user + if not isinstance(data, dict): + return + user = dict() + self.merge(user, data) + self.users.setdefault(db, {})[account_id] = user + return user + + def merge(self, a: dict, b: dict): + """ + 将b 合并到 a + :param a: + :param b: + :return: + """ + for k, v in b.items(): + if isinstance(v, dict): + self.merge(a, v) + else: + a[k] = v + + def merge_once(self, a: dict, b: dict): + """ + a 存在的字段不接受b的合并 + :param a: + :param b: + :return: + """ + for k, v in b.items(): + if isinstance(v, dict): + self.merge_once(a, v) + else: + a[k] = v if a.get(k) is None else a[k] + + def merge_add(self, a: dict, b: dict): + """ + b 的properties 属性累加到a + :param a: + :param b: + :return: + """ + if 'properties' not in b or not isinstance(b['properties'], dict): + return + for k, v in b['properties'].items(): + if not isinstance(v, int): + raise ValueError('需要提供数值类型累加') + a[k] = a.setdefault(k, 0) + v + + def merge_unset(self, a: dict, b: dict): + """ + 清除 a 中 b的properties + :param db: + :param data: + :return: + """ + if 'properties' not in b or not isinstance(b['properties'], list): + return + for k in b['properties']: + if k in a: + del a[k] + + def user_set(self, db, data: dict): + """ + 注意 + data 结构包含 properties + user 为一层 k v + 将data 合并到 user + :param db: + :param data: + :return: + """ + account_id = data[self.user_key] + user = self.get_user(db, account_id, data) + self.merge(user, data) + + def user_setOnce(self, db: str, data: dict): + account_id = data[self.user_key] + user = self.get_user(db, account_id, data) + self.merge_once(user, data) + + def user_add(self, db: str, data: dict): + account_id = data[self.user_key] + user = self.get_user(db, account_id, data) + self.merge_add(user, data) + + def user_unset(self, db: str, data: dict): + account_id = data[self.user_key] + user = self.get_user(db, account_id) + self.merge_unset(user, data) + + def user_append(self, db: str, data: dict): + pass + + def user_del(self, db: str, data: dict): + pass + + @property + def buffer_pool(self): + return self.tb, self.users diff --git a/v2/sketch.py b/v2/sketch.py new file mode 100644 index 0000000..a1f6be4 --- /dev/null +++ b/v2/sketch.py @@ -0,0 +1,113 @@ +import copy + +from .valid_data import * + + +class Sketch: + def __init__(self, db_client, struct_cache=None): + self.db_client = db_client + self.struct_cache = struct_cache + self.__type_dict = dict() + self.__struct_dict = dict() + + @property + def type_dict(self): + return self.__type_dict + + @property + def struct_dict(self): + return self.__struct_dict + + def up_tb_struct(self, db, tb, data): + struct_dict = self.__struct_dict.get(f'{db}_{tb}', {}) + + # 用于类型转换 + type_dict = self.__type_dict.setdefault(f'{db}_{tb}', {}) + for k, v in struct_dict.items(): + type_dict.setdefault(v, set()) + type_dict[v].add(k) + # 更新结构记录 + if self.struct_cache: + self.struct_cache.update(db, tb, data) + + def get_tb_struct_cache(self, db, tb): + """ + 查一条记录 取字段 和类型 + :param db: + :param tb: + :return: + """ + if self.__struct_dict.get(f'{db}_{tb}'): + return self.__struct_dict.get(f'{db}_{tb}') + sql = f'select * from {db}.{tb} limit 1' + _, columns = self.db_client.execute(sql, with_column_types=True) + res = {item[0]: item[1] for item in columns} + self.__struct_dict[f'{db}_{tb}'] = res + self.up_tb_struct(db, tb, res) + return res + + def update_user_view(self, db, tb): + """ + 更新视图 + :param db: + :param tb: + :return: + """ + if tb != 'user': + return + + sql = f'drop table if exists {db}.user_view' + self.db_client.execute(sql) + sql = f"""create view {db}.user_view as select * + from {db}.user + order by `#role_create_time` desc + LIMIT 1 by `#account_id`""" + self.db_client.execute(sql) + + def alter_table(self, db, tb, data): + """ + 数据库字段检查 + 添加新字段为第一次出现类型 + + 如果要修改字段类型 存在类型转换问题。停止程序,删除列 + :param db: + :param tb: + :param data: + :return: + """ + default_field = self.get_tb_struct_cache(db, tb) + keys = set(default_field) + for k, v in data.items(): + if k in ('#type',): + continue + if k not in default_field: + if isinstance(v, str): + if is_valid_date(v): + default_field[k] = "Nullable(DateTime('UTC'))" + else: + default_field[k] = 'Nullable(String)' + + if isinstance(v, int): + default_field[k] = 'Nullable(UInt64)' + + if isinstance(v, float): + default_field[k] = 'Nullable(Float32)' + + if isinstance(v, list): + default_field[k] = 'Array(String)' + + if isinstance(v, bool): + default_field[k] = 'Nullable(UInt8)' + + sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}' + print(sql) + try: + self.db_client.execute(sql) + except Exception as e: + print(f'添加字段 {k} 失败') + default_field.pop(k) + else: + self.update_user_view(db, tb) + + if set(default_field) - keys: + self.up_tb_struct(db, tb, default_field) diff --git a/ck/struct_cache.py b/v2/struct_cache.py similarity index 100% rename from ck/struct_cache.py rename to v2/struct_cache.py diff --git a/v2/transmitter.py b/v2/transmitter.py new file mode 100644 index 0000000..f306aee --- /dev/null +++ b/v2/transmitter.py @@ -0,0 +1,78 @@ +import json +import re + +from .valid_data import * + +__all__ = 'Transmitter', + + +class Transmitter: + def __init__(self, db_client, sketch): + self.db_client = db_client + self.sketch = sketch + self.ts = int(time.time()) + self.slots = dict() + + def add_source(self, handler, bulk_max=1000, time_out=60): + self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out} + + def check_send(self): + for h, p in self.slots.items(): + ts = int(time.time()) + tb, pool = h.buffer_pool + for db, buffer in pool.items(): + if len(buffer) >= p['bulk_max'] or self.ts + p['time_out'] <= ts: + self.ts = ts + yield db, tb, buffer + + @staticmethod + def flat_data(data: dict): + if 'properties' in data: + properties = data.pop('properties') + data.update(properties) + return data + + def __send(self, db, tb, data): + sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow ' + sql = sql + '\n'.join(data) + try: + # 允许20%错误率 + self.db_client.execute('set input_format_allow_errors_ratio=0.2') + self.db_client.execute(sql) + except Exception as e: + # 丢弃错误行 再次发送 + if e.code == 26: + m = re.match('(.*)?Stack trace', e.message) + if m: + error_msg = m.group(1) + error_row = re.match('.*?errors out of (\d+) rows', error_msg) + if error_row: + error_row = int(error_row.group(1)) - 1 + error_data = data.pop(error_row) + self.__send(db, tb, data) + else: + print(f'{db}.{tb}插入{len(data)}条') + + def check_table(self, db, tb, data): + [self.sketch.alter_table(db, tb, item) for item in data] + + def check_type(self, db, tb, data): + struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] + for item in data: + del_keys = set() + for k, v in item.items(): + type_ = struct_dict[k] + item[k] = TYPE_CK2PY[type_](v, **item) + if v is None: + del_keys.add(k) + print(k, '类型不一致') + for key in del_keys: + del item[key] + + def run(self): + for db, tb, buffer in self.check_send(): + data = [self.flat_data(x) for x in buffer.values()] + self.check_table(db, tb, data) + self.check_type(db, tb, data) + self.__send(db, tb, [json.dumps(item) for item in data]) + buffer.clear() diff --git a/v2/valid_data.py b/v2/valid_data.py new file mode 100644 index 0000000..0855900 --- /dev/null +++ b/v2/valid_data.py @@ -0,0 +1,92 @@ +import time +from datetime import datetime +from datetime import timedelta +from ipaddress import IPv4Address + + +def is_valid_date(v, **kwargs): + try: + date = datetime.strptime(v, "%Y-%m-%d %H:%M:%S") + zone_offset = kwargs.get('#zone_offset', 8) + return (date - timedelta(hours=zone_offset)).strftime("%Y-%m-%d %H:%M:%S") + except: + return None + + +def is_valid_int(v, **kwargs): + try: + return int(v) + except: + return None + + +def is_valid_srt(v, **kwargs): + if isinstance(v, str): + return v + return None + + +def is_valid_float(v, **kwargs): + try: + return float(v) + except: + return None + + +def is_valid_bool(v, **kwargs): + if isinstance(v, bool): + return v + return None + + +def is_valid_array(v, **kwargs): + if isinstance(v, list): + return [str(i) for i in v] + return None + + +def is_valid_ipv4(v, **kwargs): + try: + return str(IPv4Address(v)) + except: + return None + + +TYPE_CK2PY = { + "DateTime('UTC')": is_valid_date, + "Nullable(DateTime('UTC'))": is_valid_date, + "DateTime()": is_valid_date, + + "Nullable(IPv4)": is_valid_ipv4, + "IPv4": is_valid_ipv4, + + "String": is_valid_srt, + "Nullable(String)": is_valid_srt, + + "Nullable(UInt8)": is_valid_int, + "UInt8": is_valid_srt, + + "Nullable(Int8)": is_valid_int, + "Int8": is_valid_srt, + + "Nullable(UInt16)": is_valid_int, + "UInt16": is_valid_srt, + + "Nullable(Int16)": is_valid_int, + "Int16": is_valid_srt, + + "Nullable(UInt32)": is_valid_int, + "UInt32": is_valid_srt, + + "Nullable(UInt64)": is_valid_int, + "UInt64": is_valid_srt, + + "Nullable(Int64)": is_valid_int, + "Int64": is_valid_srt, + + "Array(String)": is_valid_array, + + "Nullable(Float)": is_valid_float, + "Float": is_valid_float, + +} diff --git a/初始化事件表.sql b/初始化事件表.sql new file mode 100644 index 0000000..e4e1d13 --- /dev/null +++ b/初始化事件表.sql @@ -0,0 +1,54 @@ +create table shjy.event +( + `#ip` Nullable(IPv4), + `#country` Nullable(String), + `#country_code` Nullable(String), + `#province` Nullable(String), + `#city` Nullable(String), + `#os_version` Nullable(String), + `#manufacturer` Nullable(String), + `#os` Nullable(String), + `#device_id` Nullable(String), + `#screen_height` Nullable(UInt16), + `#screen_width` Nullable(UInt16), + `#device_model` Nullable(String), + `#app_version` Nullable(String), + `#bundle_id` Nullable(String), + `#lib` Nullable(String), + `#lib_version` Nullable(String), + `#network_type` Nullable(String), + `#carrier` Nullable(String), + `#browser` Nullable(String), + `#browser_version` Nullable(String), + `#duration` Nullable(String), + `#url` Nullable(String), + `#url_path` Nullable(String), + `#referrer` Nullable(String), + `#referrer_host` Nullable(String), + `#title` Nullable(String), + `#screen_name` Nullable(String), + `#element_id` Nullable(String), + `#element_type` Nullable(String), + `#resume_from_background` Nullable(String), + `#element_selector` Nullable(String), + `#element_position` Nullable(String), + `#element_content` Nullable(String), + `#scene` Nullable(String), + `#mp_platform` Nullable(String), + `#app_crashed_reason` Nullable(String), + `#zone_offset` Int8 default 8, + `#event_id` String, + + `#event_time` DateTime('UTC'), + `#account_id` String, + `#distinct_id` Nullable(String), + `#event_name` String, + `#server_time` DateTime('UTC') default now(), + + + `sign` Int8 default 1 + +) ENGINE = CollapsingMergeTree(sign) + PARTITION BY toYYYYMMDD(`#event_time`) + order by (`#account_id`, `#event_time`, `#event_name`) +-- TTL event_time + toIntervalDay(365) \ No newline at end of file diff --git a/初始化用户表.sql b/初始化用户表.sql new file mode 100644 index 0000000..4feb96a --- /dev/null +++ b/初始化用户表.sql @@ -0,0 +1,13 @@ +create table shjy.user +( + + `#role_create_time` DateTime('UTC'), + `#account_id` String, + `svrindex` UInt16, + + `#zone_offset` Int8 default 8, + `#server_time` DateTime('UTC') default now() + +) ENGINE = ReplacingMergeTree() + PARTITION BY `svrindex` + order by `#account_id`