import json import os import re import threading import time import traceback from kafka import TopicPartition, OffsetAndMetadata from settings import settings from .valid_data import * __all__ = 'Transmitter', class Ping(threading.Thread): def __init__(self, db_client, p, log): threading.Thread.__init__(self) self.daemon = True self.ping_ts = 0 self.time_out = 60 self.log = log self.db_client = db_client self.p = p def run(self): while True: time.sleep(10) ts = int(time.time()) if self.ping_ts + self.time_out < ts: # 保持连接 try: self.ping_ts = ts self.log.info(f'保持连接{self.p} ping') self.db_client.execute('select 1') except: self.log.error('ping error') class Transmitter: def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0): self.db_client = db_client self.db_name = db_name self.sketch = sketch self.slots = dict() self.log = log self.lock = lock self.event_attr = event_attr self.p = p self.topic_p = TopicPartition(settings.SUBSCRIBE_TOPIC, p) def start_ping(self): t = Ping(self.db_client, self.p, self.log) t.start() def add_source(self, handler, bulk_max=1000, time_out=60): self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} def check_send(self): ts = int(time.time()) for h, p in self.slots.items(): tb, buffer = h.buffer_pool buffer_size = len(buffer) if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: p['ts'] = ts yield tb, buffer @staticmethod def flat_data(data: dict): if 'properties' in data: properties = data.pop('properties') data.update(properties) return data def __send(self, db, tb, data): sql = f'INSERT INTO {db}.{tb} FORMAT JSONEachRow ' sql = sql + '\n'.join(data) try: # 允许20%错误率 # self.lock.acquire() ts = int(time.time() * 1000) # print(f'{ts} {os.getpid()} 获得锁') self.db_client.execute('set input_format_allow_errors_ratio=0.2') self.db_client.execute(sql) self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}') except Exception as e: self.log.error(traceback.format_exc()) # 丢弃错误行 再次发送 if hasattr(e, 'code') and e.code == 26: m = re.match('(.*)?Stack trace', e.message) self.log.error(data) if m: error_msg = m.group(1) error_row = re.match('.*?errors out of (\d+) rows', error_msg) if error_row: error_row = int(error_row.group(1)) - 1 error_data = data.pop(error_row) self.__send(db, tb, data) else: # pass msg = traceback.format_exc() self.log.error(msg) finally: pass # print(f' {os.getpid()} 释放锁') # self.lock.release() self.log.info(f'{db}.{tb}插入{len(data)}条') def check_table(self, db, tb, data): [self.sketch.alter_table(db, tb, item) for item in data] def collect_event(self, db, tb, data): if tb != 'event': return [self.event_attr.add_event(db, item) for item in data] def check_type(self, db, tb, data): struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] for item in data: del_keys = set() for k, v in item.items(): if v is None: del_keys.add(k) continue type_ = struct_dict[k] item[k] = TYPE_CK2PY[type_](v, **item) if v is None: del_keys.add(k) self.log.warning(f'{k} {type(k)} 类型不一致') for key in del_keys: del item[key] def run(self, kafka_client): for tb, buffer in self.check_send(): try: data = [self.flat_data(x) for x in buffer.values()] self.check_table(self.db_name, tb, data) self.check_type(self.db_name, tb, data) self.collect_event(self.db_name, tb, data) self.__send(self.db_name, tb, [json.dumps(item) for item in data]) except Exception as e: self.log.error(traceback.format_exc()) buffer.clear() try: offsets = kafka_client.committed(self.topic_p) kafka_client.commit({self.topic_p: OffsetAndMetadata(offsets, '')}) except Exception as e: self.log.error(f'进程:{self.p} error:{e}')