From 4a1e6fa18c5224129d7e4b5b328585b84c1cd0e7 Mon Sep 17 00:00:00 2001 From: kf_wuhao <15392746632@qq.com> Date: Mon, 11 Jan 2021 19:43:57 +0800 Subject: [PATCH] update --- config.json | 5 +++ script/歌手道具监控.py | 47 ++++++++++++++++++++++++ task/summary_assets.py | 81 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+) create mode 100644 script/歌手道具监控.py create mode 100644 task/summary_assets.py diff --git a/config.json b/config.json index e832c75..2ff19bd 100644 --- a/config.json +++ b/config.json @@ -71,5 +71,10 @@ "source_coll": "event", "dest_coll": "summary_open_hd", "task_name": "summary_open_hd" + }, + "summary_assets": { + "source_coll": "event", + "dest_coll": "summary_assets", + "task_name": "summary_assets" } } \ No newline at end of file diff --git a/script/歌手道具监控.py b/script/歌手道具监控.py new file mode 100644 index 0000000..8509910 --- /dev/null +++ b/script/歌手道具监控.py @@ -0,0 +1,47 @@ +import json + +from db import get_local_db + +s = """item 899 通用合同 +item 945 总监礼盒 +item 983 生产许可证 +item 1000 扩建卡 +item 1001 公司扩建卡 +item 2009 高级星探卡 +item 2012 明星星探卡 +item 2026 1小时加速卡 +item 3000 铝合金 +item 3001 齿轮 +item 6022 宠物激素 +item 6030 涂装喷漆 +item 6031 涂装零件 +item 11006 红色歌手合同 +item 43005 歌王自选礼盒 +attr aixin 爱心 +attr fenhong 分红 +attr jinbi 金币 +attr jipiao 机票 +attr license 许可证 +attr reputation 名望 +attr rmbmoney 钻石""" + +data = [ + +] + +for i in s.split('\n'): + j = i.split('\t') + tmp = { + 'a': j[0], + 't': j[1], + 'name': j[2] + } + data.append(tmp) + +update_data = { + 'pname': 'assets_filter', + 'data': data +} + +db = get_local_db('game_geshouccs') +db['attr'].update_one({'pname': 'assets_filter'}, {'$set': update_data}, upsert=True) diff --git a/task/summary_assets.py b/task/summary_assets.py new file mode 100644 index 0000000..c3e6473 --- /dev/null +++ b/task/summary_assets.py @@ -0,0 +1,81 @@ +import traceback +from typing import List, Dict + +from pymongo import UpdateOne +from pydantic import Field +import pandas as pd + +from .task import Task +from utils import * +from model import BaseModel, MdbObjectId, IntStr + + +class SummaryAssets(Task): + """ + 资源产出和消耗 + """ + + class Model(BaseModel): + id: MdbObjectId = Field(..., title="id", alias='_id') + channel_name: str = Field(None, title="channel", alias='_channel_name') + owner_name: str = Field(None, title="owner", alias='_owner_name') + channel_uid: str = Field(None, title="channel_uid", alias='_channel_uid') + device_id: str = Field(None, title='device_id', alias='_device_id') + district_server_id: int = Field(None, title="区服id", alias='_district_server_id') + game_role_id: str = Field(None, title="角色id", alias='_game_role_id') + event_time: int = Field(..., title="事件时间", alias='_event_time') + role_create_time: int = Field(None, title="角色创建时间") + role_level: int = Field(None, title="角色等级") + role_vip: int = Field(None, title="角色vip等级") + role_stage: IntStr = Field(None, title="关卡") + prize: List[Dict] = Field(None, title='奖励') + need: List[Dict] = Field(None, title='消耗') + cdate: int = Field(..., title='当天0点') + + def cleaning(self, cursor_list): + # 查询要清洗的资源 + assets_filter = (self.local_db['attr'].find_one({'pname': 'assets_filter'}) or dict()).get('data') + if not assets_filter: + msg = f'{self.game_name} 请先设置要分析的资源' + ddsend_msg(msg) + logger.warning(msg) + return + a = set() + t = set() + for item in assets_filter: + a.add(item['a']) + t.add(item['t']) + + for cursor in cursor_list: # type:dict + for source_coll, ts in cursor.items(): # type:str,dict + if ts['cursor_st'] == ts['cursor_et']: + continue + logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}') + where = { + '$or': [{'prize.a': {'$in': list(a)}, 'prize.t': {'$in': list(t)}}, + {'need.a': {'$in': list(a)}, 'need.t': {'$in': list(t)}}], + '_event_name': 'res', + '_event_time': { + '$gte': ts['cursor_st'], + '$lt': ts['cursor_et'], + } + } + + projection = self.Model.get_fields() + bulk_data = [] + cdate = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone).normalize().timestamp()) + + for item in self.local_db[source_coll].find(where, projection): + try: + item['cdate'] = cdate + model = self.Model(**item) + data = model.dict(by_alias=True) + bulk_data.append(UpdateOne({'_id': data['_id']}, {'$set': data}, upsert=True)) + except Exception as e: + msg = traceback.format_exc() + ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') + logger.error(repr(e)) + if bulk_data: + self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) + self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])