From 4e05b9f46198807d3616e0be153df8387a7dfa70 Mon Sep 17 00:00:00 2001 From: wuaho Date: Sat, 14 Aug 2021 11:04:23 +0800 Subject: [PATCH] 1 --- app.py | 1 + v2/transmitter.py | 35 +++++++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/app.py b/app.py index af71434..3c3ba6e 100644 --- a/app.py +++ b/app.py @@ -11,6 +11,7 @@ class XProcess(Process): def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None): super(XProcess, self).__init__() + self.daemon = True self.partition = partition self.lock = lock self.ipsearch = ipsearch diff --git a/v2/transmitter.py b/v2/transmitter.py index 060a0a3..e76a7a4 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -1,6 +1,7 @@ import json import os import re +import threading import time import traceback @@ -9,6 +10,28 @@ from .valid_data import * __all__ = 'Transmitter', +class Ping(threading.Thread): + def __init__(self, db_client, log): + threading.Thread.__init__(self) + self.daemon = True + self.ping_ts = 0 + self.time_out = 60 + self.log = log + self.db_client = db_client + + def run(self): + while True: + ts = int(time.time()) + if self.ping_ts + self.time_out < ts: + # 保持连接 + try: + self.ping_ts = ts + self.log.info('保持连接 ping') + self.db_client.execute('select 1') + except: + self.log.error('ping error') + + class Transmitter: def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0): self.db_client = db_client @@ -19,19 +42,16 @@ class Transmitter: self.lock = lock self.event_attr = event_attr self.p = p - self.ping_ts = 0 + + def start_ping(self): + t = Ping(self.db_client, self.log) + t.start() def add_source(self, handler, bulk_max=1000, time_out=60): self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} def check_send(self): ts = int(time.time()) - if self.ping_ts + 60 < ts: - # 保持连接 - self.ping_ts = ts - self.log.info('保持连接 ping') - self.db_client.execute('select 1') - for h, p in self.slots.items(): tb, buffer = h.buffer_pool buffer_size = len(buffer) @@ -39,7 +59,6 @@ class Transmitter: p['ts'] = ts yield tb, buffer - @staticmethod def flat_data(data: dict): if 'properties' in data: