to_ck/v2/transmitter.py
2021-04-26 15:40:02 +08:00

110 lines
3.4 KiB
Python

import json
import re
import time
from .valid_data import *
__all__ = 'Transmitter',
class Transmitter:
def __init__(self, db_client, sketch):
self.db_client = db_client
self.sketch = sketch
self.ts = int(time.time())
self.slots = dict()
def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out}
def check_send(self):
for h, p in self.slots.items():
ts = int(time.time())
tb, pool = h.buffer_pool
for db, buffer in pool.items():
if len(buffer) >= p['bulk_max'] or self.ts + p['time_out'] <= ts:
self.ts = ts
yield db, tb, buffer
@staticmethod
def flat_data(data: dict):
if 'properties' in data:
properties = data.pop('properties')
data.update(properties)
return data
def __send(self, db, tb, data):
sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow '
sql = sql + '\n'.join(data)
try:
# 允许20%错误率
self.db_client.execute('set input_format_allow_errors_ratio=0.2')
self.db_client.execute(sql)
except Exception as e:
# 丢弃错误行 再次发送
if e.code == 26:
m = re.match('(.*)?Stack trace', e.message)
if m:
error_msg = m.group(1)
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
if error_row:
error_row = int(error_row.group(1)) - 1
error_data = data.pop(error_row)
self.__send(db, tb, data)
else:
pass
# print(f'{db}.{tb}插入{len(data)}条')
def check_table(self, db, tb, data):
[self.sketch.alter_table(db, tb, item) for item in data]
def check_type(self, db, tb, data):
# import cProfile, pstats
# from io import StringIO
#
# pr = cProfile.Profile()
# pr.enable()
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
for item in data:
del_keys = set()
for k, v in item.items():
if v is None:
del_keys.add(k)
continue
type_ = struct_dict[k]
item[k] = TYPE_CK2PY[type_](v, **item)
if v is None:
del_keys.add(k)
print(k, '类型不一致')
for key in del_keys:
del item[key]
# pr.disable()
# s = StringIO()
# ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
# ps.print_stats()
# print(s.getvalue())
def run(self):
for db, tb, buffer in self.check_send():
# print('*' * 50)
# print(1, int(time.time() * 1000))
data = [self.flat_data(x) for x in buffer.values()]
# print(2, int(time.time() * 1000))
self.check_table(db, tb, data)
# print(3, int(time.time() * 1000))
self.check_type(db, tb, data)
# print(4, int(time.time() * 1000))
self.__send(db, tb, [json.dumps(item) for item in data])
# print(5, int(time.time() * 1000))
buffer.clear()
# print(6, int(time.time() * 1000))