1
This commit is contained in:
parent
199eb8dba7
commit
9340bf8e58
@ -19,22 +19,23 @@ class Transmitter:
|
||||
self.lock = lock
|
||||
self.event_attr = event_attr
|
||||
self.p = p
|
||||
self.ping_ts = 0
|
||||
|
||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||
|
||||
def check_send(self):
|
||||
ts = int(time.time())
|
||||
for h, p in self.slots.items():
|
||||
ts = int(time.time())
|
||||
tb, buffer = h.buffer_pool
|
||||
buffer_size = len(buffer)
|
||||
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
||||
p['ts'] = ts
|
||||
yield tb, buffer
|
||||
elif p['ts'] + p['time_out'] <= ts:
|
||||
# 保持连接
|
||||
print('保持连接')
|
||||
self.db_client.execute('select 1')
|
||||
if self.ping_ts + 60 < ts:
|
||||
# 保持连接
|
||||
print('保持连接')
|
||||
self.db_client.execute('select 1')
|
||||
|
||||
@staticmethod
|
||||
def flat_data(data: dict):
|
||||
|
Loading…
Reference in New Issue
Block a user