1
This commit is contained in:
parent
c6b50804d3
commit
8a1c414008
3
app.py
3
app.py
@ -11,6 +11,7 @@ class XProcess(Process):
|
||||
|
||||
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
||||
super(XProcess, self).__init__()
|
||||
# self.daemon = True
|
||||
self.partition = partition
|
||||
self.lock = lock
|
||||
self.ipsearch = ipsearch
|
||||
@ -25,8 +26,10 @@ class XProcess(Process):
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
||||
self.partition)
|
||||
transmitter.start_ping()
|
||||
transmitter.add_source(handler_event, 5000, 60)
|
||||
transmitter.add_source(handler_user, 500, 60)
|
||||
|
||||
last_ts = int(time.time())
|
||||
consumer, kafka_client = create_consumer(self.partition)
|
||||
|
||||
|
@ -5,7 +5,7 @@ class Config:
|
||||
# ck数据库连接
|
||||
CK_CONFIG = {'host': '139.159.159.3',
|
||||
'port': 9654,
|
||||
'send_receive_timeout': 30}
|
||||
}
|
||||
|
||||
# 每个游戏不一样 游戏上报 kafka 主题
|
||||
SUBSCRIBE_TOPIC = 'zhengba_test'
|
||||
|
@ -1,6 +1,7 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
@ -9,6 +10,29 @@ from .valid_data import *
|
||||
__all__ = 'Transmitter',
|
||||
|
||||
|
||||
class Ping(threading.Thread):
|
||||
def __init__(self, db_client, p, log):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.ping_ts = 0
|
||||
self.time_out = 60
|
||||
self.log = log
|
||||
self.db_client = db_client
|
||||
self.p = p
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
ts = int(time.time())
|
||||
if self.ping_ts + self.time_out < ts:
|
||||
# 保持连接
|
||||
try:
|
||||
self.ping_ts = ts
|
||||
self.log.info(f'保持连接{self.p} ping')
|
||||
self.db_client.execute('select 1')
|
||||
except:
|
||||
self.log.error('ping error')
|
||||
|
||||
|
||||
class Transmitter:
|
||||
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
||||
self.db_client = db_client
|
||||
@ -20,12 +44,16 @@ class Transmitter:
|
||||
self.event_attr = event_attr
|
||||
self.p = p
|
||||
|
||||
def start_ping(self):
|
||||
t = Ping(self.db_client, self.p, self.log)
|
||||
t.start()
|
||||
|
||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||
|
||||
def check_send(self):
|
||||
for h, p in self.slots.items():
|
||||
ts = int(time.time())
|
||||
for h, p in self.slots.items():
|
||||
tb, buffer = h.buffer_pool
|
||||
buffer_size = len(buffer)
|
||||
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
||||
|
Loading…
Reference in New Issue
Block a user