This commit is contained in:
kf_wuhao 2021-01-11 09:58:33 +08:00
parent 2be81a0511
commit a035dff338
17 changed files with 235 additions and 44 deletions

View File

@ -50,5 +50,16 @@
"source_coll": "user",
"dest_coll": "user",
"task_name": "sync_user"
},
"summary_funnel": {
"source_coll": "event",
"dest_coll": "summary_funnel",
"task_name": "summary_funnel"
},
"summary_online_time": {
"source_coll": "event",
"dest_coll": "summary_online_time",
"task_name": "summary_online_time",
"freq": "D"
}
}

View File

@ -1,2 +1,2 @@
from .field_type import (IntStr, IntFloat, MdbObjectId)
from .model import (GBaseModel, )
from .model import (GBaseModel, BaseModel)

View File

@ -1,10 +1,10 @@
from pydantic import BaseModel, Field
from model.field_type import MdbObjectId, IntStr
from model.field_type import IntStr
class GBaseModel(BaseModel):
id: MdbObjectId = Field(..., title="平台", alias='_id')
platform: str = Field(None, title="平台", alias='_platform')
channel_name: str = Field(None, title="channel", alias='_channel_name')
owner_name: str = Field(None, title="owner", alias='_owner_name')
@ -23,6 +23,11 @@ class GBaseModel(BaseModel):
return [v.alias for v in cls.__fields__.values()]
class BaseModel(BaseModel):
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
if __name__ == '__main__':
obj = GBaseModel(_id="5fd0f4812de17aeba6c1a373", role_level='2', aaa=123, _platform=13566, _event_time=123456789)

View File

@ -1,4 +1,5 @@
import os
import sys
class Config:
@ -12,10 +13,14 @@ class Config:
class Production(Config):
DB_PREFIX = 'game'
run_model = 'production'
class Debug(Config):
DB_PREFIX = 'debug'
run_model = 'debug'
settings = Production
if sys.platform.startswith('win'):
settings = Debug

View File

@ -1,7 +1,10 @@
import traceback
import pymongo
from pymongo import UpdateOne
from pydantic import BaseModel, Field
from pydantic import Field
from model import BaseModel
from .task import Task
from utils import *
@ -17,10 +20,6 @@ class AddUserFlag(Task):
channel_uid: str = Field(..., title="channel_uid", alias='_channel_uid')
role_create_time: int = Field(..., title="注册时间戳")
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
@ -94,8 +93,9 @@ class AddUserFlag(Task):
bulk_data.append(
UpdateOne({'_game_role_id': model.game_role_id}, {'$set': {'_first_device_id': model.device_id}}))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -1,10 +1,12 @@
import traceback
from pymongo import UpdateOne
from pydantic import BaseModel, Field
from pydantic import Field
import pandas as pd
from .task import Task
from utils import *
from model import IntStr, IntFloat
from model import IntStr, IntFloat, BaseModel
class FirstRecharge(Task):
@ -52,8 +54,9 @@ class FirstRecharge(Task):
UpdateOne({'_game_role_id': _game_role_id, 'is_recharge': {'$exists': False}},
{'$set': {'is_recharge': data}}))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -1,7 +1,8 @@
import requests
from pydantic import BaseModel, Field
from pydantic import Field
from db import get_local_db
from model import BaseModel
from .task import Task
from utils import *

View File

@ -1,7 +1,10 @@
import traceback
import pymongo
from pymongo import UpdateOne
from pydantic import BaseModel, Field
from pydantic import Field
from model import BaseModel
from .task import Task
from utils import *
@ -15,10 +18,6 @@ class RepairGunfu(Task):
game_role_id: str = Field(None, title="角色id", alias='_game_role_id')
device_id: str = Field(..., title='设备id', alias='_device_id')
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
@ -49,8 +48,9 @@ class RepairGunfu(Task):
UpdateOne({'_game_role_id': model.game_role_id},
{'$set': {'gunfu_num': role_cnt + 1}}, upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -1,8 +1,9 @@
import time
from pydantic import Field, BaseModel
from pydantic import Field
import pandas as pd
from model import BaseModel
from .task import Task
from utils import *
import numpy as np
@ -163,10 +164,6 @@ class Summary3(Task):
is_new_device: int = Field(None, title="新设备")
is_new_channel_uid: int = Field(None, title="新账号")
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict

View File

@ -1,10 +1,12 @@
import traceback
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
from .task import Task
from utils import *
from model import GBaseModel
from model import GBaseModel, MdbObjectId
class SummaryFunc(Task):
@ -13,6 +15,7 @@ class SummaryFunc(Task):
"""
class Model(GBaseModel):
id: MdbObjectId = Field(..., title="id", alias='_id')
prize: list = Field(None, title='奖励')
need: list = Field(None, title='消耗')
ftype: str = Field(..., title='功能')
@ -43,8 +46,9 @@ class SummaryFunc(Task):
data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
except Exception as e:
logger.error(f'ftype {item["ftype"]} msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'ftype {item["ftype"]} 字段异常 {msg}')
logger.error(f'ftype {item["ftype"]} 字段异常 {msg}')
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

58
task/summary_funnel.py Normal file
View File

@ -0,0 +1,58 @@
import traceback
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
from .task import Task
from utils import *
from model import GBaseModel, IntStr, MdbObjectId
class SummaryFunnel(Task):
"""
功能分析
"""
class Model(GBaseModel):
id: MdbObjectId = Field(..., title="id", alias='_id')
cdate: int = Field(..., title='当天0点')
def cleaning(self, cursor_list):
funnel_conf = self.local_db['attr'].find_one({'pname': 'funnel'}, {'data': True})
if not funnel_conf:
ddsend_msg(f'{self.game_name} 请设置漏斗配置')
return
step_list = list({step['cond']['step'] for step in funnel_conf['data']['steps'] if step['cond']['step']})
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = {
'_event_name': 'Guide',
'step': {'$in': step_list},
'_event_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
}
}
bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where): # 所有字段
try:
item['cdate'] = cdate
model = self.Model(**item)
data = model.dict(by_alias=True)
data.update(item)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
except Exception as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])

View File

@ -1,3 +1,5 @@
import traceback
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
@ -51,13 +53,13 @@ class SummaryLogin(Task):
item['cdate'] = cdate
model = self.Model(**item)
data = model.dict(by_alias=True)
data.pop('_id')
bulk_data.append(
UpdateOne({'cdate': cdate, '_game_role_id': data['_game_role_id']}, {'$set': data},
upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -0,0 +1,96 @@
import time
import traceback
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
import numpy as np
from model import BaseModel
from .task import Task
from utils import *
class SummaryOnlineTime(Task):
"""
在线时长
"""
class Model(BaseModel):
cdate: int = Field(..., title='当天0点')
platform: str = Field(..., min_length=1, title="平台", alias='_platform')
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')
device_id: str = Field(..., min_length=1, title='device_id', alias='_device_id')
district_server_id: int = Field(..., title="区服id", alias='_district_server_id')
game_role_id: str = Field(..., min_length=1, title="角色id", alias='_game_role_id')
# 处理天游标
def get_cursor(self):
if not self.cursor_st:
self.cursor_st = self.task_info.get('cursor_et')
if self.cursor_st and self.cursor_st >= int(
pd.Timestamp(time.time(), unit='s', tz=self.timezone).normalize().timestamp()):
self.cursor_st -= 86400
if not self.cursor_st:
self.cursor_st = int(time.time()) - 86400
super().get_cursor()
def generate_cursor_time(self):
date_index = pd.date_range(pd.Timestamp(self.cursor_st, unit='s', tz=self.timezone).normalize(),
pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize(), freq=self.freq)
df = pd.DataFrame(index=date_index[:-1])
df['st'] = df.index
df['et'] = np.append(df.index[1:], [pd.Timestamp(self.cursor_et, unit='s', tz=self.timezone).normalize()])
return df
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
# 查出该游标活跃用户
pipeline = [
{
'$match': {
'_event_name': 'TimeSpending'
}
}, {
'$group': {
'_id': '$_game_role_id',
'ts': {
'$sum': '$ts'
}
}
}
]
group_ts = self.local_db[source_coll].aggregate(pipeline=pipeline)
role_ts = {role['_id']: role['ts'] for role in group_ts}
projection = self.Model.get_fields()
where = {
'_game_role_id': {'$in': list(role_ts)}
}
bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db['user'].find(where, projection):
try:
item['cdate'] = cdate
model = self.Model(**item)
data = model.dict(by_alias=True)
data['ts'] = role_ts[data['_game_role_id']]
bulk_data.append(
UpdateOne({'_game_role_id': data['_game_role_id'], 'cdate': data['cdate']}, {'$set': data},
upsert=True))
logger.debug(f'处理 {data["_game_role_id"]}')
except Exception as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
logger.debug(f'准备写入{len(bulk_data)}')
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
logger.debug('写入完成')
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])

View File

@ -1,5 +1,7 @@
import traceback
from pymongo import UpdateOne
from pydantic import BaseModel, Field
from pydantic import Field
import pandas as pd
from .task import Task
@ -58,11 +60,11 @@ class SummaryPay(Task):
item[k] = item.get(k) or user_info[k]
model = self.Model(**item)
data = model.dict(by_alias=True)
data.pop('_id')
bulk_data.append(UpdateOne({'orderid': model.orderid}, {'$set': data}, upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -1,3 +1,4 @@
import traceback
from typing import List
from pymongo import UpdateOne
@ -6,7 +7,7 @@ import pandas as pd
from .task import Task
from utils import *
from model import GBaseModel, IntStr
from model import GBaseModel, IntStr, MdbObjectId
class SummaryShopbuy(Task):
@ -15,6 +16,7 @@ class SummaryShopbuy(Task):
"""
class Model(GBaseModel):
id: MdbObjectId = Field(..., title="id", alias='_id')
cdate: int = Field(..., title='当天0点')
prize: List[dict] = Field(None, title='奖励')
need: List[dict] = Field(None, title='消耗')
@ -58,8 +60,9 @@ class SummaryShopbuy(Task):
data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -1,3 +1,5 @@
import traceback
from pymongo import UpdateOne
from .task import Task
@ -27,8 +29,9 @@ class SyncUser(Task):
UpdateOne({'_game_role_id': item['_game_role_id']},
{'$set': item}, upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])

View File

@ -33,6 +33,7 @@ class Task(metaclass=abc.ABCMeta):
'name': self.task_name
}
self.task_info = self.get_task_info()
logger.debug(f'初始化完成 当前{settings.run_model}模式')
def get_task_info(self):
task_info = self.task_coll.find_one(self.task_where) or {}
@ -157,7 +158,7 @@ class Task(metaclass=abc.ABCMeta):
pass
def run(self):
if not self.check_run():
if settings.run_model == 'production' and not self.check_run():
return '运行中...'
self.set_run_ts()
self.set_run_status(True)