142 lines
4.5 KiB
Python
142 lines
4.5 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
from .valid_data import *
|
|
|
|
__all__ = 'Transmitter',
|
|
|
|
|
|
class Ping(threading.Thread):
|
|
def __init__(self, db_client, p, log):
|
|
threading.Thread.__init__(self)
|
|
self.daemon = True
|
|
self.ping_ts = 0
|
|
self.time_out = 60
|
|
self.log = log
|
|
self.db_client = db_client
|
|
self.p = p
|
|
|
|
def run(self):
|
|
while True:
|
|
time.sleep(10)
|
|
ts = int(time.time())
|
|
if self.ping_ts + self.time_out < ts:
|
|
# 保持连接
|
|
try:
|
|
self.ping_ts = ts
|
|
self.log.info(f'保持连接{self.p} ping')
|
|
self.db_client.execute('select 1')
|
|
except:
|
|
self.log.error('ping error')
|
|
|
|
|
|
class Transmitter:
|
|
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
|
self.db_client = db_client
|
|
self.db_name = db_name
|
|
self.sketch = sketch
|
|
self.slots = dict()
|
|
self.log = log
|
|
self.lock = lock
|
|
self.event_attr = event_attr
|
|
self.p = p
|
|
|
|
def start_ping(self):
|
|
t = Ping(self.db_client, self.p, self.log)
|
|
t.start()
|
|
|
|
def add_source(self, handler, bulk_max=1000, time_out=60):
|
|
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
|
|
|
def check_send(self):
|
|
ts = int(time.time())
|
|
for h, p in self.slots.items():
|
|
tb, buffer = h.buffer_pool
|
|
buffer_size = len(buffer)
|
|
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
|
p['ts'] = ts
|
|
yield tb, buffer
|
|
|
|
@staticmethod
|
|
def flat_data(data: dict):
|
|
if 'properties' in data:
|
|
properties = data.pop('properties')
|
|
data.update(properties)
|
|
return data
|
|
|
|
def __send(self, db, tb, data):
|
|
sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow '
|
|
sql = sql + '\n'.join(data)
|
|
try:
|
|
# 允许20%错误率
|
|
# self.lock.acquire()
|
|
ts = int(time.time() * 1000)
|
|
# print(f'{ts} {os.getpid()} 获得锁')
|
|
self.db_client.execute('set input_format_allow_errors_ratio=0.2')
|
|
self.db_client.execute(sql)
|
|
self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}')
|
|
except Exception as e:
|
|
# 丢弃错误行 再次发送
|
|
if hasattr(e, 'code') and e.code == 26:
|
|
m = re.match('(.*)?Stack trace', e.message)
|
|
if m:
|
|
error_msg = m.group(1)
|
|
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
|
|
if error_row:
|
|
error_row = int(error_row.group(1)) - 1
|
|
error_data = data.pop(error_row)
|
|
self.__send(db, tb, data)
|
|
else:
|
|
# pass
|
|
msg = traceback.format_exc()
|
|
self.log.error(msg)
|
|
finally:
|
|
pass
|
|
# print(f' {os.getpid()} 释放锁')
|
|
# self.lock.release()
|
|
|
|
self.log.info(f'{db}.{tb}插入{len(data)}条')
|
|
|
|
def check_table(self, db, tb, data):
|
|
[self.sketch.alter_table(db, tb, item) for item in data]
|
|
|
|
|
|
|
|
def collect_event(self, db, tb, data):
|
|
if tb != 'event':
|
|
return
|
|
|
|
[self.event_attr.add_event(db, item) for item in data]
|
|
|
|
def check_type(self, db, tb, data):
|
|
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
|
|
for item in data:
|
|
del_keys = set()
|
|
for k, v in item.items():
|
|
if v is None:
|
|
del_keys.add(k)
|
|
continue
|
|
type_ = struct_dict[k]
|
|
item[k] = TYPE_CK2PY[type_](v, **item)
|
|
if v is None:
|
|
del_keys.add(k)
|
|
self.log.warning(f'{k} {type(k)} 类型不一致')
|
|
for key in del_keys:
|
|
del item[key]
|
|
|
|
def run(self):
|
|
for tb, buffer in self.check_send():
|
|
try:
|
|
data = [self.flat_data(x) for x in buffer.values()]
|
|
self.check_table(self.db_name, tb, data)
|
|
self.check_type(self.db_name, tb, data)
|
|
self.collect_event(self.db_name, tb, data)
|
|
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
|
|
except Exception as e:
|
|
self.log.error(e)
|
|
buffer.clear()
|