添加data_id

This commit is contained in:
wuaho 2021-10-15 16:29:44 +08:00
parent 0c05a58509
commit 0a49a7b119
6 changed files with 125 additions and 11 deletions

14
app.py
View File

@ -1,9 +1,10 @@
import copy import copy
import datetime import datetime
import os
import schedule import schedule
from utils import create_consumer, PostData from utils import create_consumer, PostData,get_data_id
from core import settings from core import settings
consumer, client = create_consumer() consumer, client = create_consumer()
@ -24,9 +25,12 @@ def handler_userid(properties):
properties['user_id'] = properties['binduid'] properties['user_id'] = properties['binduid']
def run(): def run():
post_data = PostData() post_data = PostData()
schedule.every(60).seconds.do(post_data.post) schedule.every(60).seconds.do(post_data.post)
idx = get_data_id()
for msg in consumer(): for msg in consumer():
data: dict = msg.value data: dict = msg.value
if data.get('#type') != 'track': if data.get('#type') != 'track':
@ -44,8 +48,12 @@ def run():
if event_name not in settings.legu_to_sm_event: if event_name not in settings.legu_to_sm_event:
continue continue
if 'shoumeng' not in data.get('owner_name'): # if 'shoumeng' not in (data.get('owner_name') or ''):
continue # continue
idx += 1
data['data_id'] = idx
send_data = copy.deepcopy(settings.template_data) send_data = copy.deepcopy(settings.template_data)

View File

@ -64,7 +64,8 @@ class GlobalConfig(BaseSettings):
'lv': 'role_rank', 'lv': 'role_rank',
'vip': 'role_vip', 'vip': 'role_vip',
# '':'castle_rank', # 城堡等级 # '':'castle_rank', # 城堡等级
'exp': 'exp' 'exp': 'exp',
'data_id': 'data_id'
} }
# 事件属性映射 # 事件属性映射
@ -195,11 +196,11 @@ class DevConfig(GlobalConfig):
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads, 'value_deserializer': json.loads,
'auto_offset_reset': 'earliest', 'auto_offset_reset': 'earliest',
# 'enable_auto_commit': True, 'enable_auto_commit': True,
# 'auto_commit_interval_ms': 10000, 'auto_commit_interval_ms': 10000,
# 每个游戏不一样 # 每个游戏不一样
'group_id': 'shoumeng_xiangsu_debug' # 'shou_meng_xiangsu' 'group_id': 'shoumeng_xiangsu_debug' # '测试'
} }
SUBSCRIBE_TOPIC: str = 'debug' SUBSCRIBE_TOPIC: str = 'debug'
@ -216,11 +217,11 @@ class ProdConfig(GlobalConfig):
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads, 'value_deserializer': json.loads,
'auto_offset_reset': 'earliest', 'auto_offset_reset': 'earliest',
# 'enable_auto_commit': True, 'enable_auto_commit': True,
# 'auto_commit_interval_ms': 10000, 'auto_commit_interval_ms': 10000,
# 每个游戏不一样 # 每个游戏不一样
'group_id': 'shou_meng_xiangsu2' # 'shou_meng_xiangsu' 'group_id': 'shou_meng_xiangsu2' # '正式的'
} }
SUBSCRIBE_TOPIC = 'xiangsu' SUBSCRIBE_TOPIC = 'xiangsu'

72
resend.py Normal file
View File

@ -0,0 +1,72 @@
import copy
from clickhouse_driver import Client
import pandas as pd
from core import settings
from utils import PostData
post_data = PostData()
def handler_os(properties):
os_: str = properties.get('#os', 'Android')
if os_.lower() == 'ios':
properties['#os'] = 102
properties['#os'] = 101
def handler_money(properties):
properties['unitPrice'] = int(properties.get('unitPrice', 0) / 100)
def handler_userid(properties):
properties['user_id'] = properties['binduid']
client = Client(**{'host': '139.159.159.3',
'port': 9654,
'user': 'legu',
'password': 'gncPASUwpYrc'
})
dates = pd.date_range('2021-09-29 00:00:00', '2021-09-30 23:59:59', freq='30T')
for i in range(len(dates) - 1):
s = dates[i].strftime('%Y-%m-%d %H:%M:%S')
e = (dates[i + 1] - pd.Timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S')
sql = f"""select * from xiangsu.event where addHours(`#event_time`,8) between '{s}' and '{e}' """
print(sql)
data, columns = client.execute(sql, with_column_types=True, columnar=True)
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
if df.empty:
continue
df['#event_time'] = df['#event_time'].apply(lambda x: int(x.timestamp()))
datas = df.T.to_dict().values()
for data in datas:
event_name = data.get('#event_name')
if event_name not in settings.legu_to_sm_event:
continue
if 'shoumeng' not in data.get('owner_name'):
continue
send_data = copy.deepcopy(settings.template_data)
handler_os(data)
if event_name == 'pay':
handler_money(data)
handler_userid(data)
send_data['event']['event_name'] = settings.legu_to_sm_event[event_name]
send_data['event']['event_time'] = data['#event_time']
for k, v in settings.legu_to_sm_attr_base.items():
send_data['data'][v] = data.get(k)
for k, v in settings.legu_to_sm_attr.get(event_name, dict()).items():
send_data['data'][v] = data.get(k)
if post_data.is_upload():
post_data.post()
post_data.add(send_data)
post_data.post()

View File

@ -1,2 +1,3 @@
from .consumer import * from .consumer import *
from .post_data import PostData from .post_data import PostData
from .data_idx import *

27
utils/data_idx.py Normal file
View File

@ -0,0 +1,27 @@
import os
def get_data_id():
data_id_file = 'data_id.txt'
if not os.path.exists(data_id_file):
with open(data_id_file, 'w') as f:
idx = 0
f.write(f'{idx:0>15d}\n')
f.write(f'{idx:0>15d}\n')
with open(data_id_file, 'rb') as f:
f.seek(-16, 2)
s = f.read().decode()
idx = int(s)
print(idx)
return idx
def set_data_id(idx: int):
data_id_file = 'data_id.txt'
with open(data_id_file, 'a') as f:
f.write(f'{idx:0>15d}\n')
if __name__ == '__main__':
get_data_id()

View File

@ -8,6 +8,7 @@ import time
import requests import requests
from core import settings from core import settings
from .data_idx import set_data_id
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
@ -52,8 +53,12 @@ class PostData:
except Exception as e: except Exception as e:
print(e) print(e)
finally: finally:
self.set_idx()
self.clear_data() self.clear_data()
def set_idx(self):
set_data_id(self.data_list[-1]['data']['data_id'])
def clear_data(self): def clear_data(self):
self.data_list.clear() self.data_list.clear()