This commit is contained in:
kf_wuhao 2021-01-18 10:52:48 +08:00
parent 35739a1a92
commit c05e827970
9 changed files with 40 additions and 26 deletions

View File

@ -10,4 +10,18 @@ xdata 数据清洗
4. 标记为运行状态。run=1 4. 标记为运行状态。run=1
5. 清洗数据入库。 5. 清洗数据入库。
6. 设置本次操作游标。 6. 设置本次操作游标。
7. 标记运行结束。run=0 7. 标记运行结束。run=0
## 注意事项
事件时间和入库时间 偏差
eg
事件A时间在 39分57秒发生入库时间在 40分32秒
任务执行时间在 40分0秒清洗30分0秒~40分0秒的数据事件A还未入库造成遗漏。
解决办法已入库时间为游标

View File

@ -18,6 +18,7 @@ class FirstRecharge(Task):
role_level: int = Field(None, title='角色等级') role_level: int = Field(None, title='角色等级')
role_vip: int = Field(None, title='vip等级') role_vip: int = Field(None, title='vip等级')
role_stage: IntStr = Field(None, title='关卡') role_stage: IntStr = Field(None, title='关卡')
event_time: int = Field(..., title="事件时间", alias='_event_time')
money: IntFloat = Field(..., title='金额') money: IntFloat = Field(..., title='金额')
game_role_id: str = Field(..., min_length=1, title='角色id', alias='_game_role_id') game_role_id: str = Field(..., min_length=1, title='角色id', alias='_game_role_id')
orderid: str = Field(..., min_length=1, title='订单号') orderid: str = Field(..., min_length=1, title='订单号')
@ -35,7 +36,7 @@ class FirstRecharge(Task):
continue continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_time': { '_ut': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
} }
@ -45,8 +46,8 @@ class FirstRecharge(Task):
bulk_data = [] bulk_data = []
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \ item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp()) .normalize().timestamp())
model = self.Model(**item) model = self.Model(**item)
data = model.dict(by_alias=True) data = model.dict(by_alias=True)
_game_role_id = data.pop('_game_role_id') _game_role_id = data.pop('_game_role_id')

View File

@ -57,7 +57,7 @@ class SummaryAssets(Task):
'$or': [{'prize.a': {'$in': list(a)}, 'prize.t': {'$in': list(t)}}, '$or': [{'prize.a': {'$in': list(a)}, 'prize.t': {'$in': list(t)}},
{'need.a': {'$in': list(a)}, 'need.t': {'$in': list(t)}}], {'need.a': {'$in': list(a)}, 'need.t': {'$in': list(t)}}],
'_event_name': 'res', '_event_name': 'res',
'_event_time': { '_ut': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
} }
@ -65,11 +65,11 @@ class SummaryAssets(Task):
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
model = self.Model(**item) model = self.Model(**item)
data = model.dict(by_alias=True) data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))

View File

@ -30,7 +30,7 @@ class SummaryFunc(Task):
continue continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_name': 'Func', '_ut': 'Func',
'_event_time': { '_event_time': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
@ -39,10 +39,10 @@ class SummaryFunc(Task):
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
model = self.Model(**item) model = self.Model(**item)
data = model.dict(by_alias=True) data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))

View File

@ -33,17 +33,17 @@ class SummaryFunnel(Task):
where = { where = {
'_event_name': 'Guide', '_event_name': 'Guide',
'step': {'$in': step_list}, 'step': {'$in': step_list},
'_event_time': { '_ut': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
} }
} }
bulk_data = [] bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where): # 所有字段 for item in self.local_db[source_coll].find(where): # 所有字段
try: try:
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
model = self.Model(**item) model = self.Model(**item)
data = model.dict(by_alias=True) data = model.dict(by_alias=True)
data.update(item) data.update(item)

View File

@ -41,20 +41,20 @@ class SummaryJoinHd(Task):
continue continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_name': 'res', '_ut': 'res',
'function': 'hdgetprize', 'function': 'hdgetprize',
'_event_time': { '_event_time': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
} }
} }
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
projection = self.Model.get_fields() projection = self.Model.get_fields()
projection.extend(['function_data', 'function_detail']) projection.extend(['function_data', 'function_detail'])
bulk_data = [] bulk_data = []
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
item['htype'] = str(item['function_detail']) item['htype'] = str(item['function_detail'])
item['hd_idx'] = 0 item['hd_idx'] = 0
if isinstance(item['function_data'], dict): if isinstance(item['function_data'], dict):

View File

@ -29,7 +29,7 @@ class SummaryOpenHd(Task):
'_event_name': 'Activity', '_event_name': 'Activity',
'act': 'click', 'act': 'click',
'htype': {"$exists": 1}, 'htype': {"$exists": 1},
'_event_time': { '_ut': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
} }
@ -37,11 +37,11 @@ class SummaryOpenHd(Task):
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
item['htype'] = str(item['htype']) item['htype'] = str(item['htype'])
model = self.Model(**item) model = self.Model(**item)
data = model.dict(by_alias=True) data = model.dict(by_alias=True)

View File

@ -39,13 +39,11 @@ class SummaryPay(Task):
continue continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_time': { '_ut': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
'$lt': ts['cursor_et'], '$lt': ts['cursor_et'],
} }
} }
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \
.normalize().timestamp())
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
@ -54,7 +52,8 @@ class SummaryPay(Task):
if orderid.startswith('GM_') or \ if orderid.startswith('GM_') or \
orderid.startswith('debugPay'): orderid.startswith('debugPay'):
continue continue
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
user_info = self.local_db['user'].find_one({'_game_role_id': item['_game_role_id']}, projection) user_info = self.local_db['user'].find_one({'_game_role_id': item['_game_role_id']}, projection)
for k, v in user_info.items(): for k, v in user_info.items():
item[k] = item.get(k) or user_info[k] item[k] = item.get(k) or user_info[k]

View File

@ -36,7 +36,7 @@ class SummaryShopbuy(Task):
continue continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_name': 'Shop', '_ut': 'Shop',
"act": "buy", "act": "buy",
'_event_time': { '_event_time': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
@ -46,10 +46,10 @@ class SummaryShopbuy(Task):
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = cdate item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
item['needa'] = item['need'][0]['a'] item['needa'] = item['need'][0]['a']
item['needt'] = item['need'][0]['t'] item['needt'] = item['need'][0]['t']
item['needn'] = item['need'][0]['n'] item['needn'] = item['need'][0]['n']