批量用户查询

This commit is contained in:
wuaho 2021-04-26 15:40:02 +08:00
parent 9be70d5411
commit 7334ad9169
8 changed files with 113 additions and 16 deletions

25
main.py
View File

@ -1,32 +1,45 @@
import time
from settings import settings
from v2 import *
db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client)
handler_user = HandlerUser(db_client)
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, sketch)
def run():
transmitter.add_source(handler_event, 1000, 60)
transmitter.add_source(handler_user, 100, 60)
transmitter.add_source(handler_user, 500, 60)
i = 0
ts = time.time() * 1000
for topic, msg in consumer():
# print(msg)
i += 1
if i > 10000:
print(time.time() * 1000-ts)
ts = time.time() * 1000
i = 0
type_ = msg['#type']
del msg['#type']
db = settings.APPID_TO_CKDB.get(msg['#app_id'])
if 'user' in type_:
# continue
obj = getattr(handler_user, type_)
handler_user.receive_data.append(User(obj, db, msg))
if len(handler_user.receive_data) >= 1000:
handler_user.execute()
elif 'track' in type_:
# continue
obj = getattr(handler_event, type_)
obj(db, msg)
else:
continue
del msg['#type']
obj(db, msg)
transmitter.run()

View File

@ -5,12 +5,13 @@ class Config:
CK_CONFIG = {'host': '119.29.176.224',
'send_receive_timeout': 3}
SUBSCRIBE_TOPIC = ['test','test2']
SUBSCRIBE_TOPIC = ['test', 'test2']
KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads,
'group_id': 'legu_group'
# 'group_id': 'legu_group'
'group_id': 'ta2legu'
}
TOPIC_TO_LEGU = {
@ -18,6 +19,8 @@ class Config:
'c3e0409ac18341149877b08f087db640': 'legu_test'
}
GAME = 'shjy'
APPID_TO_CKDB = {
'a77703e24e6643d08b74a4163a14f74c': 'shjy',
'c3e0409ac18341149877b08f087db640': 'shjy'

View File

@ -1,9 +1,13 @@
__all__ = 'CK',
import pandas as pd
import numpy as np
from datetime import datetime
from datetime import timedelta
from clickhouse_driver import Client
from pandas import DatetimeTZDtype
from pandas import Timedelta
class CK(Client):
@ -32,3 +36,21 @@ class CK(Client):
else:
res[k] = v
return res
def get_all(self, db, tb, where: str) -> dict:
"""
注意 还原时区
:param db:
:param tb:
:param where:
:return:
"""
sql = f"select * from {db}.{tb} where "
sql += where
data, columns = self.execute(sql, columnar=True, with_column_types=True)
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
tz = df['#zone_offset'].apply(lambda x: timedelta(hours=x))
for t_type in df.select_dtypes(include=[DatetimeTZDtype]):
df[t_type] = (df[t_type] + tz).apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
return df.T.to_dict()

View File

@ -6,9 +6,10 @@ __all__ = 'HandlerEvent',
class HandlerEvent:
tb = 'event'
def __init__(self, db_client):
def __init__(self, db_client,db_name):
self.event = dict()
self.db_client = db_client
self.db_name = db_name
def merge_update(self, a: dict, b: dict):
"""

View File

@ -1,13 +1,35 @@
__all__ = 'HandlerUser',
from collections import namedtuple
__all__ = 'HandlerUser', 'User'
User = namedtuple('User', ['obj', 'db', 'msg'])
class HandlerUser:
tb = 'user'
user_key = '#account_id'
def __init__(self, db_client):
def __init__(self, db_client, db_name):
self.users = dict()
self.db_client = db_client
self.receive_data = []
self.db_name = db_name
def execute(self):
account_ids = set(item.msg.get('#account_id') for item in self.receive_data) - set(
self.users.setdefault(self.db_name, {}))
if not account_ids:
return
self.get_users(account_ids)
for item in self.receive_data:
item.obj(item.db, item.msg)
self.receive_data.clear()
def get_users(self, account_ids: set):
where = f'`#account_id` in {tuple(account_ids)}'
res = self.db_client.get_all(self.db_name, 'user_view', where)
for item in res.values():
self.users.setdefault(self.db_name, {}).setdefault(item['#account_id'], item)
def get_user(self, db, account_id, data=None):
user = self.users.get(db, {}).get(account_id)

View File

@ -106,8 +106,7 @@ class Sketch:
except Exception as e:
print(f'添加字段 {k} 失败')
default_field.pop(k)
else:
self.update_user_view(db, tb)
if set(default_field) - keys:
self.up_tb_struct(db, tb, default_field)
self.update_user_view(db, tb)

View File

@ -1,5 +1,6 @@
import json
import re
import time
from .valid_data import *
@ -51,16 +52,25 @@ class Transmitter:
error_data = data.pop(error_row)
self.__send(db, tb, data)
else:
print(f'{db}.{tb}插入{len(data)}')
pass
# print(f'{db}.{tb}插入{len(data)}条')
def check_table(self, db, tb, data):
[self.sketch.alter_table(db, tb, item) for item in data]
def check_type(self, db, tb, data):
# import cProfile, pstats
# from io import StringIO
#
# pr = cProfile.Profile()
# pr.enable()
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
for item in data:
del_keys = set()
for k, v in item.items():
if v is None:
del_keys.add(k)
continue
type_ = struct_dict[k]
item[k] = TYPE_CK2PY[type_](v, **item)
if v is None:
@ -69,10 +79,31 @@ class Transmitter:
for key in del_keys:
del item[key]
# pr.disable()
# s = StringIO()
# ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
# ps.print_stats()
# print(s.getvalue())
def run(self):
for db, tb, buffer in self.check_send():
# print('*' * 50)
# print(1, int(time.time() * 1000))
data = [self.flat_data(x) for x in buffer.values()]
# print(2, int(time.time() * 1000))
self.check_table(db, tb, data)
# print(3, int(time.time() * 1000))
self.check_type(db, tb, data)
# print(4, int(time.time() * 1000))
self.__send(db, tb, [json.dumps(item) for item in data])
# print(5, int(time.time() * 1000))
buffer.clear()
# print(6, int(time.time() * 1000))

View File

@ -1,4 +1,3 @@
import time
from datetime import datetime
from datetime import timedelta
from ipaddress import IPv4Address
@ -6,7 +5,13 @@ from ipaddress import IPv4Address
def is_valid_date(v, **kwargs):
try:
date = datetime.strptime(v, "%Y-%m-%d %H:%M:%S")
date = datetime(int(v[:4]),
int(v[5:7]),
int(v[8:10]),
int(v[11:13]),
int(v[14:16]),
int(v[17:])
)
zone_offset = kwargs.get('#zone_offset', 8)
return (date - timedelta(hours=zone_offset)).strftime("%Y-%m-%d %H:%M:%S")
except:
@ -47,6 +52,7 @@ def is_valid_array(v, **kwargs):
def is_valid_ipv4(v, **kwargs):
try:
return v
return str(IPv4Address(v))
except:
return None