This commit is contained in:
wuaho 2021-04-24 16:00:28 +08:00
parent f0501b0615
commit 9be70d5411
18 changed files with 589 additions and 307 deletions

View File

View File

@ -1 +0,0 @@
from ck import *

274
ck/ck.py
View File

@ -1,274 +0,0 @@
import time
from datetime import datetime
import json
import re
import arrow
from clickhouse_driver.client import Client
from settings import settings
from .struct_cache import StructCacheFile
from .robot import DDRobot
__all__ = ('CK',)
class DataHandler:
handler_link = []
def __init__(self, func):
DataHandler.handler_link.append(func)
class CK:
def __init__(self, client=Client(**settings.CK_CONFIG),
struct_cache=StructCacheFile(),
robot=DDRobot(),
bulk_max=1000):
self.client = client
self.struct_cache = struct_cache
self.robot = robot
self.type_dict = dict()
self.struct_dict = dict()
self.bulk_data = dict()
self.bulk_max = bulk_max
self.__send_ts = int(time.time())
def set_connect(self, client):
self.client = client
def up_tb_struct(self, db, tb, data):
struct_dict = self.struct_dict.get(f'{db}_{tb}', {})
# 用于类型转换
type_dict = self.type_dict.setdefault(f'{db}_{tb}', {})
for k, v in struct_dict.items():
type_dict.setdefault(v, set())
type_dict[v].add(k)
# 更新结构记录
self.struct_cache.update(db, tb, data)
@DataHandler
def date2utc(self, db, tb, data: dict):
"""
日期置为utc时间
:param data:
:return:
"""
for k in self.type_dict[f'{db}_{tb}'].get('Nullable(DateTime(\'UTC\'))', set()) | self.type_dict[
f'{db}_{tb}'].get(
'DateTime(\'UTC\')', set()):
try:
data[k] = arrow.get(data[k]).shift(hours=-data.get('zone_offset', 8)).format('YYYY-MM-DD HH:mm:ss')
except:
pass
@DataHandler
def array2str(self, db, tb, data: dict):
"""
数组里统一存字符串
:param data:
:return:
"""
for k in self.type_dict[f'{db}_{tb}'].get('Array(String)', set()):
try:
data[k] = [str(v) for v in data[k]]
except:
pass
@DataHandler
def up_user(self, db, tb, data: dict):
"""
从视图中查出最新user info 与当前user 合并
:param db:
:param tb:
:param data:
:return:
"""
if tb != 'user':
return
sql = f"SELECT * from {db}.user_view where account_id='{data['account_id']}'"
d, col = self.client.execute(sql, with_column_types=True)
if not d:
return
for v, c in zip(d[0], col):
c = c[0]
if c not in data:
data[c] = v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v
def __notify(self, *args, **kwargs):
"""
预留机器人通知
:param args:
:param kwargs:
:return:
"""
if self.robot:
self.robot.send(*args, **kwargs)
else:
pass
def get_tb_struct_cache(self, db, tb):
"""
查一条记录 取字段 和类型
:param db:
:param tb:
:return:
"""
if self.struct_dict.get(f'{db}_{tb}'):
return self.struct_dict.get(f'{db}_{tb}')
sql = f'select * from {db}.{tb} limit 1'
_, columns = self.client.execute(sql, with_column_types=True)
res = {item[0]: item[1] for item in columns}
self.struct_dict[f'{db}_{tb}'] = res
# s = self.client.execute(f'show create {db}.{tb}')[0][0]
# s = re.match(r'.*?\((.*?)\)\n?ENGINE', s, re.S).group(1)
# res = dict()
# for row in s.split('\n'):
# row = row.strip()
# if not row:
# continue
# row = re.sub("[,`]", '', row)
# k, t = row.split(' ')[:2]
# res[k] = t
#
# self.struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res
@staticmethod
def is_valid_date(date: str):
try:
res = arrow.get(date)
return res
except:
return False
def update_user_view(self, db, tb):
"""
更新视图
:param db:
:param tb:
:return:
"""
if tb != 'user':
return
sql = f"""drop table {db}.user_view;create view {db}.user_view as select *
from {db}.user
order by role_create_time desc
LIMIT 1 by account_id"""
self.client.execute(sql)
def alter_table(self, db, tb, data):
"""
数据库字段检查
添加新字段为第一次出现类型
如果要修改字段类型 存在类型转换问题停止程序删除列
:param db:
:param tb:
:param data:
:return:
"""
default_field = self.get_tb_struct_cache(db, tb)
keys = set(default_field)
for k, v in data.items():
if k not in default_field:
if isinstance(v, str):
if self.is_valid_date(v):
default_field[k] = "Nullable(DateTime('UTC'))"
else:
default_field[k] = 'Nullable(String)'
if isinstance(v, int):
default_field[k] = 'Nullable(UInt64)'
if isinstance(v, float):
default_field[k] = 'Nullable(Float32)'
if isinstance(v, list):
default_field[k] = 'Array(String)'
if isinstance(v, bool):
default_field[k] = 'Nullable(UInt8)'
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
print(sql)
try:
self.client.execute(sql)
except Exception as e:
print(f'添加字段 {k} 失败')
default_field.pop(k)
else:
self.update_user_view(db, tb)
if set(default_field) - keys:
self.up_tb_struct(db, tb, default_field)
def __send(self, db, tb, data):
"""
sql写入需要字段名或全字段
这里 json 格式写入
超过错误允许率去掉那条记录再次递归
一般都是 类型不匹配错误
:param db:
:param tb:
:param data:
:return:
"""
if not data:
return
sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow '
sql = sql + '\n'.join(data)
try:
# 允许20%错误率
self.client.execute('set input_format_allow_errors_ratio=0.2')
self.client.execute(sql)
except Exception as e:
# 丢弃错误行 再次发送
if e.code == 26:
m = re.match('(.*)?Stack trace', e.message)
if m:
error_msg = m.group(1)
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
if error_row:
error_row = int(error_row.group(1)) - 1
error_data = data.pop(error_row)
self.__notify(error_msg, error_data)
self.__send(db, tb, data)
else:
print(f'{db}.{tb}插入{len(data)}')
finally:
data.clear()
def __add(self, db, tb, msg):
"""
列表缓存池
:param db:
:param tb:
:param msg:
:return:
"""
bulk_data = self.bulk_data.setdefault(f'{db}', {'event': [], 'user': []})
bulk_data[tb].append(json.dumps(msg))
ts = int(time.time())
# 满足其一条件 写入ck
if len(bulk_data[tb]) >= self.bulk_max or self.__send_ts + 60 <= ts:
self.__send_ts = ts
self.__send(db, tb, bulk_data[tb])
def send(self, db, tb, msg):
params = (db, tb, msg)
self.alter_table(*params)
# 数据加工链
[f(self, *params) for f in DataHandler.handler_link]
self.__add(*params)
if __name__ == '__main__':
ck_client = CK()

View File

@ -1,9 +0,0 @@
class DDRobot:
def __init__(self):
pass
def send(self, *args, **kwargs):
if args:
print(args)
if kwargs:
print(kwargs)

44
main.py
View File

@ -1,33 +1,33 @@
import clickhouse_driver
from clickhouse_driver import Client
from ck.ck import CK
from common import *
from settings import settings from settings import settings
import traceback from v2 import *
ck_client = CK() db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client)
handler_user = HandlerUser(db_client)
transmitter = Transmitter(db_client, sketch)
def run(): def run():
transmitter.add_source(handler_event, 1000, 60)
transmitter.add_source(handler_user, 100, 60)
for topic, msg in consumer(): for topic, msg in consumer():
# print(msg) # print(msg)
try: type_ = msg['#type']
db = settings.APPID_TO_CKDB.get(msg['app_id']) db = settings.APPID_TO_CKDB.get(msg['#app_id'])
if 'user' in msg['type']: if 'user' in type_:
table = 'user' # continue
elif 'track' in msg['type']: obj = getattr(handler_user, type_)
table = 'event' elif 'track' in type_:
else: # continue
continue obj = getattr(handler_event, type_)
del msg['type'] else:
continue
ck_client.send(db, table, msg) del msg['#type']
except clickhouse_driver.errors.NetworkError: obj(db, msg)
ck_client.set_connect(Client(**settings.CK_CONFIG))
except Exception as e: transmitter.run()
print(traceback.print_exc())
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -5,7 +5,7 @@ class Config:
CK_CONFIG = {'host': '119.29.176.224', CK_CONFIG = {'host': '119.29.176.224',
'send_receive_timeout': 3} 'send_receive_timeout': 3}
SUBSCRIBE_TOPIC = ['legu_test'] SUBSCRIBE_TOPIC = ['test','test2']
KAFKA_CONSUMER_CONF = { KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],

4
user_view.sql Normal file
View File

@ -0,0 +1,4 @@
create view shjy.user_view as select *
from shjy.user
order by `#role_create_time` desc
LIMIT 1 by `#account_id`

6
v2/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .consumer import *
from .db import *
from .handler_user import *
from .handler_event import *
from .transmitter import *
from .sketch import *

34
v2/db.py Normal file
View File

@ -0,0 +1,34 @@
__all__ = 'CK',
from datetime import datetime
from datetime import timedelta
from clickhouse_driver import Client
class CK(Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def get_one(self, db, tb, **where) -> dict:
"""
注意 还原时区
:param db:
:param tb:
:param where:
:return:
"""
sql = f"select * from {db}.{tb} where 1"
for k, v in where.items():
sql += f" and `{k}`='{v}'"
sql += ' limit 1'
data, columns = self.execute(sql, with_column_types=True)
res = dict()
if data:
data = {k[0]: v for k, v in zip(columns, data[0])}
for k, v in data.items():
if isinstance(v, datetime):
res[k] = (v + timedelta(hours=data['#zone_offset'])).strftime('%Y-%m-%d %H:%M:%S')
else:
res[k] = v
return res

54
v2/handler_event.py Normal file
View File

@ -0,0 +1,54 @@
import copy
__all__ = 'HandlerEvent',
class HandlerEvent:
tb = 'event'
def __init__(self, db_client):
self.event = dict()
self.db_client = db_client
def merge_update(self, a: dict, b: dict):
"""
可更新事件 合并
:param a:
:param b:
:return:
"""
if 'properties' not in b or not isinstance(b['properties'], dict):
return
for k, v in b['properties'].items():
a[k] = v
def track(self, db, data):
event = self.event.setdefault(db, {})
event[len(event)] = data
def track_update(self, db, data):
if '#event_id' not in data:
return
old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']})
old_event['sign'] = -1
event = self.event.setdefault(db, {})
event[len(event)] = old_event
new_event = copy.deepcopy(old_event)
self.merge_update(new_event, data)
new_event['sign'] = 1
event[len(event)] = new_event
def track_overwrite(self, db, data):
if '#event_id' not in data:
return
old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']})
old_event['sign'] = -1
event = self.event.setdefault(db, {})
event[len(event)] = old_event
event[len(event)] = data
@property
def buffer_pool(self):
return self.tb, self.event

118
v2/handler_user.py Normal file
View File

@ -0,0 +1,118 @@
__all__ = 'HandlerUser',
class HandlerUser:
tb = 'user'
user_key = '#account_id'
def __init__(self, db_client):
self.users = dict()
self.db_client = db_client
def get_user(self, db, account_id, data=None):
user = self.users.get(db, {}).get(account_id)
if user:
return user
user = self.db_client.get_one(db, f'{self.tb}_view', **{'#account_id': account_id})
if user:
self.users.setdefault(db, {})[account_id] = user
return user
if not isinstance(data, dict):
return
user = dict()
self.merge(user, data)
self.users.setdefault(db, {})[account_id] = user
return user
def merge(self, a: dict, b: dict):
"""
将b 合并到 a
:param a:
:param b:
:return:
"""
for k, v in b.items():
if isinstance(v, dict):
self.merge(a, v)
else:
a[k] = v
def merge_once(self, a: dict, b: dict):
"""
a 存在的字段不接受b的合并
:param a:
:param b:
:return:
"""
for k, v in b.items():
if isinstance(v, dict):
self.merge_once(a, v)
else:
a[k] = v if a.get(k) is None else a[k]
def merge_add(self, a: dict, b: dict):
"""
b 的properties 属性累加到a
:param a:
:param b:
:return:
"""
if 'properties' not in b or not isinstance(b['properties'], dict):
return
for k, v in b['properties'].items():
if not isinstance(v, int):
raise ValueError('需要提供数值类型累加')
a[k] = a.setdefault(k, 0) + v
def merge_unset(self, a: dict, b: dict):
"""
清除 a b的properties
:param db:
:param data:
:return:
"""
if 'properties' not in b or not isinstance(b['properties'], list):
return
for k in b['properties']:
if k in a:
del a[k]
def user_set(self, db, data: dict):
"""
注意
data 结构包含 properties
user 为一层 k v
将data 合并到 user
:param db:
:param data:
:return:
"""
account_id = data[self.user_key]
user = self.get_user(db, account_id, data)
self.merge(user, data)
def user_setOnce(self, db: str, data: dict):
account_id = data[self.user_key]
user = self.get_user(db, account_id, data)
self.merge_once(user, data)
def user_add(self, db: str, data: dict):
account_id = data[self.user_key]
user = self.get_user(db, account_id, data)
self.merge_add(user, data)
def user_unset(self, db: str, data: dict):
account_id = data[self.user_key]
user = self.get_user(db, account_id)
self.merge_unset(user, data)
def user_append(self, db: str, data: dict):
pass
def user_del(self, db: str, data: dict):
pass
@property
def buffer_pool(self):
return self.tb, self.users

113
v2/sketch.py Normal file
View File

@ -0,0 +1,113 @@
import copy
from .valid_data import *
class Sketch:
def __init__(self, db_client, struct_cache=None):
self.db_client = db_client
self.struct_cache = struct_cache
self.__type_dict = dict()
self.__struct_dict = dict()
@property
def type_dict(self):
return self.__type_dict
@property
def struct_dict(self):
return self.__struct_dict
def up_tb_struct(self, db, tb, data):
struct_dict = self.__struct_dict.get(f'{db}_{tb}', {})
# 用于类型转换
type_dict = self.__type_dict.setdefault(f'{db}_{tb}', {})
for k, v in struct_dict.items():
type_dict.setdefault(v, set())
type_dict[v].add(k)
# 更新结构记录
if self.struct_cache:
self.struct_cache.update(db, tb, data)
def get_tb_struct_cache(self, db, tb):
"""
查一条记录 取字段 和类型
:param db:
:param tb:
:return:
"""
if self.__struct_dict.get(f'{db}_{tb}'):
return self.__struct_dict.get(f'{db}_{tb}')
sql = f'select * from {db}.{tb} limit 1'
_, columns = self.db_client.execute(sql, with_column_types=True)
res = {item[0]: item[1] for item in columns}
self.__struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res
def update_user_view(self, db, tb):
"""
更新视图
:param db:
:param tb:
:return:
"""
if tb != 'user':
return
sql = f'drop table if exists {db}.user_view'
self.db_client.execute(sql)
sql = f"""create view {db}.user_view as select *
from {db}.user
order by `#role_create_time` desc
LIMIT 1 by `#account_id`"""
self.db_client.execute(sql)
def alter_table(self, db, tb, data):
"""
数据库字段检查
添加新字段为第一次出现类型
如果要修改字段类型 存在类型转换问题停止程序删除列
:param db:
:param tb:
:param data:
:return:
"""
default_field = self.get_tb_struct_cache(db, tb)
keys = set(default_field)
for k, v in data.items():
if k in ('#type',):
continue
if k not in default_field:
if isinstance(v, str):
if is_valid_date(v):
default_field[k] = "Nullable(DateTime('UTC'))"
else:
default_field[k] = 'Nullable(String)'
if isinstance(v, int):
default_field[k] = 'Nullable(UInt64)'
if isinstance(v, float):
default_field[k] = 'Nullable(Float32)'
if isinstance(v, list):
default_field[k] = 'Array(String)'
if isinstance(v, bool):
default_field[k] = 'Nullable(UInt8)'
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
print(sql)
try:
self.db_client.execute(sql)
except Exception as e:
print(f'添加字段 {k} 失败')
default_field.pop(k)
else:
self.update_user_view(db, tb)
if set(default_field) - keys:
self.up_tb_struct(db, tb, default_field)

78
v2/transmitter.py Normal file
View File

@ -0,0 +1,78 @@
import json
import re
from .valid_data import *
__all__ = 'Transmitter',
class Transmitter:
def __init__(self, db_client, sketch):
self.db_client = db_client
self.sketch = sketch
self.ts = int(time.time())
self.slots = dict()
def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out}
def check_send(self):
for h, p in self.slots.items():
ts = int(time.time())
tb, pool = h.buffer_pool
for db, buffer in pool.items():
if len(buffer) >= p['bulk_max'] or self.ts + p['time_out'] <= ts:
self.ts = ts
yield db, tb, buffer
@staticmethod
def flat_data(data: dict):
if 'properties' in data:
properties = data.pop('properties')
data.update(properties)
return data
def __send(self, db, tb, data):
sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow '
sql = sql + '\n'.join(data)
try:
# 允许20%错误率
self.db_client.execute('set input_format_allow_errors_ratio=0.2')
self.db_client.execute(sql)
except Exception as e:
# 丢弃错误行 再次发送
if e.code == 26:
m = re.match('(.*)?Stack trace', e.message)
if m:
error_msg = m.group(1)
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
if error_row:
error_row = int(error_row.group(1)) - 1
error_data = data.pop(error_row)
self.__send(db, tb, data)
else:
print(f'{db}.{tb}插入{len(data)}')
def check_table(self, db, tb, data):
[self.sketch.alter_table(db, tb, item) for item in data]
def check_type(self, db, tb, data):
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
for item in data:
del_keys = set()
for k, v in item.items():
type_ = struct_dict[k]
item[k] = TYPE_CK2PY[type_](v, **item)
if v is None:
del_keys.add(k)
print(k, '类型不一致')
for key in del_keys:
del item[key]
def run(self):
for db, tb, buffer in self.check_send():
data = [self.flat_data(x) for x in buffer.values()]
self.check_table(db, tb, data)
self.check_type(db, tb, data)
self.__send(db, tb, [json.dumps(item) for item in data])
buffer.clear()

92
v2/valid_data.py Normal file
View File

@ -0,0 +1,92 @@
import time
from datetime import datetime
from datetime import timedelta
from ipaddress import IPv4Address
def is_valid_date(v, **kwargs):
try:
date = datetime.strptime(v, "%Y-%m-%d %H:%M:%S")
zone_offset = kwargs.get('#zone_offset', 8)
return (date - timedelta(hours=zone_offset)).strftime("%Y-%m-%d %H:%M:%S")
except:
return None
def is_valid_int(v, **kwargs):
try:
return int(v)
except:
return None
def is_valid_srt(v, **kwargs):
if isinstance(v, str):
return v
return None
def is_valid_float(v, **kwargs):
try:
return float(v)
except:
return None
def is_valid_bool(v, **kwargs):
if isinstance(v, bool):
return v
return None
def is_valid_array(v, **kwargs):
if isinstance(v, list):
return [str(i) for i in v]
return None
def is_valid_ipv4(v, **kwargs):
try:
return str(IPv4Address(v))
except:
return None
TYPE_CK2PY = {
"DateTime('UTC')": is_valid_date,
"Nullable(DateTime('UTC'))": is_valid_date,
"DateTime()": is_valid_date,
"Nullable(IPv4)": is_valid_ipv4,
"IPv4": is_valid_ipv4,
"String": is_valid_srt,
"Nullable(String)": is_valid_srt,
"Nullable(UInt8)": is_valid_int,
"UInt8": is_valid_srt,
"Nullable(Int8)": is_valid_int,
"Int8": is_valid_srt,
"Nullable(UInt16)": is_valid_int,
"UInt16": is_valid_srt,
"Nullable(Int16)": is_valid_int,
"Int16": is_valid_srt,
"Nullable(UInt32)": is_valid_int,
"UInt32": is_valid_srt,
"Nullable(UInt64)": is_valid_int,
"UInt64": is_valid_srt,
"Nullable(Int64)": is_valid_int,
"Int64": is_valid_srt,
"Array(String)": is_valid_array,
"Nullable(Float)": is_valid_float,
"Float": is_valid_float,
}

54
初始化事件表.sql Normal file
View File

@ -0,0 +1,54 @@
create table shjy.event
(
`#ip` Nullable(IPv4),
`#country` Nullable(String),
`#country_code` Nullable(String),
`#province` Nullable(String),
`#city` Nullable(String),
`#os_version` Nullable(String),
`#manufacturer` Nullable(String),
`#os` Nullable(String),
`#device_id` Nullable(String),
`#screen_height` Nullable(UInt16),
`#screen_width` Nullable(UInt16),
`#device_model` Nullable(String),
`#app_version` Nullable(String),
`#bundle_id` Nullable(String),
`#lib` Nullable(String),
`#lib_version` Nullable(String),
`#network_type` Nullable(String),
`#carrier` Nullable(String),
`#browser` Nullable(String),
`#browser_version` Nullable(String),
`#duration` Nullable(String),
`#url` Nullable(String),
`#url_path` Nullable(String),
`#referrer` Nullable(String),
`#referrer_host` Nullable(String),
`#title` Nullable(String),
`#screen_name` Nullable(String),
`#element_id` Nullable(String),
`#element_type` Nullable(String),
`#resume_from_background` Nullable(String),
`#element_selector` Nullable(String),
`#element_position` Nullable(String),
`#element_content` Nullable(String),
`#scene` Nullable(String),
`#mp_platform` Nullable(String),
`#app_crashed_reason` Nullable(String),
`#zone_offset` Int8 default 8,
`#event_id` String,
`#event_time` DateTime('UTC'),
`#account_id` String,
`#distinct_id` Nullable(String),
`#event_name` String,
`#server_time` DateTime('UTC') default now(),
`sign` Int8 default 1
) ENGINE = CollapsingMergeTree(sign)
PARTITION BY toYYYYMMDD(`#event_time`)
order by (`#account_id`, `#event_time`, `#event_name`)
-- TTL event_time + toIntervalDay(365)

13
初始化用户表.sql Normal file
View File

@ -0,0 +1,13 @@
create table shjy.user
(
`#role_create_time` DateTime('UTC'),
`#account_id` String,
`svrindex` UInt16,
`#zone_offset` Int8 default 8,
`#server_time` DateTime('UTC') default now()
) ENGINE = ReplacingMergeTree()
PARTITION BY `svrindex`
order by `#account_id`