db 固定

This commit is contained in:
wuaho 2021-04-26 17:23:36 +08:00
parent 431af3aff1
commit 544675bebc
5 changed files with 46 additions and 87 deletions

View File

@ -7,7 +7,7 @@ db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client) sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME) handler_event = HandlerEvent(db_client, settings.GAME)
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, sketch) transmitter = Transmitter(db_client, settings.GAME, sketch)
def run(): def run():
@ -24,11 +24,10 @@ def run():
i = 0 i = 0
type_ = msg['#type'] type_ = msg['#type']
del msg['#type'] del msg['#type']
db = settings.APPID_TO_CKDB.get(msg['#app_id'])
if 'user' in type_: if 'user' in type_:
# continue # continue
obj = getattr(handler_user, type_) obj = getattr(handler_user, type_)
handler_user.receive_data.append(User(obj, db, msg)) handler_user.receive_data.append(UserAct(obj, msg))
if len(handler_user.receive_data) >= 1000: if len(handler_user.receive_data) >= 1000:
handler_user.execute() handler_user.execute()
@ -36,7 +35,7 @@ def run():
elif 'track' in type_: elif 'track' in type_:
# continue # continue
obj = getattr(handler_event, type_) obj = getattr(handler_event, type_)
obj(db, msg) obj(msg)
else: else:
continue continue

View File

@ -10,22 +10,11 @@ class Config:
KAFKA_CONSUMER_CONF = { KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads, 'value_deserializer': json.loads,
# 'group_id': 'legu_group' 'group_id': 'legu_group'
'group_id': 'ta2legu'
}
TOPIC_TO_LEGU = {
'a77703e24e6643d08b74a4163a14f74c': 'legu_test',
'c3e0409ac18341149877b08f087db640': 'legu_test'
} }
GAME = 'shjy' GAME = 'shjy'
APPID_TO_CKDB = {
'a77703e24e6643d08b74a4163a14f74c': 'shjy',
'c3e0409ac18341149877b08f087db640': 'shjy'
}
REDIS_CONF = { REDIS_CONF = {
'host': '192.168.0.161', 'host': '192.168.0.161',
'port': 6379, 'port': 6379,

View File

@ -23,32 +23,29 @@ class HandlerEvent:
for k, v in b['properties'].items(): for k, v in b['properties'].items():
a[k] = v a[k] = v
def track(self, db, data): def track(self, data):
event = self.event.setdefault(db, {}) self.event[len(self.event)] = data
event[len(event)] = data
def track_update(self, db, data): def track_update(self, data):
if '#event_id' not in data: if '#event_id' not in data:
return return
old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']}) old_event = self.db_client.get_one(self.db_name, self.tb, {'#event_id': data['#event_id']})
old_event['sign'] = -1 old_event['sign'] = -1
event = self.event.setdefault(db, {}) self.event[len(self.event)] = old_event
event[len(event)] = old_event
new_event = copy.deepcopy(old_event) new_event = copy.deepcopy(old_event)
self.merge_update(new_event, data) self.merge_update(new_event, data)
new_event['sign'] = 1 new_event['sign'] = 1
event[len(event)] = new_event self.event[len(self.event)] = new_event
def track_overwrite(self, db, data): def track_overwrite(self, data):
if '#event_id' not in data: if '#event_id' not in data:
return return
old_event = self.db_client.get_one(db, self.tb, {'#event_id': data['#event_id']}) old_event = self.db_client.get_one(self.db_name, self.tb, {'#event_id': data['#event_id']})
old_event['sign'] = -1 old_event['sign'] = -1
event = self.event.setdefault(db, {}) self.event[len(self.event)] = old_event
event[len(event)] = old_event
event[len(event)] = data self.event[len(self.event)] = data
@property @property
def buffer_pool(self): def buffer_pool(self):

View File

@ -1,8 +1,8 @@
from collections import namedtuple from collections import namedtuple
__all__ = 'HandlerUser', 'User' __all__ = 'HandlerUser', 'UserAct'
User = namedtuple('User', ['obj', 'db', 'msg']) UserAct = namedtuple('User', ['obj', 'msg'])
class HandlerUser: class HandlerUser:
@ -16,35 +16,34 @@ class HandlerUser:
self.db_name = db_name self.db_name = db_name
def execute(self): def execute(self):
account_ids = set(item.msg.get('#account_id') for item in self.receive_data) - set( account_ids = set(item.msg.get('#account_id') for item in self.receive_data) - set(self.users)
self.users.setdefault(self.db_name, {}))
if not account_ids: if not account_ids:
return return
self.get_users(account_ids) self.get_users(account_ids)
for item in self.receive_data: for item in self.receive_data:
item.obj(item.db, item.msg) item.obj(item.msg)
self.receive_data.clear() self.receive_data.clear()
def get_users(self, account_ids: set): def get_users(self, account_ids: set):
where = f'`#account_id` in {tuple(account_ids)}' where = f'`#account_id` in {tuple(account_ids)}'
res = self.db_client.get_all(self.db_name, 'user_view', where) res = self.db_client.get_all(self.db_name, 'user_view', where)
for item in res.values(): for item in res.values():
self.users.setdefault(self.db_name, {}).setdefault(item['#account_id'], item) self.users[item['#account_id']] = item
def get_user(self, db, account_id, data=None): def get_user(self, account_id, data=None):
user = self.users.get(db, {}).get(account_id) user = self.users.get(account_id)
if user: if user:
return user return user
user = self.db_client.get_one(db, f'{self.tb}_view', **{'#account_id': account_id}) user = self.db_client.get_one(self.db_name, f'{self.tb}_view', **{'#account_id': account_id})
if user: if user:
self.users.setdefault(db, {})[account_id] = user self.users[account_id] = user
return user return user
if not isinstance(data, dict): if not isinstance(data, dict):
return return
user = dict() user = dict()
self.merge(user, data) self.merge(user, data)
self.users.setdefault(db, {})[account_id] = user self.users[account_id] = user
return user return user
def merge(self, a: dict, b: dict): def merge(self, a: dict, b: dict):
@ -100,7 +99,7 @@ class HandlerUser:
if k in a: if k in a:
del a[k] del a[k]
def user_set(self, db, data: dict): def user_set(self, data: dict):
""" """
注意 注意
data 结构包含 properties data 结构包含 properties
@ -111,28 +110,28 @@ class HandlerUser:
:return: :return:
""" """
account_id = data[self.user_key] account_id = data[self.user_key]
user = self.get_user(db, account_id, data) user = self.get_user(account_id, data)
self.merge(user, data) self.merge(user, data)
def user_setOnce(self, db: str, data: dict): def user_setOnce(self, data: dict):
account_id = data[self.user_key] account_id = data[self.user_key]
user = self.get_user(db, account_id, data) user = self.get_user(account_id, data)
self.merge_once(user, data) self.merge_once(user, data)
def user_add(self, db: str, data: dict): def user_add(self, data: dict):
account_id = data[self.user_key] account_id = data[self.user_key]
user = self.get_user(db, account_id, data) user = self.get_user(account_id, data)
self.merge_add(user, data) self.merge_add(user, data)
def user_unset(self, db: str, data: dict): def user_unset(self, data: dict):
account_id = data[self.user_key] account_id = data[self.user_key]
user = self.get_user(db, account_id) user = self.get_user(account_id)
self.merge_unset(user, data) self.merge_unset(user, data)
def user_append(self, db: str, data: dict): def user_append(self, data: dict):
pass pass
def user_del(self, db: str, data: dict): def user_del(self, data: dict):
pass pass
@property @property

View File

@ -8,8 +8,9 @@ __all__ = 'Transmitter',
class Transmitter: class Transmitter:
def __init__(self, db_client, sketch): def __init__(self, db_client, db_name, sketch):
self.db_client = db_client self.db_client = db_client
self.db_name = db_name
self.sketch = sketch self.sketch = sketch
self.ts = int(time.time()) self.ts = int(time.time())
self.slots = dict() self.slots = dict()
@ -20,11 +21,11 @@ class Transmitter:
def check_send(self): def check_send(self):
for h, p in self.slots.items(): for h, p in self.slots.items():
ts = int(time.time()) ts = int(time.time())
tb, pool = h.buffer_pool tb, buffer = h.buffer_pool
for db, buffer in pool.items(): buffer_size = len(buffer)
if len(buffer) >= p['bulk_max'] or self.ts + p['time_out'] <= ts: if (self.ts + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
self.ts = ts self.ts = ts
yield db, tb, buffer yield tb, buffer
@staticmethod @staticmethod
def flat_data(data: dict): def flat_data(data: dict):
@ -59,11 +60,6 @@ class Transmitter:
[self.sketch.alter_table(db, tb, item) for item in data] [self.sketch.alter_table(db, tb, item) for item in data]
def check_type(self, db, tb, data): def check_type(self, db, tb, data):
# import cProfile, pstats
# from io import StringIO
#
# pr = cProfile.Profile()
# pr.enable()
struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
for item in data: for item in data:
del_keys = set() del_keys = set()
@ -79,31 +75,10 @@ class Transmitter:
for key in del_keys: for key in del_keys:
del item[key] del item[key]
# pr.disable()
# s = StringIO()
# ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
# ps.print_stats()
# print(s.getvalue())
def run(self): def run(self):
for tb, buffer in self.check_send():
for db, tb, buffer in self.check_send():
# print('*' * 50)
# print(1, int(time.time() * 1000))
data = [self.flat_data(x) for x in buffer.values()] data = [self.flat_data(x) for x in buffer.values()]
# print(2, int(time.time() * 1000)) self.check_table(self.db_name, tb, data)
self.check_type(self.db_name, tb, data)
self.check_table(db, tb, data) self.__send(self.db_name, tb, [json.dumps(item) for item in data])
# print(3, int(time.time() * 1000))
self.check_type(db, tb, data)
# print(4, int(time.time() * 1000))
self.__send(db, tb, [json.dumps(item) for item in data])
# print(5, int(time.time() * 1000))
buffer.clear() buffer.clear()
# print(6, int(time.time() * 1000))