import operator import os import re, json import pandas as pd from copy import deepcopy from fastapi import APIRouter, Depends, Request, File, UploadFile from motor.motor_asyncio import AsyncIOMotorDatabase from utils.re_to_jianli import fmt_txt, getText_pdf from api import deps from utils.dingding import get_redis_alluid, send_dates from utils.jianli import get_resume from utils.func import get_every_days, get_every_weeks, get_every_months, doc2pdf, get_uid import crud, schemas from datetime import datetime from core.configuration import * from db import get_database from db.ckdb import get_ck_db, CKDrive import os import shutil from obs import ObsClient from imbox import Imbox from models.interview_zsgc import InterviewDo from utils import get_time, qujian_time, Download_xlsx, send_str_mail router = APIRouter() # 创建ObsClient实例 obsClient = ObsClient( access_key_id='UPEO770G619UPU8TU61Y', secret_access_key='M7zVRT1pjRtGSZ2TOZwKBRoVJLeWAOf633kHaNcu', server='obs.cn-east-2.myhuaweicloud.com' ) # 修改报表参数 @router.post("/set_form_setting") async def man_mass_form( request: Request, interview: InterviewDo = Depends(InterviewDo), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """修改报表参数""" await interview.init() table_id = interview.where.get('table_id', '') update_data = interview.data_in # 更新报表参数 if not table_id or not update_data: return schemas.Msg(code=-9, msg='参数有误', data='') await crud.api_interview_tables.update_tables(db, {'table_id': table_id, 'update_data': update_data}) return schemas.Msg(code=200, msg='ok', data='') # 候选人质量报表 @router.post("/man_mass_form") async def man_mass_form( request: Request, interview: InterviewDo = Depends(InterviewDo), db: CKDrive = Depends(get_ck_db), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await db.execute(sql) school_985 = ['清华大学', '北京大学', '中国人民大学', '北京理工大学', '北京航空航天大学', '中央民族大学', '北京师范大学', '中国农业大学', '天津大学', '南开大学', '复旦大学', '上海交通大学', '同济大学', '华东师范大学', '重庆大学', '四川大学', '电子科技大学', '湖南大学', '国防科技大学', '中南大学', '厦门大学'] school_211 = ['上海外国语大学', '东华大学', '上海财经大学', '华东理工大学', '上海大学', '天津医科大学', '吉林大学', '东北师范大学', '延边大学', '哈尔滨工业大学', '哈尔滨工程大学', '东北农业大学', '东北林业大学', '南京大学', '东南大学', '苏州大学', '中国矿业大学', '中国药科大学', '河海大学', '南京航空航天大学', '江南大学', '南京农业大学', '南京理工大学', '浙江大学', '中国科技大学', '安徽大学', '合肥工业大学', '福州大学', '南昌大学', '山东大学', '中国海洋大学', '石油大学', '湖南师范大学', '广西大学', '中山大学', '暨南大学', '华南理工大学', '华南师范大学', '广州中医药大学', '武汉大学', '华中科技大学', '中国地质大学', '武汉理工大学', '华中师范大学', '华中农业大学', '中南财经政法大学'] # 性别 gander = { '男': [], '女': [] } # 年龄 age = { '20-29': [], '30-39': [], '40-49': [], '50-59': [], '60-100': [], } # 学历 education = { '大专': [], '本科': [], '研究生': [], '博士': [], '硕士': [] } # 工作经验 work_exp = { '0-2': [], '3-6': [], '6-10': [], '10-20': [], '20-40': [] } # 所在地 account = { } # 毕业院校 school = { } # 就职公司 work_for = { } # 渠道 owner_name = { '前程无忧': [], '人才库': [], '智联招聘': [], 'Boss直聘': [], '58同城': [] } # 职位 job_name = { } # 学校类型 school_type = { '985': [], '211': [], '其他': [], } if not data: return schemas.Msg(code=200, msg='无数据', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_gander = interview_data.get('gander', '男') if i_gander == '男': gander['男'].append(i_name) else: gander['女'].append(i_name) i_age = interview_data.get('age', 20) if i_age <= 29: age['20-29'].append(i_name) elif 30 <= i_age <= 39: age['30-39'].append(i_name) elif 40 <= i_age <= 49: age['40-49'].append(i_name) elif 50 <= i_age <= 59: age['50-59'].append(i_name) else: age['60-100'].append(i_name) i_education = interview_data.get('education', 1) if i_education == 1: education['大专'].append(i_name) elif i_education == 2: education['本科'].append(i_name) elif i_education == 3: education['研究生'].append(i_name) elif i_education == 4: education['博士'].append(i_name) else: education['硕士'].append(i_name) i_owner_name = interview_data.get('owner_name', 2) if i_owner_name == 1: owner_name['前程无忧'].append(i_name) elif i_owner_name == 2: owner_name['人才库'].append(i_name) elif i_owner_name == 3: owner_name['智联招聘'].append(i_name) elif i_owner_name == 4: owner_name['Boss直聘'].append(i_name) else: owner_name['58同城'].append(i_name) i_school = interview_data.get('school', '') if i_school: if i_school in school: school[i_school].append(i_name) else: school[i_school] = [i_name] if i_school in school_985: school_type['985'].append(i_name) school_type['211'].append(i_name) elif i_school in school_211: school_type['211'].append(i_name) else: school_type['其他'].append(i_name) i_work_exp = interview_data.get('work_exp', 1) if i_work_exp <= 2: work_exp['0-2'].append(i_name) elif 3 <= i_work_exp < 6: work_exp['3-6'].append(i_name) elif 6 <= i_work_exp < 10: work_exp['6-10'].append(i_name) elif 10 <= i_work_exp < 20: work_exp['10-20'].append(i_name) else: work_exp['20-40'].append(i_name) i_job_name = interview_data.get('job_names', '') if i_job_name: if i_job_name in job_name: job_name[i_job_name].append(i_name) else: job_name[i_job_name] = [i_name] i_account = interview_data.get('account', '') if i_account: if i_account in account: account[i_account].append(i_name) else: account[i_account] = [i_name] i_work_list = interview_data.get('work_list', '') if i_work_list: data_work = eval(i_work_list[0]) i_work_for = data_work.get('name', '') if i_work_for: if i_work_for in work_for: work_for[i_work_for].append(i_name) else: work_for[i_work_for] = [i_name] res_msg = { 'gander': gander, 'age': age, 'education': education, 'work_exp': work_exp, 'account': account, 'school': school, 'work_for': work_for, 'owner_name': owner_name, 'job_name': job_name, 'school_type': school_type } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量性别报表 @router.post("/man_gander_form") async def man_gander_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量性别报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 性别 gander = { '男': [], '女': [] } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '') i_gander = interview_data.get('gander', '男') if i_gander == '男': gander['男'].append(i_name) else: gander['女'].append(i_name) # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量性别' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') len1 = len(gander['男']) len2 = len(gander['女']) chk_data = [] columns = ['男', '女'] if len1 >= len2: for index, i in enumerate(gander['男']): if index >= len2: add_data = [i, ''] else: add_data = [i, gander['女'][index]] true_add_data = deepcopy(add_data) chk_data.append(true_add_data) else: for index, i in enumerate(gander['女']): if index >= len1: add_data = ['', i] else: add_data = [gander['男'][index], i] true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': gander, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量年龄报表 @router.post("/man_age_form") async def man_age_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量年龄报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 年龄 age = { '20-29': [], '30-39': [], '40-49': [], '50-59': [], '60-100': [], } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '') i_age = interview_data.get('age', 20) if i_age <= 29: age['20-29'].append(i_name) elif 30 <= i_age <= 39: age['30-39'].append(i_name) elif 40 <= i_age <= 49: age['40-49'].append(i_name) elif 50 <= i_age <= 59: age['50-59'].append(i_name) else: age['60-100'].append(i_name) # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量年龄' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['20-29', '30-39', '40-49', '50-59', '60-100'] # 取出最大长度key max_key = max(columns, key=lambda x: len(age[x])) for index, i in enumerate(age[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(age[key]): add_data.append('') continue add_data.append(age[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': age, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量学历报表 @router.post("/man_education_form") async def man_education_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量学历报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 学历 education = { '大专': [], '本科': [], '研究生': [], '博士': [], '硕士': [] } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_education = interview_data.get('education', 1) if i_education == 1: education['大专'].append(i_name) elif i_education == 2: education['本科'].append(i_name) elif i_education == 3: education['研究生'].append(i_name) elif i_education == 4: education['博士'].append(i_name) else: education['硕士'].append(i_name) # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量学历' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['大专', '本科', '研究生', '博士', '硕士'] # 取出最大长度key max_key = max(columns, key=lambda x: len(education[x])) for index, i in enumerate(education[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(education[key]): add_data.append('') continue add_data.append(education[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': education, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量工作经验报表 @router.post("/man_work_exp_form") async def man_work_exp_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量工作经验报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 工作经验 work_exp = { '0-2': [], '3-6': [], '6-10': [], '10-20': [], '20-40': [] } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_work_exp = interview_data.get('work_exp', 1) if i_work_exp <= 2: work_exp['0-2'].append(i_name) elif 3 <= i_work_exp < 6: work_exp['3-6'].append(i_name) elif 6 <= i_work_exp < 10: work_exp['6-10'].append(i_name) elif 10 <= i_work_exp < 20: work_exp['10-20'].append(i_name) else: work_exp['20-40'].append(i_name) # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量工作经验' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['0-2', '3-6', '6-10', '10-20', '20-40'] # 取出最大长度key max_key = max(columns, key=lambda x: len(work_exp[x])) for index, i in enumerate(work_exp[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(work_exp[key]): add_data.append('') continue add_data.append(work_exp[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': work_exp, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量所在地报表 @router.post("/man_account_form") async def man_account_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量所在地报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 所在地 account = { } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_account = interview_data.get('account', '') if i_account: if i_account in account: account[i_account].append(i_name) else: account[i_account] = [i_name] # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量所在地' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(account.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(account[x])) for index, i in enumerate(account[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(account[key]): add_data.append('') continue add_data.append(account[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': account, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量毕业院校报表 @router.post("/man_school_form") async def man_school_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量毕业院校报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 毕业院校 school = { } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_school = interview_data.get('school', '') if i_school: if i_school in school: school[i_school].append(i_name) else: school[i_school] = [i_name] # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量毕业院校' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(school.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(school[x])) for index, i in enumerate(school[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(school[key]): add_data.append('') continue add_data.append(school[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': school, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量就职公司报表 @router.post("/man_work_for_form") async def man_work_for_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量就职公司报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 就职公司 work_for = { } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_work_list = interview_data.get('work_list', '') if i_work_list: data_work = eval(i_work_list[0]) i_work_for = data_work.get('name', '') if i_work_for: if i_work_for in work_for: work_for[i_work_for].append(i_name) else: work_for[i_work_for] = [i_name] # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量就职公司' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(work_for.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(work_for[x])) for index, i in enumerate(work_for[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(work_for[key]): add_data.append('') continue add_data.append(work_for[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': work_for, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量渠道报表 @router.post("/man_owner_name_form") async def man_owner_name_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量渠道报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 渠道 owner_name = { '前程无忧': [], '人才库': [], '智联招聘': [], 'Boss直聘': [], '58同城': [] } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_owner_name = interview_data.get('owner_name', 2) if i_owner_name == 1: owner_name['前程无忧'].append(i_name) elif i_owner_name == 2: owner_name['人才库'].append(i_name) elif i_owner_name == 3: owner_name['智联招聘'].append(i_name) elif i_owner_name == 4: owner_name['Boss直聘'].append(i_name) else: owner_name['58同城'].append(i_name) # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量渠道' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(owner_name.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(owner_name[x])) for index, i in enumerate(owner_name[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(owner_name[key]): add_data.append('') continue add_data.append(owner_name[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': owner_name, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量职位报表 @router.post("/man_job_name_form") async def man_job_name_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量职位报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 职位 job_name = { } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_job_name = interview_data.get('job_names', '') if i_job_name: if i_job_name in job_name: job_name[i_job_name].append(i_name) else: job_name[i_job_name] = [i_name] # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量职位' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(job_name.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(job_name[x])) for index, i in enumerate(job_name[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(job_name[key]): add_data.append('') continue add_data.append(job_name[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': job_name, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人质量学校类型报表 @router.post("/man_school_type_form") async def man_school_type_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人质量学校类型报表 """ await interview.init() res = interview.get_man_mass_form_sql() sql = res['sql'] data = await ck_db.execute(sql) school_985 = ['清华大学', '北京大学', '中国人民大学', '北京理工大学', '北京航空航天大学', '中央民族大学', '北京师范大学', '中国农业大学', '天津大学', '南开大学', '复旦大学', '上海交通大学', '同济大学', '华东师范大学', '重庆大学', '四川大学', '电子科技大学', '湖南大学', '国防科技大学', '中南大学', '厦门大学'] school_211 = ['上海外国语大学', '东华大学', '上海财经大学', '华东理工大学', '上海大学', '天津医科大学', '吉林大学', '东北师范大学', '延边大学', '哈尔滨工业大学', '哈尔滨工程大学', '东北农业大学', '东北林业大学', '南京大学', '东南大学', '苏州大学', '中国矿业大学', '中国药科大学', '河海大学', '南京航空航天大学', '江南大学', '南京农业大学', '南京理工大学', '浙江大学', '中国科技大学', '安徽大学', '合肥工业大学', '福州大学', '南昌大学', '山东大学', '中国海洋大学', '石油大学', '湖南师范大学', '广西大学', '中山大学', '暨南大学', '华南理工大学', '华南师范大学', '广州中医药大学', '武汉大学', '华中科技大学', '中国地质大学', '武汉理工大学', '华中师范大学', '华中农业大学', '中南财经政法大学'] # 学校类型 school_type = { '985': [], '211': [], '其他': [], } # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 处理数据 for interview_data in data.values(): i_name = interview_data.get('name', '默认名字') i_school = interview_data.get('school', '') if i_school: if i_school in school_985: school_type['985'].append(i_name) school_type['211'].append(i_name) elif i_school in school_211: school_type['211'].append(i_name) else: school_type['其他'].append(i_name) # 导出报表 if interview.out_form == 'out': xls_name = '候选人质量学校类型' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(school_type.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(school_type[x])) for index, i in enumerate(school_type[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(school_type[key]): add_data.append('') continue add_data.append(school_type[key][index]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': school_type, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 候选人明细报表 @router.post("/man_info_form") async def man_info_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 候选人明细报表 """ await interview.init() res = interview.get_man_info_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) for key, interview_data in data.items(): i_work_list = interview_data.get('work_list', '') if i_work_list: data_work = eval(i_work_list[0]) i_work_for = data_work.get('name', '') if i_work_for: data[key]['work_for'] = i_work_for continue data[key]['work_for'] = '' new_data = [v for k, v in data.items()] res_data = [] for data in new_data: age = data['age'] education = data['education'] work_exp = data['work_exp'] if age: data['age'] = str(age) + '岁' else: data['age'] = '20岁' if education: data['education'] = education_dict.get(education, '大专') if work_exp: data['work_exp'] = str(int(work_exp)) + '年' else: data['work_exp'] = '一年以内' new_res = {k: v for k, v in data.items() if k != 'work_list'} res_data.append(new_res) level_list = { "uid": "用户id", "age": "年龄", "gender": "性别", "name": "姓名", "education": "学历", "school": "毕业院校", "work_exp": "工作经验", "job_name": "应聘职位", "account": "所属地", "graduate_time": "毕业时间", "phone": "电话", "work_for": "上家公司" } # 导出报表 if interview.out_form == 'out': xls_name = '候选人明细' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_data = { 'data': res_data, 'level_list': level_list } res_msg = { 'data': res_true_data, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_msg) # 职位阶段数据报表 @router.post("/every_stage_form") async def every_stage_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 职位阶段数据报表 """ all_jobs = await crud.jobs.all_some_field(db) res_msg = {} job_id_to_name = {} job_ids = [] for job_data in all_jobs: job_id = job_data['job_id'] job_ids.append(job_id) job_name = job_data['job_name'] job_id_to_name[job_id] = job_name res_job_data = deepcopy({k: v for k, v in job_data.items() if k != 'job_id'}) res_job_data.update({ 'start_num': 0, 'screen_num': 0, 'exam_num': 0, 'offer_num': 0, 'wait_work_num': 0, 'work_in_num': job_data['now_job_num'], }) res_job_data['principal'] = res_job_data['principal'][0]['name'] res_msg[job_name] = res_job_data await interview.init() res = interview.get_every_stage_form_sql(job_ids) sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) for ck_data in data.values(): # 职位id不存在跳过 if ck_data['job_id'] not in job_id_to_name: continue ck_job_name = job_id_to_name[ck_data['job_id']] stage = ck_data['interview_stage'] if stage == 1: res_msg[ck_job_name]['start_num'] += 1 elif stage == 2: res_msg[ck_job_name]['screen_num'] += 1 elif stage == 3: res_msg[ck_job_name]['exam_num'] += 1 elif stage == 4: res_msg[ck_job_name]['offer_num'] += 1 elif stage == 5: res_msg[ck_job_name]['wait_work_num'] += 1 res_data = [] for k, v in res_msg.items(): start_time = v.get('start_time', '') if start_time: v['start_time'] = start_time.split(' ')[0] res_value = deepcopy(v) res_data.append(res_value) level_list = { "job_name": "职位", "job_sector": "部门", "principal": "负责人", "start_time": "招聘开始时间", "job_num": "招聘人数", "start_num": "初筛", "screen_num": "复筛", "exam_num": "面试", "offer_num": "offer", "wait_work_num": "待入职", "work_in_num": "已入职" } # 导出报表 if interview.out_form == 'out': xls_name = '职位阶段数据' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_data = { 'data': res_data, 'level_list': level_list } res_back_data = { 'data': res_true_data, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_back_data) # hr工作量报表 @router.post("/hr_works_form") async def hr_works_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ hr工作量报表 """ await interview.init() res = interview.get_hr_works_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = {} job_name_sector = {} hr_names = [] # 查询面试数据 job_ids = [] for ck_data in data.values(): ck_job_id = ck_data['job_id'] hr_name = ck_data['hr_name'] if hr_name not in hr_names: hr_names.append(hr_name) job_data = await crud.jobs.find_job_name(db, ck_job_id) if not job_data: continue job_name = job_data['job_name'] # 职位 job_sector = job_data['job_sector'] # 部门 job_name_sector.update({ job_name: job_sector }) # 未录入职位 if ck_job_id not in job_ids: job_ids.append(ck_job_id) res_msg[job_name] = {hr_name: { 'start_num': 0, 'screen_num': 0, 'exam_num': 0, 'exam_set_num': 0, 'exam_ok_num': 0, 'offer_num': 0, 'wait_work_num': 0, 'work_num': 0 }} # 已录入职位 else: if hr_name not in res_msg[job_name]: res_msg[job_name][hr_name] = { 'start_num': 0, 'screen_num': 0, 'exam_num': 0, 'exam_set_num': 0, 'exam_ok_num': 0, 'offer_num': 0, 'wait_work_num': 0, 'work_num': 0 } stage = ck_data['interview_stage'] if stage >= 1: res_msg[job_name][hr_name]['start_num'] += 1 if stage >= 2: res_msg[job_name][hr_name]['screen_num'] += 1 if stage >= 3: res_msg[job_name][hr_name]['exam_num'] += 1 if stage >= 4: res_msg[job_name][hr_name]['offer_num'] += 1 if stage >= 5: res_msg[job_name][hr_name]['wait_work_num'] += 1 if stage >= 7: res_msg[job_name][hr_name]['work_num'] += 1 interview_records = await crud.interview_record.find_job_some(db, hr_names) for record in interview_records: record_job_name = record['job_names'] record_hr_name = record['hr_name'] record_interview_sign = record['interview_sign'] if record_job_name in res_msg: if record_hr_name not in res_msg[record_job_name]: res_msg[record_job_name][record_hr_name] = { 'start_num': 0, 'screen_num': 0, 'exam_num': 0, 'exam_set_num': 0, 'exam_ok_num': 0, 'offer_num': 0, 'wait_work_num': 0, 'work_num': 0 } res_msg[record_job_name][record_hr_name]['exam_set_num'] += 1 if record_interview_sign: res_msg[record_job_name][record_hr_name]['exam_ok_num'] += 1 for job_name1, msg_data in res_msg.items(): count_data = { 'start_num': 0, 'screen_num': 0, 'exam_num': 0, 'exam_set_num': 0, 'exam_ok_num': 0, 'offer_num': 0, 'wait_work_num': 0, 'work_num': 0 } for key, value_data in msg_data.items(): for true_key, num in value_data.items(): count_data[true_key] += num # res_msg[job_name1].update({'总计': count_data}) res_data = {} for k, v in res_msg.items(): if k in job_name_sector: if job_name_sector[k] not in res_data: res_data[job_name_sector[k]] = {k: v} else: res_data[job_name_sector[k]].update({k: v}) else: if '其他' in res_data: res_data['其他'] = {k: v} else: res_data['其他'].update({k: v}) res_data1 = [] for k, v in res_data.items(): data1 = {'date_name': k} for k1, v1 in v.items(): data2 = {'job_name': k1} for k2, v2 in v1.items(): v2.update(data1) v2.update(data2) v2.update({'name': k2}) true_data = deepcopy(v2) res_data1.append(true_data) level_list = { "date_name": "部门", "job_name": "职位", "name": "HR", "start_num": "初筛", "screen_num": "复筛", "exam_num": "面试", "exam_set_num": "创建面试", "exam_ok_num": "面试签到", "offer_num": "offer", "wait_work_num": "待入职", "work_num": "已入职" } # 导出报表 if interview.out_form == 'out': xls_name = 'hr工作量' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data1: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_msg = { 'data': res_data1, 'level_list': level_list, } res_true_data = { 'data': res_true_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_true_data) # 职位阶段通过率报表 @router.post("/stage_success_form") async def stage_success_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 职位阶段通过率报表 """ all_jobs = await crud.jobs.all_some_field(db) res_msg = {} job_id_to_name = {} job_ids = [] for job_data in all_jobs: job_id = job_data['job_id'] job_ids.append(job_id) job_name = job_data['job_name'] job_id_to_name[job_id] = job_name res_job_data = deepcopy({k: v for k, v in job_data.items() if k not in ['job_id', 'now_job_num']}) res_job_data.update({ 'start_num': 0, 'screen_num': 0, 'exam_num': 0, 'offer_num': 0, 'wait_work_num': 0, 'work_in_num': 0 }) res_job_data['principal'] = res_job_data['principal'][0]['name'] res_msg[job_name] = res_job_data await interview.init() res = interview.get_every_stage_form_sql(job_ids) sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) for ck_data in data.values(): # 职位id不存在跳过 if ck_data['job_id'] not in job_id_to_name: continue ck_job_name = job_id_to_name[ck_data['job_id']] stage = ck_data['interview_stage'] if stage >= 1: res_msg[ck_job_name]['start_num'] += 1 if stage >= 2: res_msg[ck_job_name]['screen_num'] += 1 if stage >= 3: res_msg[ck_job_name]['exam_num'] += 1 if stage >= 4: res_msg[ck_job_name]['offer_num'] += 1 if stage >= 5: res_msg[ck_job_name]['wait_work_num'] += 1 if stage >= 7: res_msg[ck_job_name]['work_in_num'] += 1 for res_name, res_data in res_msg.items(): chk_num1 = res_data['start_num'] chk_num2 = res_data['screen_num'] chk_num3 = res_data['exam_num'] chk_num4 = res_data['offer_num'] chk_num5 = res_data['wait_work_num'] chk_num7 = res_data['work_in_num'] if chk_num1 and chk_num2: chance_1 = '{:.2%}'.format(chk_num2 / chk_num1) else: chance_1 = '0%' if chk_num2 and chk_num3: chance_2 = '{:.2%}'.format(chk_num3 / chk_num2) else: chance_2 = '0%' if chk_num3 and chk_num4: chance_3 = '{:.2%}'.format(chk_num4 / chk_num3) else: chance_3 = '0%' if chk_num4 and chk_num5: chance_4 = '{:.2%}'.format(chk_num5 / chk_num4) else: chance_4 = '0%' if chk_num5 and chk_num7: chance_5 = '{:.2%}'.format(chk_num7 / chk_num5) else: chance_5 = '0%' res_msg[res_name].update({ 'chance_1': chance_1, 'chance_2': chance_2, 'chance_3': chance_3, 'chance_4': chance_4, 'chance_5': chance_5 }) res_data1 = [] for k, v in res_msg.items(): start_time = v.get('start_time', '') if start_time: v['start_time'] = start_time.split(' ')[0] res_value = deepcopy(v) res_data1.append(res_value) level_list = { "job_name": "职位", "job_sector": "部门", "principal": "负责人", "start_time": "招聘开始时间", "job_num": "招聘人数", "start_num": "初筛", "screen_num": "复筛", "exam_num": "面试", "offer_num": "offer", "wait_work_num": "待入职", "work_in_num": "已入职", "chance_1": "初筛通过率", "chance_2": "复筛通过率", "chance_3": "面试通过率", "chance_4": "offer通过率", "chance_5": "待入职通过率" } # 导出报表 if interview.out_form == 'out': xls_name = '职位阶段通过率' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data1: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_data = { 'data': res_data1, 'level_list': level_list } res_back_data = { 'data': res_true_data, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_back_data) # 招聘漏斗报表 @router.post("/interview_funnel_form") async def interview_funnel_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 招聘漏斗报表 """ await interview.init() res = interview.get_hr_works_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = { '初筛': 0, '复筛': 0, '面试': 0, 'offer': 0, '待入职': 0, '已入职': 0 } for i_data in data.values(): stage = i_data['interview_stage'] if stage >= 1: res_msg['初筛'] += 1 if stage >= 2: res_msg['复筛'] += 1 if stage >= 3: res_msg['面试'] += 1 if stage >= 4: res_msg['offer'] += 1 if stage >= 5: res_msg['待入职'] += 1 if stage >= 7: res_msg['已入职'] += 1 # 导出报表 if interview.out_form == 'out': xls_name = '招聘漏斗' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(res_msg.keys()) add_data = [] for key in columns: add_data.append(res_msg[key]) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 初筛数量 @router.post("/interview_start_form") async def interview_start_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 初筛 """ await interview.init() res = interview.get_hr_works_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = { 'value': 0, } for i_data in data.values(): stage = i_data['interview_stage'] if stage >= 1: res_msg['value'] += 1 # 导出报表 if interview.out_form == 'out': xls_name = '初筛数量' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['初筛数量'] add_data = [res_msg['value']] chk_data.append(add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 面试数量 @router.post("/interview_exam_form") async def interview_exam_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 安排面试总次数 """ await interview.init() res = interview.get_hr_works_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = { 'value': 0, } for i_data in data.values(): stage = i_data['interview_stage'] if stage >= 3: res_msg['value'] += 1 # 导出报表 if interview.out_form == 'out': xls_name = '面试数量' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['面试数量'] add_data = [res_msg['value']] chk_data.append(add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 待入职数量 @router.post("/interview_wait_in_form") async def interview_wait_in_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 待入职 """ await interview.init() res = interview.get_hr_works_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = { 'value': 0, } for i_data in data.values(): stage = i_data['interview_stage'] if stage >= 5: res_msg['value'] += 1 # 导出报表 if interview.out_form == 'out': xls_name = '待入职数量' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['待入职数量'] add_data = [res_msg['value']] chk_data.append(add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 初筛渠道 @router.post("/interview_stage1_owner_form") async def interview_stage1_owner_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 初筛阶段渠道 """ await interview.init() res = interview.get_owner_stage1_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 简历渠道 owner_dict = {1: '前程无忧', 2: '人才库', 3: '智联招聘', 4: 'Boss直聘', 5: '58同城', 6: '拉勾'} chk_data = { '前程无忧': [], '人才库': [], '智联招聘': [], 'Boss直聘': [], '58同城': [], '拉勾': [] } for i in data.values(): owner_name = owner_dict[i['owner_name']] i_data = deepcopy(i) chk_data[owner_name].append(i_data['name']) # 导出报表 if interview.out_form == 'out': xls_name = '初筛阶段渠道' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data1 = [] columns = list(chk_data.keys()) # 取出最大长度key max_key = max(columns, key=lambda x: len(chk_data[x])) for index, i in enumerate(chk_data[max_key]): add_data = [] for key in columns: if key == max_key: add_data.append(i) continue if index >= len(chk_data[key]): add_data.append('') continue add_data.append(chk_data[key][index]) true_add_data = deepcopy(add_data) chk_data1.append(true_add_data) chk_df = pd.DataFrame(data=chk_data1, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': chk_data, 'table_data': table_data, } return schemas.Msg(code=200, msg='ok', data=res_data) # 招聘职位数 @router.post("/interview_job_num_form") async def interview_job_num_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 开放中的职位 """ job_data = await crud.jobs.all_some_field(db) value = list(set([i['job_name'] for i in job_data])) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = { 'value': len(value) } # 导出报表 if interview.out_form == 'out': xls_name = '开放中的职位数量' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['开放中的职位数量'] add_data = [res_msg['value']] chk_data.append(add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls # 简历渠道 res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 招聘职位总人数 @router.post("/interview_job_need_num_form") async def interview_job_need_num_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 职位招聘人数 """ job_data = await crud.jobs.all_some_field(db) value = [i['job_num'] for i in job_data] # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) res_msg = { 'value': sum(value) } # 导出报表 if interview.out_form == 'out': xls_name = '招聘职位总人数' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['招聘职位总人数'] add_data = [res_msg['value']] chk_data.append(add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 渠道质量报表 @router.post("/owner_form") async def owner_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 渠道质量报表 """ await interview.init() res = interview.get_owner_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 简历渠道 owner_dict = {1: '前程无忧', 2: '人才库', 3: '智联招聘', 4: 'Boss直聘', 5: '58同城', 6: '拉勾'} chk_data = { '前程无忧': [], '人才库': [], '智联招聘': [], 'Boss直聘': [], '58同城': [], '拉勾': [] } for i in data.values(): owner_name = owner_dict[i['owner_name']] i_data = deepcopy(i) chk_data[owner_name].append(i_data) res_msg = { '前程无忧': { 'count': 0 }, '人才库': { 'count': 0 }, '智联招聘': { 'count': 0 }, 'Boss直聘': { 'count': 0 }, '58同城': { 'count': 0 }, '拉勾': { 'count': 0 } } for key, data_list in chk_data.items(): count_num = len(data_list) offer_num = len([1 for i in data_list if i['interview_stage'] >= 4]) work_num = len([1 for i in data_list if i['interview_stage'] >= 7]) if offer_num: offer_chance = '{:.2%}'.format(offer_num / count_num) else: offer_chance = '0%' if work_num: work_chance = '{:.2%}'.format(work_num / count_num) else: work_chance = '0%' res_msg[key]['count'] = count_num res_msg[key]['offer_num'] = offer_num res_msg[key]['work_num'] = work_num res_msg[key]['offer_chance'] = offer_chance res_msg[key]['work_chance'] = work_chance res_data = [] for k, v in res_msg.items(): v.update({'owner_name': k}) res_value = deepcopy(v) res_data.append(res_value) level_list = { "owner_name": "渠道", "count": "总数", "offer_num": "面试通过", "work_num": "入职人数", "offer_chance": "面试通过率", "work_chance": "入职率", } # 导出报表 if interview.out_form == 'out': xls_name = '渠道质量' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_data = { 'data': res_data, 'level_list': level_list } res_back_data = { 'data': res_true_data, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_back_data) # 渠道效果报表 @router.post("/owner_effect") async def owner_effect( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 渠道效果报表 """ await interview.init() res = interview.get_owner_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 简历渠道 owner_dict = {1: '前程无忧', 2: '人才库', 3: '智联招聘', 4: 'Boss直聘', 5: '58同城', 6: '拉勾'} chk_data = { '前程无忧': [], '人才库': [], '智联招聘': [], 'Boss直聘': [], '58同城': [], '拉勾': [], } for i in data.values(): owner_name = owner_dict[i['owner_name']] i_data = deepcopy(i) chk_data[owner_name].append(i_data) res_msg = { '前程无忧': { 'count': 0 }, '人才库': { 'count': 0 }, '智联招聘': { 'count': 0 }, 'Boss直聘': { 'count': 0 }, '58同城': { 'count': 0 }, '拉勾': { 'count': 0 } } for key, data_list in chk_data.items(): count_num = len(data_list) start_num = len([1 for i in data_list if i['interview_stage'] >= 1]) # 初筛 screen_num = len([1 for i in data_list if i['interview_stage'] >= 2]) # 复筛 exam_num = len([1 for i in data_list if i['interview_stage'] >= 3]) # 面试 offer_num = len([1 for i in data_list if i['interview_stage'] >= 4]) # offer work_num = len([1 for i in data_list if i['interview_stage'] >= 7]) # 入职 res_msg[key]['count'] = count_num res_msg[key]['start_num'] = start_num res_msg[key]['screen_num'] = screen_num res_msg[key]['exam_num'] = exam_num res_msg[key]['offer_num'] = offer_num res_msg[key]['work_num'] = work_num res_msg[key]['year_money'] = 0 res_msg[key]['one_money'] = 0 year = interview.where.get('start_time', '2022').split('-')[0] owner_info = await crud.owner_info.find_owner_some(db, where={'year': year}, findlist=['owner_name', 'year_money']) for i in owner_info: if i['owner_name'] in res_msg: res_msg[i['owner_name']]['year_money'] = i['year_money'] if res_msg[i['owner_name']]['count'] != 0: res_msg[i['owner_name']]['one_money'] = res_msg[i['owner_name']]['year_money'] / res_msg[i['count']][ 'one_money'] res_data = [] for k, v in res_msg.items(): v.update({'owner_name': k}) res_value = deepcopy(v) res_data.append(res_value) level_list = { "owner_name": "渠道", "count": "总数", "start_num": "推荐简历", "screen_num": "有效简历", "exam_num": "到场面试数", "offer_num": "面试通过", "work_num": "入职人数", "year_money": "成本", "one_money": "人均成本", } # 导出报表 if interview.out_form == 'out': xls_name = '渠道效果' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_msg = { 'data': res_data, 'level_list': level_list } res_true_data = { 'data': res_true_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_true_data) # 招聘趋势分析报表 @router.post("/interview_trend_form") async def interview_trend_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 招聘趋势分析报表 """ await interview.init() res = interview.get_trend_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) sdate = res['sdate'] edate = res['edate'] days = get_every_days(sdate, edate) res_msg = {} for i in days: res_msg[str(i)] = { '初筛简历数': 0, '初筛': 0, '创建面试的申请数': 0 } for chk_data in data.values(): chk_date = datetime.strftime(chk_data['date'], '%Y-%m-%d') if chk_date not in res_msg: continue chk_stage = chk_data['interview_stage'] count_num = chk_data['value'] if chk_stage >= 1: res_msg[chk_date]['初筛简历数'] += 1 * count_num if chk_stage >= 2: res_msg[chk_date]['初筛'] += 1 * count_num if chk_stage >= 3: res_msg[chk_date]['创建面试的申请数'] += 1 * count_num # 导出报表 if interview.out_form == 'out': xls_name = '招聘趋势分析' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['日期', '初筛简历数', '初筛', '创建面试的申请数'] for i, value in res_msg.items(): add_data = [i] for key in columns[1:]: add_data.append(value.get(key, '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 年度招聘趋势报表 @router.post("/year_trend_form") async def year_trend_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 年度招聘趋势报表 """ await interview.init() res = interview.get_year_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) sdate = res['sdate'] edate = res['edate'] months = get_every_months(sdate, edate) res_msg = {} for i in months: res_msg[str(i)] = { 'start_num': 0, 'ok_num': 0, 'exam_num': 0, 'exam_pass_num': 0, 'offer_num': 0, 'work_num': 0, 'work_ok_num': 0, 'out_num': 0, 'out_self_num': 0, 'out_must_num': 0 } for chk_data in data.values(): chk_date = datetime.strftime(chk_data['date'], '%Y-%m') if chk_date not in res_msg: continue chk_stage = chk_data['interview_stage'] count_num = chk_data['value'] if chk_stage >= 1: res_msg[chk_date]['start_num'] += 1 * count_num if chk_stage >= 2: res_msg[chk_date]['ok_num'] += 1 * count_num if chk_stage >= 3: res_msg[chk_date]['exam_num'] += 1 * count_num if chk_stage >= 4: res_msg[chk_date]['exam_pass_num'] += 1 * count_num if chk_stage >= 5: res_msg[chk_date]['offer_num'] += 1 * count_num if chk_stage >= 7: res_msg[chk_date]['work_num'] += 1 * count_num if chk_stage >= 8: res_msg[chk_date]['work_ok_num'] += 1 * count_num if chk_stage == 9: res_msg[chk_date]['out_num'] += 1 * count_num res_msg[chk_date]['out_self_num'] += 1 * count_num if chk_stage == 10: res_msg[chk_date]['out_num'] += 1 * count_num res_msg[chk_date]['out_must_num'] += 1 * count_num res_data = [] for k, v in res_msg.items(): v.update({'time': k}) res_v = deepcopy(v) res_data.append(res_v) level_list = { "time": "时间", "start_num": "简历推荐数", "ok_num": "有效简历数", "exam_num": "到场面试数", "exam_pass_num": "面试通过", "offer_num": "offer发出数", "work_num": "入职人数", "work_ok_num": "转正人数", "out_num": "离职人数", "out_self_num": "主动离职", "out_must_num": "被动离职" } # 导出报表 if interview.out_form == 'out': xls_name = '年度招聘趋势' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_data = { 'data': res_data, 'level_list': level_list } res_back_data = { 'data': res_true_data, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_back_data) # 年度岗位招聘数据报表 @router.post("/year_job_form") async def year_job_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 岗位招聘数据分析报表 """ await interview.init() res = interview.year_job_form_sql() sql = res['sql'] data = await ck_db.execute(sql) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) # 取出job和部门 job_ids = set([j_data['job_id'] for j_data in data.values()]) job_ids = list(job_ids) job_datas = await crud.jobs.find_job_some(db, job_ids) job_dict = {job['job_id']: {'name': job['job_name'], 'sector': job['job_sector']} for job in job_datas} sdate = res['sdate'] edate = res['edate'] chk_type = res['chk_type'] if chk_type == 'day': months = get_every_days(sdate, edate) elif chk_type == 'week': months = get_every_weeks(sdate, edate) else: months = get_every_months(sdate, edate) res_msg = {} for i in months: res_msg[str(i)] = {} for job_id in job_ids: res_msg[str(i)][job_id] = { 'start_num': 0, 'ok_num': 0, 'exam_num': 0, 'exam_pass_num': 0, 'offer_num': 0, 'work_num': 0, 'out_num': 0 } for chk_data in data.values(): if chk_type == 'month': chk_date = datetime.strftime(chk_data['date'], '%Y-%m') else: chk_date = datetime.strftime(chk_data['date'], '%Y-%m-%d') if chk_date not in res_msg: continue chk_stage = chk_data['interview_stage'] count_num = chk_data['value'] chk_job_id = chk_data['job_id'] if chk_stage >= 1: res_msg[chk_date][chk_job_id]['start_num'] += 1 * count_num if chk_stage >= 2: res_msg[chk_date][chk_job_id]['ok_num'] += 1 * count_num if chk_stage >= 3: res_msg[chk_date][chk_job_id]['exam_num'] += 1 * count_num if chk_stage >= 4: res_msg[chk_date][chk_job_id]['exam_pass_num'] += 1 * count_num if chk_stage >= 5: res_msg[chk_date][chk_job_id]['offer_num'] += 1 * count_num if chk_stage >= 7: res_msg[chk_date][chk_job_id]['work_num'] += 1 * count_num if chk_stage >= 9: res_msg[chk_date][chk_job_id]['out_num'] += 1 * count_num res_data = {} for k, v in res_msg.items(): res_data[k] = {} for k1, v1 in v.items(): if k1 in job_dict: if job_dict[k1]['sector'] not in res_data[k]: res_data[k][job_dict[k1]['sector']] = {} res_data[k][job_dict[k1]['sector']].update({job_dict[k1]['name']: v1}) else: res_data[k]['其他'] = {} res_data[k]['其他'].update({'其他': v1}) res_data1 = [] for k, v in res_data.items(): data1 = {'time': k} for k1, v1 in v.items(): data2 = {'date_name': k1} for k2, v2 in v1.items(): v2.update(data1) v2.update(data2) v2.update({'job_name': k2}) true_data = deepcopy(v2) res_data1.append(true_data) level_list = { "time": "时间", "date_name": "部门", "job_name": "职位", "start_num": "简历推荐数", "ok_num": "有效简历数", "exam_num": "到场面试数", "exam_pass_num": "面试通过", "offer_num": "offer发出数", "work_num": "入职人数", "out_num": "离职人数" } # 导出报表 if interview.out_form == 'out': xls_name = '年度岗位招聘数据' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data1: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_msg = { 'data': res_data1, 'level_list': level_list, } res_true_data = { 'data': res_true_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_true_data) # 年度入离职数据报表 @router.post("/year_in_out") async def year_in_out( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 年度入离职数据报表 """ await interview.init() res = interview.year_in_out_sql() sql_in = res['sql_in'] sql_out = res['sql_out'] data_in = await ck_db.execute(sql_in) data_out = await ck_db.execute(sql_out) # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) sdate = res['sdate'] edate = res['edate'] months = get_every_months(sdate, edate) res_msg = { } for i in months: res_msg[str(i)] = {} res_msg[str(i)]['入职'] = 0 res_msg[str(i)]['离职'] = 0 # 入职数据处理 for in_data in data_in.values(): in_date = datetime.strftime(in_data['date'], '%Y-%m') if in_date not in res_msg: continue value = in_data['value'] res_msg[in_date]['入职'] += value # 离职数据处理 for out_data in data_out.values(): out_date = datetime.strftime(out_data['date'], '%Y-%m') if out_date not in res_msg: continue value = out_data['value'] res_msg[out_date]['离职'] += value # 导出报表 if interview.out_form == 'out': xls_name = '年度入离职数据' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = ['日期', '入职', '离职'] for i, value in res_msg.items(): add_data = [i] for key in columns[1:]: add_data.append(value.get(key, '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 查询入职人员信息 @router.post("/find_worker_form") async def find_worker_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 人才密度报表 """ await interview.init() # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) find_column = interview.find_column if not interview.find_column: find_column = ['name', 'extension'] data = await crud.worker_info.find_worker_some(db, findlist=find_column) if not data: return schemas.Msg(code=-9, msg='无数据', data=None) new_data1 = [i for i in data if not i['extension'] or '岗位职级' not in i['extension']] new_data2 = [i for i in data if i['extension'] and '岗位职级' in i['extension']] res_msg = { '未评级': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == ''], 'p1-p3': [i['name'] for i in new_data2 if i['extension']['岗位职级'] in ['P1', 'P2', 'P3']], 'p4': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'P4'], 'p5': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'P5'], 'p6': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'P6'], 'p7': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'P7'], 'p8': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'P8'], 'm1': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'M1'], 'm2': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'M2'], 'm3': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'M3'], 'm4': [i['name'] for i in new_data2 if i['extension']['岗位职级'] == 'M4'], '总人数': len(data) } res_msg['未评级'].extend([i['name'] for i in new_data1]) res_data = [] res_msg1 = { 'title': '数量', '未评级': len(res_msg['未评级']), 'p1-p3': len(res_msg['p1-p3']), 'p4': len(res_msg['p4']), 'p5': len(res_msg['p5']), 'p6': len(res_msg['p6']), 'p7': len(res_msg['p7']), 'p8': len(res_msg['p8']), 'm1': len(res_msg['m1']), 'm2': len(res_msg['m2']), 'm3': len(res_msg['m3']), 'm4': len(res_msg['m4']), } res_data.append(res_msg1) res_msg2 = { 'title': '占比', '未评级': '{:.2%}'.format(res_msg1['未评级'] / res_msg['总人数']), 'p1-p3': '{:.2%}'.format(res_msg1['p1-p3'] / res_msg['总人数']), 'p4': '{:.2%}'.format(res_msg1['p4'] / res_msg['总人数']), 'p5': '{:.2%}'.format(res_msg1['p5'] / res_msg['总人数']), 'p6': '{:.2%}'.format(res_msg1['p6'] / res_msg['总人数']), 'p7': '{:.2%}'.format(res_msg1['p7'] / res_msg['总人数']), 'p8': '{:.2%}'.format(res_msg1['p8'] / res_msg['总人数']), 'm1': '{:.2%}'.format(res_msg1['m1'] / res_msg['总人数']), 'm2': '{:.2%}'.format(res_msg1['m2'] / res_msg['总人数']), 'm3': '{:.2%}'.format(res_msg1['m3'] / res_msg['总人数']), 'm4': '{:.2%}'.format(res_msg1['m4'] / res_msg['总人数']), } res_data.append(res_msg2) level_list = { 'title': '职级', '未评级': '未评级', 'p1-p3': 'p1-p3', 'p4': 'p4', 'p5': 'p5', 'p6': 'p6', 'p7': 'p7', 'p8': 'p8', 'm1': 'm1', 'm2': 'm2', 'm3': 'm3', 'm4': 'm4' } # 导出报表 if interview.out_form == 'out': xls_name = '人才密度' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in res_data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_true_msg = { 'data': res_data, 'level_list': level_list } res_true_data = { 'data': res_true_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_true_data) # 查询统计数据 @router.post("/find_count_info") async def find_count_info( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 在职人员年份分布 """ await interview.init() # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) data = await crud.count_in_worker.find_count_some(db) true_index = 0 max_num = 0 for index, i in enumerate(data): if len(i.keys()) > max_num: true_index = index index_key = [k for k in data[true_index].keys()] level_list = {i: i for i in index_key} level_list.update({ 'count_id': '月份' }) # 导出报表 if interview.out_form == 'out': xls_name = '在职人员年份分布' + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': data, 'level_list': level_list } res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 添加一条查询字段对照表数据 @router.post("/find_column_insert") async def find_column_insert( request: Request, insert_data: dict, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 添加一条查询字段对照表数据 """ await crud.api_find_column.insert_columns(db, insert_data) return schemas.Msg(code=200, msg='ok', data='') # 修改一条查询字段对照表数据 @router.post("/find_column_update") async def find_column_update( request: Request, column_type: str, update_data: dict, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 修改一条查询字段对照表数据 """ data = await crud.api_find_column.update_columns(db, update_data, where={'column_type': column_type}) return schemas.Msg(code=200, msg='ok', data=data) # 报表查询字段对照列表 @router.post("/find_column_list") async def find_column_list( request: Request, column_type: str, db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 报表查询字段对照列表 """ data = await crud.api_find_column.get_one_column(db, where={'column_type': column_type}) return schemas.Msg(code=200, msg='ok', data=data) # 通用简历分析报表 @router.post("/currency_interview_form") async def currency_interview_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 通用简历分析报表 """ await interview.init() res = interview.find_interview_every_sql() sql = res['sql'] # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) find_column = table_data.get('find_column', []) title = table_data['title'] data = await ck_db.execute(sql) level_list_data = await crud.api_find_column.get_one_column(db, where={'column_type': 'interview'}) level_list = {k: v for k, v in level_list_data.items() if k in find_column} # 导出报表 if interview.out_form == 'out': xls_name = title + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in data.values(): add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': data.values(), 'level_list': level_list } res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 通用入职人员分析报表 @router.post("/currency_worker_form") async def currency_worker_form( request: Request, interview: InterviewDo = Depends(InterviewDo), ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 通用入职人员分析报表 """ # 获取报表相关参数 table_id = interview.data_in.get('table_id', '') if not table_id: return schemas.Msg(code=200, msg='报表数据错误', data={}) table_data = await crud.api_interview_tables.get_one_table(db, table_id) if not table_data: return schemas.Msg(code=200, msg='报表数据错误', data={}) find_column = table_data.get('find_column', []) title = table_data['title'] data = await crud.worker_info.find_worker_some(db, findlist=find_column) level_list_data = await crud.api_find_column.get_one_column(db, where={'column_type': 'worker'}) level_list = {k: v for k, v in level_list_data.items() if k in find_column} # 导出报表 if interview.out_form == 'out': xls_name = title + interview.where.get('start_time', '') + '~' + interview.where.get('start_time', '') chk_data = [] columns = list(level_list.values()) key_dict = {v: k for k, v in level_list.items()} for i in data: add_data = [] for key in columns: add_data.append(i.get(key_dict[key], '')) true_add_data = deepcopy(add_data) chk_data.append(true_add_data) chk_df = pd.DataFrame(data=chk_data, columns=columns) download_xls = Download_xlsx(chk_df, xls_name) return download_xls res_msg = { 'data': data, 'level_list': level_list } res_data = { 'data': res_msg, 'table_data': table_data } return schemas.Msg(code=200, msg='ok', data=res_data) # 读取邮箱简历方法 def email_user(dir_name, mail_dict): file_list = [] # 简历文件 # 获取当前文件所在目录的绝对路径 abs_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) dir_path = '{0}/{1}'.format(abs_path, dir_name) if not os.path.exists(dir_path): os.makedirs(dir_path) password = mail_dict['pwd'] host = mail_dict['host'] mail = mail_dict['mail'] # IMAP服务器地址,邮箱地址,密码,是否打开SSL加密 with Imbox(host, mail, password, ssl=True) as qq_box: # 未读邮件 all_box_messages = qq_box.messages(unread=True) for uid, message in all_box_messages: # 标题 title = message.subject # 发件人 sent_from = message.sent_from dir_path1 = dir_path + '/{0}'.format(sent_from[0]['name'] + '_' + title) if os.path.exists(dir_path1): continue # 下载附件 if message.attachments: os.makedirs(dir_path1) os.makedirs(dir_path1 + '/form') os.makedirs(dir_path1 + '/other_files') for attachment in message.attachments: with open(attachment['filename'], 'wb') as f: f.write(attachment['content'].getvalue()) f.close() end_str = attachment['filename'].split('.')[-1] # 简历存储地址 file_name = attachment['filename'] if end_str.lower() in ['pdf', 'doc', 'docx']: shutil.move(file_name, dir_path1 + '/form/') file_list.append([dir_path1 + '/form/' + file_name, dir_path1 + '/other_files/']) # 其他附件存储地址 else: shutil.move(file_name, dir_path1 + '/other_files/') # 标记为已读 qq_box.mark_seen(uid) return file_list # 获取hr邮箱简历 @router.post("/chk_email_files") async def chk_email_files( request: Request, ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database) # current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 获取hr邮箱简历 """ mail_user = { '3a0c9e4958d12e': { # 李宗振 user表的_id 'mail': '1986461823@qq.com', # 接收简历的邮箱 'host': 'imap.qq.com', # imap服务器地址 'pwd': 'hoosihokeaqkifdf' # 第三方提供的安全码 } # '吴操': { # 'mail': '2787668634@qq.com', # 'host': 'imap.qq.com', # 'pwd': 'rqysfgolyvjpddhh' # } } for user, user_dict in mail_user.items(): # 读取到的简历列表 file_list = email_user(user, user_dict) if not file_list: continue # 遍历转存华为云 for file in file_list: # 简历文件 file0 = file[0] # 附件目录 file1 = file[1] filename = file0.split('/')[-1] path_data = file0.split('/' + filename)[0] if not file0.endswith('pdf'): file0, fil = doc2pdf(file0, path_data, filename) uid = get_uid() res = obsClient.putFile('legu-cdn-source', 'hrms_bata/' + uid + '.pdf', file0) if res.status < 300: # 地址 url = res.body.objectUrl insert_data = { 'email_id': uid, 'hr_id': user, 'file_url': url, 'is_chk': 0, 'other_file': file1, 'interview_file': file0 } # 存mongodb数据库作为待处理简历数据 await crud.api_chk_emails.insert_chk_emails(db, insert_data) return schemas.Msg(code=200, msg='ok', data='') # hr待处理简历预览 @router.post("/exist_chk_file") async def exist_chk_file( request: Request, hr_id: str, ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 获取hr待处理简历数据 """ data = await crud.api_chk_emails.get_chk_emails(db, hr_id) return schemas.Msg(code=200, msg='ok', data=data) # hr待处理简历处理 @router.post("/hr_do_file") async def hr_do_file( request: Request, data_in: schemas.UpdateEmails, ck_db: CKDrive = Depends(get_ck_db), db: AsyncIOMotorDatabase = Depends(get_database), current_user: schemas.UserDB = Depends(deps.get_current_user) ) -> schemas.Msg: """ 获取hr待处理简历数据 """ fn = data_in.update_data.get('interview_file', '') other_file = data_in.update_data.get('other_file', '') split_str = other_file.split('/')[-2] delete_dir = other_file.split('/' + split_str)[0] if data_in.is_pass == 0: # 删除对应的文件 shutil.rmtree(delete_dir) return schemas.Msg(code=200, msg='ok', data={}) if not fn: return schemas.Msg(code=-9, msg='简历文件有误', data={}) other_file_list = [] if other_file: file_list = os.listdir(other_file) if file_list: for file in file_list: try: res_other = obsClient.putFile('legu-cdn-source', 'hrms/accessory/' + file, other_file + file) if res_other.status < 300: # 地址 url = res_other.body.objectUrl other_file_list.append(url) except: pass continue try: # 简历初始文档 data_mode = { "interview_name": "", "interview_type": 1, "interview_sign": 0, "hope_money": "", "feedback": 0, "interview_round": 0, "event_time": datetime.now(), "name": "", "phone": "", "job_name": "", "hr_name": "", "work_exp": 0, "interview_stage": 1, "owner_name": 2, "education": 1, "work_undergo": [], "project_undergo": [], "work_list": [], "school": "", "at_school": "", "specialty": "", "specialty_do": [], "mmended_state": 0, "mail": "", "account": "", "id_card": "", "gender": "", "age": 0, "gam": "", "interview_state": 1, "counts": 1, "nation": "汉", "review": "", "upgrade": [], "come_time": "", "now_money": "", "men_state": 1, "teacher_state": 1, "teacher_back": 1, "offer_state": 1, "offer_exam_state": 1, "notice_state": 1, "pass_why": 0, "pass_text": "", "now_address": "", "language": [], "remembrance": [], "file_url": '', "hr_manner": 2, "resume_affix_id": other_file_list, } uid = get_uid() data_mode['uid'] = uid # 存数据 chk_txt = getText_pdf(fn) data = fmt_txt(chk_txt) education = data['education'] # 学历int转化 education_int = { '大专': 1, '本科': 2, '研究生': 3, '博士': 4, '硕士': 5, } if education and isinstance(education, str): data['education'] = education_int.get(education, 1) age = data['age'] if not age: data['age'] = 20 # 年龄int转化 if age and isinstance(age, str): true_age = re.search(r"\d+\.?\d*", age) if len(true_age.group()) > 2: data['age'] = 20 else: data['age'] = int(true_age.group()) work_exp = data['work_exp'] if not work_exp: data['work_exp'] = 0 # 工作经验float转化 if work_exp and isinstance(work_exp, str): true_work_exp = re.search(r"\d+\.?\d*", work_exp) if len(true_work_exp.group()) > 3: data['work_exp'] = 0 else: data['work_exp'] = float(true_work_exp.group()) data_mode.update(data) # 转json字符串 if 'remembrance_list' in data_mode: remembrance = data_mode.pop('remembrance_list') data_mode['remembrance'] = remembrance if 'language_list' in data_mode: language = data_mode.pop('language_list') data_mode['language'] = language if 'project_undergo' in data_mode: if data_mode.get('project_undergo', []): data_mode['project_undergo'] = [json.dumps(i) for i in data_mode['project_undergo']] else: data_mode['project_undergo'] = [] if 'work_list' in data_mode: if data_mode.get('work_list', []): data_mode['work_list'] = [json.dumps(i) for i in data_mode['work_list']] else: data_mode['work_list'] = [] if 'language' in data_mode: if data_mode.get('language', []): data_mode['language'] = [json.dumps(i) for i in data_mode['language']] else: data_mode['language'] = [] if 'remembrance' in data_mode: if data_mode.get('remembrance', []): data_mode['remembrance'] = [json.dumps(i) for i in data_mode['remembrance']] else: data_mode['remembrance'] = [] # 字符串转datetime if data_mode.get('in_time', ''): chk_in_time = data_mode['in_time'].replace('-', '/').replace('.', '/') if len(chk_in_time.split('/')) == 2: data_mode['in_time'] = str(datetime.strptime(chk_in_time, "%Y/%m").date()) if len(chk_in_time.split('/')) == 3: data_mode['in_time'] = str(datetime.strptime(chk_in_time, "%Y/%m/%d").date()) if data_mode.get('out_time', ''): chk_out_time = data_mode['out_time'].replace('-', '/').replace('.', '/') if len(chk_out_time.split('/')) == 2: data_mode['out_time'] = str(datetime.strptime(chk_out_time, "%Y/%m").date()) if len(chk_out_time.split('/')) == 3: data_mode['out_time'] = str(datetime.strptime(chk_out_time, "%Y/%m/%d").date()) if data_mode.get('birthday', ''): chk_birthday = data_mode['birthday'].replace('-', '/').replace('.', '/') if len(chk_birthday.split('/')) == 2: data_mode['birthday'] = str(datetime.strptime(chk_birthday, "%Y/%m").date()) if len(chk_birthday.split('/')) == 3: data_mode['birthday'] = str(datetime.strptime(chk_birthday, "%Y/%m/%d").date()) if data_mode.get('star_time', ''): chk_star_time = data_mode['star_time'].replace('-', '/').replace('.', '/') if len(chk_star_time.split('/')) == 2: data_mode['star_time'] = str(datetime.strptime(chk_star_time, "%Y/%m").date()) if len(chk_star_time.split('/')) == 3: data_mode['star_time'] = str(datetime.strptime(chk_star_time, "%Y/%m/%d").date()) if data_mode.get('end_time', ''): chk_end_time = data_mode['end_time'].replace('-', '/').replace('.', '/') if len(chk_end_time.split('/')) == 2: data_mode['end_time'] = str(datetime.strptime(chk_end_time, "%Y/%m").date()) if len(chk_end_time.split('/')) == 3: data_mode['end_time'] = str(datetime.strptime(chk_end_time, "%Y/%m/%d").date()) if data_mode.get('graduate_time', ''): chk_graduate = data_mode['graduate_time'].replace('-', '/').replace('.', '/') if len(chk_graduate.split('/')) == 2: data_mode['graduate_time'] = str(datetime.strptime(chk_graduate, "%Y/%m").date()) if len(chk_graduate.split('/')) == 3: data_mode['graduate_time'] = str(datetime.strptime(chk_graduate, "%Y/%m/%d").date()) work_list = data['work_list'] language = data['language'] project_undergo = data['project_undergo'] remembrance = data['remembrance'] # 简历查重,姓名,手机号,性别name,phone,gender find_name = data['name'] find_phone = data['phone'] find_gender = data['gender'] where = {} if find_name: where.update({ 'name': find_name }) if find_phone: where.update({ 'phone': find_phone }) if find_gender: where.update({ 'gender': find_gender }) whereStr = '' for key, value in where.items(): if isinstance(value, str): if not value.strip(): continue if whereStr: whereStr += 'and ' + str(key) + ' = ' + "'" + value + "'" + ' ' else: whereStr += str(key) + ' = ' + "'" + value + "'" + ' ' continue if whereStr: whereStr += 'and ' + str(key) + ' = ' + str(value) + ' ' else: whereStr += str(key) + ' = ' + str(value) + ' ' whereStr = whereStr.strip() sql = f"select uid from HR.resumes where {whereStr}" is_in_data = await ck_db.execute(sql) exist = 0 if is_in_data: exist = 1 uid = list(is_in_data.values())[0]['uid'] # os.rename(path_data + '/' + filename, path_data + '/' + find_phone + '.pdf') res = obsClient.putFile('legu-cdn-source', 'hrms/' + uid + '.pdf', fn) if res.status < 300: # 地址 url = res.body.objectUrl data_mode['file_url'] = url data['file_url'] = url res_data = { 'data': data, 'file_url': url, 'uid': uid, 'exist': exist, 'project_undergo': project_undergo, 'work_list': work_list, 'language_list': language, 'remembrance_list': remembrance } if exist: # 更新待处理简历状态 await crud.api_chk_emails.update_chk_emails(db, data_in) shutil.rmtree(delete_dir) return schemas.Msg(code=0, msg='ok', data=res_data) sql = f"insert into HR.resumes(interview_name, interview_type, interview_sign, hope_money, feedback," \ f" interview_round, event_time, uid, name, phone, job_name, hr_name, work_exp, interview_stage, owner_name," \ f" education, work_undergo, project_undergo, work_list, school, at_school, specialty, specialty_do, " \ f"mmended_state, mail, account, id_card, gender, age, gam, interview_state, counts, nation, come_time," \ f" review, upgrade, now_money, men_state, teacher_state, teacher_back, offer_state, offer_exam_state," \ f" notice_state, pass_why, pass_text, now_address,language,remembrance, file_url, hr_manner, resume_affix_id) values" await ck_db.execute_dict(sql, [data_mode]) # 更新待处理简历状态 await crud.api_chk_emails.update_chk_emails(db, data_in) shutil.rmtree(delete_dir) return schemas.Msg(code=0, msg='ok', data=res_data) else: print('errorCode:', res.errorCode) print('errorMessage:', res.errorMessage) return schemas.Msg(code=400, msg='上传华为云失败', data=None) except: return schemas.Msg(code=400, msg='上传华为云失败或者解析失败', data=None) if __name__ == '__main__': print(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))