Compare commits

..

6 Commits

Author SHA1 Message Date
6cb5391eb0 add summary3.py 2021-01-08 15:41:28 +08:00
c445cc0cb8 update 2020-12-26 18:16:50 +08:00
019b16a39a update 2020-12-26 18:08:08 +08:00
1f289c5ca9 update 2020-12-26 17:01:32 +08:00
1091240d45 update 2020-12-26 16:50:17 +08:00
fbf375bd45 update 2020-12-26 14:14:59 +08:00
6 changed files with 559 additions and 16 deletions

View File

@ -15,7 +15,7 @@
"task_name": "repair_gunfu"
},
"summary_login": {
"source_coll": "event",
"source_coll": "user",
"dest_coll": "summary_login",
"task_name": "summary_login"
},
@ -43,7 +43,8 @@
"source_coll": "user",
"dest_coll": "",
"task_name": "summary3",
"freq": "D"
"freq": "D",
"is_inc": true
},
"sync_user": {
"source_coll": "user",

View File

@ -1,6 +1,6 @@
from pydantic import BaseModel, Field
from model.field_type import MdbObjectId
from model.field_type import MdbObjectId, IntStr
class GBaseModel(BaseModel):
@ -16,7 +16,7 @@ class GBaseModel(BaseModel):
role_create_time: int = Field(None, title="角色创建时间")
role_level: int = Field(None, title="角色等级")
role_vip: int = Field(None, title="角色vip等级")
role_stage: int = Field(None, title="关卡")
role_stage: IntStr = Field(None, title="关卡")
@classmethod
def get_fields(cls):

View File

@ -75,9 +75,28 @@ class AddUserFlag(Task):
bulk_data.append(
UpdateOne({'_game_role_id': model.game_role_id},
{'$set': {'is_new_device': 1}}))
except Exception as e:
logger.error(f'msg:{e}')
pass
# 记录第一次登录设备id
where = {
'role_create_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
},
'_first_device_id': ''
}
for item in self.local_db[source_coll].find(where, projection):
try:
# 新设备
model = self.Model(**item)
bulk_data.append(
UpdateOne({'_game_role_id': model.game_role_id}, {'$set': {'_first_device_id': model.device_id}}))
except Exception as e:
logger.error(f'msg:{e}')
# pass
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

521
task/summary3.py Normal file
View File

@ -0,0 +1,521 @@
import time
from pydantic import Field, BaseModel
import pandas as pd
from .task import Task
from utils import *
import numpy as np
# LTV 天数
LTV_DAYS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360]
# 留存的天数
RETAIN_DAYS = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55,
56,
57, 58, 59, 60, 65, 70, 75, 80, 85, 90, 100, 110, 120, 150, 180, 210, 240, 270, 300, 330, 360]
def role_struct(cdate, owner, channel, platform):
data = {
'cdate': cdate,
'_channel_name': channel,
'_owner_name': owner,
'_platform': platform,
'new_role_list': [],
'new_role': 0,
'now_pay_role_list': [],
'now_pay_role': 0,
'role_now_login': 0,
'pay_role_now_login': 0,
'now_pay_money': 0,
}
role_login_n = {'role_login_' + str(day): 0 for day in LTV_DAYS}
pay_role_login_n = {'pay_role_login_' + str(day): 0 for day in RETAIN_DAYS}
role_all_money_n = {'role_all_money_' + str(day): 0 for day in LTV_DAYS}
data.update(role_login_n)
data.update(pay_role_login_n)
data.update(role_all_money_n)
return data
def device_struct(cdate, owner, channel, platform):
data = {
'cdate': cdate,
'_channel_name': channel,
'_owner_name': owner,
'_platform': platform,
'new_device_list': [],
'new_device': 0,
'now_pay_device_list': [],
'now_pay_device': 0,
'device_now_login': 0,
'pay_device_now_login': 0,
'now_pay_money': 0,
}
device_login_n = {'device_login_' + str(day): 0 for day in RETAIN_DAYS}
pay_device_login_n = {'pay_device_login_' + str(day): 0 for day in RETAIN_DAYS}
device_all_money_n = {'device_all_money_' + str(day): 0 for day in LTV_DAYS}
data.update(device_login_n)
data.update(pay_device_login_n)
data.update(device_all_money_n)
return data
def account_struct(cdate, owner, channel, platform):
data = {
'cdate': cdate,
'_channel_name': channel,
'_owner_name': owner,
'_platform': platform,
'new_account_list': [],
'new_account': 0,
'now_pay_account_list': [],
'now_pay_account': 0,
'account_now_login': 0,
'pay_account_now_login': 0,
'now_pay_money': 0,
}
account_login_n = {'account_login_' + str(day): 0 for day in RETAIN_DAYS}
pay_account_login_n = {'pay_account_login_' + str(day): 0 for day in RETAIN_DAYS}
account_all_money_n = {'account_all_money_' + str(day): 0 for day in LTV_DAYS}
data.update(account_login_n)
data.update(pay_account_login_n)
data.update(account_all_money_n)
return data
class HandlerWrapper:
handler_link = []
def __init__(self, func):
self.func = func
HandlerWrapper.handler_link.append(func)
def __call__(self, *args, **kwargs):
self.func(*args, **kwargs)
class HandlerWrapperInc:
handler_link = []
def __init__(self, func):
self.func = func
HandlerWrapperInc.handler_link.append(func)
def __call__(self, *args, **kwargs):
self.func(*args, **kwargs)
class Summary3(Task):
"""
Summary3
"""
handler_link = []
# 处理天游标
def get_cursor(self):
if not self.cursor_st:
self.cursor_st = int(time.time()) - 86400
super().get_cursor()
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq)
df = pd.DataFrame(index=date_index[:-1])
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()])
return df
class Model(BaseModel):
game_role_id: str = Field(..., title="角色id", alias='_game_role_id')
platform: str = Field(..., min_length=1, title="平台", alias='_platform')
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')
device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id')
district_server_id: int = Field(..., title="区服id", alias='_district_server_id')
role_create_time: int = Field(..., title="角色创建时间")
is_new_device: int = Field(None, title="新设备")
is_new_channel_uid: int = Field(None, title="新账号")
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 游标 {ts}')
# 这一天新用户
where = {
'role_create_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
},
'_owner_name': {'$nin': ['dev', 'banshu', 'tishen']},
'_platform': {'$exists': True},
'_channel_name': {'$exists': True}
}
projection = self.Model.get_fields()
user_df = pd.DataFrame(self.local_db[self.source_coll].find(where, projection))
# self.handler_summary(user_df, ts['cursor_st'])
if self.is_inc:
self.handler_summary_inc(ts['cursor_st'])
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])
def handler_summary_inc(self, cdate):
params = (self, cdate)
list(map(lambda handler: handler(*params), HandlerWrapperInc.handler_link))
@HandlerWrapperInc
def handler_inc_init(self, cdate):
"""
设置初始值
"""
yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp())
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
def update(cat):
cursor = self.local_db[f'summary_{cat}1'].find({'cdate': {'$gte': min_ts, '$lt': cdate}})
for doc in cursor:
doc_date = doc['cdate']
age_day = (cdate - doc_date) // 86400 + 1
data = {f'{cat}_all_money_{age_day}': doc[f'{cat}_all_money_{age_day - 1}']}
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
update('role')
update('account')
update('device')
@HandlerWrapperInc
def handler_money_inc(self, cdate):
yesterday_ts = int(pd.Timestamp(cdate, unit='s',
tz=self.timezone).normalize().timestamp())
df = pd.DataFrame(self.local_db['summary_pay'].find({'cdate': yesterday_ts}, {
'_id': False,
'_game_role_id': True,
'_channel_uid': True,
'_device_id': True,
'role_create_time': True,
}))
if df.shape == (0, 0):
return
role_list = list(df['_game_role_id'].unique())
account_list = list(df['_channel_uid'].unique())
device_list = list(df['_device_id'].unique())
min_ts = yesterday_ts - max(RETAIN_DAYS + LTV_DAYS) * 86400
def update(cat, cat_list):
update_doc = dict()
for cid in cat_list:
data = self.local_db[f'summary_{cat}1'].find_one(
{'cdate': {'$gte': min_ts, '$lte': yesterday_ts}, f'new_{cat}_list': cid}, {
'_id': True,
'cdate': True,
f'now_pay_{cat}_list': True,
})
if not data:
continue
update_doc[data['_id']] = data
# 添加新充值用户
data[f'now_pay_{cat}_list'] = list(set(data[f'now_pay_{cat}_list']) | {cid})
data[f'now_pay_{cat}'] = len(data[f'now_pay_{cat}_list'])
self.local_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': data['_id']}, {'$set': data})
# 计算累计充值
for id_, doc in update_doc.items():
pipeline = [
{
'$match': {
"cdate": {'$gte': doc['cdate'], '$lte': cdate},
f'_game_{cat}_id': {'$in': doc[f'now_pay_{cat}_list']},
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
try:
sum_money = self.local_db['summary_pay'].aggregate(pipeline).next()
except StopIteration:
continue
age_day = (cdate - doc['cdate']) // 86400 + 1
if sum_money:
data = {f'{cat}_all_money_{age_day}': sum_money['sum_money']}
self.local_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': id_}, {'$set': data})
update('role', role_list)
update('account', account_list)
update('device', device_list)
@HandlerWrapperInc
def handler_login_inc(self, cdate):
yesterday_ts = int(pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize().timestamp())
df = pd.DataFrame(self.local_db['summary_login'].find({'cdate': yesterday_ts}, {
'_id': False,
'_game_role_id': True,
'_channel_uid': True,
'_device_id': True,
'role_create_time': True,
}))
if df.shape == (0, 0):
return
df = df[df['role_create_time'] > 0]
df['role_cdate'] = df['role_create_time'].apply(
lambda x: int(pd.Timestamp(x, unit='s', tz=self.timezone).normalize().timestamp()))
df.rename(columns={'_game_role_id': 'role_id', '_channel_uid': 'account_id', '_device_id': 'device_id'},
inplace=True)
def update(cat):
cursor = self.local_db[f'summary_{cat}1'].find(
{'cdate': role_cdate}, {
'_id': True,
f'now_pay_{cat}_list': True,
f'new_{cat}_list': True,
})
for doc in cursor:
data = dict()
age_day = (cdate - role_cdate) // 86400 + 1
n = df[df[f'{cat}_id'].isin(doc[f'now_pay_{cat}_list'])].shape[0]
if n:
data[f'pay_{cat}_login_{age_day}'] = n
n = df[df[f'{cat}_id'].isin(doc[f'new_{cat}_list'])].shape[0]
if n:
data[f'{cat}_login_{age_day}'] = n
if data:
self.local_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
self.remote_db[f'summary_{cat}1'].update_one({'_id': doc['_id']}, {'$set': data})
for role_cdate in df['role_cdate'].unique():
role_cdate = int(role_cdate)
update('role')
update('account')
update('device')
def handler_summary(self, user_df, cdate):
# 日期-owner-channel-platform 相同为一个文档
#
if user_df.shape == (0, 0):
return
user_group = user_df.groupby(['_owner_name', '_channel_name', '_platform'])
for group in user_group.groups:
df = user_group.get_group(group)
role_data = role_struct(cdate, *group)
device_data = device_struct(cdate, *group)
account_data = account_struct(cdate, *group)
params = (self, role_data, device_data, account_data, df, group, cdate)
list(map(lambda handler: handler(*params), HandlerWrapper.handler_link))
@HandlerWrapper
def handler_now(self, role_data, device_data, account_data, df, group, cdate):
role_data['new_role_list'] = list(df['_game_role_id'].unique())
role_data['new_role'] = len(role_data['new_role_list'])
device_data['new_device_list'] = list(df[df['is_new_device'] == 1]['_device_id'].unique())
device_data['new_device'] = len(device_data['new_device_list'])
account_data['new_account_list'] = list(df[df['is_new_channel_uid'] == 1]['_channel_uid'].unique())
account_data['new_account'] = len(account_data['new_account_list'])
@HandlerWrapper
def handler_pay(self, role_data, device_data, account_data, df, group, cdate):
where = {
'cdate': {'$gte': cdate}
}
all_pay_device_list = self.local_db['summary_pay'].distinct('_device_id', where)
all_pay_account_list = self.local_db['summary_pay'].distinct('_channel_uid', where)
all_pay_role_list = self.local_db['summary_pay'].distinct('_game_role_id', where)
pay_device_list = set(all_pay_device_list) & set(device_data['new_device_list'])
pay_account_list = set(all_pay_account_list) & set(account_data['new_account_list'])
pay_role_list = set(all_pay_role_list) & set(role_data['new_role_list'])
role_data['now_pay_role_list'] = list(pay_role_list)
role_data['now_pay_role'] = len(role_data['now_pay_role_list'])
account_data['now_pay_account_list'] = list(pay_account_list)
account_data['now_pay_account'] = len(account_data['now_pay_account_list'])
device_data['now_pay_device_list'] = list(pay_device_list)
device_data['now_pay_device'] = len(device_data['now_pay_device_list'])
@HandlerWrapper
def handler_pay_login(self, role_data, device_data, account_data, df, group, cdate):
st = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp())
et = st + 86400
role_data['pay_role_now_login'] = self.local_db['summary_login'].count({
'cdate': {'$gte': st, '$lt': et},
'_game_role_id': {'$in': role_data['now_pay_role_list']}
})
account_data['pay_account_now_login'] = self.local_db['summary_login'].count({
'cdate': {'$gte': st, '$lt': et},
'_channel_uid': {'$in': account_data['now_pay_account_list']}
})
device_data['pay_device_now_login'] = self.local_db['summary_login'].count({
'cdate': {'$gte': st, '$lt': et},
'_first_device_id': {'$in': device_data['now_pay_device_list']}
})
@HandlerWrapper
def handler_login(self, role_data, device_data, account_data, df, group, cdate):
ts = int(pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize().timestamp())
ts -= 86400
role_data['role_now_login'] = self.local_db['summary_login'].count({
'cdate': ts,
'_game_role_id': {'$in': role_data['new_role_list']}
})
account_data['account_now_login'] = self.local_db['summary_login'].count({
'cdate': ts,
'_channel_uid': {'$in': account_data['new_account_list']}
})
device_data['device_now_login'] = self.local_db['summary_login'].count({
'cdate': ts,
'_first_device_id': {'$in': device_data['new_device_list']}
})
@HandlerWrapper
def handler_login_day(self, role_data, device_data, account_data, df, group, cdate):
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
for day in RETAIN_DAYS:
day_n = c_day + pd.Timedelta(days=(day - 1))
if day_n >= today:
continue
ts = int(day_n.timestamp())
role_data[f'role_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_game_role_id': {'$in': role_data['new_role_list']}
})
account_data[f'account_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_channel_uid': {'$in': account_data['new_account_list']}
})
device_data[f'device_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_first_device_id': {'$in': device_data['new_device_list']}
})
@HandlerWrapper
def handler_login_day_pay(self, role_data, device_data, account_data, df, group, cdate):
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
for day in RETAIN_DAYS:
day_n = c_day + pd.Timedelta(days=(day - 1))
if day_n >= today:
continue
ts = int(day_n.timestamp())
role_data[f'pay_role_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_game_role_id': {'$in': role_data['now_pay_role_list']}
})
account_data[f'pay_account_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_channel_uid': {'$in': account_data['now_pay_account_list']}
})
device_data[f'pay_device_login_{day}'] = self.local_db['summary_login'].count({
'cdate': ts,
'_first_device_id': {'$in': device_data['now_pay_device_list']}
})
@HandlerWrapper
def handler_money_n(self, role_data, device_data, account_data, df, group, cdate):
c_day = pd.Timestamp(cdate, unit='s', tz=self.timezone).normalize()
today = pd.Timestamp(int(time.time()), unit='s', tz=self.timezone).normalize()
for day in LTV_DAYS:
day_n = c_day + pd.Timedelta(days=(day - 1))
if day_n >= today:
continue
ts = int(day_n.timestamp())
pipeline = [
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
'_game_role_id': {'$in': role_data['now_pay_role_list']}
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
role_data[f'role_all_money_{day}'] = tmp[0].get('sum_money', 0)
pipeline = [
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
'_channel_uid': {'$in': account_data['now_pay_account_list']}
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
account_data[f'account_all_money_{day}'] = tmp[0].get('sum_money', 0)
pipeline = [
{'$match': {'cdate': {'$gte': cdate, '$lte': ts},
'_first_device_id': {'$in': device_data['now_pay_device_list']}
}
},
{'$group': {'_id': None, 'sum_money': {'$sum': '$money'}}}
]
tmp = list(self.local_db['summary_pay'].aggregate(pipeline)) or [{}]
device_data[f'device_all_money_{day}'] = tmp[0].get('sum_money', 0)
@HandlerWrapper
def save_today(self, role_data, device_data, account_data, df, group, cdate):
def update(cat, data):
if not data.get(f'new_{cat}'):
return
obj_id = self.local_db[f'summary_{cat}1'].update_one(
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
{'$set': data}, upsert=True)
if obj_id.upserted_id:
data['_id'] = obj_id.upserted_id
self.remote_db[f'summary_{cat}1'].update_one(
{'_owner_name': group[0], '_channel_name': group[1], '_platform': group[2], 'cdate': cdate},
{'$set': data}, upsert=True)
update('role', role_data)
update('device', device_data)
update('account', account_data)

View File

@ -14,7 +14,7 @@ class SummaryLogin(Task):
class Model(GBaseModel):
cdate: int = Field(..., title='当天0点')
# first_device_id: str = Field(None, title='第一次登录设备id', alias='_first_device_id')
first_device_id: str = Field(None, title='第一次登录设备id', alias='_first_device_id')
manufacturer: str = Field(None, title='设备品牌', alias='_manufacturer')
model: str = Field(None, title='型号', alias='_model')
os_version: str = Field(None, title='系统版本', alias='_os_version')
@ -29,11 +29,10 @@ class SummaryLogin(Task):
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = {
'_event_time': {
'_ut': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
},
'_event_name': 'TimeSpending' # 在线时长打点 30s
}
}
projection = self.Model.get_fields()
@ -47,14 +46,15 @@ class SummaryLogin(Task):
# 还没有记录的
role_set = set(role_list) - set(exists_role_list)
for role_id in role_set:
item = self.local_db[source_coll].find_one(
{'_game_role_id': role_id, '_event_name': 'TimeSpending'}, projection)
for item in self.local_db[source_coll].find({'_game_role_id': {'$in': list(role_set)}}, projection):
try:
item['cdate'] = cdate
model = self.Model(**item)
data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
data.pop('_id')
bulk_data.append(
UpdateOne({'cdate': cdate, '_game_role_id': data['_game_role_id']}, {'$set': data},
upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass

View File

@ -16,13 +16,16 @@ from utils import *
class Task(metaclass=abc.ABCMeta):
def __init__(self, *args, **kwargs):
self.game_name = kwargs.get('game_name')
self.game_db = f'game_{self.game_name}'
self.source_coll = kwargs.get('source_coll')
self.dest_coll = kwargs.get('dest_coll')
self.cursor_st = kwargs.get('st')
self.cursor_et = kwargs.get('et')
self.timezone = kwargs.get('timezone')
self.task_name = kwargs.get("task_name")
self.freq = kwargs.get('freq', '30T')
for k, v in kwargs.items():
self.__setattr__(k, v)
self.game_db = f'game_{self.game_name}'
self.local_db = get_local_db(self.game_db)
self.remote_db = get_remote_db(self.game_db)
self.task_coll = self.local_db['task2']
@ -31,8 +34,6 @@ class Task(metaclass=abc.ABCMeta):
}
self.task_info = self.get_task_info()
self.freq = kwargs.get('freq', '30T')
def get_task_info(self):
task_info = self.task_coll.find_one(self.task_where) or {}
return task_info
@ -109,7 +110,8 @@ class Task(metaclass=abc.ABCMeta):
def get_event_coll(self) -> list:
"""
根据游标时间戳 返回要处理的集合
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}}, {'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
:return: [{'event_2020-12-10': {'cursor_st': 1607608848, 'cursor_et': 1607610648}},
{'event_2020-12-10': {'cursor_st': 1607610648, 'cursor_et': 1607610791}}]
"""
df = self.generate_cursor_time()
df['event_coll_s'] = df['st'].apply(lambda x: f'event_{x.date().strftime("%Y-%m-%d")}')