diff --git a/README.md b/README.md index e81e0db..23c6ce2 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,18 @@ xdata 数据清洗 4. 标记为运行状态。run=1 5. 清洗数据入库。 6. 设置本次操作游标。 -7. 标记运行结束。run=0 \ No newline at end of file +7. 标记运行结束。run=0 + + + +## 注意事项 + +事件时间和入库时间 偏差 + +eg: + +事件A时间在 39分57秒发生;入库时间在 40分32秒; + +任务执行时间在 40分0秒,清洗30分0秒~40分0秒的数据,事件A还未入库造成遗漏。 + +解决办法已入库时间为游标 \ No newline at end of file diff --git a/task/first_recharge.py b/task/first_recharge.py index 1397b24..4422d82 100644 --- a/task/first_recharge.py +++ b/task/first_recharge.py @@ -18,6 +18,7 @@ class FirstRecharge(Task): role_level: int = Field(None, title='角色等级') role_vip: int = Field(None, title='vip等级') role_stage: IntStr = Field(None, title='关卡') + event_time: int = Field(..., title="事件时间", alias='_event_time') money: IntFloat = Field(..., title='金额') game_role_id: str = Field(..., min_length=1, title='角色id', alias='_game_role_id') orderid: str = Field(..., min_length=1, title='订单号') @@ -35,7 +36,7 @@ class FirstRecharge(Task): continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') where = { - '_event_time': { + '_ut': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], } @@ -45,8 +46,8 @@ class FirstRecharge(Task): bulk_data = [] for item in self.local_db[source_coll].find(where, projection): try: - item['cdate'] = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \ - .normalize().timestamp()) + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) model = self.Model(**item) data = model.dict(by_alias=True) _game_role_id = data.pop('_game_role_id') diff --git a/task/summary_assets.py b/task/summary_assets.py index b810bc6..f578a58 100644 --- a/task/summary_assets.py +++ b/task/summary_assets.py @@ -57,7 +57,7 @@ class SummaryAssets(Task): '$or': [{'prize.a': {'$in': list(a)}, 'prize.t': {'$in': list(t)}}, {'need.a': {'$in': list(a)}, 'need.t': {'$in': list(t)}}], '_event_name': 'res', - '_event_time': { + '_ut': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], } @@ -65,11 +65,11 @@ class SummaryAssets(Task): projection = self.Model.get_fields() bulk_data = [] - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) for item in self.local_db[source_coll].find(where, projection): try: - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) model = self.Model(**item) data = model.dict(by_alias=True) bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) diff --git a/task/summary_func.py b/task/summary_func.py index e223c75..47a6978 100644 --- a/task/summary_func.py +++ b/task/summary_func.py @@ -30,7 +30,7 @@ class SummaryFunc(Task): continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') where = { - '_event_name': 'Func', + '_ut': 'Func', '_event_time': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], @@ -39,10 +39,10 @@ class SummaryFunc(Task): projection = self.Model.get_fields() bulk_data = [] - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) for item in self.local_db[source_coll].find(where, projection): try: - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) model = self.Model(**item) data = model.dict(by_alias=True) bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) diff --git a/task/summary_funnel.py b/task/summary_funnel.py index 89f1bfd..cb1290f 100644 --- a/task/summary_funnel.py +++ b/task/summary_funnel.py @@ -33,17 +33,17 @@ class SummaryFunnel(Task): where = { '_event_name': 'Guide', 'step': {'$in': step_list}, - '_event_time': { + '_ut': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], } } bulk_data = [] - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) for item in self.local_db[source_coll].find(where): # 所有字段 try: - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) model = self.Model(**item) data = model.dict(by_alias=True) data.update(item) diff --git a/task/summary_join_hd.py b/task/summary_join_hd.py index 9eff6e1..f7094da 100644 --- a/task/summary_join_hd.py +++ b/task/summary_join_hd.py @@ -41,20 +41,20 @@ class SummaryJoinHd(Task): continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') where = { - '_event_name': 'res', + '_ut': 'res', 'function': 'hdgetprize', '_event_time': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], } } - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) projection = self.Model.get_fields() projection.extend(['function_data', 'function_detail']) bulk_data = [] for item in self.local_db[source_coll].find(where, projection): try: - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) item['htype'] = str(item['function_detail']) item['hd_idx'] = 0 if isinstance(item['function_data'], dict): diff --git a/task/summary_open_hd.py b/task/summary_open_hd.py index c5ffd76..f595675 100644 --- a/task/summary_open_hd.py +++ b/task/summary_open_hd.py @@ -29,7 +29,7 @@ class SummaryOpenHd(Task): '_event_name': 'Activity', 'act': 'click', 'htype': {"$exists": 1}, - '_event_time': { + '_ut': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], } @@ -37,11 +37,11 @@ class SummaryOpenHd(Task): projection = self.Model.get_fields() bulk_data = [] - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) for item in self.local_db[source_coll].find(where, projection): try: - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) item['htype'] = str(item['htype']) model = self.Model(**item) data = model.dict(by_alias=True) diff --git a/task/summary_pay.py b/task/summary_pay.py index 9ab6ef9..b3e3038 100644 --- a/task/summary_pay.py +++ b/task/summary_pay.py @@ -39,13 +39,11 @@ class SummaryPay(Task): continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') where = { - '_event_time': { + '_ut': { '$gte': ts['cursor_st'], '$lt': ts['cursor_et'], } } - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \ - .normalize().timestamp()) projection = self.Model.get_fields() bulk_data = [] for item in self.local_db[source_coll].find(where, projection): @@ -54,7 +52,8 @@ class SummaryPay(Task): if orderid.startswith('GM_') or \ orderid.startswith('debugPay'): continue - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) user_info = self.local_db['user'].find_one({'_game_role_id': item['_game_role_id']}, projection) for k, v in user_info.items(): item[k] = item.get(k) or user_info[k] diff --git a/task/summary_shopbuy.py b/task/summary_shopbuy.py index a749510..f3e0249 100644 --- a/task/summary_shopbuy.py +++ b/task/summary_shopbuy.py @@ -36,7 +36,7 @@ class SummaryShopbuy(Task): continue logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') where = { - '_event_name': 'Shop', + '_ut': 'Shop', "act": "buy", '_event_time': { '$gte': ts['cursor_st'], @@ -46,10 +46,10 @@ class SummaryShopbuy(Task): projection = self.Model.get_fields() bulk_data = [] - cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) for item in self.local_db[source_coll].find(where, projection): try: - item['cdate'] = cdate + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) item['needa'] = item['need'][0]['a'] item['needt'] = item['need'][0]['t'] item['needn'] = item['need'][0]['n']