From 285119d27e02dd2a95f85baded050faa97752a11 Mon Sep 17 00:00:00 2001 From: wuaho Date: Wed, 19 May 2021 15:27:07 +0800 Subject: [PATCH] 1 --- app.py | 5 +++-- main.py | 4 +++- single_process.py | 12 ++++++++++-- v2/event_attr.py | 30 ++++++++++++++++++++++++++++++ v2/transmitter.py | 9 ++++++++- 5 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 v2/event_attr.py diff --git a/app.py b/app.py index b624073..e803e22 100644 --- a/app.py +++ b/app.py @@ -8,20 +8,21 @@ from v2.struct_cache import StructCacheFile, StructCacheRedis class XProcess(Process): - def __init__(self, partition, lock, ipsearch, log, rdb=None): + def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None): super(XProcess, self).__init__() self.partition = partition self.lock = lock self.ipsearch = ipsearch self.rdb = rdb self.log = log + self.event_attr = event_attr def run(self): db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client, StructCacheRedis(self.rdb)) handler_event = HandlerEvent(db_client, settings.GAME, self.ipsearch) handler_user = HandlerUser(db_client, settings.GAME) - transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock) + transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr) transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_user, 1000, 60) last_ts = int(time.time()) diff --git a/main.py b/main.py index b873b83..c605202 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ from app import XProcess from multiprocessing import Lock from settings import settings +from v2.event_attr import EventAttr from v2.ipregion import IpSearch, Ip2Region from v2.log import logger @@ -19,7 +20,8 @@ echo `pstree -p {pid}`|awk 'BEGIN{{ FS="(" ; RS=")" }} NF>1 {{ print $NF }}'|xar lock = Lock() rdb = redis.Redis(**settings.REDIS_CONF) + event_attr = EventAttr(rdb) ipsearch = IpSearch(Ip2Region, "ip2region.db") for i in range(0, 16): - XProcess(i, lock, ipsearch, logger, rdb).start() + XProcess(i, lock, ipsearch, logger, rdb, event_attr).start() diff --git a/single_process.py b/single_process.py index 2042b6c..03cfa98 100644 --- a/single_process.py +++ b/single_process.py @@ -1,20 +1,28 @@ import time +import redis + from settings import settings from v2 import * +from v2.event_attr import EventAttr from v2.ipregion import IpSearch, Ip2Region from multiprocessing import Lock ipsearch = IpSearch(Ip2Region, "ip2region.db") lock = Lock() +from v2.log import logger + +rdb = redis.Redis(**settings.REDIS_CONF) +event_attr = EventAttr(rdb) + def run(): db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client) handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_user = HandlerUser(db_client, settings.GAME) - transmitter = Transmitter(db_client, settings.GAME, sketch, lock) + transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_user, 1000, 60) last_ts = int(time.time()) @@ -27,7 +35,7 @@ def run(): ts = int(time.time()) if 'user' in type_: - # continue + continue obj = getattr(handler_user, type_) handler_user.receive_data.append(UserAct(obj, msg)) if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: diff --git a/v2/event_attr.py b/v2/event_attr.py new file mode 100644 index 0000000..1c0648c --- /dev/null +++ b/v2/event_attr.py @@ -0,0 +1,30 @@ +from redis import Redis + + +class EventAttr: + event_attr = dict() + + def __init__(self, rdb: Redis): + self.rdb = rdb + + def get_event_attr(self, key): + attr = self.event_attr.get(key) + if not attr: + self.event_attr[key] = self.rdb.get(key) or set() + return self.event_attr[key] + + def set_event_attr(self, key, *data): + self.event_attr[key] = self.rdb.sadd(key, *data) + + def check_attr(self, db, data): + event_name = data.get('#event_name') + if not event_name: + return + + key = f'{db}_event_{event_name}' + + attr = self.get_event_attr(key) + data_attr = set(data) + extra_attr = data_attr - attr + if extra_attr: + self.set_event_attr(key, *extra_attr) diff --git a/v2/transmitter.py b/v2/transmitter.py index 430ca33..3c5190f 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -10,13 +10,14 @@ __all__ = 'Transmitter', class Transmitter: - def __init__(self, db_client, db_name, sketch, log, lock): + def __init__(self, db_client, db_name, sketch, log, lock, event_attr): self.db_client = db_client self.db_name = db_name self.sketch = sketch self.slots = dict() self.log = log self.lock = lock + self.event_attr = event_attr def add_source(self, handler, bulk_max=1000, time_out=60): self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} @@ -73,6 +74,11 @@ class Transmitter: def check_table(self, db, tb, data): [self.sketch.alter_table(db, tb, item) for item in data] + def set_event_attr(self, db, tb, data): + if tb != 'event': + return + [self.event_attr.check_attr(db, item) for item in data] + def check_type(self, db, tb, data): struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] for item in data: @@ -95,6 +101,7 @@ class Transmitter: data = [self.flat_data(x) for x in buffer.values()] self.check_table(self.db_name, tb, data) self.check_type(self.db_name, tb, data) + self.set_event_attr(self.db_name, tb, data) self.__send(self.db_name, tb, [json.dumps(item) for item in data]) except: pass