This commit is contained in:
kf_wuhao 2021-01-11 19:43:57 +08:00
parent 8c7ee6c944
commit 4a1e6fa18c
3 changed files with 133 additions and 0 deletions

View File

@ -71,5 +71,10 @@
"source_coll": "event", "source_coll": "event",
"dest_coll": "summary_open_hd", "dest_coll": "summary_open_hd",
"task_name": "summary_open_hd" "task_name": "summary_open_hd"
},
"summary_assets": {
"source_coll": "event",
"dest_coll": "summary_assets",
"task_name": "summary_assets"
} }
} }

View File

@ -0,0 +1,47 @@
import json
from db import get_local_db
s = """item 899 通用合同
item 945 总监礼盒
item 983 生产许可证
item 1000 扩建卡
item 1001 公司扩建卡
item 2009 高级星探卡
item 2012 明星星探卡
item 2026 1小时加速卡
item 3000 铝合金
item 3001 齿轮
item 6022 宠物激素
item 6030 涂装喷漆
item 6031 涂装零件
item 11006 红色歌手合同
item 43005 歌王自选礼盒
attr aixin 爱心
attr fenhong 分红
attr jinbi 金币
attr jipiao 机票
attr license 许可证
attr reputation 名望
attr rmbmoney 钻石"""
data = [
]
for i in s.split('\n'):
j = i.split('\t')
tmp = {
'a': j[0],
't': j[1],
'name': j[2]
}
data.append(tmp)
update_data = {
'pname': 'assets_filter',
'data': data
}
db = get_local_db('game_geshouccs')
db['attr'].update_one({'pname': 'assets_filter'}, {'$set': update_data}, upsert=True)

81
task/summary_assets.py Normal file
View File

@ -0,0 +1,81 @@
import traceback
from typing import List, Dict
from pymongo import UpdateOne
from pydantic import Field
import pandas as pd
from .task import Task
from utils import *
from model import BaseModel, MdbObjectId, IntStr
class SummaryAssets(Task):
"""
资源产出和消耗
"""
class Model(BaseModel):
id: MdbObjectId = Field(..., title="id", alias='_id')
channel_name: str = Field(None, title="channel", alias='_channel_name')
owner_name: str = Field(None, title="owner", alias='_owner_name')
channel_uid: str = Field(None, title="channel_uid", alias='_channel_uid')
device_id: str = Field(None, title='device_id', alias='_device_id')
district_server_id: int = Field(None, title="区服id", alias='_district_server_id')
game_role_id: str = Field(None, title="角色id", alias='_game_role_id')
event_time: int = Field(..., title="事件时间", alias='_event_time')
role_create_time: int = Field(None, title="角色创建时间")
role_level: int = Field(None, title="角色等级")
role_vip: int = Field(None, title="角色vip等级")
role_stage: IntStr = Field(None, title="关卡")
prize: List[Dict] = Field(None, title='奖励')
need: List[Dict] = Field(None, title='消耗')
cdate: int = Field(..., title='当天0点')
def cleaning(self, cursor_list):
# 查询要清洗的资源
assets_filter = (self.local_db['attr'].find_one({'pname': 'assets_filter'}) or dict()).get('data')
if not assets_filter:
msg = f'{self.game_name} 请先设置要分析的资源'
ddsend_msg(msg)
logger.warning(msg)
return
a = set()
t = set()
for item in assets_filter:
a.add(item['a'])
t.add(item['t'])
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = {
'$or': [{'prize.a': {'$in': list(a)}, 'prize.t': {'$in': list(t)}},
{'need.a': {'$in': list(a)}, 'need.t': {'$in': list(t)}}],
'_event_name': 'res',
'_event_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
}
}
projection = self.Model.get_fields()
bulk_data = []
cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp())
for item in self.local_db[source_coll].find(where, projection):
try:
item['cdate'] = cdate
model = self.Model(**item)
data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True))
except Exception as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}')
logger.error(repr(e))
if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])