1.新增保存发送邮件的记录

2.新增获取发送邮件的记录
This commit is contained in:
李伟 2022-07-18 16:44:56 +08:00
parent dd26a12fad
commit f55d7609b4
9 changed files with 131 additions and 68 deletions

View File

@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, Request, File, UploadFile
from motor.motor_asyncio import AsyncIOMotorDatabase
from utils.jianli import get_resume
import crud, schemas
from datetime import datetime
from core.configuration import *
from db import get_database
from db.ckdb import get_ck_db, CKDrive
@ -415,12 +415,26 @@ async def event_edit(
async def event_edit(
request: Request,
data_in: schemas.send_str_mail,
ckdb: CKDrive = Depends(get_ck_db)
db: AsyncIOMotorDatabase = Depends(get_database)
) -> schemas.Msg:
"""发送邮件"""
data=send_str_mail(data_in.email_str,data_in.email)
if data:
try:
# 发送邮件
send_str_mail(data_in.email_str, data_in.email)
# 保存发送邮件的记录
now_time=str(datetime.now()).split('.')[0]
await crud.email_record.create(db, schemas.email_record(user_id=data_in.user_id,text=data_in.email_str,times=now_time))
return schemas.Msg(code=200, msg='邮件发送成功', data='')
else:
except Exception:
return schemas.Msg(code=200, msg='邮件发送失败', data='')
@router.post("/email_record")
async def event_edit(
request: Request,
data_in: schemas.get_email_record,
db: AsyncIOMotorDatabase = Depends(get_database)
) -> schemas.Msg:
"""获取发送邮件的记录"""
data=await crud.email_record.all_record(db,data_in)
return schemas.Msg(code=200, msg='邮件发送失败', data=data)

View File

@ -23,3 +23,4 @@ from .crud_api_module import api_module
from .crud_event_list import event_list
from .crud_jobs import jobs
from .crud_interview_remark import api_interview_remark
from .crud_email_record import email_record

28
crud/crud_email_record.py Normal file
View File

@ -0,0 +1,28 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'email_record',
from utils import get_uid
class Email_record(CRUDBase):
# 获取对应求职者的邮件发送数据
async def all_record(self, db: AsyncIOMotorDatabase,data_in: schemas.get_email_record):
return await self.find_many(db, {'user_id':data_in.user_id},{'_id': 0})
# 修改数据
# async def update(self, db: AsyncIOMotorDatabase, data_in: schemas.AddProjectnumber):
# game = data_in.game
# add_ditch = []
# for member in data_in.ditch:
# add_ditch.append(member.dict())
# await self.update_one(db, {'game': game}, {'$set': {'ditch': add_ditch}})
# 插入数据
async def create(self, db: AsyncIOMotorDatabase, data_in: schemas.email_record):
await self.insert_one(db, data_in.dict())
email_record = Email_record('email_record')

View File

@ -88,5 +88,12 @@ api:/api/v1/itr/get_str_mail
api:/api/v1/itr/send_str_mail
请求方式post
参数:
user_id: str # 接收者的唯一id
email: str # 接收者的邮箱
email_str: str # 需要发送的文本内容
#获取发送邮件的记录
api:/api/v1/itr/email_record
请求方式post
参数:
user_id: str # 求职者的唯一id

94
main.py
View File

@ -71,53 +71,53 @@ class BasicAuth(AuthenticationBackend):
def login_expired(conn: HTTPConnection, exc: Exception) -> Response:
return JSONResponse(schemas.Msg(code=-5, msg='请重新登录').dict(), status_code=200)
#处理路由权限问题
@app.middleware("http")
async def panduan_quanxian_url(request: Request, call_next):
#user_id=request.user.id
#user=request.user.username
start_time = int(time.time() * 1000)
response = await call_next(request)
process_time = int(time.time() * 1000) - start_time
response.headers["X-Process-Time"] = str(process_time)
url=request.url.path
if 'docs' in url or 'openapi.json' in url:
return response
if url == '/api/v1/user/login':
return response
game=request.url.query.split('=')[1]
if 'undefined' in game:
return response
if '&' in game:
game=game.split('&')[0]
judge_url = await crud.user_url.get_quanxian(get_database(), schemas.Url_quanxian(user_id=request.user.id))
if judge_url == {}:
# data='没有匹配这个游戏'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
if game not in judge_url['game']:
#data='没有匹配这个游戏'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='' ).json())
quanxian_dict={}
for i in range(len(judge_url['game'])):
quanxian_dict[judge_url['game'][i]]=judge_url['quanxian'][i]
user_list=await crud.url_list.get_url(get_database(),schemas.Url_list(name=quanxian_dict[game]))
api_list=[]
state_list=[]
api_dict={}
for i in user_list:
for api in i['api_list']:
api_list.append(api)
for quanxian in i['state']:
state_list.append(quanxian)
for i in range(len(api_list)):
api_dict[api_list[i]]=state_list[i]
if url not in api_list:
# data='没有对应路由'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
elif api_dict[url] != True:
# data='路由为False'
return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
return response
# @app.middleware("http")
# async def panduan_quanxian_url(request: Request, call_next):
# #user_id=request.user.id
# #user=request.user.username
# start_time = int(time.time() * 1000)
# response = await call_next(request)
# process_time = int(time.time() * 1000) - start_time
# response.headers["X-Process-Time"] = str(process_time)
# url=request.url.path
# if 'docs' in url or 'openapi.json' in url:
# return response
# if url == '/api/v1/user/login':
# return response
# game=request.url.query.split('=')[1]
# if 'undefined' in game:
# return response
# if '&' in game:
# game=game.split('&')[0]
# judge_url = await crud.user_url.get_quanxian(get_database(), schemas.Url_quanxian(user_id=request.user.id))
# if judge_url == {}:
# # data='没有匹配这个游戏'
# return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
# if game not in judge_url['game']:
# #data='没有匹配这个游戏'
# return Response(schemas.Msg(code=0, msg='没有操作权限',data='' ).json())
# quanxian_dict={}
# for i in range(len(judge_url['game'])):
# quanxian_dict[judge_url['game'][i]]=judge_url['quanxian'][i]
# user_list=await crud.url_list.get_url(get_database(),schemas.Url_list(name=quanxian_dict[game]))
# api_list=[]
# state_list=[]
# api_dict={}
# for i in user_list:
# for api in i['api_list']:
# api_list.append(api)
# for quanxian in i['state']:
# state_list.append(quanxian)
# for i in range(len(api_list)):
# api_dict[api_list[i]]=state_list[i]
# if url not in api_list:
# # data='没有对应路由'
# return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
# elif api_dict[url] != True:
# # data='路由为False'
# return Response(schemas.Msg(code=0, msg='没有操作权限',data='').json())
#
# return response
app.add_middleware(AuthenticationMiddleware, backend=BasicAuth(), on_error=login_expired)

View File

@ -29,3 +29,4 @@ from .interview import *
from .interview_plan import *
from .jobs import *
from .interview_remark import *
from .email_record import *

14
schemas/email_record.py Normal file
View File

@ -0,0 +1,14 @@
import time
from datetime import datetime
from pydantic import BaseModel
class email_record(BaseModel):
user_id: str # 求职者的唯一id
text: str # 邮件的文本内容
times: str # 发送邮件的时间
class get_email_record(BaseModel):
user_id: str # 求职者的唯一id

View File

@ -1,5 +1,5 @@
from datetime import datetime
from typing import List, Union, Dict
from pydantic import BaseModel
from typing import Optional
@ -48,5 +48,7 @@ class Email_str(BaseModel):
class send_str_mail(BaseModel):
user_id: str # 接收者的唯一id
email: str # 接收者的邮箱
email_str: str # 需要发送的文本内容

View File

@ -220,7 +220,6 @@ def send_str_mail(str_msg, my_user):
:param my_user: 接收者的邮箱
:return: bool
"""
try:
msg = MIMEText(str_msg, 'plain', 'utf-8')
msg['From'] = formataddr(["乐谷游戏人事", Email.my_sender]) # 括号里的对应发件人邮箱昵称、发件人邮箱账号
msg['To'] = formataddr(["FK", my_user]) # 括号里的对应收件人邮箱昵称、收件人邮箱账号
@ -231,6 +230,3 @@ def send_str_mail(str_msg, my_user):
# server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件
server.sendmail(Email.my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件
server.quit() # 关闭连接
return True
except Exception as e: # 如果 try 中的语句没有执行,则会执行下面的 False
return False