diff --git a/main.py b/main.py index 0d6f6dc..f041f67 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client) handler_event = HandlerEvent(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME) -transmitter = Transmitter(db_client, sketch) +transmitter = Transmitter(db_client, settings.GAME, sketch) def run(): @@ -24,11 +24,10 @@ def run(): i = 0 type_ = msg['#type'] del msg['#type'] - db = settings.APPID_TO_CKDB.get(msg['#app_id']) if 'user' in type_: # continue obj = getattr(handler_user, type_) - handler_user.receive_data.append(User(obj, db, msg)) + handler_user.receive_data.append(UserAct(obj, msg)) if len(handler_user.receive_data) >= 1000: handler_user.execute() @@ -36,7 +35,7 @@ def run(): elif 'track' in type_: # continue obj = getattr(handler_event, type_) - obj(db, msg) + obj(msg) else: continue diff --git a/settings.py b/settings.py index b4b139b..0c8cc14 100644 --- a/settings.py +++ b/settings.py @@ -10,22 +10,11 @@ class Config: KAFKA_CONSUMER_CONF = { 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'value_deserializer': json.loads, - # 'group_id': 'legu_group' - 'group_id': 'ta2legu' - } - - TOPIC_TO_LEGU = { - 'a77703e24e6643d08b74a4163a14f74c': 'legu_test', - 'c3e0409ac18341149877b08f087db640': 'legu_test' + 'group_id': 'legu_group' } GAME = 'shjy' - APPID_TO_CKDB = { - 'a77703e24e6643d08b74a4163a14f74c': 'shjy', - 'c3e0409ac18341149877b08f087db640': 'shjy' - } - REDIS_CONF = { 'host': '192.168.0.161', 'port': 6379, diff --git a/v2/handler_event.py b/v2/handler_event.py index 878369d..2e0ce6d 100644 --- a/v2/handler_event.py +++ b/v2/handler_event.py @@ -6,7 +6,7 @@ __all__ = 'HandlerEvent', class HandlerEvent: tb = 'event' - def __init__(self, db_client,db_name): + def __init__(self, db_client, db_name): self.event = dict() self.db_client = db_client self.db_name = db_name @@ -23,32 +23,29 @@ class HandlerEvent: for k, v in b['properties'].items(): a[k] = v - def track(self, db, data): - event = self.event.setdefault(db, {}) - event[len(event)] = data + def track(self, data): + self.event[len(self.event)] = data - def track_update(self, db, data): + def track_update(self, data): if '#event_id' not in data: return - old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']}) + old_event = self.db_client.get_one(self.db_name, self.tb, {'#event_id': data['#event_id']}) old_event['sign'] = -1 - event = self.event.setdefault(db, {}) - event[len(event)] = old_event + self.event[len(self.event)] = old_event new_event = copy.deepcopy(old_event) self.merge_update(new_event, data) new_event['sign'] = 1 - event[len(event)] = new_event + self.event[len(self.event)] = new_event - def track_overwrite(self, db, data): + def track_overwrite(self, data): if '#event_id' not in data: return - old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']}) + old_event = self.db_client.get_one(self.db_name, self.tb, {'#event_id': data['#event_id']}) old_event['sign'] = -1 - event = self.event.setdefault(db, {}) - event[len(event)] = old_event + self.event[len(self.event)] = old_event - event[len(event)] = data + self.event[len(self.event)] = data @property def buffer_pool(self): diff --git a/v2/handler_user.py b/v2/handler_user.py index 53ff0ac..1829699 100644 --- a/v2/handler_user.py +++ b/v2/handler_user.py @@ -1,8 +1,8 @@ from collections import namedtuple -__all__ = 'HandlerUser', 'User' +__all__ = 'HandlerUser', 'UserAct' -User = namedtuple('User', ['obj', 'db', 'msg']) +UserAct = namedtuple('User', ['obj', 'msg']) class HandlerUser: @@ -16,35 +16,34 @@ class HandlerUser: self.db_name = db_name def execute(self): - account_ids = set(item.msg.get('#account_id') for item in self.receive_data) - set( - self.users.setdefault(self.db_name, {})) + account_ids = set(item.msg.get('#account_id') for item in self.receive_data) - set(self.users) if not account_ids: return self.get_users(account_ids) for item in self.receive_data: - item.obj(item.db, item.msg) + item.obj(item.msg) self.receive_data.clear() def get_users(self, account_ids: set): where = f'`#account_id` in {tuple(account_ids)}' res = self.db_client.get_all(self.db_name, 'user_view', where) for item in res.values(): - self.users.setdefault(self.db_name, {}).setdefault(item['#account_id'], item) + self.users[item['#account_id']] = item - def get_user(self, db, account_id, data=None): - user = self.users.get(db, {}).get(account_id) + def get_user(self, account_id, data=None): + user = self.users.get(account_id) if user: return user - user = self.db_client.get_one(db, f'{self.tb}_view', **{'#account_id': account_id}) + user = self.db_client.get_one(self.db_name, f'{self.tb}_view', **{'#account_id': account_id}) if user: - self.users.setdefault(db, {})[account_id] = user + self.users[account_id] = user return user if not isinstance(data, dict): return user = dict() self.merge(user, data) - self.users.setdefault(db, {})[account_id] = user + self.users[account_id] = user return user def merge(self, a: dict, b: dict): @@ -100,7 +99,7 @@ class HandlerUser: if k in a: del a[k] - def user_set(self, db, data: dict): + def user_set(self, data: dict): """ 注意 data 结构包含 properties @@ -111,28 +110,28 @@ class HandlerUser: :return: """ account_id = data[self.user_key] - user = self.get_user(db, account_id, data) + user = self.get_user(account_id, data) self.merge(user, data) - def user_setOnce(self, db: str, data: dict): + def user_setOnce(self, data: dict): account_id = data[self.user_key] - user = self.get_user(db, account_id, data) + user = self.get_user(account_id, data) self.merge_once(user, data) - def user_add(self, db: str, data: dict): + def user_add(self, data: dict): account_id = data[self.user_key] - user = self.get_user(db, account_id, data) + user = self.get_user(account_id, data) self.merge_add(user, data) - def user_unset(self, db: str, data: dict): + def user_unset(self, data: dict): account_id = data[self.user_key] - user = self.get_user(db, account_id) + user = self.get_user(account_id) self.merge_unset(user, data) - def user_append(self, db: str, data: dict): + def user_append(self, data: dict): pass - def user_del(self, db: str, data: dict): + def user_del(self, data: dict): pass @property diff --git a/v2/transmitter.py b/v2/transmitter.py index 5b24da4..6dfe78d 100644 --- a/v2/transmitter.py +++ b/v2/transmitter.py @@ -8,8 +8,9 @@ __all__ = 'Transmitter', class Transmitter: - def __init__(self, db_client, sketch): + def __init__(self, db_client, db_name, sketch): self.db_client = db_client + self.db_name = db_name self.sketch = sketch self.ts = int(time.time()) self.slots = dict() @@ -20,11 +21,11 @@ class Transmitter: def check_send(self): for h, p in self.slots.items(): ts = int(time.time()) - tb, pool = h.buffer_pool - for db, buffer in pool.items(): - if len(buffer) >= p['bulk_max'] or self.ts + p['time_out'] <= ts: - self.ts = ts - yield db, tb, buffer + tb, buffer = h.buffer_pool + buffer_size = len(buffer) + if (self.ts + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: + self.ts = ts + yield tb, buffer @staticmethod def flat_data(data: dict): @@ -59,11 +60,6 @@ class Transmitter: [self.sketch.alter_table(db, tb, item) for item in data] def check_type(self, db, tb, data): - # import cProfile, pstats - # from io import StringIO - # - # pr = cProfile.Profile() - # pr.enable() struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] for item in data: del_keys = set() @@ -79,31 +75,10 @@ class Transmitter: for key in del_keys: del item[key] - # pr.disable() - # s = StringIO() - # ps = pstats.Stats(pr, stream=s).sort_stats('cumulative') - # ps.print_stats() - # print(s.getvalue()) - def run(self): - - - for db, tb, buffer in self.check_send(): - # print('*' * 50) - # print(1, int(time.time() * 1000)) + for tb, buffer in self.check_send(): data = [self.flat_data(x) for x in buffer.values()] - # print(2, int(time.time() * 1000)) - - self.check_table(db, tb, data) - # print(3, int(time.time() * 1000)) - - self.check_type(db, tb, data) - # print(4, int(time.time() * 1000)) - - self.__send(db, tb, [json.dumps(item) for item in data]) - # print(5, int(time.time() * 1000)) - + self.check_table(self.db_name, tb, data) + self.check_type(self.db_name, tb, data) + self.__send(self.db_name, tb, [json.dumps(item) for item in data]) buffer.clear() - # print(6, int(time.time() * 1000)) - -