邮箱未读邮件读取

This commit is contained in:
Àî×ÚÕñ 2022-10-12 17:32:04 +08:00
parent 14c6136567
commit aa0f0183fc
5 changed files with 527 additions and 4 deletions

View File

@ -1,26 +1,36 @@
import operator import operator
import os import os
import re import re,json
import pandas as pd import pandas as pd
from copy import deepcopy from copy import deepcopy
from fastapi import APIRouter, Depends, Request, File, UploadFile from fastapi import APIRouter, Depends, Request, File, UploadFile
from motor.motor_asyncio import AsyncIOMotorDatabase from motor.motor_asyncio import AsyncIOMotorDatabase
from utils.re_to_jianli import fmt_txt, getText_pdf
from api import deps from api import deps
from utils.dingding import get_redis_alluid, send_dates from utils.dingding import get_redis_alluid, send_dates
from utils.jianli import get_resume from utils.jianli import get_resume
from utils.func import get_every_days, get_every_weeks, get_every_months from utils.func import get_every_days, get_every_weeks, get_every_months, doc2pdf, get_uid
import crud, schemas import crud, schemas
from datetime import datetime from datetime import datetime
from core.configuration import * from core.configuration import *
from db import get_database from db import get_database
from db.ckdb import get_ck_db, CKDrive from db.ckdb import get_ck_db, CKDrive
import os
import shutil
from obs import ObsClient
from imbox import Imbox
from models.interview_zsgc import InterviewDo from models.interview_zsgc import InterviewDo
from utils import get_time, qujian_time, Download_xlsx, send_str_mail from utils import get_time, qujian_time, Download_xlsx, send_str_mail
router = APIRouter() router = APIRouter()
# 创建ObsClient实例
obsClient = ObsClient(
access_key_id='UPEO770G619UPU8TU61Y',
secret_access_key='M7zVRT1pjRtGSZ2TOZwKBRoVJLeWAOf633kHaNcu',
server='obs.cn-east-2.myhuaweicloud.com'
)
# 修改报表参数 # 修改报表参数
@router.post("/set_form_setting") @router.post("/set_form_setting")
@ -2669,3 +2679,447 @@ async def find_count_info(
'table_data': table_data 'table_data': table_data
} }
return schemas.Msg(code=200, msg='ok', data=res_data) return schemas.Msg(code=200, msg='ok', data=res_data)
# 通用报表
@router.post("/currency_form")
async def currency_form(
request: Request,
interview: InterviewDo = Depends(InterviewDo),
ck_db: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 通用模板报表 """
await interview.init()
res = interview.year_job_form_sql()
sql = res['sql']
data = await ck_db.execute(sql)
# 获取报表相关参数
table_id = interview.data_in.get('table_id', '')
if not table_id:
return schemas.Msg(code=200, msg='报表数据错误', data={})
table_data = await crud.api_interview_tables.get_one_table(db, table_id)
if not table_data:
return schemas.Msg(code=200, msg='报表数据错误', data={})
level_list = {
'count_id': '月份'
}
# 导出报表
if interview.out_form == 'out':
xls_name = '在职人员年份分布' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '')
chk_data = []
columns = list(level_list.values())
key_dict = {v: k for k, v in level_list.items()}
for i in data:
add_data = []
for key in columns:
add_data.append(i.get(key_dict[key], ''))
true_add_data = deepcopy(add_data)
chk_data.append(true_add_data)
chk_df = pd.DataFrame(data=chk_data, columns=columns)
download_xls = Download_xlsx(chk_df, xls_name)
return download_xls
def email_user(dir_name, mail_dict):
file_list = [] # 简历文件
# 获取当前文件所在目录的绝对路径
abs_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dir_path = '{0}/{1}'.format(abs_path, dir_name)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
password = mail_dict['pwd']
host = mail_dict['host']
mail = mail_dict['mail']
# IMAP服务器地址邮箱地址密码是否打开SSL加密
with Imbox(host, mail, password, ssl=True) as qq_box:
# 未读邮件
all_box_messages = qq_box.messages(unread=True)
for uid, message in all_box_messages:
# 标题
title = message.subject
# 发件人
sent_from = message.sent_from
dir_path1 = dir_path + '/{0}'.format(sent_from[0]['name'] + '_' + title)
if os.path.exists(dir_path1):
continue
# 下载附件
if message.attachments:
os.makedirs(dir_path1)
os.makedirs(dir_path1 + '/form')
os.makedirs(dir_path1 + '/other_files')
for attachment in message.attachments:
with open(attachment['filename'], 'wb') as f:
f.write(attachment['content'].getvalue())
f.close()
end_str = attachment['filename'].split('.')[-1]
# 简历存储地址
file_name = attachment['filename']
if end_str.lower() in ['pdf', 'doc', 'docx']:
shutil.move(file_name, dir_path1 + '/form/')
file_list.append([dir_path1 + '/form/' + file_name, dir_path1 + '/other_files/'])
# 其他附件存储地址
else:
shutil.move(file_name, dir_path1 + '/other_files/')
# 标记为已读
qq_box.mark_seen(uid)
return file_list
# 获取hr邮箱简历
@router.post("/chk_email_files")
async def chk_email_files(
request: Request,
ck_db: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database)
# current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 获取hr邮箱简历 """
mail_user = {
'3a0c9e4958d12e': { # 李宗振 user表的_id
'mail': '1986461823@qq.com', # 接收简历的邮箱
'host': 'imap.qq.com', # imap服务器地址
'pwd': 'hoosihokeaqkifdf' # 第三方提供的安全码
}
# '吴操': {
# 'mail': '2787668634@qq.com',
# 'host': 'imap.qq.com',
# 'pwd': 'rqysfgolyvjpddhh'
# }
}
for user, user_dict in mail_user.items():
# 读取到的简历列表
file_list = email_user(user, user_dict)
if not file_list:
continue
# 遍历转存华为云
for file in file_list:
# 简历文件
file0 = file[0]
# 附件目录
file1 = file[1]
filename = file0.split('/')[-1]
path_data = file0.split('/' + filename)[0]
if not file0.endswith('pdf'):
file0, fil = doc2pdf(file0, path_data, filename)
uid = get_uid()
res = obsClient.putFile('legu-cdn-source', 'hrms_bata/' + uid + '.pdf', file0)
if res.status < 300:
# 地址
url = res.body.objectUrl
insert_data = {
'email_id': uid,
'hr_id': user,
'file_url': url,
'is_chk': 0,
'other_file': file1,
'interview_file': file0
}
# 存mongodb数据库作为待处理简历数据
await crud.api_chk_emails.insert_chk_emails(db, insert_data)
return schemas.Msg(code=200, msg='ok', data='')
# hr待处理简历预览
@router.post("/exist_chk_file")
async def exist_chk_file(
request: Request,
hr_id: str,
ck_db: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 获取hr待处理简历数据 """
data = await crud.api_chk_emails.get_chk_emails(db, hr_id)
return schemas.Msg(code=200, msg='ok', data=data)
# hr待处理简历处理
@router.post("/hr_do_file")
async def hr_do_file(
request: Request,
data_in: schemas.UpdateEmails,
ck_db: CKDrive = Depends(get_ck_db),
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
""" 获取hr待处理简历数据 """
fn = data_in.update_data.get('interview_file', '')
other_file = data_in.update_data.get('other_file', '')
split_str = other_file.split('/')[-2]
delete_dir = other_file.split('/' + split_str)[0]
if data_in.is_pass == 0:
# 删除对应的文件
shutil.rmtree(delete_dir)
return schemas.Msg(code=200, msg='ok', data={})
if not fn:
return schemas.Msg(code=-9, msg='简历文件有误', data={})
other_file_list = []
if other_file:
file_list = os.listdir(other_file)
if file_list:
for file in file_list:
try:
res_other = obsClient.putFile('legu-cdn-source', 'hrms/accessory/' + file, other_file + file)
if res_other.status < 300:
# 地址
url = res_other.body.objectUrl
other_file_list.append(url)
except:
pass
continue
try:
# 简历初始文档
data_mode = {
"interview_name": "",
"interview_type": 1,
"interview_sign": 0,
"hope_money": "",
"feedback": 0,
"interview_round": 0,
"event_time": datetime.now(),
"name": "",
"phone": "",
"job_name": "",
"hr_name": "",
"work_exp": 0,
"interview_stage": 1,
"owner_name": 2,
"education": 1,
"work_undergo": [],
"project_undergo": [],
"work_list": [],
"school": "",
"at_school": "",
"specialty": "",
"specialty_do": [],
"mmended_state": 0,
"mail": "",
"account": "",
"id_card": "",
"gender": "",
"age": 0,
"gam": "",
"interview_state": 1,
"counts": 1,
"nation": "",
"review": "",
"upgrade": [],
"come_time": "",
"now_money": "",
"men_state": 1,
"teacher_state": 1,
"teacher_back": 1,
"offer_state": 1,
"offer_exam_state": 1,
"notice_state": 1,
"pass_why": 0,
"pass_text": "",
"now_address": "",
"language": [],
"remembrance": [],
"file_url": '',
"hr_manner": 2,
"resume_affix_id": other_file_list,
}
uid = get_uid()
data_mode['uid'] = uid
# 存数据
chk_txt = getText_pdf(fn)
data = fmt_txt(chk_txt)
education = data['education']
# 学历int转化
education_int = {
'大专': 1,
'本科': 2,
'研究生': 3,
'博士': 4,
'硕士': 5,
}
if education and isinstance(education, str):
data['education'] = education_int.get(education, 1)
age = data['age']
if not age:
data['age'] = 20
# 年龄int转化
if age and isinstance(age, str):
true_age = re.search(r"\d+\.?\d*", age)
if len(true_age.group()) > 2:
data['age'] = 20
else:
data['age'] = int(true_age.group())
work_exp = data['work_exp']
if not work_exp:
data['work_exp'] = 0
# 工作经验float转化
if work_exp and isinstance(work_exp, str):
true_work_exp = re.search(r"\d+\.?\d*", work_exp)
if len(true_work_exp.group()) > 3:
data['work_exp'] = 0
else:
data['work_exp'] = float(true_work_exp.group())
data_mode.update(data)
# 转json字符串
if 'remembrance_list' in data_mode:
remembrance = data_mode.pop('remembrance_list')
data_mode['remembrance'] = remembrance
if 'language_list' in data_mode:
language = data_mode.pop('language_list')
data_mode['language'] = language
if 'project_undergo' in data_mode:
if data_mode.get('project_undergo', []):
data_mode['project_undergo'] = [json.dumps(i) for i in data_mode['project_undergo']]
else:
data_mode['project_undergo'] = []
if 'work_list' in data_mode:
if data_mode.get('work_list', []):
data_mode['work_list'] = [json.dumps(i) for i in data_mode['work_list']]
else:
data_mode['work_list'] = []
if 'language' in data_mode:
if data_mode.get('language', []):
data_mode['language'] = [json.dumps(i) for i in data_mode['language']]
else:
data_mode['language'] = []
if 'remembrance' in data_mode:
if data_mode.get('remembrance', []):
data_mode['remembrance'] = [json.dumps(i) for i in data_mode['remembrance']]
else:
data_mode['remembrance'] = []
# 字符串转datetime
if data_mode.get('in_time', ''):
chk_in_time = data_mode['in_time'].replace('-', '/').replace('.', '/')
if len(chk_in_time.split('/')) == 2:
data_mode['in_time'] = str(datetime.strptime(chk_in_time, "%Y/%m").date())
if len(chk_in_time.split('/')) == 3:
data_mode['in_time'] = str(datetime.strptime(chk_in_time, "%Y/%m/%d").date())
if data_mode.get('out_time', ''):
chk_out_time = data_mode['out_time'].replace('-', '/').replace('.', '/')
if len(chk_out_time.split('/')) == 2:
data_mode['out_time'] = str(datetime.strptime(chk_out_time, "%Y/%m").date())
if len(chk_out_time.split('/')) == 3:
data_mode['out_time'] = str(datetime.strptime(chk_out_time, "%Y/%m/%d").date())
if data_mode.get('birthday', ''):
chk_birthday = data_mode['birthday'].replace('-', '/').replace('.', '/')
if len(chk_birthday.split('/')) == 2:
data_mode['birthday'] = str(datetime.strptime(chk_birthday, "%Y/%m").date())
if len(chk_birthday.split('/')) == 3:
data_mode['birthday'] = str(datetime.strptime(chk_birthday, "%Y/%m/%d").date())
if data_mode.get('star_time', ''):
chk_star_time = data_mode['star_time'].replace('-', '/').replace('.', '/')
if len(chk_star_time.split('/')) == 2:
data_mode['star_time'] = str(datetime.strptime(chk_star_time, "%Y/%m").date())
if len(chk_star_time.split('/')) == 3:
data_mode['star_time'] = str(datetime.strptime(chk_star_time, "%Y/%m/%d").date())
if data_mode.get('end_time', ''):
chk_end_time = data_mode['end_time'].replace('-', '/').replace('.', '/')
if len(chk_end_time.split('/')) == 2:
data_mode['end_time'] = str(datetime.strptime(chk_end_time, "%Y/%m").date())
if len(chk_end_time.split('/')) == 3:
data_mode['end_time'] = str(datetime.strptime(chk_end_time, "%Y/%m/%d").date())
if data_mode.get('graduate_time', ''):
chk_graduate = data_mode['graduate_time'].replace('-', '/').replace('.', '/')
if len(chk_graduate.split('/')) == 2:
data_mode['graduate_time'] = str(datetime.strptime(chk_graduate, "%Y/%m").date())
if len(chk_graduate.split('/')) == 3:
data_mode['graduate_time'] = str(datetime.strptime(chk_graduate, "%Y/%m/%d").date())
work_list = data['work_list']
language = data['language']
project_undergo = data['project_undergo']
remembrance = data['remembrance']
# 简历查重,姓名,手机号,性别name,phone,gender
find_name = data['name']
find_phone = data['phone']
find_gender = data['gender']
where = {}
if find_name:
where.update({
'name': find_name
})
if find_phone:
where.update({
'phone': find_phone
})
if find_gender:
where.update({
'gender': find_gender
})
whereStr = ''
for key, value in where.items():
if isinstance(value, str):
if not value.strip():
continue
if whereStr:
whereStr += 'and ' + str(key) + ' = ' + "'" + value + "'" + ' '
else:
whereStr += str(key) + ' = ' + "'" + value + "'" + ' '
continue
if whereStr:
whereStr += 'and ' + str(key) + ' = ' + str(value) + ' '
else:
whereStr += str(key) + ' = ' + str(value) + ' '
whereStr = whereStr.strip()
sql = f"select uid from HR.resumes where {whereStr}"
is_in_data = await ck_db.execute(sql)
exist = 0
if is_in_data:
exist = 1
uid = list(is_in_data.values())[0]['uid']
# os.rename(path_data + '/' + filename, path_data + '/' + find_phone + '.pdf')
res = obsClient.putFile('legu-cdn-source', 'hrms/' + uid + '.pdf', fn)
if res.status < 300:
# 地址
url = res.body.objectUrl
data_mode['file_url'] = url
data['file_url'] = url
res_data = {
'data': data,
'file_url': url,
'uid': uid,
'exist': exist,
'project_undergo': project_undergo,
'work_list': work_list,
'language_list': language,
'remembrance_list': remembrance
}
if exist:
# 更新待处理简历状态
await crud.api_chk_emails.update_chk_emails(db, data_in)
shutil.rmtree(delete_dir)
return schemas.Msg(code=0, msg='ok', data=res_data)
sql = f"insert into HR.resumes(interview_name, interview_type, interview_sign, hope_money, feedback," \
f" interview_round, event_time, uid, name, phone, job_name, hr_name, work_exp, interview_stage, owner_name," \
f" education, work_undergo, project_undergo, work_list, school, at_school, specialty, specialty_do, " \
f"mmended_state, mail, account, id_card, gender, age, gam, interview_state, counts, nation, come_time," \
f" review, upgrade, now_money, men_state, teacher_state, teacher_back, offer_state, offer_exam_state," \
f" notice_state, pass_why, pass_text, now_address,language,remembrance, file_url, hr_manner, resume_affix_id) values"
await ck_db.execute_dict(sql, [data_mode])
# 更新待处理简历状态
await crud.api_chk_emails.update_chk_emails(db, data_in)
shutil.rmtree(delete_dir)
return schemas.Msg(code=0, msg='ok', data=res_data)
else:
print('errorCode:', res.errorCode)
print('errorMessage:', res.errorMessage)
return schemas.Msg(code=400, msg='上传华为云失败', data=None)
except:
return schemas.Msg(code=400, msg='上传华为云失败或者解析失败', data=None)
if __name__ == '__main__':
print(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))

View File

@ -25,6 +25,7 @@ from .crud_jobs import jobs
from .crud_interview_remark import api_interview_remark from .crud_interview_remark import api_interview_remark
from .crud_interview_modes import api_interview_modes from .crud_interview_modes import api_interview_modes
from .crud_interview_tables import api_interview_tables from .crud_interview_tables import api_interview_tables
from .crud_chk_emails import api_chk_emails
from .crud_email_record import email_record from .crud_email_record import email_record
from .crud_operate_log import operate_log from .crud_operate_log import operate_log
from .crud_interview_record import interview_record from .crud_interview_record import interview_record

35
crud/crud_chk_emails.py Normal file
View File

@ -0,0 +1,35 @@
from motor.motor_asyncio import AsyncIOMotorDatabase
import schemas
from crud.base import CRUDBase
__all__ = 'api_chk_emails',
class ApiChkEmails(CRUDBase):
# 获取hr所有待处理简历
async def get_chk_emails(self, db: AsyncIOMotorDatabase, hr_id):
where = {
'hr_id': hr_id,
'is_chk': 0
}
return await self.find_many(db, where, {'_id': 0})
# 插入一条全新的待处理简历
async def insert_chk_emails(self, db: AsyncIOMotorDatabase, data):
return await self.insert_one(db, data)
# 更新一条待处理简历信息
async def update_chk_emails(self, db: AsyncIOMotorDatabase, data_in: schemas.UpdateEmails):
return await self.update_one(db, {'email_id': data_in.email_id}, {'$set': data_in.update_data})
# 查询待处理简历信息
async def get_emails(self, db: AsyncIOMotorDatabase, where):
return await self.find_many(db, where, {'_id': 0})
# 获取一条待处理简历信息
async def get_one_email(self, db: AsyncIOMotorDatabase, email_id):
return await self.find_one(db, {'email_id': email_id}, {'_id': 0})
api_chk_emails = ApiChkEmails('chk_emails')

View File

@ -33,6 +33,7 @@ from .email_record import *
from .operate_log import * from .operate_log import *
from .interview_modes import * from .interview_modes import *
from .interview_tables import * from .interview_tables import *
from .chk_emails import *
from .interview_record import * from .interview_record import *
from .worker import * from .worker import *
from .owner_info import * from .owner_info import *

32
schemas/chk_emails.py Normal file
View File

@ -0,0 +1,32 @@
import time
import random
from datetime import datetime
from pydantic import BaseModel
def get_id():
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
# 待处理简历模板
class InsertEmails(BaseModel):
email_id: str # 唯一id
is_chk: int = 0 # 是否已处理
hr_id: str # hr_id
file_url: str # 简历华为云地址
other_file: str # 简历附件文件夹地址
mode_time: datetime = datetime.now() # 修改时间
# 查询待处理简历
class FindEmails(BaseModel):
hr_id: str # 简历所属hr的uid
where: dict = {} # 其他条件
# 更新待处理简历
class UpdateEmails(BaseModel):
email_id: str # 唯一id
is_pass: int = 1 # 唯一id
update_data: dict # 更新的数据