Compare commits

...

87 Commits

Author SHA1 Message Date
c4b287dfe7 更新 '初始化事件表.sql' 2023-02-27 10:42:30 +08:00
wuaho
9b59106f4d 追踪错误 2021-10-27 09:56:38 +08:00
wuaho
5500e8386e 改为手动提交 2021-10-26 21:36:57 +08:00
wuaho
19c64b5801 改为手动提交 2021-10-26 21:36:14 +08:00
wuaho
e33a22d1d4 改为手动提交 2021-10-26 21:14:30 +08:00
wuaho
1d6854beff 改为手动提交 2021-10-26 20:43:45 +08:00
wuaho
bf55fafe46 改为手动提交 2021-10-26 20:35:54 +08:00
wuaho
dad6c0b072 改为手动提交 2021-10-26 19:55:54 +08:00
wuaho
cf46c8b6d0 改为手动提交 2021-10-26 19:25:35 +08:00
wuaho
d639660839 改为手动提交 2021-10-26 18:29:23 +08:00
wuaho
e6331341a5 改为手动提交 2021-10-26 17:31:20 +08:00
wuaho
17571d01d3 改为手动提交 2021-10-26 17:06:40 +08:00
wuaho
25392daede 改为手动提交 2021-10-26 11:20:45 +08:00
wuaho
22fcd5e651 改为手动提交 2021-10-26 10:40:44 +08:00
wuaho
4eea6929d7 add unique id 2021-10-21 10:32:00 +08:00
wuaho
3b9ed0c98f add unique id 2021-10-21 10:17:57 +08:00
wuaho
4176aae943 add unique id 2021-10-21 10:13:21 +08:00
wuaho
debadeb774 add unique id 2021-10-21 10:09:50 +08:00
wuaho
1b32bafe44 重订单问题加日志跟踪 2021-10-21 09:46:45 +08:00
wuaho
80ec356516 重订单问题加日志跟踪 2021-10-11 19:12:24 +08:00
wuaho
f198c68b4e pipenv 2021-09-29 11:39:28 +08:00
wuaho
6abac2e9c5 优化 2021-09-26 19:07:51 +08:00
wuaho
d655407df7 优化 2021-09-23 17:21:18 +08:00
wuaho
f8a9938a77 优化 2021-09-23 16:32:11 +08:00
wuaho
d361aba1a0 优化 2021-09-23 12:34:38 +08:00
wuaho
8ae3568c15 优化 2021-09-18 10:42:26 +08:00
wuaho
7fb03eb90c 测试消费 2021-09-18 10:19:34 +08:00
wuaho
4a670de26b 停止消费 2021-09-18 10:09:18 +08:00
wuaho
75bca68aeb 1 2021-09-17 19:19:41 +08:00
wuaho
15591fd7ae 1 2021-09-17 18:11:56 +08:00
wuaho
678b1c93c0 1 2021-09-17 16:54:06 +08:00
wuaho
8f1ba1040b 1 2021-09-09 21:31:12 +08:00
wuaho
402def6734 1 2021-09-09 21:10:07 +08:00
wuaho
c81abb3006 收集事件名 2021-09-09 21:08:27 +08:00
wuaho
628f993e1b 添加密码 2021-09-07 20:35:25 +08:00
wuaho
73becdce74 修改 表引擎 和排序键 2021-09-07 16:30:47 +08:00
wuaho
6603e14434 svrindex String 2021-08-24 20:24:20 +08:00
wuaho
69060898d7 视图过滤 2021-08-18 15:42:44 +08:00
wuaho
ba3b1b0370 视图过滤 2021-08-18 15:06:16 +08:00
wuaho
28a1b31523 视图过滤 2021-08-17 15:37:07 +08:00
wuaho
8e003a4137 视图过滤 2021-08-17 15:35:16 +08:00
wuaho
346290412a 视图过滤 2021-08-17 14:33:52 +08:00
wuaho
da05dd532c 1 2021-08-16 18:07:38 +08:00
wuaho
ba9bb564dc 1 2021-08-15 15:17:48 +08:00
wuaho
53d9cd1594 1 2021-08-14 20:35:32 +08:00
wuaho
262222f6d2 1 2021-08-14 20:35:06 +08:00
wuaho
dfb489dd9c 1 2021-08-14 15:39:11 +08:00
wuaho
f44b67d585 1 2021-08-14 15:16:28 +08:00
wuaho
1213b6162c 1 2021-08-14 15:07:44 +08:00
wuaho
1814777322 1 2021-08-14 15:04:34 +08:00
wuaho
34ea378ebc 1 2021-08-14 14:55:39 +08:00
wuaho
38890d3a5c 1 2021-08-14 14:33:52 +08:00
wuaho
84c3b3442a 1 2021-08-14 13:58:47 +08:00
wuaho
1c04efd709 1 2021-08-14 12:04:03 +08:00
wuaho
83a43bf3da 1 2021-08-14 11:57:36 +08:00
wuaho
86aa16574e 1 2021-08-14 11:19:50 +08:00
wuaho
eef92b3fdc 1 2021-08-14 11:15:12 +08:00
wuaho
a6991c3421 1 2021-08-14 11:11:01 +08:00
wuaho
4e05b9f461 1 2021-08-14 11:04:23 +08:00
wuaho
0841c29438 1 2021-08-14 09:31:57 +08:00
wuaho
3ca8b92f90 1 2021-08-14 09:29:46 +08:00
wuaho
14d1ab3f0a 1 2021-08-14 09:26:23 +08:00
wuaho
9340bf8e58 1 2021-08-14 09:25:36 +08:00
wuaho
199eb8dba7 1 2021-08-14 00:35:19 +08:00
wuaho
3b3e900930 1 2021-08-14 00:32:03 +08:00
wuaho
baf68b2832 数据清理 2021-08-11 20:08:48 +08:00
wuaho
494fa8d74d 1 2021-08-03 18:24:30 +08:00
wuaho
ce0d6d717e 1 2021-08-03 17:55:20 +08:00
wuaho
79b47321c5 像素 2021-08-03 17:51:21 +08:00
wuaho
009d5f0560 停止入库信号 2021-08-02 16:54:59 +08:00
wuaho
132f6cdb5c 停止入库信号 2021-08-02 13:37:38 +08:00
wuaho
1dc9f42471 gmhdgdt,gmhdtt 2021-07-28 20:37:33 +08:00
wuaho
f96f5da864 减少批量 2021-07-28 20:19:22 +08:00
wuaho
3b1e73937d 减少批量 2021-07-28 20:17:21 +08:00
wuaho
41638d385c 减少批量 2021-07-28 19:59:41 +08:00
wuaho
e9d1bdb9b7 减少批量 2021-07-28 19:48:35 +08:00
wuaho
f078d04279 同步表结构 2021-07-28 19:32:49 +08:00
wuaho
26c57a624e 清空表 2021-07-28 17:50:16 +08:00
wuaho
d6cab58c45 初始化结构 2021-07-26 23:39:38 +08:00
wuaho
f2f4b5ed75 视图 2021-07-21 16:20:04 +08:00
wuaho
60b285b21e 视图 2021-07-21 15:32:49 +08:00
wuaho
3173404d9a 充值视图 2021-07-20 13:51:19 +08:00
wuaho
7b2dd128c2 新用户视图 2021-07-20 09:58:05 +08:00
wuaho
6063dfc47c 迁移数据库 2021-07-10 01:54:18 +08:00
wuaho
d3e0901c35 迁移数据库 2021-07-09 21:11:42 +08:00
wuaho
49f45afaaa 争霸测试 2021-07-06 10:54:30 +08:00
wuaho
d4a0e5dcfc 争霸测试 2021-07-06 10:14:07 +08:00
23 changed files with 452 additions and 142 deletions

3
.gitignore vendored
View File

@ -129,3 +129,6 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
.idea .idea
# 不同游戏单独配置
settings.py
clear_up.py

12
Pipfile
View File

@ -4,12 +4,16 @@ verify_ssl = false
name = "pypi" name = "pypi"
[packages] [packages]
kafka-python = "*" kafka-python = "2.0.2"
clickhouse-driver = "*" clickhouse-driver = "0.2.2"
pipfile = "*" pipfile = "0.0.2"
pandas = "*" pandas = "1.3.3"
redis = "==3.5.3"
loguru = "==0.5.3"
[dev-packages] [dev-packages]
[requires] [requires]
python_version = "3.8" python_version = "3.8"

60
app.py
View File

@ -1,6 +1,9 @@
# coding:utf-8
import time import time
from multiprocessing import Process from multiprocessing import Process
from kafka import TopicPartition
from settings import settings from settings import settings
from v2 import * from v2 import *
from v2.struct_cache import StructCacheFile, StructCacheRedis from v2.struct_cache import StructCacheFile, StructCacheRedis
@ -10,6 +13,7 @@ class XProcess(Process):
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None): def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
super(XProcess, self).__init__() super(XProcess, self).__init__()
# self.daemon = True
self.partition = partition self.partition = partition
self.lock = lock self.lock = lock
self.ipsearch = ipsearch self.ipsearch = ipsearch
@ -24,30 +28,64 @@ class XProcess(Process):
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr, transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
self.partition) self.partition)
transmitter.add_source(handler_event, 10000, 60) transmitter.start_ping()
transmitter.add_source(handler_user, 99, 60) transmitter.add_source(handler_event, 5000, 60)
last_ts = int(time.time()) transmitter.add_source(handler_user, 500, 60)
consumer = create_consumer(self.partition)
for topic, msg in consumer(): last_ts = int(time.time())
# print(msg) consumer, kafka_client = create_consumer(self.partition)
type_ = msg['#type']
del msg['#type'] for msg in consumer():
data = msg.value
type_ = data['#type']
del data['#type']
ts = int(time.time()) ts = int(time.time())
try:
data['properties']['unique_id'] = f'{msg.topic}-{msg.partition}-{msg.offset}'
except:
pass
if msg.topic == 'debug':
self.log.info(data)
if 'user' in type_: if 'user' in type_:
# continue # continue
obj = getattr(handler_user, type_) obj = getattr(handler_user, type_)
handler_user.receive_data.append(UserAct(obj, msg)) handler_user.receive_data.append(UserAct(obj, data))
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
last_ts = ts last_ts = ts
handler_user.execute() handler_user.execute()
elif 'track' in type_: elif 'track' in type_:
# continue # continue
if data['#event_name'] == 'pay':
self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
obj = getattr(handler_event, type_) obj = getattr(handler_event, type_)
obj(msg) obj(data)
elif type_ == settings.STOP_SIGNAL:
# continue
# 1 小时内有效
self.log.info(type_)
if data.get('#time', 0) + 3600 < int(time.time()):
continue
# 停止消费kafka
self.log.info(f'进程{self.partition} 等待90秒')
time.sleep(90)
self.log.info(f'进程{self.partition} 写入数据')
transmitter.run(kafka_client)
self.log.info(f'进程{self.partition} 结束')
kafka_client.commit()
kafka_client.close()
break
elif type_ == 'test':
self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
else: else:
continue continue
transmitter.run() transmitter.run(kafka_client)
while True:
time.sleep(5)
self.log.info(f'消费分区{self.partition} 已结束。。。')

48
clear_up.py.template Normal file
View File

@ -0,0 +1,48 @@
"""
ÇåÀí²âÊÔÊý¾Ý
"""
import json
from settings import settings
game = ''
db = settings.GAME
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
import pandas as pd
from clickhouse_driver import Client
client = Client(**settings.CK_CONFIG)
df = pd.read_json(server_list_url)
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
serverid = tuple((str(i) for i in df['serverid'].to_list()))
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
from {db}.event
where
`#event_time`>addDays(now('UTC'),-3) and (
lower(`#os`) = 'windows'
or svrindex not in {serverid}
)
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
having n = 1 limit 2000"""
data, columns = client.execute(
sql, columnar=True, with_column_types=True
)
if not data:
exit(0)
data_df = pd.DataFrame(
{col[0]: d for d, col in zip(data, columns)}
)
data_df.drop('n', axis=1, inplace=True)
data_df['sign'] = -1
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
insert_sql = f'INSERT INTO {db}.event FORMAT JSONEachRow '
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
client.execute(insert_sql)

1
create_database.sql Normal file
View File

@ -0,0 +1 @@
create database xiangsu;

View File

@ -1,3 +1,4 @@
#coding:utf-8
import os import os
import redis import redis

View File

@ -1,23 +1,37 @@
import json import json
class Config: class Config:
# ck数据库连接 # ck数据库连接
CK_CONFIG = {'host': '119.29.176.224', CK_CONFIG = {'host': '139.159.159.3',
'send_receive_timeout': 30} 'port': 9654,
'user': 'legu',
'password': 'gncPASUwpYrc'
}
# 每个游戏不一样 游戏上报 kafka 主题 # 每个游戏不一样 游戏上报 kafka 主题
SUBSCRIBE_TOPIC = 'test2' # *************
SUBSCRIBE_TOPIC = ''
KAFKA_CONSUMER_CONF = { KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads, 'value_deserializer': json.loads,
'auto_offset_reset': 'earliest', 'auto_offset_reset': 'earliest',
'enable_auto_commit': False,
# 每个游戏不一样 # 每个游戏不一样
'group_id': 'legu_group3' # *************
'group_id': ''
}
KAFKA_PRODUCER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
} }
# 游戏数据库名 # 游戏数据库名
GAME = 'shjy' # *************
GAME = ''
STOP_SIGNAL = 'stop_MntxuXMc'
REDIS_CONF = { REDIS_CONF = {
'host': '192.168.0.161', 'host': '192.168.0.161',

View File

@ -1,6 +1,8 @@
# coding:utf-8
import time import time
import redis import redis
from kafka import TopicPartition
from settings import settings from settings import settings
from v2 import * from v2 import *
@ -16,30 +18,29 @@ from v2.log import logger
rdb = redis.Redis(**settings.REDIS_CONF) rdb = redis.Redis(**settings.REDIS_CONF)
event_attr = EventAttr(rdb) event_attr = EventAttr(rdb)
partition = 0
def run(): def run():
db_client = CK(**settings.CK_CONFIG) db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client) sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr,partition)
transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_event, 1000, 10)
transmitter.add_source(handler_user, 1000, 60) transmitter.add_source(handler_user, 1000, 10)
last_ts = int(time.time()) last_ts = int(time.time())
consumer = create_consumer(-1) consumer, kafka_client = create_consumer(partition)
for topic, msg in consumer(): for msg in consumer():
# print(msg) data = msg.value
type_ = msg['#type'] type_ = data['#type']
if msg['#app_id']!='c3e0409ac18341149877b08f087db640': del data['#type']
print(msg)
del msg['#type']
ts = int(time.time()) ts = int(time.time())
if 'user' in type_: if 'user' in type_:
# continue # continue
obj = getattr(handler_user, type_) obj = getattr(handler_user, type_)
handler_user.receive_data.append(UserAct(obj, msg)) handler_user.receive_data.append(UserAct(obj, data))
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
last_ts = ts last_ts = ts
handler_user.execute() handler_user.execute()
@ -47,11 +48,32 @@ def run():
elif 'track' in type_: elif 'track' in type_:
# continue # continue
obj = getattr(handler_event, type_) obj = getattr(handler_event, type_)
obj(msg) obj(data)
elif type_ == settings.STOP_SIGNAL:
# continue
# 1 小时内有效
print(type_)
if data.get('#time', 0) + 3600 < int(time.time()):
continue
kafka_client.close()
# 停止消费kafka
print(f'进程{msg.partition} 等待90秒')
time.sleep(1)
print(f'进程{msg.partition} 写入数据')
transmitter.run(kafka_client)
print(f'进程{msg.partition} 结束')
break
elif type_ == 'test':
print(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
else: else:
continue continue
transmitter.run() transmitter.run(kafka_client)
while True:
time.sleep(5)
print(f'消费分区{partition} 已结束。。。')
if __name__ == '__main__': if __name__ == '__main__':

13
sync_event_name.py Normal file
View File

@ -0,0 +1,13 @@
import redis
from clickhouse_driver import Client
from settings import settings
rdb = redis.Redis(**settings.REDIS_CONF)
client = Client(**settings.CK_CONFIG)
sql = f"""select distinct `#event_name` as v from {settings.GAME}.event"""
df = client.query_dataframe(sql)
data = df['v'].to_list()
rdb.sadd(f'{settings.GAME}_event_set', *data)

View File

@ -0,0 +1,42 @@
# coding:utf-8
"""
更新事件表视图
"""
import json
import os
from settings import settings
game = ''
db = settings.GAME
svrid_file = f'{game}_svrid.json'
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
import pandas as pd
from clickhouse_driver import Client
client = Client(**settings.CK_CONFIG)
df = pd.read_json(server_list_url)
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
serverid = tuple((str(i) for i in df['serverid'].to_list()))
# if os.path.exists(svrid_file):
# with open(svrid_file, 'r') as f:
# old_svrid = json.load(f)
# if set(old_svrid) == set(serverid):
# exit(0)
sql = f"""drop table if exists {db}.event_view"""
res1 = client.execute(sql)
# 筛选有效数据
sql = f"""create view {db}.event_view as
select *
from {db}.event
where (`#os`is null or lower(`#os`) != 'windows')
and svrindex in {serverid}
and not startsWith(`orderid`,'debugPay')
"""
res2 = client.execute(sql)
with open(svrid_file, 'w') as f:
json.dump(sorted(serverid), f)

View File

@ -1,4 +1,5 @@
create view shjy.user_view as select * drop table if exists xiangsu.user_view;
from shjy.user create view xiangsu.user_view as select *
from xiangsu.user
order by `#reg_time` desc order by `#reg_time` desc
LIMIT 1 by `#account_id` LIMIT 1 by `#account_id`

View File

@ -6,17 +6,19 @@ from settings import settings
__all__ = 'create_consumer', __all__ = 'create_consumer',
def create_consumer(partition=-1): def create_consumer(partition: int = -1):
def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
if partition > 0:
def consumer():
if partition > -1:
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
else: else:
c.subscribe([settings.SUBSCRIBE_TOPIC]) c.subscribe([settings.SUBSCRIBE_TOPIC])
for msg in c: for msg in c:
# print(msg) # print(msg)
topic = msg.topic yield msg
val = msg.value # topic = msg.topic
yield topic, val # val = msg.value
# yield topic, val
return consumer return consumer, c

View File

@ -1,5 +1,6 @@
__all__ = 'CK', __all__ = 'CK',
import threading
import traceback import traceback
import pandas as pd import pandas as pd
@ -15,12 +16,20 @@ class CK:
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
self.__client = self.__create_client() self.__client = self.__create_client()
self.look = threading.Lock()
def __create_client(self): def __create_client(self):
return Client(*self.args, **self.kwargs) return Client(*self.args, **self.kwargs)
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
return self.__client.execute(*args, **kwargs) try:
self.look.acquire(timeout=10)
res = self.__client.execute(*args, **kwargs)
except Exception as e:
raise e
finally:
self.look.release()
return res
def get_one(self, db, tb, try_cnt=3, **where): def get_one(self, db, tb, try_cnt=3, **where):
@ -30,7 +39,7 @@ class CK:
sql += ' limit 1' sql += ' limit 1'
data = None data = None
try: try:
data, columns = self.__client.execute(sql, with_column_types=True) data, columns = self.execute(sql, with_column_types=True)
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
self.__client.disconnect() self.__client.disconnect()
@ -62,8 +71,10 @@ class CK:
sql += where sql += where
data = None data = None
try: try:
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True) data, columns = self.execute(sql, columnar=True, with_column_types=True)
except Exception as e: except Exception as e:
print('*' * 50)
print(sql)
traceback.print_exc() traceback.print_exc()
if e.code == 60: if e.code == 60:
return self.get_all(db, 'user', where, try_cnt - 1) return self.get_all(db, 'user', where, try_cnt - 1)

View File

@ -7,26 +7,12 @@ class EventAttr:
def __init__(self, rdb: Redis): def __init__(self, rdb: Redis):
self.rdb = rdb self.rdb = rdb
def get_event_attr(self, key): def set_event_name(self, key, *data):
attr = self.event_attr.get(key)
if not attr:
self.event_attr[key] = self.rdb.smembers(key) or set()
return self.event_attr[key]
def set_event_attr(self, key, *data):
self.rdb.sadd(key, *data) self.rdb.sadd(key, *data)
self.event_attr[key] = data
def check_attr(self, db, data): def add_event(self, db, data):
event_name = data.get('#event_name') event_name = data.get('#event_name')
if not event_name: if not event_name:
return return
key = f'{db}_event_set'
key = f'{db}_event_{event_name}' self.set_event_name(key, event_name)
attr = self.get_event_attr(key)
data_attr = set(data)
extra_attr = data_attr - attr
if extra_attr:
self.set_event_attr(key, *extra_attr)

View File

@ -1,5 +1,4 @@
import copy from settings import settings
from .valid_data import * from .valid_data import *
@ -9,6 +8,7 @@ class Sketch:
self.struct_cache = struct_cache self.struct_cache = struct_cache
self.__type_dict = dict() self.__type_dict = dict()
self.__struct_dict = dict() self.__struct_dict = dict()
self.init_tb_struct()
@property @property
def type_dict(self): def type_dict(self):
@ -30,20 +30,30 @@ class Sketch:
if self.struct_cache: if self.struct_cache:
self.struct_cache.update(db, tb, data) self.struct_cache.update(db, tb, data)
def init_tb_struct_cache(self, db, tb):
sql = f"select name,type from system.columns where database='{db}' and table='{tb}'"
data, columns = self.db_client.execute(sql, with_column_types=True, columnar=True)
res = {k: v for k, v in zip(data[0], data[1])}
self.__struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res
def init_tb_struct(self):
self.init_tb_struct_cache(settings.GAME, 'event')
self.init_tb_struct_cache(settings.GAME, 'user')
def get_tb_struct_cache(self, db, tb): def get_tb_struct_cache(self, db, tb):
""" """
查一条记录 取字段 和类型 取字段 和类型
:param db: :param db:
:param tb: :param tb:
:return: :return:
""" """
if self.__struct_dict.get(f'{db}_{tb}'): if self.__struct_dict.get(f'{db}_{tb}'):
return self.__struct_dict.get(f'{db}_{tb}') return self.__struct_dict.get(f'{db}_{tb}')
sql = f'select * from {db}.{tb} limit 1'
_, columns = self.db_client.execute(sql, with_column_types=True) res = self.init_tb_struct_cache(db, tb)
res = {item[0]: item[1] for item in columns}
self.__struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res return res
def update_user_view(self, db, tb): def update_user_view(self, db, tb):
@ -64,7 +74,7 @@ class Sketch:
LIMIT 1 by `#account_id`""" LIMIT 1 by `#account_id`"""
self.db_client.execute(sql) self.db_client.execute(sql)
def alter_table(self, db, tb, data): def alter_table(self, db, tb, data, try_cnt=10):
""" """
数据库字段检查 数据库字段检查
添加新字段为第一次出现类型 添加新字段为第一次出现类型
@ -88,7 +98,7 @@ class Sketch:
default_field[k] = 'Nullable(String)' default_field[k] = 'Nullable(String)'
if isinstance(v, int): if isinstance(v, int):
default_field[k] = 'Nullable(UInt64)' default_field[k] = 'Nullable(Int64)'
if isinstance(v, float): if isinstance(v, float):
default_field[k] = 'Nullable(Float32)' default_field[k] = 'Nullable(Float32)'
@ -99,13 +109,18 @@ class Sketch:
if isinstance(v, bool): if isinstance(v, bool):
default_field[k] = 'Nullable(UInt8)' default_field[k] = 'Nullable(UInt8)'
try:
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}' sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
print(sql) print(sql)
try:
self.db_client.execute(sql) self.db_client.execute(sql)
except Exception as e: except Exception as e:
print(f'添加字段 {k} 失败') print(f'添加字段 {k} 失败,同步数据库表结构')
# 读取数据库表结构并设置
self.init_tb_struct()
default_field.pop(k) default_field.pop(k)
if try_cnt < 0:
raise e
return self.alter_table(db, tb, data, try_cnt=try_cnt - 1)
if set(default_field) - keys: if set(default_field) - keys:
self.up_tb_struct(db, tb, default_field) self.up_tb_struct(db, tb, default_field)

View File

@ -1,14 +1,41 @@
import json import json
import os import os
import re import re
import threading
import time import time
import traceback import traceback
from settings import settings
from .valid_data import * from .valid_data import *
__all__ = 'Transmitter', __all__ = 'Transmitter',
class Ping(threading.Thread):
def __init__(self, db_client, p, log):
threading.Thread.__init__(self)
self.daemon = True
self.ping_ts = 0
self.time_out = 60
self.log = log
self.db_client = db_client
self.p = p
def run(self):
while True:
time.sleep(10)
ts = int(time.time())
if self.ping_ts + self.time_out < ts:
# 保持连接
try:
self.ping_ts = ts
self.log.info(f'保持连接{self.p} ping')
self.db_client.execute('select 1')
except:
self.log.error('ping error')
class Transmitter: class Transmitter:
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0): def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
self.db_client = db_client self.db_client = db_client
@ -20,12 +47,16 @@ class Transmitter:
self.event_attr = event_attr self.event_attr = event_attr
self.p = p self.p = p
def start_ping(self):
t = Ping(self.db_client, self.p, self.log)
t.start()
def add_source(self, handler, bulk_max=1000, time_out=60): def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
def check_send(self): def check_send(self):
for h, p in self.slots.items():
ts = int(time.time()) ts = int(time.time())
for h, p in self.slots.items():
tb, buffer = h.buffer_pool tb, buffer = h.buffer_pool
buffer_size = len(buffer) buffer_size = len(buffer)
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
@ -51,9 +82,11 @@ class Transmitter:
self.db_client.execute(sql) self.db_client.execute(sql)
self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}') self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}')
except Exception as e: except Exception as e:
self.log.error(traceback.format_exc())
# 丢弃错误行 再次发送 # 丢弃错误行 再次发送
if hasattr(e, 'code') and e.code == 26: if hasattr(e, 'code') and e.code == 26:
m = re.match('(.*)?Stack trace', e.message) m = re.match('(.*)?Stack trace', e.message)
self.log.error(data)
if m: if m:
error_msg = m.group(1) error_msg = m.group(1)
error_row = re.match('.*?errors out of (\d+) rows', error_msg) error_row = re.match('.*?errors out of (\d+) rows', error_msg)
@ -75,10 +108,11 @@ class Transmitter:
def check_table(self, db, tb, data): def check_table(self, db, tb, data):
[self.sketch.alter_table(db, tb, item) for item in data] [self.sketch.alter_table(db, tb, item) for item in data]
def set_event_attr(self, db, tb, data): def collect_event(self, db, tb, data):
if tb != 'event': if tb != 'event':
return return
[self.event_attr.check_attr(db, item) for item in data]
[self.event_attr.add_event(db, item) for item in data]
def check_type(self, db, tb, data): def check_type(self, db, tb, data):
struct_dict = self.sketch.struct_dict[f'{db}_{tb}'] struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
@ -96,14 +130,19 @@ class Transmitter:
for key in del_keys: for key in del_keys:
del item[key] del item[key]
def run(self): def run(self, kafka_client):
for tb, buffer in self.check_send(): for tb, buffer in self.check_send():
try: try:
data = [self.flat_data(x) for x in buffer.values()] data = [self.flat_data(x) for x in buffer.values()]
self.check_table(self.db_name, tb, data) self.check_table(self.db_name, tb, data)
self.check_type(self.db_name, tb, data) self.check_type(self.db_name, tb, data)
self.set_event_attr(self.db_name, tb, data) self.collect_event(self.db_name, tb, data)
self.__send(self.db_name, tb, [json.dumps(item) for item in data]) self.__send(self.db_name, tb, [json.dumps(item) for item in data])
except Exception as e: except Exception as e:
self.log.error(e) self.log.error(traceback.format_exc())
self.log.error(data)
buffer.clear() buffer.clear()
try:
kafka_client.commit()
except Exception as e:
self.log.error(f'进程:{self.p} error:{e}')

View File

@ -26,8 +26,9 @@ def is_valid_int(v, **kwargs):
def is_valid_srt(v, **kwargs): def is_valid_srt(v, **kwargs):
if isinstance(v, str): try:
return v return str(v)
except:
return None return None
@ -39,14 +40,16 @@ def is_valid_float(v, **kwargs):
def is_valid_bool(v, **kwargs): def is_valid_bool(v, **kwargs):
if isinstance(v, bool): try:
return v return bool(v)
except:
return None return None
def is_valid_array(v, **kwargs): def is_valid_array(v, **kwargs):
if isinstance(v, list): try:
return [str(i) for i in v] return [str(i) for i in v]
except:
return None return None

13
充值视图.sql Normal file
View File

@ -0,0 +1,13 @@
drop table if exists xiangsu.recharge_game;
create view xiangsu.recharge_game as
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
`#os`,
`#bundle_id`,
owner_name,
channel,
arrayDistinct(groupArray(binduid)) as account,
length(account) as account_num,
sum(money) as money
from xiangsu.event
where `#event_name` = 'rechargeGame'
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel

View File

@ -1,12 +1,10 @@
create table shjy.event -- auto-generated definition
create table event
( (
`#ip` Nullable(IPv4), `#ip` Nullable(IPv4),
`#country` Nullable(String), `#country` Nullable(String),
`#country_code` Nullable(String),
`#province` Nullable(String), `#province` Nullable(String),
`#city` Nullable(String), `#city` Nullable(String),
`#os_version` Nullable(String),
`#manufacturer` Nullable(String),
`#os` Nullable(String), `#os` Nullable(String),
`#device_id` Nullable(String), `#device_id` Nullable(String),
`#screen_height` Nullable(UInt16), `#screen_height` Nullable(UInt16),
@ -14,41 +12,50 @@ create table shjy.event
`#device_model` Nullable(String), `#device_model` Nullable(String),
`#app_version` Nullable(String), `#app_version` Nullable(String),
`#bundle_id` Nullable(String), `#bundle_id` Nullable(String),
`#lib` Nullable(String), `#app_name` Nullable(String),
`#lib_version` Nullable(String), `#game_version` Nullable(String),
`#os_version` Nullable(String),
`#network_type` Nullable(String), `#network_type` Nullable(String),
`#carrier` Nullable(String), `#carrier` Nullable(String),
`#browser` Nullable(String), `#manufacturer` Nullable(String),
`#browser_version` Nullable(String), `#app_id` Nullable(String),
`#duration` Nullable(String),
`#url` Nullable(String),
`#url_path` Nullable(String),
`#referrer` Nullable(String),
`#referrer_host` Nullable(String),
`#title` Nullable(String),
`#screen_name` Nullable(String),
`#element_id` Nullable(String),
`#element_type` Nullable(String),
`#resume_from_background` Nullable(String),
`#element_selector` Nullable(String),
`#element_position` Nullable(String),
`#element_content` Nullable(String),
`#scene` Nullable(String),
`#mp_platform` Nullable(String),
`#app_crashed_reason` Nullable(String),
`#zone_offset` Int8 default 8,
`#event_id` String,
`#event_time` DateTime('UTC'),
`#account_id` String, `#account_id` String,
`#distinct_id` Nullable(String), `#distinct_id` Nullable(String),
binduid Nullable(String),
channel Nullable(String),
owner_name String default '',
role_name Nullable(String),
exp Nullable(UInt64),
zhanli Nullable(UInt64),
maxmapid Nullable(UInt16),
mapid Nullable(UInt16),
ghid Nullable(String),
rmbmoney Nullable(UInt64),
jinbi Nullable(UInt64),
svrindex Nullable(String),
lv Nullable(UInt16),
vip Nullable(UInt16),
game Nullable(String),
`#zone_offset` Int8 default 8,
`#event_time` DateTime('UTC'),
`#event_name` String, `#event_name` String,
`#server_time` DateTime('UTC') default now(), `#server_time` DateTime('UTC') default now(),
unitPrice Nullable(UInt32),
money Nullable(String),
islishishouci Nullable(UInt8),
isdangrishouci Nullable(UInt8),
is_today_reg Nullable(UInt8),
orderid Nullable(String),
proid Nullable(String),
step_id Nullable(UInt16),
step_group Nullable(UInt16),
guide_start_time Nullable(UInt32),
online_ts Nullable(UInt16),
`#time` Nullable(DateTime('UTC'))
)
engine = ReplacingMergeTree PARTITION BY toYYYYMMDD(`#event_time`)
ORDER BY (owner_name, `#event_name`, `#event_time`, `#account_id`)
SETTINGS index_granularity = 8192;
`sign` Int8 default 1
) ENGINE = CollapsingMergeTree(sign)
PARTITION BY toYYYYMMDD(`#event_time`)
order by (`#account_id`, `#event_time`, `#event_name`)
-- TTL event_time + toIntervalDay(365)

View File

@ -1,9 +1,10 @@
create table shjy.user drop table if exists xiangsu.user;
create table xiangsu.user
( (
`#reg_time` DateTime('UTC'), `#reg_time` DateTime('UTC'),
`#account_id` String, `#account_id` String,
`svrindex` UInt16, `svrindex` String,
`#zone_offset` Int8 default 8, `#zone_offset` Int8 default 8,
`#server_time` DateTime('UTC') default now() `#server_time` DateTime('UTC') default now()

12
新用户视图.sql Normal file
View File

@ -0,0 +1,12 @@
drop table if exists xiangsu.new_account;
create view xiangsu.new_account as
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
`#os`,
`#bundle_id`,
owner_name,
channel,
groupArray(`binduid`) as account,
length(account) as num
from xiangsu.event
where role_idx = 1
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel

23
新账号付费.sql Normal file
View File

@ -0,0 +1,23 @@
drop table if exists xiangsu.new_account_recharge;
create view xiangsu.new_account_recharge as (select date,
`#os`,
`#bundle_id`,
owner_name,
channel,
uniqExact(binduid) as accout_num,
sum(money) as money
from (select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
`#os`,
`#bundle_id`,
owner_name,
channel,
binduid,
money
from xiangsu.event
where `#event_name` = 'rechargeGame') as tb1
right join (select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
binduid
from xiangsu.event
where role_idx = 1) as tb2
on tb1.date = tb2.date and tb2.binduid = tb1.binduid
group by date, `#os`, `#bundle_id`, owner_name, channel)

11
活跃账号视图.sql Normal file
View File

@ -0,0 +1,11 @@
drop table if exists xiangsu.active_account;
create view xiangsu.active_account as
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
`#os`,
`#bundle_id`,
owner_name,
channel,
uniqCombined(binduid) as num,
arrayDistinct(groupArray(binduid)) as account
from xiangsu.event
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel