From 95855fb14804b31207b65b170142b2287a3a74b6 Mon Sep 17 00:00:00 2001 From: wuhao <15392746632@qq.com> Date: Fri, 11 Dec 2020 11:52:58 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=87=E6=BB=A4=E6=9F=A5=E8=AF=A2=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E6=88=B3=E7=9B=B8=E7=AD=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- task/summary_func.py | 11 +++++++++-- task/task.py | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/task/summary_func.py b/task/summary_func.py index 0c0299a..73e491e 100644 --- a/task/summary_func.py +++ b/task/summary_func.py @@ -1,5 +1,6 @@ from pymongo import UpdateOne from pydantic import Field +import pandas as pd from .task import Task from utils import * @@ -16,11 +17,14 @@ class SummaryFunc(Task): need: list = Field(None, title='消耗') ftype: str = Field(..., title='功能') data: dict = Field(None, title='功能数据') + cdate: int = Field(..., title='当天0点') def cleaning(self, cursor_list): for cursor in cursor_list: # type:dict for event_coll, ts in cursor.items(): # type:str,dict - logger.info(f'开始处理{self.game_name} 处理 {event_coll} ...') + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {event_coll} 游标 {ts}') where = { '_event_name': 'Func', '_event_time': { @@ -33,10 +37,13 @@ class SummaryFunc(Task): bulk_data = [] for item in self.local_db[event_coll].find(where, projection): try: + item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ + .normalize().timestamp()) model = self.Model(**item) data = model.dict() bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) except Exception as e: - logger.error(e) + # logger.error(f'ftype {item["ftype"]} msg:{e}') + pass self.remote_db[self.task_name].bulk_write(bulk_data, ordered=False) self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et']) diff --git a/task/task.py b/task/task.py index 3a7eeb3..c6232b1 100644 --- a/task/task.py +++ b/task/task.py @@ -46,11 +46,11 @@ class Task(metaclass=abc.ABCMeta): elif int(time.time()) - last_ts > time_out: # 任务超时 # todo 钉钉通知 - logger.info('钉钉通知') + logger.info(f'{self.game_name} 钉钉通知') return False else: # 正在运行没超时 - logger.info('正在运行没超时') + logger.info(f'{self.game_name} 正在运行没超时') return False def set_run_ts(self):