Compare commits
87 Commits
Author | SHA1 | Date | |
---|---|---|---|
c4b287dfe7 | |||
![]() |
9b59106f4d | ||
![]() |
5500e8386e | ||
![]() |
19c64b5801 | ||
![]() |
e33a22d1d4 | ||
![]() |
1d6854beff | ||
![]() |
bf55fafe46 | ||
![]() |
dad6c0b072 | ||
![]() |
cf46c8b6d0 | ||
![]() |
d639660839 | ||
![]() |
e6331341a5 | ||
![]() |
17571d01d3 | ||
![]() |
25392daede | ||
![]() |
22fcd5e651 | ||
![]() |
4eea6929d7 | ||
![]() |
3b9ed0c98f | ||
![]() |
4176aae943 | ||
![]() |
debadeb774 | ||
![]() |
1b32bafe44 | ||
![]() |
80ec356516 | ||
![]() |
f198c68b4e | ||
![]() |
6abac2e9c5 | ||
![]() |
d655407df7 | ||
![]() |
f8a9938a77 | ||
![]() |
d361aba1a0 | ||
![]() |
8ae3568c15 | ||
![]() |
7fb03eb90c | ||
![]() |
4a670de26b | ||
![]() |
75bca68aeb | ||
![]() |
15591fd7ae | ||
![]() |
678b1c93c0 | ||
![]() |
8f1ba1040b | ||
![]() |
402def6734 | ||
![]() |
c81abb3006 | ||
![]() |
628f993e1b | ||
![]() |
73becdce74 | ||
![]() |
6603e14434 | ||
![]() |
69060898d7 | ||
![]() |
ba3b1b0370 | ||
![]() |
28a1b31523 | ||
![]() |
8e003a4137 | ||
![]() |
346290412a | ||
![]() |
da05dd532c | ||
![]() |
ba9bb564dc | ||
![]() |
53d9cd1594 | ||
![]() |
262222f6d2 | ||
![]() |
dfb489dd9c | ||
![]() |
f44b67d585 | ||
![]() |
1213b6162c | ||
![]() |
1814777322 | ||
![]() |
34ea378ebc | ||
![]() |
38890d3a5c | ||
![]() |
84c3b3442a | ||
![]() |
1c04efd709 | ||
![]() |
83a43bf3da | ||
![]() |
86aa16574e | ||
![]() |
eef92b3fdc | ||
![]() |
a6991c3421 | ||
![]() |
4e05b9f461 | ||
![]() |
0841c29438 | ||
![]() |
3ca8b92f90 | ||
![]() |
14d1ab3f0a | ||
![]() |
9340bf8e58 | ||
![]() |
199eb8dba7 | ||
![]() |
3b3e900930 | ||
![]() |
baf68b2832 | ||
![]() |
494fa8d74d | ||
![]() |
ce0d6d717e | ||
![]() |
79b47321c5 | ||
![]() |
009d5f0560 | ||
![]() |
132f6cdb5c | ||
![]() |
1dc9f42471 | ||
![]() |
f96f5da864 | ||
![]() |
3b1e73937d | ||
![]() |
41638d385c | ||
![]() |
e9d1bdb9b7 | ||
![]() |
f078d04279 | ||
![]() |
26c57a624e | ||
![]() |
d6cab58c45 | ||
![]() |
f2f4b5ed75 | ||
![]() |
60b285b21e | ||
![]() |
3173404d9a | ||
![]() |
7b2dd128c2 | ||
![]() |
6063dfc47c | ||
![]() |
d3e0901c35 | ||
![]() |
49f45afaaa | ||
![]() |
d4a0e5dcfc |
3
.gitignore
vendored
3
.gitignore
vendored
@ -129,3 +129,6 @@ dmypy.json
|
|||||||
# Pyre type checker
|
# Pyre type checker
|
||||||
.pyre/
|
.pyre/
|
||||||
.idea
|
.idea
|
||||||
|
# 不同游戏单独配置
|
||||||
|
settings.py
|
||||||
|
clear_up.py
|
||||||
|
12
Pipfile
12
Pipfile
@ -4,12 +4,16 @@ verify_ssl = false
|
|||||||
name = "pypi"
|
name = "pypi"
|
||||||
|
|
||||||
[packages]
|
[packages]
|
||||||
kafka-python = "*"
|
kafka-python = "2.0.2"
|
||||||
clickhouse-driver = "*"
|
clickhouse-driver = "0.2.2"
|
||||||
pipfile = "*"
|
pipfile = "0.0.2"
|
||||||
pandas = "*"
|
pandas = "1.3.3"
|
||||||
|
redis = "==3.5.3"
|
||||||
|
loguru = "==0.5.3"
|
||||||
|
|
||||||
[dev-packages]
|
[dev-packages]
|
||||||
|
|
||||||
[requires]
|
[requires]
|
||||||
python_version = "3.8"
|
python_version = "3.8"
|
||||||
|
|
||||||
|
|
||||||
|
60
app.py
60
app.py
@ -1,6 +1,9 @@
|
|||||||
|
# coding:utf-8
|
||||||
import time
|
import time
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
|
|
||||||
|
from kafka import TopicPartition
|
||||||
|
|
||||||
from settings import settings
|
from settings import settings
|
||||||
from v2 import *
|
from v2 import *
|
||||||
from v2.struct_cache import StructCacheFile, StructCacheRedis
|
from v2.struct_cache import StructCacheFile, StructCacheRedis
|
||||||
@ -10,6 +13,7 @@ class XProcess(Process):
|
|||||||
|
|
||||||
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
||||||
super(XProcess, self).__init__()
|
super(XProcess, self).__init__()
|
||||||
|
# self.daemon = True
|
||||||
self.partition = partition
|
self.partition = partition
|
||||||
self.lock = lock
|
self.lock = lock
|
||||||
self.ipsearch = ipsearch
|
self.ipsearch = ipsearch
|
||||||
@ -24,30 +28,64 @@ class XProcess(Process):
|
|||||||
handler_user = HandlerUser(db_client, settings.GAME)
|
handler_user = HandlerUser(db_client, settings.GAME)
|
||||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
||||||
self.partition)
|
self.partition)
|
||||||
transmitter.add_source(handler_event, 10000, 60)
|
transmitter.start_ping()
|
||||||
transmitter.add_source(handler_user, 99, 60)
|
transmitter.add_source(handler_event, 5000, 60)
|
||||||
last_ts = int(time.time())
|
transmitter.add_source(handler_user, 500, 60)
|
||||||
consumer = create_consumer(self.partition)
|
|
||||||
|
|
||||||
for topic, msg in consumer():
|
last_ts = int(time.time())
|
||||||
# print(msg)
|
consumer, kafka_client = create_consumer(self.partition)
|
||||||
type_ = msg['#type']
|
|
||||||
del msg['#type']
|
for msg in consumer():
|
||||||
|
data = msg.value
|
||||||
|
type_ = data['#type']
|
||||||
|
del data['#type']
|
||||||
ts = int(time.time())
|
ts = int(time.time())
|
||||||
|
try:
|
||||||
|
data['properties']['unique_id'] = f'{msg.topic}-{msg.partition}-{msg.offset}'
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if msg.topic == 'debug':
|
||||||
|
self.log.info(data)
|
||||||
|
|
||||||
if 'user' in type_:
|
if 'user' in type_:
|
||||||
# continue
|
# continue
|
||||||
obj = getattr(handler_user, type_)
|
obj = getattr(handler_user, type_)
|
||||||
handler_user.receive_data.append(UserAct(obj, msg))
|
handler_user.receive_data.append(UserAct(obj, data))
|
||||||
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
||||||
last_ts = ts
|
last_ts = ts
|
||||||
handler_user.execute()
|
handler_user.execute()
|
||||||
|
|
||||||
elif 'track' in type_:
|
elif 'track' in type_:
|
||||||
# continue
|
# continue
|
||||||
|
if data['#event_name'] == 'pay':
|
||||||
|
self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
|
||||||
|
|
||||||
obj = getattr(handler_event, type_)
|
obj = getattr(handler_event, type_)
|
||||||
obj(msg)
|
obj(data)
|
||||||
|
elif type_ == settings.STOP_SIGNAL:
|
||||||
|
# continue
|
||||||
|
# 1 小时内有效
|
||||||
|
self.log.info(type_)
|
||||||
|
if data.get('#time', 0) + 3600 < int(time.time()):
|
||||||
|
continue
|
||||||
|
# 停止消费kafka
|
||||||
|
self.log.info(f'进程{self.partition} 等待90秒')
|
||||||
|
time.sleep(90)
|
||||||
|
self.log.info(f'进程{self.partition} 写入数据')
|
||||||
|
transmitter.run(kafka_client)
|
||||||
|
self.log.info(f'进程{self.partition} 结束')
|
||||||
|
kafka_client.commit()
|
||||||
|
kafka_client.close()
|
||||||
|
|
||||||
|
break
|
||||||
|
elif type_ == 'test':
|
||||||
|
self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
transmitter.run()
|
transmitter.run(kafka_client)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
time.sleep(5)
|
||||||
|
self.log.info(f'消费分区{self.partition} 已结束。。。')
|
||||||
|
48
clear_up.py.template
Normal file
48
clear_up.py.template
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
"""
|
||||||
|
ÇåÀí²âÊÔÊý¾Ý
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
game = ''
|
||||||
|
db = settings.GAME
|
||||||
|
|
||||||
|
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from clickhouse_driver import Client
|
||||||
|
|
||||||
|
client = Client(**settings.CK_CONFIG)
|
||||||
|
|
||||||
|
df = pd.read_json(server_list_url)
|
||||||
|
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||||
|
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||||
|
|
||||||
|
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
|
||||||
|
from {db}.event
|
||||||
|
where
|
||||||
|
`#event_time`>addDays(now('UTC'),-3) and (
|
||||||
|
lower(`#os`) = 'windows'
|
||||||
|
or svrindex not in {serverid}
|
||||||
|
)
|
||||||
|
|
||||||
|
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
|
||||||
|
having n = 1 limit 2000"""
|
||||||
|
|
||||||
|
data, columns = client.execute(
|
||||||
|
sql, columnar=True, with_column_types=True
|
||||||
|
)
|
||||||
|
if not data:
|
||||||
|
exit(0)
|
||||||
|
data_df = pd.DataFrame(
|
||||||
|
{col[0]: d for d, col in zip(data, columns)}
|
||||||
|
)
|
||||||
|
|
||||||
|
data_df.drop('n', axis=1, inplace=True)
|
||||||
|
data_df['sign'] = -1
|
||||||
|
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
|
||||||
|
insert_sql = f'INSERT INTO {db}.event FORMAT JSONEachRow '
|
||||||
|
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
|
||||||
|
|
||||||
|
client.execute(insert_sql)
|
1
create_database.sql
Normal file
1
create_database.sql
Normal file
@ -0,0 +1 @@
|
|||||||
|
create database xiangsu;
|
@ -1,23 +1,37 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
# ck数据库连接
|
# ck数据库连接
|
||||||
CK_CONFIG = {'host': '119.29.176.224',
|
CK_CONFIG = {'host': '139.159.159.3',
|
||||||
'send_receive_timeout': 30}
|
'port': 9654,
|
||||||
|
'user': 'legu',
|
||||||
|
'password': 'gncPASUwpYrc'
|
||||||
|
}
|
||||||
|
|
||||||
# 每个游戏不一样 游戏上报 kafka 主题
|
# 每个游戏不一样 游戏上报 kafka 主题
|
||||||
SUBSCRIBE_TOPIC = 'test2'
|
# *************
|
||||||
|
SUBSCRIBE_TOPIC = ''
|
||||||
|
|
||||||
KAFKA_CONSUMER_CONF = {
|
KAFKA_CONSUMER_CONF = {
|
||||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||||
'value_deserializer': json.loads,
|
'value_deserializer': json.loads,
|
||||||
'auto_offset_reset': 'earliest',
|
'auto_offset_reset': 'earliest',
|
||||||
|
'enable_auto_commit': False,
|
||||||
|
|
||||||
# 每个游戏不一样
|
# 每个游戏不一样
|
||||||
'group_id': 'legu_group3'
|
# *************
|
||||||
|
'group_id': ''
|
||||||
|
}
|
||||||
|
KAFKA_PRODUCER_CONF = {
|
||||||
|
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||||
|
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
|
||||||
}
|
}
|
||||||
# 游戏数据库名
|
# 游戏数据库名
|
||||||
GAME = 'shjy'
|
# *************
|
||||||
|
GAME = ''
|
||||||
|
|
||||||
|
STOP_SIGNAL = 'stop_MntxuXMc'
|
||||||
|
|
||||||
REDIS_CONF = {
|
REDIS_CONF = {
|
||||||
'host': '192.168.0.161',
|
'host': '192.168.0.161',
|
@ -1,6 +1,8 @@
|
|||||||
|
# coding:utf-8
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
|
from kafka import TopicPartition
|
||||||
|
|
||||||
from settings import settings
|
from settings import settings
|
||||||
from v2 import *
|
from v2 import *
|
||||||
@ -16,30 +18,29 @@ from v2.log import logger
|
|||||||
rdb = redis.Redis(**settings.REDIS_CONF)
|
rdb = redis.Redis(**settings.REDIS_CONF)
|
||||||
event_attr = EventAttr(rdb)
|
event_attr = EventAttr(rdb)
|
||||||
|
|
||||||
|
partition = 0
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
db_client = CK(**settings.CK_CONFIG)
|
db_client = CK(**settings.CK_CONFIG)
|
||||||
sketch = Sketch(db_client)
|
sketch = Sketch(db_client)
|
||||||
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
|
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
|
||||||
handler_user = HandlerUser(db_client, settings.GAME)
|
handler_user = HandlerUser(db_client, settings.GAME)
|
||||||
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr)
|
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr,partition)
|
||||||
transmitter.add_source(handler_event, 10000, 60)
|
transmitter.add_source(handler_event, 1000, 10)
|
||||||
transmitter.add_source(handler_user, 1000, 60)
|
transmitter.add_source(handler_user, 1000, 10)
|
||||||
last_ts = int(time.time())
|
last_ts = int(time.time())
|
||||||
consumer = create_consumer(-1)
|
consumer, kafka_client = create_consumer(partition)
|
||||||
|
|
||||||
for topic, msg in consumer():
|
for msg in consumer():
|
||||||
# print(msg)
|
data = msg.value
|
||||||
type_ = msg['#type']
|
type_ = data['#type']
|
||||||
if msg['#app_id']!='c3e0409ac18341149877b08f087db640':
|
del data['#type']
|
||||||
print(msg)
|
|
||||||
del msg['#type']
|
|
||||||
ts = int(time.time())
|
ts = int(time.time())
|
||||||
|
|
||||||
if 'user' in type_:
|
if 'user' in type_:
|
||||||
# continue
|
# continue
|
||||||
obj = getattr(handler_user, type_)
|
obj = getattr(handler_user, type_)
|
||||||
handler_user.receive_data.append(UserAct(obj, msg))
|
handler_user.receive_data.append(UserAct(obj, data))
|
||||||
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
||||||
last_ts = ts
|
last_ts = ts
|
||||||
handler_user.execute()
|
handler_user.execute()
|
||||||
@ -47,11 +48,32 @@ def run():
|
|||||||
elif 'track' in type_:
|
elif 'track' in type_:
|
||||||
# continue
|
# continue
|
||||||
obj = getattr(handler_event, type_)
|
obj = getattr(handler_event, type_)
|
||||||
obj(msg)
|
obj(data)
|
||||||
|
elif type_ == settings.STOP_SIGNAL:
|
||||||
|
# continue
|
||||||
|
# 1 小时内有效
|
||||||
|
print(type_)
|
||||||
|
if data.get('#time', 0) + 3600 < int(time.time()):
|
||||||
|
continue
|
||||||
|
kafka_client.close()
|
||||||
|
# 停止消费kafka
|
||||||
|
print(f'进程{msg.partition} 等待90秒')
|
||||||
|
time.sleep(1)
|
||||||
|
print(f'进程{msg.partition} 写入数据')
|
||||||
|
transmitter.run(kafka_client)
|
||||||
|
print(f'进程{msg.partition} 结束')
|
||||||
|
|
||||||
|
break
|
||||||
|
elif type_ == 'test':
|
||||||
|
print(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
transmitter.run()
|
transmitter.run(kafka_client)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
time.sleep(5)
|
||||||
|
print(f'消费分区{partition} 已结束。。。')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
13
sync_event_name.py
Normal file
13
sync_event_name.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import redis
|
||||||
|
from clickhouse_driver import Client
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
rdb = redis.Redis(**settings.REDIS_CONF)
|
||||||
|
|
||||||
|
client = Client(**settings.CK_CONFIG)
|
||||||
|
|
||||||
|
sql = f"""select distinct `#event_name` as v from {settings.GAME}.event"""
|
||||||
|
df = client.query_dataframe(sql)
|
||||||
|
data = df['v'].to_list()
|
||||||
|
rdb.sadd(f'{settings.GAME}_event_set', *data)
|
42
update_event_view.py.template
Normal file
42
update_event_view.py.template
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
# coding:utf-8
|
||||||
|
"""
|
||||||
|
更新事件表视图
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
game = ''
|
||||||
|
db = settings.GAME
|
||||||
|
svrid_file = f'{game}_svrid.json'
|
||||||
|
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from clickhouse_driver import Client
|
||||||
|
|
||||||
|
client = Client(**settings.CK_CONFIG)
|
||||||
|
|
||||||
|
df = pd.read_json(server_list_url)
|
||||||
|
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||||
|
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||||
|
|
||||||
|
# if os.path.exists(svrid_file):
|
||||||
|
# with open(svrid_file, 'r') as f:
|
||||||
|
# old_svrid = json.load(f)
|
||||||
|
# if set(old_svrid) == set(serverid):
|
||||||
|
# exit(0)
|
||||||
|
|
||||||
|
sql = f"""drop table if exists {db}.event_view"""
|
||||||
|
res1 = client.execute(sql)
|
||||||
|
# 筛选有效数据
|
||||||
|
sql = f"""create view {db}.event_view as
|
||||||
|
select *
|
||||||
|
from {db}.event
|
||||||
|
where (`#os`is null or lower(`#os`) != 'windows')
|
||||||
|
and svrindex in {serverid}
|
||||||
|
and not startsWith(`orderid`,'debugPay')
|
||||||
|
"""
|
||||||
|
res2 = client.execute(sql)
|
||||||
|
with open(svrid_file, 'w') as f:
|
||||||
|
json.dump(sorted(serverid), f)
|
@ -1,4 +1,5 @@
|
|||||||
create view shjy.user_view as select *
|
drop table if exists xiangsu.user_view;
|
||||||
from shjy.user
|
create view xiangsu.user_view as select *
|
||||||
|
from xiangsu.user
|
||||||
order by `#reg_time` desc
|
order by `#reg_time` desc
|
||||||
LIMIT 1 by `#account_id`
|
LIMIT 1 by `#account_id`
|
@ -6,17 +6,19 @@ from settings import settings
|
|||||||
__all__ = 'create_consumer',
|
__all__ = 'create_consumer',
|
||||||
|
|
||||||
|
|
||||||
def create_consumer(partition=-1):
|
def create_consumer(partition: int = -1):
|
||||||
|
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
||||||
|
|
||||||
def consumer():
|
def consumer():
|
||||||
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
if partition > -1:
|
||||||
if partition > 0:
|
|
||||||
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
|
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
|
||||||
else:
|
else:
|
||||||
c.subscribe([settings.SUBSCRIBE_TOPIC])
|
c.subscribe([settings.SUBSCRIBE_TOPIC])
|
||||||
for msg in c:
|
for msg in c:
|
||||||
# print(msg)
|
# print(msg)
|
||||||
topic = msg.topic
|
yield msg
|
||||||
val = msg.value
|
# topic = msg.topic
|
||||||
yield topic, val
|
# val = msg.value
|
||||||
|
# yield topic, val
|
||||||
|
|
||||||
return consumer
|
return consumer, c
|
||||||
|
17
v2/db.py
17
v2/db.py
@ -1,5 +1,6 @@
|
|||||||
__all__ = 'CK',
|
__all__ = 'CK',
|
||||||
|
|
||||||
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@ -15,12 +16,20 @@ class CK:
|
|||||||
self.args = args
|
self.args = args
|
||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
self.__client = self.__create_client()
|
self.__client = self.__create_client()
|
||||||
|
self.look = threading.Lock()
|
||||||
|
|
||||||
def __create_client(self):
|
def __create_client(self):
|
||||||
return Client(*self.args, **self.kwargs)
|
return Client(*self.args, **self.kwargs)
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
def execute(self, *args, **kwargs):
|
||||||
return self.__client.execute(*args, **kwargs)
|
try:
|
||||||
|
self.look.acquire(timeout=10)
|
||||||
|
res = self.__client.execute(*args, **kwargs)
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
self.look.release()
|
||||||
|
return res
|
||||||
|
|
||||||
def get_one(self, db, tb, try_cnt=3, **where):
|
def get_one(self, db, tb, try_cnt=3, **where):
|
||||||
|
|
||||||
@ -30,7 +39,7 @@ class CK:
|
|||||||
sql += ' limit 1'
|
sql += ' limit 1'
|
||||||
data = None
|
data = None
|
||||||
try:
|
try:
|
||||||
data, columns = self.__client.execute(sql, with_column_types=True)
|
data, columns = self.execute(sql, with_column_types=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
self.__client.disconnect()
|
self.__client.disconnect()
|
||||||
@ -62,8 +71,10 @@ class CK:
|
|||||||
sql += where
|
sql += where
|
||||||
data = None
|
data = None
|
||||||
try:
|
try:
|
||||||
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True)
|
data, columns = self.execute(sql, columnar=True, with_column_types=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
print('*' * 50)
|
||||||
|
print(sql)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
if e.code == 60:
|
if e.code == 60:
|
||||||
return self.get_all(db, 'user', where, try_cnt - 1)
|
return self.get_all(db, 'user', where, try_cnt - 1)
|
||||||
|
@ -7,26 +7,12 @@ class EventAttr:
|
|||||||
def __init__(self, rdb: Redis):
|
def __init__(self, rdb: Redis):
|
||||||
self.rdb = rdb
|
self.rdb = rdb
|
||||||
|
|
||||||
def get_event_attr(self, key):
|
def set_event_name(self, key, *data):
|
||||||
attr = self.event_attr.get(key)
|
|
||||||
if not attr:
|
|
||||||
self.event_attr[key] = self.rdb.smembers(key) or set()
|
|
||||||
return self.event_attr[key]
|
|
||||||
|
|
||||||
def set_event_attr(self, key, *data):
|
|
||||||
self.rdb.sadd(key, *data)
|
self.rdb.sadd(key, *data)
|
||||||
self.event_attr[key] = data
|
|
||||||
|
|
||||||
def check_attr(self, db, data):
|
def add_event(self, db, data):
|
||||||
event_name = data.get('#event_name')
|
event_name = data.get('#event_name')
|
||||||
if not event_name:
|
if not event_name:
|
||||||
return
|
return
|
||||||
|
key = f'{db}_event_set'
|
||||||
key = f'{db}_event_{event_name}'
|
self.set_event_name(key, event_name)
|
||||||
|
|
||||||
attr = self.get_event_attr(key)
|
|
||||||
data_attr = set(data)
|
|
||||||
extra_attr = data_attr - attr
|
|
||||||
|
|
||||||
if extra_attr:
|
|
||||||
self.set_event_attr(key, *extra_attr)
|
|
||||||
|
41
v2/sketch.py
41
v2/sketch.py
@ -1,5 +1,4 @@
|
|||||||
import copy
|
from settings import settings
|
||||||
|
|
||||||
from .valid_data import *
|
from .valid_data import *
|
||||||
|
|
||||||
|
|
||||||
@ -9,6 +8,7 @@ class Sketch:
|
|||||||
self.struct_cache = struct_cache
|
self.struct_cache = struct_cache
|
||||||
self.__type_dict = dict()
|
self.__type_dict = dict()
|
||||||
self.__struct_dict = dict()
|
self.__struct_dict = dict()
|
||||||
|
self.init_tb_struct()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def type_dict(self):
|
def type_dict(self):
|
||||||
@ -30,20 +30,30 @@ class Sketch:
|
|||||||
if self.struct_cache:
|
if self.struct_cache:
|
||||||
self.struct_cache.update(db, tb, data)
|
self.struct_cache.update(db, tb, data)
|
||||||
|
|
||||||
|
def init_tb_struct_cache(self, db, tb):
|
||||||
|
sql = f"select name,type from system.columns where database='{db}' and table='{tb}'"
|
||||||
|
|
||||||
|
data, columns = self.db_client.execute(sql, with_column_types=True, columnar=True)
|
||||||
|
res = {k: v for k, v in zip(data[0], data[1])}
|
||||||
|
self.__struct_dict[f'{db}_{tb}'] = res
|
||||||
|
self.up_tb_struct(db, tb, res)
|
||||||
|
return res
|
||||||
|
|
||||||
|
def init_tb_struct(self):
|
||||||
|
self.init_tb_struct_cache(settings.GAME, 'event')
|
||||||
|
self.init_tb_struct_cache(settings.GAME, 'user')
|
||||||
|
|
||||||
def get_tb_struct_cache(self, db, tb):
|
def get_tb_struct_cache(self, db, tb):
|
||||||
"""
|
"""
|
||||||
查一条记录 取字段 和类型
|
取字段 和类型
|
||||||
:param db:
|
:param db:
|
||||||
:param tb:
|
:param tb:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
if self.__struct_dict.get(f'{db}_{tb}'):
|
if self.__struct_dict.get(f'{db}_{tb}'):
|
||||||
return self.__struct_dict.get(f'{db}_{tb}')
|
return self.__struct_dict.get(f'{db}_{tb}')
|
||||||
sql = f'select * from {db}.{tb} limit 1'
|
|
||||||
_, columns = self.db_client.execute(sql, with_column_types=True)
|
res = self.init_tb_struct_cache(db, tb)
|
||||||
res = {item[0]: item[1] for item in columns}
|
|
||||||
self.__struct_dict[f'{db}_{tb}'] = res
|
|
||||||
self.up_tb_struct(db, tb, res)
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def update_user_view(self, db, tb):
|
def update_user_view(self, db, tb):
|
||||||
@ -64,7 +74,7 @@ class Sketch:
|
|||||||
LIMIT 1 by `#account_id`"""
|
LIMIT 1 by `#account_id`"""
|
||||||
self.db_client.execute(sql)
|
self.db_client.execute(sql)
|
||||||
|
|
||||||
def alter_table(self, db, tb, data):
|
def alter_table(self, db, tb, data, try_cnt=10):
|
||||||
"""
|
"""
|
||||||
数据库字段检查
|
数据库字段检查
|
||||||
添加新字段为第一次出现类型
|
添加新字段为第一次出现类型
|
||||||
@ -88,7 +98,7 @@ class Sketch:
|
|||||||
default_field[k] = 'Nullable(String)'
|
default_field[k] = 'Nullable(String)'
|
||||||
|
|
||||||
if isinstance(v, int):
|
if isinstance(v, int):
|
||||||
default_field[k] = 'Nullable(UInt64)'
|
default_field[k] = 'Nullable(Int64)'
|
||||||
|
|
||||||
if isinstance(v, float):
|
if isinstance(v, float):
|
||||||
default_field[k] = 'Nullable(Float32)'
|
default_field[k] = 'Nullable(Float32)'
|
||||||
@ -99,13 +109,18 @@ class Sketch:
|
|||||||
if isinstance(v, bool):
|
if isinstance(v, bool):
|
||||||
default_field[k] = 'Nullable(UInt8)'
|
default_field[k] = 'Nullable(UInt8)'
|
||||||
|
|
||||||
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
|
|
||||||
print(sql)
|
|
||||||
try:
|
try:
|
||||||
|
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
|
||||||
|
print(sql)
|
||||||
self.db_client.execute(sql)
|
self.db_client.execute(sql)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'添加字段 {k} 失败')
|
print(f'添加字段 {k} 失败,同步数据库表结构')
|
||||||
|
# 读取数据库表结构并设置
|
||||||
|
self.init_tb_struct()
|
||||||
default_field.pop(k)
|
default_field.pop(k)
|
||||||
|
if try_cnt < 0:
|
||||||
|
raise e
|
||||||
|
return self.alter_table(db, tb, data, try_cnt=try_cnt - 1)
|
||||||
|
|
||||||
if set(default_field) - keys:
|
if set(default_field) - keys:
|
||||||
self.up_tb_struct(db, tb, default_field)
|
self.up_tb_struct(db, tb, default_field)
|
||||||
|
@ -1,14 +1,41 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
from .valid_data import *
|
from .valid_data import *
|
||||||
|
|
||||||
__all__ = 'Transmitter',
|
__all__ = 'Transmitter',
|
||||||
|
|
||||||
|
|
||||||
|
class Ping(threading.Thread):
|
||||||
|
def __init__(self, db_client, p, log):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.daemon = True
|
||||||
|
self.ping_ts = 0
|
||||||
|
self.time_out = 60
|
||||||
|
self.log = log
|
||||||
|
self.db_client = db_client
|
||||||
|
self.p = p
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
time.sleep(10)
|
||||||
|
ts = int(time.time())
|
||||||
|
if self.ping_ts + self.time_out < ts:
|
||||||
|
# 保持连接
|
||||||
|
try:
|
||||||
|
self.ping_ts = ts
|
||||||
|
self.log.info(f'保持连接{self.p} ping')
|
||||||
|
self.db_client.execute('select 1')
|
||||||
|
except:
|
||||||
|
self.log.error('ping error')
|
||||||
|
|
||||||
|
|
||||||
class Transmitter:
|
class Transmitter:
|
||||||
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
||||||
self.db_client = db_client
|
self.db_client = db_client
|
||||||
@ -20,12 +47,16 @@ class Transmitter:
|
|||||||
self.event_attr = event_attr
|
self.event_attr = event_attr
|
||||||
self.p = p
|
self.p = p
|
||||||
|
|
||||||
|
def start_ping(self):
|
||||||
|
t = Ping(self.db_client, self.p, self.log)
|
||||||
|
t.start()
|
||||||
|
|
||||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||||
|
|
||||||
def check_send(self):
|
def check_send(self):
|
||||||
|
ts = int(time.time())
|
||||||
for h, p in self.slots.items():
|
for h, p in self.slots.items():
|
||||||
ts = int(time.time())
|
|
||||||
tb, buffer = h.buffer_pool
|
tb, buffer = h.buffer_pool
|
||||||
buffer_size = len(buffer)
|
buffer_size = len(buffer)
|
||||||
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
||||||
@ -51,9 +82,11 @@ class Transmitter:
|
|||||||
self.db_client.execute(sql)
|
self.db_client.execute(sql)
|
||||||
self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}')
|
self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
self.log.error(traceback.format_exc())
|
||||||
# 丢弃错误行 再次发送
|
# 丢弃错误行 再次发送
|
||||||
if hasattr(e, 'code') and e.code == 26:
|
if hasattr(e, 'code') and e.code == 26:
|
||||||
m = re.match('(.*)?Stack trace', e.message)
|
m = re.match('(.*)?Stack trace', e.message)
|
||||||
|
self.log.error(data)
|
||||||
if m:
|
if m:
|
||||||
error_msg = m.group(1)
|
error_msg = m.group(1)
|
||||||
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
|
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
|
||||||
@ -75,10 +108,11 @@ class Transmitter:
|
|||||||
def check_table(self, db, tb, data):
|
def check_table(self, db, tb, data):
|
||||||
[self.sketch.alter_table(db, tb, item) for item in data]
|
[self.sketch.alter_table(db, tb, item) for item in data]
|
||||||
|
|
||||||
def set_event_attr(self, db, tb, data):
|
def collect_event(self, db, tb, data):
|
||||||
if tb != 'event':
|
if tb != 'event':
|
||||||
return
|
return
|
||||||
[self.event_attr.check_attr(db, item) for item in data]
|
|
||||||
|
[self.event_attr.add_event(db, item) for item in data]
|
||||||
|
|
||||||
def check_type(self, db, tb, data):
|
def check_type(self, db, tb, data):
|
||||||
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
|
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
|
||||||
@ -96,14 +130,19 @@ class Transmitter:
|
|||||||
for key in del_keys:
|
for key in del_keys:
|
||||||
del item[key]
|
del item[key]
|
||||||
|
|
||||||
def run(self):
|
def run(self, kafka_client):
|
||||||
for tb, buffer in self.check_send():
|
for tb, buffer in self.check_send():
|
||||||
try:
|
try:
|
||||||
data = [self.flat_data(x) for x in buffer.values()]
|
data = [self.flat_data(x) for x in buffer.values()]
|
||||||
self.check_table(self.db_name, tb, data)
|
self.check_table(self.db_name, tb, data)
|
||||||
self.check_type(self.db_name, tb, data)
|
self.check_type(self.db_name, tb, data)
|
||||||
self.set_event_attr(self.db_name, tb, data)
|
self.collect_event(self.db_name, tb, data)
|
||||||
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
|
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(e)
|
self.log.error(traceback.format_exc())
|
||||||
|
self.log.error(data)
|
||||||
buffer.clear()
|
buffer.clear()
|
||||||
|
try:
|
||||||
|
kafka_client.commit()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f'进程:{self.p} error:{e}')
|
||||||
|
@ -26,9 +26,10 @@ def is_valid_int(v, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def is_valid_srt(v, **kwargs):
|
def is_valid_srt(v, **kwargs):
|
||||||
if isinstance(v, str):
|
try:
|
||||||
return v
|
return str(v)
|
||||||
return None
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def is_valid_float(v, **kwargs):
|
def is_valid_float(v, **kwargs):
|
||||||
@ -39,15 +40,17 @@ def is_valid_float(v, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def is_valid_bool(v, **kwargs):
|
def is_valid_bool(v, **kwargs):
|
||||||
if isinstance(v, bool):
|
try:
|
||||||
return v
|
return bool(v)
|
||||||
return None
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def is_valid_array(v, **kwargs):
|
def is_valid_array(v, **kwargs):
|
||||||
if isinstance(v, list):
|
try:
|
||||||
return [str(i) for i in v]
|
return [str(i) for i in v]
|
||||||
return None
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def is_valid_ipv4(v, **kwargs):
|
def is_valid_ipv4(v, **kwargs):
|
||||||
|
13
充值视图.sql
Normal file
13
充值视图.sql
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
drop table if exists xiangsu.recharge_game;
|
||||||
|
create view xiangsu.recharge_game as
|
||||||
|
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||||
|
`#os`,
|
||||||
|
`#bundle_id`,
|
||||||
|
owner_name,
|
||||||
|
channel,
|
||||||
|
arrayDistinct(groupArray(binduid)) as account,
|
||||||
|
length(account) as account_num,
|
||||||
|
sum(money) as money
|
||||||
|
from xiangsu.event
|
||||||
|
where `#event_name` = 'rechargeGame'
|
||||||
|
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel
|
107
初始化事件表.sql
107
初始化事件表.sql
@ -1,54 +1,61 @@
|
|||||||
create table shjy.event
|
-- auto-generated definition
|
||||||
|
create table event
|
||||||
(
|
(
|
||||||
`#ip` Nullable(IPv4),
|
`#ip` Nullable(IPv4),
|
||||||
`#country` Nullable(String),
|
`#country` Nullable(String),
|
||||||
`#country_code` Nullable(String),
|
`#province` Nullable(String),
|
||||||
`#province` Nullable(String),
|
`#city` Nullable(String),
|
||||||
`#city` Nullable(String),
|
`#os` Nullable(String),
|
||||||
`#os_version` Nullable(String),
|
`#device_id` Nullable(String),
|
||||||
`#manufacturer` Nullable(String),
|
`#screen_height` Nullable(UInt16),
|
||||||
`#os` Nullable(String),
|
`#screen_width` Nullable(UInt16),
|
||||||
`#device_id` Nullable(String),
|
`#device_model` Nullable(String),
|
||||||
`#screen_height` Nullable(UInt16),
|
`#app_version` Nullable(String),
|
||||||
`#screen_width` Nullable(UInt16),
|
`#bundle_id` Nullable(String),
|
||||||
`#device_model` Nullable(String),
|
`#app_name` Nullable(String),
|
||||||
`#app_version` Nullable(String),
|
`#game_version` Nullable(String),
|
||||||
`#bundle_id` Nullable(String),
|
`#os_version` Nullable(String),
|
||||||
`#lib` Nullable(String),
|
`#network_type` Nullable(String),
|
||||||
`#lib_version` Nullable(String),
|
`#carrier` Nullable(String),
|
||||||
`#network_type` Nullable(String),
|
`#manufacturer` Nullable(String),
|
||||||
`#carrier` Nullable(String),
|
`#app_id` Nullable(String),
|
||||||
`#browser` Nullable(String),
|
`#account_id` String,
|
||||||
`#browser_version` Nullable(String),
|
`#distinct_id` Nullable(String),
|
||||||
`#duration` Nullable(String),
|
binduid Nullable(String),
|
||||||
`#url` Nullable(String),
|
channel Nullable(String),
|
||||||
`#url_path` Nullable(String),
|
owner_name String default '',
|
||||||
`#referrer` Nullable(String),
|
role_name Nullable(String),
|
||||||
`#referrer_host` Nullable(String),
|
exp Nullable(UInt64),
|
||||||
`#title` Nullable(String),
|
zhanli Nullable(UInt64),
|
||||||
`#screen_name` Nullable(String),
|
maxmapid Nullable(UInt16),
|
||||||
`#element_id` Nullable(String),
|
mapid Nullable(UInt16),
|
||||||
`#element_type` Nullable(String),
|
ghid Nullable(String),
|
||||||
`#resume_from_background` Nullable(String),
|
rmbmoney Nullable(UInt64),
|
||||||
`#element_selector` Nullable(String),
|
jinbi Nullable(UInt64),
|
||||||
`#element_position` Nullable(String),
|
svrindex Nullable(String),
|
||||||
`#element_content` Nullable(String),
|
lv Nullable(UInt16),
|
||||||
`#scene` Nullable(String),
|
vip Nullable(UInt16),
|
||||||
`#mp_platform` Nullable(String),
|
game Nullable(String),
|
||||||
`#app_crashed_reason` Nullable(String),
|
|
||||||
`#zone_offset` Int8 default 8,
|
|
||||||
`#event_id` String,
|
|
||||||
|
|
||||||
`#event_time` DateTime('UTC'),
|
`#zone_offset` Int8 default 8,
|
||||||
`#account_id` String,
|
`#event_time` DateTime('UTC'),
|
||||||
`#distinct_id` Nullable(String),
|
`#event_name` String,
|
||||||
`#event_name` String,
|
`#server_time` DateTime('UTC') default now(),
|
||||||
`#server_time` DateTime('UTC') default now(),
|
|
||||||
|
|
||||||
|
unitPrice Nullable(UInt32),
|
||||||
|
money Nullable(String),
|
||||||
|
islishishouci Nullable(UInt8),
|
||||||
|
isdangrishouci Nullable(UInt8),
|
||||||
|
is_today_reg Nullable(UInt8),
|
||||||
|
orderid Nullable(String),
|
||||||
|
proid Nullable(String),
|
||||||
|
step_id Nullable(UInt16),
|
||||||
|
step_group Nullable(UInt16),
|
||||||
|
guide_start_time Nullable(UInt32),
|
||||||
|
online_ts Nullable(UInt16),
|
||||||
|
`#time` Nullable(DateTime('UTC'))
|
||||||
|
)
|
||||||
|
engine = ReplacingMergeTree PARTITION BY toYYYYMMDD(`#event_time`)
|
||||||
|
ORDER BY (owner_name, `#event_name`, `#event_time`, `#account_id`)
|
||||||
|
SETTINGS index_granularity = 8192;
|
||||||
|
|
||||||
`sign` Int8 default 1
|
|
||||||
|
|
||||||
) ENGINE = CollapsingMergeTree(sign)
|
|
||||||
PARTITION BY toYYYYMMDD(`#event_time`)
|
|
||||||
order by (`#account_id`, `#event_time`, `#event_name`)
|
|
||||||
-- TTL event_time + toIntervalDay(365)
|
|
@ -1,9 +1,10 @@
|
|||||||
create table shjy.user
|
drop table if exists xiangsu.user;
|
||||||
|
create table xiangsu.user
|
||||||
(
|
(
|
||||||
|
|
||||||
`#reg_time` DateTime('UTC'),
|
`#reg_time` DateTime('UTC'),
|
||||||
`#account_id` String,
|
`#account_id` String,
|
||||||
`svrindex` UInt16,
|
`svrindex` String,
|
||||||
|
|
||||||
`#zone_offset` Int8 default 8,
|
`#zone_offset` Int8 default 8,
|
||||||
`#server_time` DateTime('UTC') default now()
|
`#server_time` DateTime('UTC') default now()
|
||||||
|
12
新用户视图.sql
Normal file
12
新用户视图.sql
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
drop table if exists xiangsu.new_account;
|
||||||
|
create view xiangsu.new_account as
|
||||||
|
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||||
|
`#os`,
|
||||||
|
`#bundle_id`,
|
||||||
|
owner_name,
|
||||||
|
channel,
|
||||||
|
groupArray(`binduid`) as account,
|
||||||
|
length(account) as num
|
||||||
|
from xiangsu.event
|
||||||
|
where role_idx = 1
|
||||||
|
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel
|
23
新账号付费.sql
Normal file
23
新账号付费.sql
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
drop table if exists xiangsu.new_account_recharge;
|
||||||
|
create view xiangsu.new_account_recharge as (select date,
|
||||||
|
`#os`,
|
||||||
|
`#bundle_id`,
|
||||||
|
owner_name,
|
||||||
|
channel,
|
||||||
|
uniqExact(binduid) as accout_num,
|
||||||
|
sum(money) as money
|
||||||
|
from (select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||||
|
`#os`,
|
||||||
|
`#bundle_id`,
|
||||||
|
owner_name,
|
||||||
|
channel,
|
||||||
|
binduid,
|
||||||
|
money
|
||||||
|
from xiangsu.event
|
||||||
|
where `#event_name` = 'rechargeGame') as tb1
|
||||||
|
right join (select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||||
|
binduid
|
||||||
|
from xiangsu.event
|
||||||
|
where role_idx = 1) as tb2
|
||||||
|
on tb1.date = tb2.date and tb2.binduid = tb1.binduid
|
||||||
|
group by date, `#os`, `#bundle_id`, owner_name, channel)
|
11
活跃账号视图.sql
Normal file
11
活跃账号视图.sql
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
drop table if exists xiangsu.active_account;
|
||||||
|
create view xiangsu.active_account as
|
||||||
|
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||||
|
`#os`,
|
||||||
|
`#bundle_id`,
|
||||||
|
owner_name,
|
||||||
|
channel,
|
||||||
|
uniqCombined(binduid) as num,
|
||||||
|
arrayDistinct(groupArray(binduid)) as account
|
||||||
|
from xiangsu.event
|
||||||
|
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel
|
Loading…
Reference in New Issue
Block a user