过滤查询时间戳相等

This commit is contained in:
wuhao 2020-12-11 11:52:58 +08:00
parent 81343908e4
commit 95855fb148
2 changed files with 11 additions and 4 deletions

View File

@ -1,5 +1,6 @@
from pymongo import UpdateOne from pymongo import UpdateOne
from pydantic import Field from pydantic import Field
import pandas as pd
from .task import Task from .task import Task
from utils import * from utils import *
@ -16,11 +17,14 @@ class SummaryFunc(Task):
need: list = Field(None, title='消耗') need: list = Field(None, title='消耗')
ftype: str = Field(..., title='功能') ftype: str = Field(..., title='功能')
data: dict = Field(None, title='功能数据') data: dict = Field(None, title='功能数据')
cdate: int = Field(..., title='当天0点')
def cleaning(self, cursor_list): def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict for cursor in cursor_list: # type:dict
for event_coll, ts in cursor.items(): # type:str,dict for event_coll, ts in cursor.items(): # type:str,dict
logger.info(f'开始处理{self.game_name} 处理 {event_coll} ...') if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {event_coll} 游标 {ts}')
where = { where = {
'_event_name': 'Func', '_event_name': 'Func',
'_event_time': { '_event_time': {
@ -33,10 +37,13 @@ class SummaryFunc(Task):
bulk_data = [] bulk_data = []
for item in self.local_db[event_coll].find(where, projection): for item in self.local_db[event_coll].find(where, projection):
try: try:
item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp())
model = self.Model(**item) model = self.Model(**item)
data = model.dict() data = model.dict()
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
except Exception as e: except Exception as e:
logger.error(e) # logger.error(f'ftype {item["ftype"]} msg:{e}')
pass
self.remote_db[self.task_name].bulk_write(bulk_data, ordered=False) self.remote_db[self.task_name].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et']) self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])

View File

@ -46,11 +46,11 @@ class Task(metaclass=abc.ABCMeta):
elif int(time.time()) - last_ts > time_out: elif int(time.time()) - last_ts > time_out:
# 任务超时 # 任务超时
# todo 钉钉通知 # todo 钉钉通知
logger.info('钉钉通知') logger.info(f'{self.game_name} 钉钉通知')
return False return False
else: else:
# 正在运行没超时 # 正在运行没超时
logger.info('正在运行没超时') logger.info(f'{self.game_name} 正在运行没超时')
return False return False
def set_run_ts(self): def set_run_ts(self):