Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
234402527b | ||
![]() |
b14354f32a | ||
![]() |
19801a09eb | ||
![]() |
4c8c74e673 | ||
![]() |
ff79538302 | ||
![]() |
b11dae3ed5 | ||
![]() |
8ad16163a9 | ||
![]() |
f95e5bdb0b | ||
![]() |
e58621a4d2 | ||
![]() |
60eaf813df | ||
![]() |
83790bed72 | ||
![]() |
76c25d994c | ||
![]() |
fab52c7282 | ||
![]() |
1bdb13b620 |
12
Pipfile
12
Pipfile
@ -4,16 +4,12 @@ verify_ssl = false
|
||||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
kafka-python = "2.0.2"
|
||||
clickhouse-driver = "0.2.2"
|
||||
pipfile = "0.0.2"
|
||||
pandas = "1.3.3"
|
||||
redis = "==3.5.3"
|
||||
loguru = "==0.5.3"
|
||||
kafka-python = "*"
|
||||
clickhouse-driver = "*"
|
||||
pipfile = "*"
|
||||
pandas = "*"
|
||||
|
||||
[dev-packages]
|
||||
|
||||
[requires]
|
||||
python_version = "3.8"
|
||||
|
||||
|
||||
|
49
app.py
49
app.py
@ -2,8 +2,6 @@
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
from kafka import TopicPartition
|
||||
|
||||
from settings import settings
|
||||
from v2 import *
|
||||
from v2.struct_cache import StructCacheFile, StructCacheRedis
|
||||
@ -35,57 +33,38 @@ class XProcess(Process):
|
||||
last_ts = int(time.time())
|
||||
consumer, kafka_client = create_consumer(self.partition)
|
||||
|
||||
for msg in consumer():
|
||||
data = msg.value
|
||||
type_ = data['#type']
|
||||
del data['#type']
|
||||
for topic, msg in consumer():
|
||||
# print(msg)
|
||||
type_ = msg['#type']
|
||||
del msg['#type']
|
||||
ts = int(time.time())
|
||||
try:
|
||||
data['properties']['unique_id'] = f'{msg.topic}-{msg.partition}-{msg.offset}'
|
||||
except:
|
||||
pass
|
||||
|
||||
if msg.topic == 'debug':
|
||||
self.log.info(data)
|
||||
|
||||
if 'user' in type_:
|
||||
# continue
|
||||
obj = getattr(handler_user, type_)
|
||||
handler_user.receive_data.append(UserAct(obj, data))
|
||||
handler_user.receive_data.append(UserAct(obj, msg))
|
||||
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
||||
last_ts = ts
|
||||
handler_user.execute()
|
||||
|
||||
elif 'track' in type_:
|
||||
# continue
|
||||
if data['#event_name'] == 'pay':
|
||||
self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
|
||||
|
||||
obj = getattr(handler_event, type_)
|
||||
obj(data)
|
||||
obj(msg)
|
||||
elif type_ == settings.STOP_SIGNAL:
|
||||
# continue
|
||||
# 1 小时内有效
|
||||
self.log.info(type_)
|
||||
if data.get('#time', 0) + 3600 < int(time.time()):
|
||||
if msg.get('#time', 0) + 3600 < int(time.time()):
|
||||
continue
|
||||
# 停止消费kafka
|
||||
self.log.info(f'进程{self.partition} 等待90秒')
|
||||
time.sleep(90)
|
||||
self.log.info(f'进程{self.partition} 写入数据')
|
||||
transmitter.run(kafka_client)
|
||||
self.log.info(f'进程{self.partition} 结束')
|
||||
kafka_client.commit()
|
||||
kafka_client.close()
|
||||
# 停止消费kafka
|
||||
print(f'进程{self.partition} 等待90秒')
|
||||
time.sleep(90)
|
||||
print(f'进程{self.partition} 写入数据')
|
||||
transmitter.run()
|
||||
print(f'进程{self.partition} 结束')
|
||||
|
||||
break
|
||||
elif type_ == 'test':
|
||||
self.log.info(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
|
||||
else:
|
||||
continue
|
||||
|
||||
transmitter.run(kafka_client)
|
||||
|
||||
while True:
|
||||
time.sleep(5)
|
||||
self.log.info(f'消费分区{self.partition} 已结束。。。')
|
||||
transmitter.run()
|
||||
|
@ -1,13 +1,8 @@
|
||||
"""
|
||||
清理测试数据
|
||||
"""
|
||||
import json
|
||||
|
||||
from settings import settings
|
||||
|
||||
game = ''
|
||||
db = settings.GAME
|
||||
|
||||
game = 'shanhai'
|
||||
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||
|
||||
import pandas as pd
|
||||
@ -20,13 +15,10 @@ df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||
|
||||
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
|
||||
from {db}.event
|
||||
from {game}.event
|
||||
where
|
||||
`#event_time`>addDays(now('UTC'),-3) and (
|
||||
lower(`#os`) = 'windows'
|
||||
or svrindex not in {serverid}
|
||||
)
|
||||
|
||||
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
|
||||
having n = 1 limit 2000"""
|
||||
|
||||
@ -42,7 +34,7 @@ data_df = pd.DataFrame(
|
||||
data_df.drop('n', axis=1, inplace=True)
|
||||
data_df['sign'] = -1
|
||||
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
|
||||
insert_sql = f'INSERT INTO {db}.event FORMAT JSONEachRow '
|
||||
insert_sql = f'INSERT INTO {game}.event FORMAT JSONEachRow '
|
||||
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
|
||||
|
||||
client.execute(insert_sql)
|
@ -1 +0,0 @@
|
||||
create database xiangsu;
|
@ -5,31 +5,27 @@ class Config:
|
||||
# ck数据库连接
|
||||
CK_CONFIG = {'host': '139.159.159.3',
|
||||
'port': 9654,
|
||||
'user': 'legu',
|
||||
'password': 'gncPASUwpYrc'
|
||||
}
|
||||
|
||||
# 每个游戏不一样 游戏上报 kafka 主题
|
||||
# *************
|
||||
SUBSCRIBE_TOPIC = ''
|
||||
SUBSCRIBE_TOPIC = 'shanhai'
|
||||
|
||||
KAFKA_CONSUMER_CONF = {
|
||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||
'value_deserializer': json.loads,
|
||||
'auto_offset_reset': 'earliest',
|
||||
'enable_auto_commit': False,
|
||||
'enable_auto_commit': True,
|
||||
'auto_commit_interval_ms': 10000,
|
||||
|
||||
# 每个游戏不一样
|
||||
# *************
|
||||
'group_id': ''
|
||||
'group_id': 'shanhai_group'
|
||||
}
|
||||
KAFKA_PRODUCER_CONF = {
|
||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
|
||||
}
|
||||
# 游戏数据库名
|
||||
# *************
|
||||
GAME = ''
|
||||
GAME = 'shanhai'
|
||||
|
||||
STOP_SIGNAL = 'stop_MntxuXMc'
|
||||
|
@ -1,8 +1,6 @@
|
||||
# coding:utf-8
|
||||
import time
|
||||
|
||||
import redis
|
||||
from kafka import TopicPartition
|
||||
|
||||
from settings import settings
|
||||
from v2 import *
|
||||
@ -18,29 +16,30 @@ from v2.log import logger
|
||||
rdb = redis.Redis(**settings.REDIS_CONF)
|
||||
event_attr = EventAttr(rdb)
|
||||
|
||||
partition = 0
|
||||
|
||||
def run():
|
||||
db_client = CK(**settings.CK_CONFIG)
|
||||
sketch = Sketch(db_client)
|
||||
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr,partition)
|
||||
transmitter.add_source(handler_event, 1000, 10)
|
||||
transmitter.add_source(handler_user, 1000, 10)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr)
|
||||
transmitter.add_source(handler_event, 1, 1)
|
||||
transmitter.add_source(handler_user, 1, 1)
|
||||
last_ts = int(time.time())
|
||||
consumer, kafka_client = create_consumer(partition)
|
||||
consumer = create_consumer(-1)
|
||||
|
||||
for msg in consumer():
|
||||
data = msg.value
|
||||
type_ = data['#type']
|
||||
del data['#type']
|
||||
for topic, msg in consumer():
|
||||
# print(msg)
|
||||
type_ = msg['#type']
|
||||
if msg['#app_id']!='c3e0409ac18341149877b08f087db640':
|
||||
print(msg)
|
||||
del msg['#type']
|
||||
ts = int(time.time())
|
||||
|
||||
if 'user' in type_:
|
||||
# continue
|
||||
obj = getattr(handler_user, type_)
|
||||
handler_user.receive_data.append(UserAct(obj, data))
|
||||
handler_user.receive_data.append(UserAct(obj, msg))
|
||||
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
|
||||
last_ts = ts
|
||||
handler_user.execute()
|
||||
@ -48,32 +47,11 @@ def run():
|
||||
elif 'track' in type_:
|
||||
# continue
|
||||
obj = getattr(handler_event, type_)
|
||||
obj(data)
|
||||
elif type_ == settings.STOP_SIGNAL:
|
||||
# continue
|
||||
# 1 小时内有效
|
||||
print(type_)
|
||||
if data.get('#time', 0) + 3600 < int(time.time()):
|
||||
continue
|
||||
kafka_client.close()
|
||||
# 停止消费kafka
|
||||
print(f'进程{msg.partition} 等待90秒')
|
||||
time.sleep(1)
|
||||
print(f'进程{msg.partition} 写入数据')
|
||||
transmitter.run(kafka_client)
|
||||
print(f'进程{msg.partition} 结束')
|
||||
|
||||
break
|
||||
elif type_ == 'test':
|
||||
print(f'topid->{msg.topic} | partition->{msg.partition} | offset->{msg.offset} | data-> {data}')
|
||||
obj(msg)
|
||||
else:
|
||||
continue
|
||||
|
||||
transmitter.run(kafka_client)
|
||||
|
||||
while True:
|
||||
time.sleep(5)
|
||||
print(f'消费分区{partition} 已结束。。。')
|
||||
transmitter.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
21
stop.py
Normal file
21
stop.py
Normal file
@ -0,0 +1,21 @@
|
||||
# coding:utf-8
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from kafka import KafkaProducer
|
||||
|
||||
from settings import settings
|
||||
|
||||
producer = KafkaProducer(**settings.KAFKA_PRODUCER_CONF)
|
||||
|
||||
msg = {
|
||||
'#type': settings.STOP_SIGNAL,
|
||||
'#time': int(time.time())
|
||||
}
|
||||
for i in range(0,16):
|
||||
print(f'发送分区{i}')
|
||||
future1 = producer.send(settings.GAME, msg, partition=i)
|
||||
try:
|
||||
future1.get(timeout=10)
|
||||
except Exception as e: # 发送失败抛出kafka_errors
|
||||
traceback.format_exc()
|
@ -1,13 +0,0 @@
|
||||
import redis
|
||||
from clickhouse_driver import Client
|
||||
|
||||
from settings import settings
|
||||
|
||||
rdb = redis.Redis(**settings.REDIS_CONF)
|
||||
|
||||
client = Client(**settings.CK_CONFIG)
|
||||
|
||||
sql = f"""select distinct `#event_name` as v from {settings.GAME}.event"""
|
||||
df = client.query_dataframe(sql)
|
||||
data = df['v'].to_list()
|
||||
rdb.sadd(f'{settings.GAME}_event_set', *data)
|
@ -1,42 +0,0 @@
|
||||
# coding:utf-8
|
||||
"""
|
||||
更新事件表视图
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
|
||||
from settings import settings
|
||||
|
||||
game = ''
|
||||
db = settings.GAME
|
||||
svrid_file = f'{game}_svrid.json'
|
||||
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||
|
||||
import pandas as pd
|
||||
from clickhouse_driver import Client
|
||||
|
||||
client = Client(**settings.CK_CONFIG)
|
||||
|
||||
df = pd.read_json(server_list_url)
|
||||
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||
|
||||
# if os.path.exists(svrid_file):
|
||||
# with open(svrid_file, 'r') as f:
|
||||
# old_svrid = json.load(f)
|
||||
# if set(old_svrid) == set(serverid):
|
||||
# exit(0)
|
||||
|
||||
sql = f"""drop table if exists {db}.event_view"""
|
||||
res1 = client.execute(sql)
|
||||
# 筛选有效数据
|
||||
sql = f"""create view {db}.event_view as
|
||||
select *
|
||||
from {db}.event
|
||||
where (`#os`is null or lower(`#os`) != 'windows')
|
||||
and svrindex in {serverid}
|
||||
and not startsWith(`orderid`,'debugPay')
|
||||
"""
|
||||
res2 = client.execute(sql)
|
||||
with open(svrid_file, 'w') as f:
|
||||
json.dump(sorted(serverid), f)
|
@ -1,5 +1,5 @@
|
||||
drop table if exists xiangsu.user_view;
|
||||
create view xiangsu.user_view as select *
|
||||
from xiangsu.user
|
||||
drop table if exists shanhai.user_view;
|
||||
create view shanhai.user_view as select *
|
||||
from shjy.user
|
||||
order by `#reg_time` desc
|
||||
LIMIT 1 by `#account_id`
|
@ -6,19 +6,18 @@ from settings import settings
|
||||
__all__ = 'create_consumer',
|
||||
|
||||
|
||||
def create_consumer(partition: int = -1):
|
||||
def create_consumer(partition=-1):
|
||||
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
||||
|
||||
def consumer():
|
||||
if partition > -1:
|
||||
if partition > 0:
|
||||
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
|
||||
else:
|
||||
c.subscribe([settings.SUBSCRIBE_TOPIC])
|
||||
for msg in c:
|
||||
# print(msg)
|
||||
yield msg
|
||||
# topic = msg.topic
|
||||
# val = msg.value
|
||||
# yield topic, val
|
||||
topic = msg.topic
|
||||
val = msg.value
|
||||
yield topic, val
|
||||
|
||||
return consumer, c
|
||||
|
17
v2/db.py
17
v2/db.py
@ -1,6 +1,5 @@
|
||||
__all__ = 'CK',
|
||||
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import pandas as pd
|
||||
@ -16,20 +15,12 @@ class CK:
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.__client = self.__create_client()
|
||||
self.look = threading.Lock()
|
||||
|
||||
def __create_client(self):
|
||||
return Client(*self.args, **self.kwargs)
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
try:
|
||||
self.look.acquire(timeout=10)
|
||||
res = self.__client.execute(*args, **kwargs)
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
self.look.release()
|
||||
return res
|
||||
return self.__client.execute(*args, **kwargs)
|
||||
|
||||
def get_one(self, db, tb, try_cnt=3, **where):
|
||||
|
||||
@ -39,7 +30,7 @@ class CK:
|
||||
sql += ' limit 1'
|
||||
data = None
|
||||
try:
|
||||
data, columns = self.execute(sql, with_column_types=True)
|
||||
data, columns = self.__client.execute(sql, with_column_types=True)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
self.__client.disconnect()
|
||||
@ -71,9 +62,9 @@ class CK:
|
||||
sql += where
|
||||
data = None
|
||||
try:
|
||||
data, columns = self.execute(sql, columnar=True, with_column_types=True)
|
||||
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True)
|
||||
except Exception as e:
|
||||
print('*' * 50)
|
||||
print('*'*50)
|
||||
print(sql)
|
||||
traceback.print_exc()
|
||||
if e.code == 60:
|
||||
|
@ -7,12 +7,26 @@ class EventAttr:
|
||||
def __init__(self, rdb: Redis):
|
||||
self.rdb = rdb
|
||||
|
||||
def set_event_name(self, key, *data):
|
||||
self.rdb.sadd(key, *data)
|
||||
def get_event_attr(self, key):
|
||||
attr = self.event_attr.get(key)
|
||||
if not attr:
|
||||
self.event_attr[key] = self.rdb.smembers(key) or set()
|
||||
return set(self.event_attr[key])
|
||||
|
||||
def add_event(self, db, data):
|
||||
def set_event_attr(self, key, *data):
|
||||
self.rdb.sadd(key, *data)
|
||||
self.event_attr[key] = data
|
||||
|
||||
def check_attr(self, db, data):
|
||||
event_name = data.get('#event_name')
|
||||
if not event_name:
|
||||
return
|
||||
key = f'{db}_event_set'
|
||||
self.set_event_name(key, event_name)
|
||||
|
||||
key = f'{db}_event_{event_name}'
|
||||
|
||||
attr = self.get_event_attr(key)
|
||||
data_attr = set(data)
|
||||
extra_attr = data_attr - attr
|
||||
|
||||
if extra_attr:
|
||||
self.set_event_attr(key, *extra_attr)
|
||||
|
@ -98,7 +98,7 @@ class Sketch:
|
||||
default_field[k] = 'Nullable(String)'
|
||||
|
||||
if isinstance(v, int):
|
||||
default_field[k] = 'Nullable(Int64)'
|
||||
default_field[k] = 'Nullable(UInt64)'
|
||||
|
||||
if isinstance(v, float):
|
||||
default_field[k] = 'Nullable(Float32)'
|
||||
@ -109,9 +109,9 @@ class Sketch:
|
||||
if isinstance(v, bool):
|
||||
default_field[k] = 'Nullable(UInt8)'
|
||||
|
||||
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
|
||||
print(sql)
|
||||
try:
|
||||
sql = f'alter table {db}.{tb} add column `{k}` {default_field[k]}'
|
||||
print(sql)
|
||||
self.db_client.execute(sql)
|
||||
except Exception as e:
|
||||
print(f'添加字段 {k} 失败,同步数据库表结构')
|
||||
|
@ -5,8 +5,6 @@ import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
|
||||
from settings import settings
|
||||
from .valid_data import *
|
||||
|
||||
__all__ = 'Transmitter',
|
||||
@ -24,7 +22,6 @@ class Ping(threading.Thread):
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
time.sleep(10)
|
||||
ts = int(time.time())
|
||||
if self.ping_ts + self.time_out < ts:
|
||||
# 保持连接
|
||||
@ -82,11 +79,9 @@ class Transmitter:
|
||||
self.db_client.execute(sql)
|
||||
self.log.info(f'进程{self.p} 写入耗时 {int(time.time() * 1000) - ts}')
|
||||
except Exception as e:
|
||||
self.log.error(traceback.format_exc())
|
||||
# 丢弃错误行 再次发送
|
||||
if hasattr(e, 'code') and e.code == 26:
|
||||
m = re.match('(.*)?Stack trace', e.message)
|
||||
self.log.error(data)
|
||||
if m:
|
||||
error_msg = m.group(1)
|
||||
error_row = re.match('.*?errors out of (\d+) rows', error_msg)
|
||||
@ -108,11 +103,10 @@ class Transmitter:
|
||||
def check_table(self, db, tb, data):
|
||||
[self.sketch.alter_table(db, tb, item) for item in data]
|
||||
|
||||
def collect_event(self, db, tb, data):
|
||||
def set_event_attr(self, db, tb, data):
|
||||
if tb != 'event':
|
||||
return
|
||||
|
||||
[self.event_attr.add_event(db, item) for item in data]
|
||||
[self.event_attr.check_attr(db, item) for item in data]
|
||||
|
||||
def check_type(self, db, tb, data):
|
||||
struct_dict = self.sketch.struct_dict[f'{db}_{tb}']
|
||||
@ -130,19 +124,14 @@ class Transmitter:
|
||||
for key in del_keys:
|
||||
del item[key]
|
||||
|
||||
def run(self, kafka_client):
|
||||
def run(self):
|
||||
for tb, buffer in self.check_send():
|
||||
try:
|
||||
data = [self.flat_data(x) for x in buffer.values()]
|
||||
self.check_table(self.db_name, tb, data)
|
||||
self.check_type(self.db_name, tb, data)
|
||||
self.collect_event(self.db_name, tb, data)
|
||||
self.set_event_attr(self.db_name, tb, data)
|
||||
self.__send(self.db_name, tb, [json.dumps(item) for item in data])
|
||||
except Exception as e:
|
||||
self.log.error(traceback.format_exc())
|
||||
self.log.error(data)
|
||||
self.log.error(e)
|
||||
buffer.clear()
|
||||
try:
|
||||
kafka_client.commit()
|
||||
except Exception as e:
|
||||
self.log.error(f'进程:{self.p} error:{e}')
|
||||
|
13
充值视图.sql
13
充值视图.sql
@ -1,13 +0,0 @@
|
||||
drop table if exists xiangsu.recharge_game;
|
||||
create view xiangsu.recharge_game as
|
||||
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||
`#os`,
|
||||
`#bundle_id`,
|
||||
owner_name,
|
||||
channel,
|
||||
arrayDistinct(groupArray(binduid)) as account,
|
||||
length(account) as account_num,
|
||||
sum(money) as money
|
||||
from xiangsu.event
|
||||
where `#event_name` = 'rechargeGame'
|
||||
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel
|
108
初始化事件表.sql
108
初始化事件表.sql
@ -1,61 +1,55 @@
|
||||
-- auto-generated definition
|
||||
create table event
|
||||
drop table if exists shanhai.event;
|
||||
create table shanhai.event
|
||||
(
|
||||
`#ip` Nullable(IPv4),
|
||||
`#country` Nullable(String),
|
||||
`#province` Nullable(String),
|
||||
`#city` Nullable(String),
|
||||
`#os` Nullable(String),
|
||||
`#device_id` Nullable(String),
|
||||
`#screen_height` Nullable(UInt16),
|
||||
`#screen_width` Nullable(UInt16),
|
||||
`#device_model` Nullable(String),
|
||||
`#app_version` Nullable(String),
|
||||
`#bundle_id` Nullable(String),
|
||||
`#app_name` Nullable(String),
|
||||
`#game_version` Nullable(String),
|
||||
`#os_version` Nullable(String),
|
||||
`#network_type` Nullable(String),
|
||||
`#carrier` Nullable(String),
|
||||
`#manufacturer` Nullable(String),
|
||||
`#app_id` Nullable(String),
|
||||
`#account_id` String,
|
||||
`#distinct_id` Nullable(String),
|
||||
binduid Nullable(String),
|
||||
channel Nullable(String),
|
||||
owner_name String default '',
|
||||
role_name Nullable(String),
|
||||
exp Nullable(UInt64),
|
||||
zhanli Nullable(UInt64),
|
||||
maxmapid Nullable(UInt16),
|
||||
mapid Nullable(UInt16),
|
||||
ghid Nullable(String),
|
||||
rmbmoney Nullable(UInt64),
|
||||
jinbi Nullable(UInt64),
|
||||
svrindex Nullable(String),
|
||||
lv Nullable(UInt16),
|
||||
vip Nullable(UInt16),
|
||||
game Nullable(String),
|
||||
`#ip` Nullable(IPv4),
|
||||
`#country` Nullable(String),
|
||||
`#country_code` Nullable(String),
|
||||
`#province` Nullable(String),
|
||||
`#city` Nullable(String),
|
||||
`#os_version` Nullable(String),
|
||||
`#manufacturer` Nullable(String),
|
||||
`#os` Nullable(String),
|
||||
`#device_id` Nullable(String),
|
||||
`#screen_height` Nullable(UInt16),
|
||||
`#screen_width` Nullable(UInt16),
|
||||
`#device_model` Nullable(String),
|
||||
`#app_version` Nullable(String),
|
||||
`#bundle_id` Nullable(String),
|
||||
`#lib` Nullable(String),
|
||||
`#lib_version` Nullable(String),
|
||||
`#network_type` Nullable(String),
|
||||
`#carrier` Nullable(String),
|
||||
`#browser` Nullable(String),
|
||||
`#browser_version` Nullable(String),
|
||||
`#duration` Nullable(String),
|
||||
`#url` Nullable(String),
|
||||
`#url_path` Nullable(String),
|
||||
`#referrer` Nullable(String),
|
||||
`#referrer_host` Nullable(String),
|
||||
`#title` Nullable(String),
|
||||
`#screen_name` Nullable(String),
|
||||
`#element_id` Nullable(String),
|
||||
`#element_type` Nullable(String),
|
||||
`#resume_from_background` Nullable(String),
|
||||
`#element_selector` Nullable(String),
|
||||
`#element_position` Nullable(String),
|
||||
`#element_content` Nullable(String),
|
||||
`#scene` Nullable(String),
|
||||
`#mp_platform` Nullable(String),
|
||||
`#app_crashed_reason` Nullable(String),
|
||||
`#zone_offset` Int8 default 8,
|
||||
`#event_id` String,
|
||||
|
||||
`#zone_offset` Int8 default 8,
|
||||
`#event_time` DateTime('UTC'),
|
||||
`#event_name` String,
|
||||
`#server_time` DateTime('UTC') default now(),
|
||||
`#event_time` DateTime('UTC'),
|
||||
`#account_id` String,
|
||||
`#distinct_id` Nullable(String),
|
||||
`#event_name` String,
|
||||
`#server_time` DateTime('UTC') default now(),
|
||||
|
||||
unitPrice Nullable(UInt32),
|
||||
money Nullable(String),
|
||||
islishishouci Nullable(UInt8),
|
||||
isdangrishouci Nullable(UInt8),
|
||||
is_today_reg Nullable(UInt8),
|
||||
orderid Nullable(String),
|
||||
proid Nullable(String),
|
||||
step_id Nullable(UInt16),
|
||||
step_group Nullable(UInt16),
|
||||
guide_start_time Nullable(UInt32),
|
||||
online_ts Nullable(UInt16),
|
||||
`#time` Nullable(DateTime('UTC'))
|
||||
)
|
||||
engine = ReplacingMergeTree PARTITION BY toYYYYMMDD(`#event_time`)
|
||||
ORDER BY (owner_name, `#event_name`, `#event_time`, `#account_id`)
|
||||
SETTINGS index_granularity = 8192;
|
||||
|
||||
`sign` Int8 default 1
|
||||
|
||||
) ENGINE = CollapsingMergeTree(sign)
|
||||
PARTITION BY toYYYYMMDD(`#event_time`)
|
||||
order by (`#account_id`, `#event_time`, `#event_name`)
|
||||
-- TTL event_time + toIntervalDay(365)
|
@ -1,10 +1,10 @@
|
||||
drop table if exists xiangsu.user;
|
||||
create table xiangsu.user
|
||||
drop table if exists shanhai.user;
|
||||
create table shanhai.user
|
||||
(
|
||||
|
||||
`#reg_time` DateTime('UTC'),
|
||||
`#account_id` String,
|
||||
`svrindex` String,
|
||||
`svrindex` UInt16,
|
||||
|
||||
`#zone_offset` Int8 default 8,
|
||||
`#server_time` DateTime('UTC') default now()
|
||||
|
12
新用户视图.sql
12
新用户视图.sql
@ -1,12 +0,0 @@
|
||||
drop table if exists xiangsu.new_account;
|
||||
create view xiangsu.new_account as
|
||||
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||
`#os`,
|
||||
`#bundle_id`,
|
||||
owner_name,
|
||||
channel,
|
||||
groupArray(`binduid`) as account,
|
||||
length(account) as num
|
||||
from xiangsu.event
|
||||
where role_idx = 1
|
||||
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel
|
23
新账号付费.sql
23
新账号付费.sql
@ -1,23 +0,0 @@
|
||||
drop table if exists xiangsu.new_account_recharge;
|
||||
create view xiangsu.new_account_recharge as (select date,
|
||||
`#os`,
|
||||
`#bundle_id`,
|
||||
owner_name,
|
||||
channel,
|
||||
uniqExact(binduid) as accout_num,
|
||||
sum(money) as money
|
||||
from (select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||
`#os`,
|
||||
`#bundle_id`,
|
||||
owner_name,
|
||||
channel,
|
||||
binduid,
|
||||
money
|
||||
from xiangsu.event
|
||||
where `#event_name` = 'rechargeGame') as tb1
|
||||
right join (select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||
binduid
|
||||
from xiangsu.event
|
||||
where role_idx = 1) as tb2
|
||||
on tb1.date = tb2.date and tb2.binduid = tb1.binduid
|
||||
group by date, `#os`, `#bundle_id`, owner_name, channel)
|
11
活跃账号视图.sql
11
活跃账号视图.sql
@ -1,11 +0,0 @@
|
||||
drop table if exists xiangsu.active_account;
|
||||
create view xiangsu.active_account as
|
||||
select toDate(addHours(`#event_time`, `#zone_offset`)) as date,
|
||||
`#os`,
|
||||
`#bundle_id`,
|
||||
owner_name,
|
||||
channel,
|
||||
uniqCombined(binduid) as num,
|
||||
arrayDistinct(groupArray(binduid)) as account
|
||||
from xiangsu.event
|
||||
group by toDate(addHours(`#event_time`, `#zone_offset`)), `#os`, `#bundle_id`, owner_name, channel
|
Loading…
Reference in New Issue
Block a user