From 8a1c4140087d10818c3d80337708ffbb8255a137 Mon Sep 17 00:00:00 2001 From: wuaho Date: Sat, 14 Aug 2021 11:30:10 +0800 Subject: [PATCH] 1 --- app.py | 3 +++ settings.py | 2 +- v2/transmitter.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/app.py b/app.py index af71434..e58ee45 100644 --- a/app.py +++ b/app.py @@ -11,6 +11,7 @@ class XProcess(Process): def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None): super(XProcess, self).__init__() + # self.daemon = True self.partition = partition self.lock = lock self.ipsearch = ipsearch @@ -25,8 +26,10 @@ class XProcess(Process): handler_user = HandlerUser(db_client, settings.GAME) transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr, self.partition) + transmitter.start_ping() transmitter.add_source(handler_event, 5000, 60) transmitter.add_source(handler_user, 500, 60) + last_ts = int(time.time()) consumer, kafka_client = create_consumer(self.partition) diff --git a/settings.py b/settings.py index edec1fa..a8149ad 100644 --- a/settings.py +++ b/settings.py @@ -5,7 +5,7 @@ class Config: # ck数据库连接 CK_CONFIG = {'host': '139.159.159.3', 'port': 9654, - 'send_receive_timeout': 30} + } # 每个游戏不一样 游戏上报 kafka 主题 SUBSCRIBE_TOPIC = 'zhengba_test' diff --git a/v2/transmitter.py b/v2/transmitter.py index 329ba0e..226d85e 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -1,6 +1,7 @@ import json import os import re +import threading import time import traceback @@ -9,6 +10,29 @@ from .valid_data import * __all__ = 'Transmitter', +class Ping(threading.Thread): + def __init__(self, db_client, p, log): + threading.Thread.__init__(self) + self.daemon = True + self.ping_ts = 0 + self.time_out = 60 + self.log = log + self.db_client = db_client + self.p = p + + def run(self): + while True: + ts = int(time.time()) + if self.ping_ts + self.time_out < ts: + # 保持连接 + try: + self.ping_ts = ts + self.log.info(f'保持连接{self.p} ping') + self.db_client.execute('select 1') + except: + self.log.error('ping error') + + class Transmitter: def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0): self.db_client = db_client @@ -20,12 +44,16 @@ class Transmitter: self.event_attr = event_attr self.p = p + def start_ping(self): + t = Ping(self.db_client, self.p, self.log) + t.start() + def add_source(self, handler, bulk_max=1000, time_out=60): self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} def check_send(self): + ts = int(time.time()) for h, p in self.slots.items(): - ts = int(time.time()) tb, buffer = h.buffer_pool buffer_size = len(buffer) if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: