修改目录结构

This commit is contained in:
kf_wuhao 2020-12-23 15:06:44 +08:00
parent 0db503885b
commit 701aa6f545
11 changed files with 98 additions and 30 deletions

View File

@ -8,5 +8,10 @@
"source_coll": "paylist", "source_coll": "paylist",
"dest_coll": "user", "dest_coll": "user",
"task_name": "first_recharge" "task_name": "first_recharge"
},
"repair_gunfu": {
"source_coll": "user",
"dest_coll": "user",
"task_name": "repair_gunfu"
} }
} }

View File

@ -1,6 +1,5 @@
import pymongo import pymongo
from .model import GBaseModel
from settings import settings from settings import settings

View File

@ -1,4 +1,5 @@
import json import json
import os
import sys import sys
from multiprocessing import Pool from multiprocessing import Pool
@ -25,10 +26,11 @@ def run_task(kwargs):
if __name__ == '__main__': if __name__ == '__main__':
# eg: summary_func 0 0 # eg: summary_func 0 0
# eg: first_recharge 0 0 # eg: first_recharge 0 0
# eg: repair_gunfu 0 0
task_name, st, et = sys.argv[1:] task_name, st, et = sys.argv[1:]
st, et = int(st), int(et) st, et = int(st), int(et)
game_list = get_game() game_list = get_game()
with open('config.json', 'r', encoding='utf8') as f: with open(os.path.join(settings.ROOT_DIR, 'config.json'), 'r', encoding='utf8') as f:
task_conf = json.load(f) task_conf = json.load(f)
params = [] params = []
for item in game_list: for item in game_list:

2
model/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from .field_type import (IntStr, IntFloat, MdbObjectId)
from .model import (GBaseModel, )

20
model/field_type.py Normal file
View File

@ -0,0 +1,20 @@
from typing import TypeVar
from bson.objectid import ObjectId
IntStr = TypeVar('IntStr', int, str)
IntFloat = TypeVar('IntFloat', int, float)
class MdbObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
try:
res = ObjectId(v)
except:
raise TypeError('不能装换为 ObjectId')
else:
return res

View File

@ -1,20 +1,6 @@
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from bson.objectid import ObjectId
from model.field_type import MdbObjectId
class MdbObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
try:
res = ObjectId(v)
except:
raise TypeError('不能装换为 ObjectId')
else:
return res
class GBaseModel(BaseModel): class GBaseModel(BaseModel):

View File

@ -1,9 +1,10 @@
from pymongo import UpdateOne from pymongo import UpdateOne
from pydantic import BaseModel, Field, validator from pydantic import BaseModel, Field
import pandas as pd import pandas as pd
from .task import Task from .task import Task
from utils import * from utils import *
from model import IntStr, IntFloat
class FirstRecharge(Task): class FirstRecharge(Task):
@ -27,10 +28,10 @@ class FirstRecharge(Task):
def cleaning(self, cursor_list): def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict for cursor in cursor_list: # type:dict
for event_coll, ts in cursor.items(): # type:str,dict for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']: if ts['cursor_st'] == ts['cursor_et']:
continue continue
logger.info(f'开始处理{self.game_name} 处理 {event_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_time': { '_event_time': {
'$gte': ts['cursor_st'], '$gte': ts['cursor_st'],
@ -40,7 +41,7 @@ class FirstRecharge(Task):
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
for item in self.local_db[event_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \ item['cdate'] = int(pd.Timestamp(ts['cursor_st'], unit='s', tz=self.timezone) \
.normalize().timestamp()) .normalize().timestamp())

58
task/repair_gunfu.py Normal file
View File

@ -0,0 +1,58 @@
import pymongo
from pymongo import UpdateOne
from pydantic import BaseModel, Field
from .task import Task
from utils import *
from model import MdbObjectId
class RepairGunfu(Task):
"""
补充滚服
"""
class Model(BaseModel):
id: MdbObjectId = Field(None, title='_id', alias='_id')
device_id: str = Field(..., title='设备id', alias='_device_id')
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict
for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']:
continue
logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = {
'role_create_time': {
'$gte': ts['cursor_st'],
'$lt': ts['cursor_et'],
},
'gunfu_num': {'$exists': False}
}
projection = self.Model.get_fields()
bulk_data = []
for item in self.local_db[source_coll].find(where, projection).sort('role_create_time',
pymongo.ASCENDING):
try:
# 新角色
model = self.Model(**item)
device_id = model.device_id
# 查找该设备所有滚服角色
role_cnt = self.local_db[source_coll].count(
{'_device_id': device_id, 'gunfu_num': {'$exists': True}})
bulk_data.append(
UpdateOne({'_id': model.id},
{'$set': {'gunfu_num': role_cnt + 1}}, upsert=True))
except Exception as e:
logger.error(f'msg:{e}')
# pass
if bulk_data:
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.set_cursor(cursor_st=ts['cursor_st'], cursor_et=ts['cursor_et'])

View File

@ -4,7 +4,7 @@ import pandas as pd
from .task import Task from .task import Task
from utils import * from utils import *
from db import GBaseModel from model import GBaseModel
class SummaryFunc(Task): class SummaryFunc(Task):
@ -21,10 +21,10 @@ class SummaryFunc(Task):
def cleaning(self, cursor_list): def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict for cursor in cursor_list: # type:dict
for event_coll, ts in cursor.items(): # type:str,dict for source_coll, ts in cursor.items(): # type:str,dict
if ts['cursor_st'] == ts['cursor_et']: if ts['cursor_st'] == ts['cursor_et']:
continue continue
logger.info(f'开始处理{self.game_name} 处理 {event_coll} 游标 {ts}') logger.info(f'开始处理{self.game_name} 处理 {source_coll} 游标 {ts}')
where = { where = {
'_event_name': 'Func', '_event_name': 'Func',
'_event_time': { '_event_time': {
@ -35,7 +35,7 @@ class SummaryFunc(Task):
projection = self.Model.get_fields() projection = self.Model.get_fields()
bulk_data = [] bulk_data = []
for item in self.local_db[event_coll].find(where, projection): for item in self.local_db[source_coll].find(where, projection):
try: try:
item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \ item['cdate'] = int(pd.Timestamp(item['_event_time'], unit='s', tz=self.timezone) \
.normalize().timestamp()) .normalize().timestamp())

View File

@ -1,5 +1,4 @@
from loguru import logger from loguru import logger
from .field_type import *
logger.add('/data/log/data_cleaning/log.log', format="{time} {level} {name}:{line} {message}", level="INFO", logger.add('/data/log/data_cleaning/log.log', format="{time} {level} {name}:{line} {message}", level="INFO",
rotation="100 MB", retention='7 days', rotation="100 MB", retention='7 days',

View File

@ -1,4 +0,0 @@
from typing import TypeVar
IntStr = TypeVar('IntStr', int, str)
IntFloat = TypeVar('IntFloat', int, float)