This commit is contained in:
wuaho 2021-08-14 11:04:23 +08:00
parent 0841c29438
commit 4e05b9f461
2 changed files with 28 additions and 8 deletions

1
app.py
View File

@ -11,6 +11,7 @@ class XProcess(Process):
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
super(XProcess, self).__init__()
self.daemon = True
self.partition = partition
self.lock = lock
self.ipsearch = ipsearch

View File

@ -1,6 +1,7 @@
import json
import os
import re
import threading
import time
import traceback
@ -9,6 +10,28 @@ from .valid_data import *
__all__ = 'Transmitter',
class Ping(threading.Thread):
def __init__(self, db_client, log):
threading.Thread.__init__(self)
self.daemon = True
self.ping_ts = 0
self.time_out = 60
self.log = log
self.db_client = db_client
def run(self):
while True:
ts = int(time.time())
if self.ping_ts + self.time_out < ts:
# 保持连接
try:
self.ping_ts = ts
self.log.info('保持连接 ping')
self.db_client.execute('select 1')
except:
self.log.error('ping error')
class Transmitter:
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
self.db_client = db_client
@ -19,19 +42,16 @@ class Transmitter:
self.lock = lock
self.event_attr = event_attr
self.p = p
self.ping_ts = 0
def start_ping(self):
t = Ping(self.db_client, self.log)
t.start()
def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
def check_send(self):
ts = int(time.time())
if self.ping_ts + 60 < ts:
# 保持连接
self.ping_ts = ts
self.log.info('保持连接 ping')
self.db_client.execute('select 1')
for h, p in self.slots.items():
tb, buffer = h.buffer_pool
buffer_size = len(buffer)
@ -39,7 +59,6 @@ class Transmitter:
p['ts'] = ts
yield tb, buffer
@staticmethod
def flat_data(data: dict):
if 'properties' in data: