This commit is contained in:
wuaho 2021-05-11 17:01:02 +08:00
parent 9597bac720
commit e12d4a0832
6 changed files with 27 additions and 11 deletions

8
app.py
View File

@ -8,18 +8,20 @@ from v2.struct_cache import StructCacheFile, StructCacheRedis
class XProcess(Process): class XProcess(Process):
def __init__(self, partition, lock, ipsearch): def __init__(self, partition, lock, ipsearch, log, rdb=None):
super(XProcess, self).__init__() super(XProcess, self).__init__()
self.partition = partition self.partition = partition
self.lock = lock self.lock = lock
self.ipsearch = ipsearch self.ipsearch = ipsearch
self.rdb = rdb
self.log = log
def run(self): def run(self):
db_client = CK(**settings.CK_CONFIG) db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client, StructCacheRedis()) sketch = Sketch(db_client, StructCacheRedis(self.rdb))
handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch) handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch)
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.lock) transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock)
transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_event, 10000, 60)
transmitter.add_source(handler_user, 1000, 60) transmitter.add_source(handler_user, 1000, 60)
last_ts = int(time.time()) last_ts = int(time.time())

View File

@ -1,8 +1,12 @@
import os import os
import redis
from app import XProcess from app import XProcess
from multiprocessing import Lock from multiprocessing import Lock
from settings import settings
from v2.ipregion import IpSearch, Ip2Region from v2.ipregion import IpSearch, Ip2Region
from v2.log import logger
if __name__ == '__main__': if __name__ == '__main__':
pid = os.getpid() pid = os.getpid()
@ -14,6 +18,8 @@ echo `pstree -p {pid}`|awk 'BEGIN{{ FS="(" ; RS=")" }} NF>1 {{ print $NF }}'|xar
f.write(stop_shell.format(pid=pid)) f.write(stop_shell.format(pid=pid))
lock = Lock() lock = Lock()
rdb = redis.Redis(**settings.REDIS_CONF)
ipsearch = IpSearch(Ip2Region, "ip2region.db") ipsearch = IpSearch(Ip2Region, "ip2region.db")
for i in range(0, 16): for i in range(0, 16):
XProcess(i, lock, ipsearch).start() XProcess(i, lock, ipsearch, logger, rdb).start()

View File

@ -1,12 +1,11 @@
import json import json
class Config: class Config:
# ck数据库连接 # ck数据库连接
CK_CONFIG = {'host': '119.29.176.224', CK_CONFIG = {'host': '119.29.176.224',
'send_receive_timeout': 30} 'send_receive_timeout': 30}
#每个游戏不一样 游戏上报 kafka 主题 # 每个游戏不一样 游戏上报 kafka 主题
SUBSCRIBE_TOPIC = 'test2' SUBSCRIBE_TOPIC = 'test2'
KAFKA_CONSUMER_CONF = { KAFKA_CONSUMER_CONF = {
@ -25,7 +24,7 @@ class Config:
'port': 6379, 'port': 6379,
'password': 'd1Gh*zp5', 'password': 'd1Gh*zp5',
'db': 1, # 存表结构 'db': 1, # 存表结构
'decode_responses':True 'decode_responses': True
} }

8
v2/log.py Normal file
View File

@ -0,0 +1,8 @@
import time
from loguru import logger
from settings import settings
__all__ = 'logger'
logger.add(settings.GAME, format="{time} {level} {message}", level="INFO", rotation=100, enqueue=True)

View File

@ -15,8 +15,8 @@ class StructCacheFile:
class StructCacheRedis: class StructCacheRedis:
def __init__(self): def __init__(self, rdb):
self.rdb = redis.Redis(**settings.REDIS_CONF) self.rdb = rdb
def update(self, db, tb, data): def update(self, db, tb, data):
self.rdb.set(f'{db}_{tb}', json.dumps(data)) self.rdb.set(f'{db}_{tb}', json.dumps(data))

View File

@ -10,11 +10,12 @@ __all__ = 'Transmitter',
class Transmitter: class Transmitter:
def __init__(self, db_client, db_name, sketch, lock): def __init__(self, db_client, db_name, sketch, log, lock):
self.db_client = db_client self.db_client = db_client
self.db_name = db_name self.db_name = db_name
self.sketch = sketch self.sketch = sketch
self.slots = dict() self.slots = dict()
self.log = log
self.lock = lock self.lock = lock
def add_source(self, handler, bulk_max=1000, time_out=60): def add_source(self, handler, bulk_max=1000, time_out=60):
@ -83,7 +84,7 @@ class Transmitter:
item[k] = TYPE_CK2PY[type_](v, **item) item[k] = TYPE_CK2PY[type_](v, **item)
if v is None: if v is None:
del_keys.add(k) del_keys.add(k)
print(k, '类型不一致') self.log.warning(f'{k} {type(k)} 类型不一致')
for key in del_keys: for key in del_keys:
del item[key] del item[key]