This commit is contained in:
kf_wuhao 2021-01-18 19:54:41 +08:00
parent e6f210a11b
commit 4877426a45
6 changed files with 57 additions and 23 deletions

View File

@ -1,2 +1,2 @@
from .field_type import (IntStr, IntFloat, MdbObjectId) from .field_type import (IntStr, IntFloat, MdbObjectId)
from .model import (GBaseModel, BaseModel) from .model import (GBaseModel, BaseModel, ValidationError)

View File

@ -2,7 +2,7 @@ from pydantic import BaseModel as BModel
from pydantic import Field from pydantic import Field
from model.field_type import IntStr from model.field_type import IntStr
from pydantic.error_wrappers import ValidationError
class BaseModel(BModel): class BaseModel(BModel):
@classmethod @classmethod
@ -11,7 +11,7 @@ class BaseModel(BModel):
class GBaseModel(BaseModel): class GBaseModel(BaseModel):
platform: str = Field(None, title="平台", alias='_platform') platform: str = Field(..., min_length=1, title="平台", alias='_platform')
channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name') channel_name: str = Field(..., min_length=1, title="channel", alias='_channel_name')
owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name') owner_name: str = Field(..., min_length=1, title="owner", alias='_owner_name')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid') channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid')

View File

@ -4,7 +4,7 @@ import pymongo
from pymongo import UpdateOne from pymongo import UpdateOne
from pydantic import Field from pydantic import Field
from model import BaseModel from model import BaseModel, ValidationError
from .task import Task from .task import Task
from utils import * from utils import *
@ -15,9 +15,9 @@ class AddUserFlag(Task):
""" """
class Model(BaseModel): class Model(BaseModel):
game_role_id: str = Field(..., min_length=1, title="角色id", alias='_game_role_id') game_role_id: str = Field(..., min_length=5, title="角色id", alias='_game_role_id')
device_id: str = Field(..., min_length=1, title='设备id', alias='_device_id') device_id: str = Field(..., min_length=5, title='设备id', alias='_device_id')
channel_uid: str = Field(..., min_length=1, title="channel_uid", alias='_channel_uid') channel_uid: str = Field(..., min_length=5, title="channel_uid", alias='_channel_uid')
role_create_time: int = Field(..., title="注册时间戳") role_create_time: int = Field(..., title="注册时间戳")
def cleaning(self, cursor_list): def cleaning(self, cursor_list):
@ -49,9 +49,15 @@ class AddUserFlag(Task):
bulk_data.append( bulk_data.append(
UpdateOne({'_game_role_id': model.game_role_id}, UpdateOne({'_game_role_id': model.game_role_id},
{'$set': {'is_new_channel_uid': 1}})) {'$set': {'is_new_channel_uid': 1}}))
except ValidationError as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段验证异常 {msg}\n{e.json()}')
logger.error(repr(e))
except Exception as e: except Exception as e:
logger.error(f'msg:{e}') msg = traceback.format_exc()
# pass ddsend_msg(f'未知异常 {msg}')
logger.error(repr(e))
# 处理设备 # 处理设备
@ -74,9 +80,15 @@ class AddUserFlag(Task):
bulk_data.append( bulk_data.append(
UpdateOne({'_game_role_id': model.game_role_id}, UpdateOne({'_game_role_id': model.game_role_id},
{'$set': {'is_new_device': 1}})) {'$set': {'is_new_device': 1}}))
except ValidationError as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段验证异常 {msg}\n{e.json()}')
logger.error(repr(e))
except Exception as e: except Exception as e:
logger.error(f'msg:{e}') msg = traceback.format_exc()
pass ddsend_msg(f'未知异常 {msg}')
logger.error(repr(e))
# 记录第一次登录设备id # 记录第一次登录设备id
where = { where = {
@ -92,9 +104,15 @@ class AddUserFlag(Task):
model = self.Model(**item) model = self.Model(**item)
bulk_data.append( bulk_data.append(
UpdateOne({'_game_role_id': model.game_role_id}, {'$set': {'_first_device_id': model.device_id}})) UpdateOne({'_game_role_id': model.game_role_id}, {'$set': {'_first_device_id': model.device_id}}))
except ValidationError as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段验证异常 {msg}\n{e.json()}')
logger.error(repr(e))
except Exception as e: except Exception as e:
msg = traceback.format_exc() msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') ddsend_msg(f'未知异常 {msg}')
logger.error(repr(e)) logger.error(repr(e))
if bulk_data: if bulk_data:

View File

@ -6,7 +6,7 @@ import pandas as pd
from .task import Task from .task import Task
from utils import * from utils import *
from model import IntStr, IntFloat, BaseModel from model import IntStr, IntFloat, BaseModel, ValidationError
class FirstRecharge(Task): class FirstRecharge(Task):
@ -25,9 +25,6 @@ class FirstRecharge(Task):
proid: str = Field(..., min_length=1, title='计费点') proid: str = Field(..., min_length=1, title='计费点')
cdate: int = Field(..., title='当天0点') cdate: int = Field(..., title='当天0点')
@classmethod
def get_fields(cls):
return [v.alias for v in cls.__fields__.values()]
def cleaning(self, cursor_list): def cleaning(self, cursor_list):
for cursor in cursor_list: # type:dict for cursor in cursor_list: # type:dict
@ -54,9 +51,14 @@ class FirstRecharge(Task):
bulk_data.append( bulk_data.append(
UpdateOne({'_game_role_id': _game_role_id, 'is_recharge': {'$exists': False}}, UpdateOne({'_game_role_id': _game_role_id, 'is_recharge': {'$exists': False}},
{'$set': {'is_recharge': data}})) {'$set': {'is_recharge': data}}))
except ValidationError as e:
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段验证异常 {msg}\n{e.json()}')
logger.error(repr(e))
except Exception as e: except Exception as e:
msg = traceback.format_exc() msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') ddsend_msg(f'未知异常 {msg}')
logger.error(repr(e)) logger.error(repr(e))
if bulk_data: if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -6,7 +6,7 @@ import pandas as pd
from .task import Task from .task import Task
from utils import * from utils import *
from model import GBaseModel from model import GBaseModel, ValidationError
class SummaryLogin(Task): class SummaryLogin(Task):
@ -56,9 +56,17 @@ class SummaryLogin(Task):
bulk_data.append( bulk_data.append(
UpdateOne({'cdate': cdate, '_game_role_id': data['_game_role_id']}, {'$set': data}, UpdateOne({'cdate': cdate, '_game_role_id': data['_game_role_id']}, {'$set': data},
upsert=True)) upsert=True))
except ValidationError as e:
bulk_data.append(
UpdateOne({'cdate': cdate, '_game_role_id': item['_game_role_id']}, {'$set': item},
upsert=True))
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段验证异常 {msg}\n{e.json()}')
logger.error(repr(e))
except Exception as e: except Exception as e:
msg = traceback.format_exc() msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段异常 {msg}') ddsend_msg(f'未知异常 {msg}')
logger.error(repr(e)) logger.error(repr(e))
if bulk_data: if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)

View File

@ -6,7 +6,7 @@ import pandas as pd
from .task import Task from .task import Task
from utils import * from utils import *
from model import IntFloat, GBaseModel from model import IntFloat, GBaseModel, ValidationError
class SummaryPay(Task): class SummaryPay(Task):
@ -61,11 +61,17 @@ class SummaryPay(Task):
model = self.Model(**item) model = self.Model(**item)
data = model.dict(by_alias=True) data = model.dict(by_alias=True)
bulk_data.append(UpdateOne({'orderid': model.orderid}, {'$set': data}, upsert=True)) bulk_data.append(UpdateOne({'orderid': model.orderid}, {'$set': data}, upsert=True))
except ValidationError as e:
bulk_data.append(UpdateOne({'orderid': item['orderid']}, {'$set': item}, upsert=True))
msg = traceback.format_exc()
ddsend_msg(f'{self.game_name}.{source_coll}字段验证异常 {msg}\n{e.json()}')
logger.error(repr(e))
except Exception as e: except Exception as e:
msg = traceback.format_exc() msg = traceback.format_exc()
ddsend_msg(f'严重警告!!!{self.game_name}.{source_coll}字段异常 {msg}') ddsend_msg(f'未知异常 {msg}')
bulk_data.append(UpdateOne({'orderid': item['orderid']}, {'$set': item}, upsert=True))
logger.error(repr(e)) logger.error(repr(e))
if bulk_data: if bulk_data:
self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False) self.local_db[self.dest_coll].bulk_write(bulk_data, ordered=False)
self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False) self.remote_db[self.dest_coll].bulk_write(bulk_data, ordered=False)