1
This commit is contained in:
parent
63866424fb
commit
285119d27e
5
app.py
5
app.py
@ -8,20 +8,21 @@ from v2.struct_cache import StructCacheFile, StructCacheRedis
|
||||
|
||||
class XProcess(Process):
|
||||
|
||||
def __init__(self, partition, lock, ipsearch, log, rdb=None):
|
||||
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
||||
super(XProcess, self).__init__()
|
||||
self.partition = partition
|
||||
self.lock = lock
|
||||
self.ipsearch = ipsearch
|
||||
self.rdb = rdb
|
||||
self.log = log
|
||||
self.event_attr = event_attr
|
||||
|
||||
def run(self):
|
||||
db_client = CK(**settings.CK_CONFIG)
|
||||
sketch = Sketch(db_client, StructCacheRedis(self.rdb))
|
||||
handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch)
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr)
|
||||
transmitter.add_source(handler_event, 10000, 60)
|
||||
transmitter.add_source(handler_user, 1000, 60)
|
||||
last_ts = int(time.time())
|
||||
|
4
main.py
4
main.py
@ -5,6 +5,7 @@ from app import XProcess
|
||||
from multiprocessing import Lock
|
||||
|
||||
from settings import settings
|
||||
from v2.event_attr import EventAttr
|
||||
from v2.ipregion import IpSearch, Ip2Region
|
||||
from v2.log import logger
|
||||
|
||||
@ -19,7 +20,8 @@ echo `pstree -p {pid}`|awk 'BEGIN{{ FS="(" ; RS=")" }} NF>1 {{ print $NF }}'|xar
|
||||
|
||||
lock = Lock()
|
||||
rdb = redis.Redis(**settings.REDIS_CONF)
|
||||
event_attr = EventAttr(rdb)
|
||||
|
||||
ipsearch = IpSearch(Ip2Region, "ip2region.db")
|
||||
for i in range(0, 16):
|
||||
XProcess(i, lock, ipsearch, logger, rdb).start()
|
||||
XProcess(i, lock, ipsearch, logger, rdb, event_attr).start()
|
||||
|
@ -1,20 +1,28 @@
|
||||
import time
|
||||
|
||||
import redis
|
||||
|
||||
from settings import settings
|
||||
from v2 import *
|
||||
from v2.event_attr import EventAttr
|
||||
from v2.ipregion import IpSearch, Ip2Region
|
||||
from multiprocessing import Lock
|
||||
|
||||
ipsearch = IpSearch(Ip2Region, "ip2region.db")
|
||||
lock = Lock()
|
||||
|
||||
from v2.log import logger
|
||||
|
||||
rdb = redis.Redis(**settings.REDIS_CONF)
|
||||
event_attr = EventAttr(rdb)
|
||||
|
||||
|
||||
def run():
|
||||
db_client = CK(**settings.CK_CONFIG)
|
||||
sketch = Sketch(db_client)
|
||||
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, lock)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr)
|
||||
transmitter.add_source(handler_event, 10000, 60)
|
||||
transmitter.add_source(handler_user, 1000, 60)
|
||||
last_ts = int(time.time())
|
||||
@ -27,7 +35,7 @@ def run():
|
||||
ts = int(time.time())
|
||||
|
||||
if 'user' in type_:
|
||||
# continue
|
||||
continue
|
||||
obj = getattr(handler_user, type_)
|
||||
handler_user.receive_data.append(UserAct(obj, msg))
|
||||
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
||||
|
30
v2/event_attr.py
Normal file
30
v2/event_attr.py
Normal file
@ -0,0 +1,30 @@
|
||||
from redis import Redis
|
||||
|
||||
|
||||
class EventAttr:
|
||||
event_attr = dict()
|
||||
|
||||
def __init__(self, rdb: Redis):
|
||||
self.rdb = rdb
|
||||
|
||||
def get_event_attr(self, key):
|
||||
attr = self.event_attr.get(key)
|
||||
if not attr:
|
||||
self.event_attr[key] = self.rdb.get(key) or set()
|
||||
return self.event_attr[key]
|
||||
|
||||
def set_event_attr(self, key, *data):
|
||||
self.event_attr[key] = self.rdb.sadd(key, *data)
|
||||
|
||||
def check_attr(self, db, data):
|
||||
event_name = data.get('#event_name')
|
||||
if not event_name:
|
||||
return
|
||||
|
||||
key = f'{db}_event_{event_name}'
|
||||
|
||||
attr = self.get_event_attr(key)
|
||||
data_attr = set(data)
|
||||
extra_attr = data_attr - attr
|
||||
if extra_attr:
|
||||
self.set_event_attr(key, *extra_attr)
|
@ -10,13 +10,14 @@ __all__ = 'Transmitter',
|
||||
|
||||
|
||||
class Transmitter:
|
||||
def __init__(self, db_client, db_name, sketch, log, lock):
|
||||
def __init__(self, db_client, db_name, sketch, log, lock, event_attr):
|
||||
self.db_client = db_client
|
||||
self.db_name = db_name
|
||||
self.sketch = sketch
|
||||
self.slots = dict()
|
||||
self.log = log
|
||||
self.lock = lock
|
||||
self.event_attr = event_attr
|
||||
|
||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||
@ -73,6 +74,11 @@ class Transmitter:
|
||||
def check_table(self, db, tb, data):
|
||||
[self.sketch.alter_table(db, tb, item) for item in data]
|
||||
|
||||
def set_event_attr(self, db, tb, data):
|
||||
if tb != 'event':
|
||||
return
|
||||
[self.event_attr.check_attr(db, item) for item in data]
|
||||
|
||||
def check_type(self, db, tb, data):
|
||||
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
|
||||
for item in data:
|
||||
@ -95,6 +101,7 @@ class Transmitter:
|
||||
data = [self.flat_data(x) for x in buffer.values()]
|
||||
self.check_table(self.db_name, tb, data)
|
||||
self.check_type(self.db_name, tb, data)
|
||||
self.set_event_attr(self.db_name, tb, data)
|
||||
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
|
||||
except:
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user