This commit is contained in:
wuaho 2021-05-08 18:24:31 +08:00
parent de668a32be
commit 9aa46f12a8
8 changed files with 153 additions and 85 deletions

48
app.py Normal file
View File

@ -0,0 +1,48 @@
import time
from multiprocessing import Process
from settings import settings
from v2 import *
class XProcess(Process):
def __init__(self, partition, lock):
super(XProcess, self).__init__()
self.partition = partition
self.lock = lock
def run(self):
db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME)
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.lock)
transmitter.add_source(handler_event, 10000, 60)
transmitter.add_source(handler_user, 1000, 60)
last_ts = int(time.time())
consumer = create_consumer(self.partition)
for topic, msg in consumer():
# print(msg)
type_ = msg['#type']
del msg['#type']
ts = int(time.time())
if 'user' in type_:
# continue
obj = getattr(handler_user, type_)
handler_user.receive_data.append(UserAct(obj, msg))
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
last_ts = ts
handler_user.execute()
elif 'track' in type_:
# continue
obj = getattr(handler_event, type_)
obj(msg)
else:
continue
transmitter.run()

49
main.py
View File

@ -1,46 +1,7 @@
import time
from settings import settings
from v2 import *
db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME)
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch)
def run():
transmitter.add_source(handler_event, 1000, 60)
transmitter.add_source(handler_user, 500, 60)
i = 0
ts = time.time() * 1000
for topic, msg in consumer():
i += 1
if i > 10000:
print(time.time() * 1000 - ts)
ts = time.time() * 1000
i = 0
type_ = msg['#type']
del msg['#type']
if 'user' in type_:
# continue
obj = getattr(handler_user, type_)
handler_user.receive_data.append(UserAct(obj, msg))
if len(handler_user.receive_data) >= 1000:
handler_user.execute()
elif 'track' in type_:
# continue
obj = getattr(handler_event, type_)
obj(msg)
else:
continue
transmitter.run()
from app import XProcess
from multiprocessing import Lock
if __name__ == '__main__':
run()
lock = Lock()
for i in range(0, 16):
XProcess(i, lock).start()

View File

@ -3,14 +3,15 @@ import json
class Config:
CK_CONFIG = {'host': '119.29.176.224',
'send_receive_timeout': 3}
'send_receive_timeout': 30}
SUBSCRIBE_TOPIC = ['test', 'test2']
SUBSCRIBE_TOPIC = 'test2'
KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads,
'group_id': 'legu_group'
'auto_offset_reset': 'earliest',
'group_id': 'legu_group3'
}
GAME = 'shjy'

View File

@ -1,13 +1,22 @@
from kafka import KafkaConsumer
from kafka import TopicPartition
from settings import settings
__all__ = 'consumer',
__all__ = 'create_consumer',
def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
c.subscribe(settings.SUBSCRIBE_TOPIC)
for msg in c:
topic = msg.topic
val = msg.value
yield topic, val
def create_consumer(partition=-1):
def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
if partition > 0:
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
else:
c.subscribe([settings.SUBSCRIBE_TOPIC])
for msg in c:
# print(msg)
topic = msg.topic
val = msg.value
yield topic, val
return consumer

View File

@ -1,5 +1,7 @@
__all__ = 'CK',
import traceback
import pandas as pd
from datetime import datetime
from datetime import timedelta
@ -8,23 +10,35 @@ from clickhouse_driver import Client
from pandas import DatetimeTZDtype
class CK(Client):
class CK:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = args
self.kwargs = kwargs
self.__client = self.__create_client()
def __create_client(self):
return Client(*self.args, **self.kwargs)
def execute(self, *args, **kwargs):
return self.__client.execute(*args, **kwargs)
def get_one(self, db, tb, try_cnt=3, **where):
def get_one(self, db, tb, **where) -> dict:
"""
注意 还原时区
:param db:
:param tb:
:param where:
:return:
"""
sql = f"select * from {db}.{tb} where 1"
for k, v in where.items():
sql += f" and `{k}`='{v}'"
sql += ' limit 1'
data, columns = self.execute(sql, with_column_types=True)
data = None
try:
data, columns = self.__client.execute(sql, with_column_types=True)
except Exception as e:
traceback.print_exc()
self.__client.disconnect()
self.__client = self.__create_client()
if try_cnt > 0:
self.get_one(db, tb, try_cnt - 1, **where)
else:
return None
res = dict()
if data:
data = {k[0]: v for k, v in zip(columns, data[0])}
@ -34,8 +48,9 @@ class CK(Client):
else:
res[k] = v
return res
return None
def get_all(self, db, tb, where: str) -> dict:
def get_all(self, db, tb, where: str, try_cnt=3):
"""
注意 还原时区
:param db:
@ -45,7 +60,23 @@ class CK(Client):
"""
sql = f"select * from {db}.{tb} where "
sql += where
data, columns = self.execute(sql, columnar=True, with_column_types=True)
data = None
try:
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True)
except Exception as e:
traceback.print_exc()
self.__client.disconnect()
self.__client = self.__create_client()
if try_cnt > 0:
self.get_all(db, tb, where, try_cnt - 1)
# 异常导致导致 避免认为用户不存在
if data is None:
return None
if not data:
return dict()
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
tz = df['#zone_offset'].apply(lambda x: timedelta(hours=x))
for t_type in df.select_dtypes(include=[DatetimeTZDtype]):

View File

@ -27,6 +27,9 @@ class HandlerUser:
def get_users(self, account_ids: set):
where = f'`#account_id` in {tuple(account_ids)}'
res = self.db_client.get_all(self.db_name, 'user_view', where)
if res is None:
print('ck查询出错了')
return
for item in res.values():
self.users[item['#account_id']] = item
@ -35,13 +38,14 @@ class HandlerUser:
if user:
return user
user = self.db_client.get_one(self.db_name, f'{self.tb}_view', **{'#account_id': account_id})
if user:
self.users[account_id] = user
return user
# user = self.db_client.get_one(self.db_name, f'{self.tb}_view', **{'#account_id': account_id})
# if user:
# self.users[account_id] = user
# return user
if not isinstance(data, dict):
return
user = dict()
data['#reg_time'] = data['#time']
self.merge(user, data)
self.users[account_id] = user
return user

View File

@ -1,6 +1,8 @@
import json
import os
import re
import time
import traceback
from .valid_data import *
@ -8,23 +10,23 @@ __all__ = 'Transmitter',
class Transmitter:
def __init__(self, db_client, db_name, sketch):
def __init__(self, db_client, db_name, sketch, lock):
self.db_client = db_client
self.db_name = db_name
self.sketch = sketch
self.ts = int(time.time())
self.slots = dict()
self.lock = lock
def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out}
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
def check_send(self):
for h, p in self.slots.items():
ts = int(time.time())
tb, buffer = h.buffer_pool
buffer_size = len(buffer)
if (self.ts + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
self.ts = ts
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
p['ts'] = ts
yield tb, buffer
@staticmethod
@ -39,11 +41,15 @@ class Transmitter:
sql = sql + '\n'.join(data)
try:
# 允许20%错误率
# self.lock.acquire()
ts = int(time.time() * 1000)
# print(f'{ts} {os.getpid()} 获得锁')
self.db_client.execute('set input_format_allow_errors_ratio=0.2')
self.db_client.execute(sql)
print(f'写入耗时 {int(time.time() * 1000) - ts}')
except Exception as e:
# 丢弃错误行 再次发送
if e.code == 26:
if hasattr(e, 'code') and e.code == 26:
m = re.match('(.*)?Stack trace', e.message)
if m:
error_msg = m.group(1)
@ -52,9 +58,15 @@ class Transmitter:
error_row = int(error_row.group(1)) - 1
error_data = data.pop(error_row)
self.__send(db, tb, data)
else:
else:
# pass
traceback.print_exc()
finally:
pass
# print(f'{db}.{tb}插入{len(data)}条')
# print(f' {os.getpid()} 释放锁')
# self.lock.release()
print(f'{db}.{tb}插入{len(data)}')
def check_table(self, db, tb, data):
[self.sketch.alter_table(db, tb, item) for item in data]
@ -77,8 +89,11 @@ class Transmitter:
def run(self):
for tb, buffer in self.check_send():
data = [self.flat_data(x) for x in buffer.values()]
self.check_table(self.db_name, tb, data)
self.check_type(self.db_name, tb, data)
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
try:
data = [self.flat_data(x) for x in buffer.values()]
self.check_table(self.db_name, tb, data)
self.check_type(self.db_name, tb, data)
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
except:
pass
buffer.clear()

View File

@ -52,7 +52,6 @@ def is_valid_array(v, **kwargs):
def is_valid_ipv4(v, **kwargs):
try:
return v
return str(IPv4Address(v))
except:
return None