init
This commit is contained in:
parent
bb8f33f38b
commit
3df9a6fb5c
2
.gitignore
vendored
2
.gitignore
vendored
@ -128,4 +128,4 @@ dmypy.json
|
|||||||
|
|
||||||
# Pyre type checker
|
# Pyre type checker
|
||||||
.pyre/
|
.pyre/
|
||||||
|
.idea
|
||||||
|
13
Pipfile
Normal file
13
Pipfile
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
[[source]]
|
||||||
|
url = "https://pypi.douban.com/simple"
|
||||||
|
verify_ssl = false
|
||||||
|
name = "pypi"
|
||||||
|
|
||||||
|
[packages]
|
||||||
|
kafka-python = "*"
|
||||||
|
clickhouse-driver = "*"
|
||||||
|
|
||||||
|
[dev-packages]
|
||||||
|
|
||||||
|
[requires]
|
||||||
|
python_version = "3.8"
|
0
__init__.py
Normal file
0
__init__.py
Normal file
1
ck/__init__.py
Normal file
1
ck/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
from ck import *
|
271
ck/ck.py
Normal file
271
ck/ck.py
Normal file
@ -0,0 +1,271 @@
|
|||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
|
||||||
|
import arrow
|
||||||
|
from clickhouse_driver.client import Client
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
|
from .struct_cache import StructCacheFile
|
||||||
|
from .robot import DDRobot
|
||||||
|
|
||||||
|
__all__ = ('CK',)
|
||||||
|
|
||||||
|
|
||||||
|
class DataHandler:
|
||||||
|
handler_link = []
|
||||||
|
|
||||||
|
def __init__(self, func):
|
||||||
|
DataHandler.handler_link.append(func)
|
||||||
|
|
||||||
|
|
||||||
|
class CK:
|
||||||
|
def __init__(self, client=Client(**settings.CK_CONFIG),
|
||||||
|
struct_cache=StructCacheFile(),
|
||||||
|
robot=DDRobot(),
|
||||||
|
bulk_max=1000):
|
||||||
|
|
||||||
|
self.client = client
|
||||||
|
self.struct_cache = struct_cache
|
||||||
|
self.robot = robot
|
||||||
|
self.type_dict = dict()
|
||||||
|
self.struct_dict = dict()
|
||||||
|
self.bulk_data = dict()
|
||||||
|
self.bulk_max = bulk_max
|
||||||
|
self.__send_ts = int(time.time())
|
||||||
|
|
||||||
|
def up_tb_struct(self, db, tb, data):
|
||||||
|
struct_dict = self.struct_dict.get(f'{db}_{tb}', {})
|
||||||
|
|
||||||
|
# 用于类型转换
|
||||||
|
type_dict = self.type_dict.setdefault(f'{db}_{tb}', {})
|
||||||
|
for k, v in struct_dict.items():
|
||||||
|
type_dict.setdefault(v, set())
|
||||||
|
type_dict[v].add(k)
|
||||||
|
# 更新结构记录
|
||||||
|
self.struct_cache.update(db, tb, data)
|
||||||
|
|
||||||
|
@DataHandler
|
||||||
|
def date2utc(self, db, tb, data: dict):
|
||||||
|
"""
|
||||||
|
日期置为utc时间
|
||||||
|
:param data:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
for k in self.type_dict[f'{db}_{tb}'].get('Nullable(DateTime(\'UTC\'))', set()) | self.type_dict[
|
||||||
|
f'{db}_{tb}'].get(
|
||||||
|
'DateTime(\'UTC\')', set()):
|
||||||
|
try:
|
||||||
|
data[k] = arrow.get(data[k]).shift(hours=-data.get('zone_offset', 8)).format('YYYY-MM-DD HH:mm:ss')
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@DataHandler
|
||||||
|
def array2str(self, db, tb, data: dict):
|
||||||
|
"""
|
||||||
|
数组里统一存字符串
|
||||||
|
:param data:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
for k in self.type_dict[f'{db}_{tb}'].get('Array(String)', set()):
|
||||||
|
try:
|
||||||
|
data[k] = [str(v) for v in data[k]]
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@DataHandler
|
||||||
|
def up_user(self, db, tb, data: dict):
|
||||||
|
"""
|
||||||
|
从视图中查出最新user info 与当前user 合并
|
||||||
|
:param db:
|
||||||
|
:param tb:
|
||||||
|
:param data:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if tb != 'user':
|
||||||
|
return
|
||||||
|
sql = f"SELECT * from {db}.user_view where account_id='{data['account_id']}'"
|
||||||
|
d, col = self.client.execute(sql, with_column_types=True)
|
||||||
|
if not d:
|
||||||
|
return
|
||||||
|
for v, c in zip(d[0], col):
|
||||||
|
c = c[0]
|
||||||
|
if c not in data:
|
||||||
|
data[c] = v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v
|
||||||
|
|
||||||
|
def __notify(self, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
预留机器人通知
|
||||||
|
:param args:
|
||||||
|
:param kwargs:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if self.robot:
|
||||||
|
self.robot.send(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_tb_struct_cache(self, db, tb):
|
||||||
|
"""
|
||||||
|
查一条记录 取字段 和类型
|
||||||
|
:param db:
|
||||||
|
:param tb:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if self.struct_dict.get(f'{db}_{tb}'):
|
||||||
|
return self.struct_dict.get(f'{db}_{tb}')
|
||||||
|
|
||||||
|
sql = f'select * from {db}.{tb} limit 1'
|
||||||
|
_, columns = self.client.execute(sql, with_column_types=True)
|
||||||
|
res = {item[0]: item[1] for item in columns}
|
||||||
|
self.struct_dict[f'{db}_{tb}'] = res
|
||||||
|
|
||||||
|
# s = self.client.execute(f'show create {db}.{tb}')[0][0]
|
||||||
|
# s = re.match(r'.*?\((.*?)\)\n?ENGINE', s, re.S).group(1)
|
||||||
|
# res = dict()
|
||||||
|
# for row in s.split('\n'):
|
||||||
|
# row = row.strip()
|
||||||
|
# if not row:
|
||||||
|
# continue
|
||||||
|
# row = re.sub("[,`]", '', row)
|
||||||
|
# k, t = row.split(' ')[:2]
|
||||||
|
# res[k] = t
|
||||||
|
#
|
||||||
|
# self.struct_dict[f'{db}_{tb}'] = res
|
||||||
|
self.up_tb_struct(db, tb, res)
|
||||||
|
return res
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_valid_date(date: str):
|
||||||
|
try:
|
||||||
|
res = arrow.get(date)
|
||||||
|
return res
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def update_user_view(self, db, tb):
|
||||||
|
"""
|
||||||
|
更新视图
|
||||||
|
:param db:
|
||||||
|
:param tb:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if tb != 'user':
|
||||||
|
return
|
||||||
|
sql = f"""drop table {db}.user_view;create view {db}.user_view as select *
|
||||||
|
from {db}.user
|
||||||
|
order by role_create_time desc
|
||||||
|
LIMIT 1 by account_id"""
|
||||||
|
self.client.execute(sql)
|
||||||
|
|
||||||
|
def alter_table(self, db, tb, data):
|
||||||
|
"""
|
||||||
|
数据库字段检查
|
||||||
|
添加新字段为第一次出现类型
|
||||||
|
|
||||||
|
如果要修改字段类型 存在类型转换问题。停止程序,删除列
|
||||||
|
:param db:
|
||||||
|
:param tb:
|
||||||
|
:param data:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
default_field = self.get_tb_struct_cache(db, tb)
|
||||||
|
keys = set(default_field)
|
||||||
|
for k, v in data.items():
|
||||||
|
if k not in default_field:
|
||||||
|
if isinstance(v, str):
|
||||||
|
if self.is_valid_date(v):
|
||||||
|
default_field[k] = "Nullable(DateTime('UTC'))"
|
||||||
|
else:
|
||||||
|
default_field[k] = 'Nullable(String)'
|
||||||
|
|
||||||
|
if isinstance(v, int):
|
||||||
|
default_field[k] = 'Nullable(UInt64)'
|
||||||
|
|
||||||
|
if isinstance(v, float):
|
||||||
|
default_field[k] = 'Nullable(Float32)'
|
||||||
|
|
||||||
|
if isinstance(v, list):
|
||||||
|
default_field[k] = 'Array(String)'
|
||||||
|
|
||||||
|
if isinstance(v, bool):
|
||||||
|
default_field[k] = 'Nullable(UInt8)'
|
||||||
|
|
||||||
|
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
|
||||||
|
print(sql)
|
||||||
|
try:
|
||||||
|
self.client.execute(sql)
|
||||||
|
except Exception as e:
|
||||||
|
print(f'添加字段 {k} 失败')
|
||||||
|
default_field.pop(k)
|
||||||
|
else:
|
||||||
|
self.update_user_view(db, tb)
|
||||||
|
|
||||||
|
if set(default_field) - keys:
|
||||||
|
self.up_tb_struct(db, tb, default_field)
|
||||||
|
|
||||||
|
def __send(self, db, tb, data):
|
||||||
|
"""
|
||||||
|
sql写入需要字段名或全字段
|
||||||
|
这里 json 格式写入
|
||||||
|
超过错误允许率去掉那条记录再次递归
|
||||||
|
一般都是 类型不匹配错误
|
||||||
|
|
||||||
|
:param db:
|
||||||
|
:param tb:
|
||||||
|
:param data:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow '
|
||||||
|
sql = sql + '\n'.join(data)
|
||||||
|
try:
|
||||||
|
# 允许20%错误率
|
||||||
|
self.client.execute('set input_format_allow_errors_ratio=0.2')
|
||||||
|
self.client.execute(sql)
|
||||||
|
except Exception as e:
|
||||||
|
# 丢弃错误行 再次发送
|
||||||
|
if e.code == 26:
|
||||||
|
m = re.match('(.*)?Stack trace', e.message)
|
||||||
|
if m:
|
||||||
|
error_msg = m.group(1)
|
||||||
|
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
|
||||||
|
if error_row:
|
||||||
|
error_row = int(error_row.group(1)) - 1
|
||||||
|
error_data = data.pop(error_row)
|
||||||
|
self.__notify(error_msg, error_data)
|
||||||
|
self.__send(db, tb, data)
|
||||||
|
else:
|
||||||
|
print(f'{db}.{tb}插入{len(data)}条')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
data.clear()
|
||||||
|
|
||||||
|
def __add(self, db, tb, msg):
|
||||||
|
"""
|
||||||
|
列表缓存池
|
||||||
|
:param db:
|
||||||
|
:param tb:
|
||||||
|
:param msg:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
bulk_data = self.bulk_data.setdefault(f'{db}', {'event': [], 'user': []})
|
||||||
|
bulk_data[tb].append(json.dumps(msg))
|
||||||
|
ts = int(time.time())
|
||||||
|
# 满足其一条件 写入ck
|
||||||
|
if len(bulk_data[tb]) >= self.bulk_max or self.__send_ts + 60 <= ts:
|
||||||
|
self.__send_ts = ts
|
||||||
|
self.__send(db, tb, bulk_data[tb])
|
||||||
|
|
||||||
|
def send(self, db, tb, msg):
|
||||||
|
params = (db, tb, msg)
|
||||||
|
self.alter_table(*params)
|
||||||
|
# 数据加工链
|
||||||
|
[f(self, *params) for f in DataHandler.handler_link]
|
||||||
|
self.__add(*params)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
ck_client = CK()
|
9
ck/robot.py
Normal file
9
ck/robot.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
class DDRobot:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def send(self, *args, **kwargs):
|
||||||
|
if args:
|
||||||
|
print(args)
|
||||||
|
if kwargs:
|
||||||
|
print(kwargs)
|
11
ck/struct_cache.py
Normal file
11
ck/struct_cache.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class StructCacheFile:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db, tb, data):
|
||||||
|
with open(f'{db}_{tb}.json', 'w') as f:
|
||||||
|
json.dump(data, f)
|
13
common.py
Normal file
13
common.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
from kafka import KafkaConsumer
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
__all__ = 'consumer',
|
||||||
|
|
||||||
|
|
||||||
|
def consumer():
|
||||||
|
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
||||||
|
c.subscribe(settings.SUBSCRIBE_TOPIC)
|
||||||
|
for msg in c:
|
||||||
|
topic = msg.topic
|
||||||
|
val = msg.value
|
||||||
|
yield topic, val
|
29
main.py
Normal file
29
main.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from ck.ck import CK
|
||||||
|
from common import *
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
ck_client = CK()
|
||||||
|
|
||||||
|
|
||||||
|
def run():
|
||||||
|
for topic, msg in consumer():
|
||||||
|
# print(msg)
|
||||||
|
try:
|
||||||
|
db = settings.APPID_TO_CKDB.get(msg['app_id'])
|
||||||
|
if 'user' in msg['type']:
|
||||||
|
table = 'user'
|
||||||
|
elif 'track' in msg['type']:
|
||||||
|
table = 'event'
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
del msg['type']
|
||||||
|
|
||||||
|
ck_client.send(db, table, msg)
|
||||||
|
except Exception as e:
|
||||||
|
print(traceback.print_exc())
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
run()
|
38
settings.py
Normal file
38
settings.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
CK_CONFIG = {'host': '119.29.176.224',
|
||||||
|
'send_receive_timeout': 3}
|
||||||
|
|
||||||
|
SUBSCRIBE_TOPIC = ['legu_test']
|
||||||
|
|
||||||
|
KAFKA_CONSUMER_CONF = {
|
||||||
|
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||||
|
'value_deserializer': json.loads,
|
||||||
|
'group_id': 'legu_group'
|
||||||
|
}
|
||||||
|
|
||||||
|
TOPIC_TO_LEGU = {
|
||||||
|
'a77703e24e6643d08b74a4163a14f74c': 'legu_test',
|
||||||
|
'c3e0409ac18341149877b08f087db640': 'legu_test'
|
||||||
|
}
|
||||||
|
|
||||||
|
APPID_TO_CKDB = {
|
||||||
|
'a77703e24e6643d08b74a4163a14f74c': 'shjy',
|
||||||
|
'c3e0409ac18341149877b08f087db640': 'shjy'
|
||||||
|
}
|
||||||
|
|
||||||
|
REDIS_CONF = {
|
||||||
|
'host': '192.168.0.161',
|
||||||
|
'port': 6379,
|
||||||
|
'password': 'd1Gh*zp5',
|
||||||
|
'ck': 10 # ck
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Debug(Config):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
settings = Debug
|
Loading…
Reference in New Issue
Block a user