1.导出所有简历面试安排

2.获取职位名称
3.面试情况详情
This commit is contained in:
李伟 2022-07-12 17:45:01 +08:00
parent e1c82b1858
commit bab4e071aa
5 changed files with 256 additions and 20 deletions

View File

@ -17,6 +17,7 @@ import crud, schemas
from common import *
from api import deps
from core.configuration import *
from db import get_database
from db.ckdb import get_ck_db, CKDrive, ckdb
from db.redisdb import get_redis_pool, RedisDrive
@ -24,7 +25,7 @@ from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
from models.user_analysis import UserAnalysis
from models.interview_zsgc import InterviewDo
from utils import DfToStream, get_bijiao
from utils import DfToStream, get_bijiao, get_time, qujian_time, Download_xlsx
router = APIRouter()
@ -154,3 +155,113 @@ async def event_edit(
"""新增职位"""
await crud.jobs.insert_job(db, data_in)
return schemas.Msg(code=200, msg='ok', data='')
@router.post("/condition")
async def interview_insert(
request: Request,
data_in: schemas.Interview,
ckdb: CKDrive = Depends(get_ck_db)
) -> schemas.Msg:
""" 面试情况 """
res = data_in.date
strs = []
for k, v in res.items():
if v != '':
if 'int' in str(type(v)):
str_s = f"{k} = {v}"
strs.append(str_s)
else:
str_s = f"{k} = '{v}'"
strs.append(str_s)
where = ' and '.join(strs)
# 当前日期
times = get_time()
# 今天的面试
if data_in.time_type == 'now':
# 查询返回的数据一共多少条
len_sql = f"""select uid from HR.resumes where {where} and toDate(star_time) == '{times}' ORDER BY event_time"""
sql = f"""select interview_round,interview_type,star_time,end_time,name,phone,job_name,hr_name,
feedback,interview_name from HR.resumes where {where} and toDate(star_time) == '{times}' ORDER BY event_time
LIMIT 10 OFFSET {(data_in.pages - 1) * 10}"""
# 明天及之后的面试
elif data_in.time_type == 'tomorrow':
len_sql = f"""select uid from HR.resumes where {where} and toDate(star_time) > '{times}' ORDER BY event_time"""
sql = f"""select interview_round,interview_type,star_time,end_time,name,phone,job_name,hr_name,
feedback,interview_name from HR.resumes where {where} and toDate(star_time) > '{times}' ORDER BY event_time
LIMIT 10 OFFSET {(data_in.pages - 1) * 10}"""
# 昨天及以前的面试
else:
len_sql = f"""select uid from HR.resumes where {where} and toDate(star_time) < '{times}' ORDER BY event_time"""
sql = f"""select interview_round,interview_type,star_time,end_time,name,phone,job_name,hr_name,
feedback,interview_name from HR.resumes where {where} and toDate(star_time) < '{times}' ORDER BY event_time
LIMIT 10 OFFSET {(data_in.pages - 1) * 10}"""
if where == '':
len_sql = len_sql.replace('where and', 'where', 1)
sql = sql.replace('where and', 'where', 1)
# 返回数据条数
df_len = await ckdb.query_dataframe(len_sql)
len_date = len(df_len)
df = await ckdb.query_dataframe(sql)
if df.empty:
return schemas.Msg(code=-9, msg='无数据', data='')
data = {'lens': len_date,
'columns': df.columns.tolist(),
'values': df.values.tolist()
}
return schemas.Msg(code=200, msg='ok', data=data)
@router.post("/get_job")
async def event_edit(
request: Request,
data_in: schemas.Jobs = None,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""获取职位名称"""
# 获取对应条件的职位
res = await crud.jobs.all_fields(db, data_in)
# 获取职位数量
num = len(res)
job_list = []
job_id = []
for i in res:
# 获取职位名称
job_list.append(i['job_name'])
job_id.append(i['job_id'])
data = {
'num': num,
'job_list': job_list,
'job_id': job_id
}
return schemas.Msg(code=200, msg='ok', data=data)
@router.post("/download_interview")
async def download_inter(
request: Request,
data_in: schemas.Timesinter,
ckdb: CKDrive = Depends(get_ck_db)):
"""导出所有面试安排"""
sql = f"""SELECT interview_type,interview_stage,star_time,end_time,name,phone,job_name,hr_name,interview_name,
interview_sign FROM HR.resumes WHERE toDate(star_time) >= '{data_in.start_time}' and toDate(star_time) <= '{data_in.end_time}'"""
df = await ckdb.query_dataframe(sql)
# xlsx表名
xlsx_name =data_in.start_time + '~' + data_in.end_time + '(包含起止日)'
datas = []
for i in range(len(df)):
one_data = []
one_data.append(interview_type_dict[df['interview_type'][i]])
one_data.append(interview_stage_dict[df['interview_stage'][i]])
df_time = qujian_time(df['star_time'][i], df['end_time'][i])
one_data.append(df_time)
one_data.append(df['name'][i])
one_data.append(df['phone'][i])
one_data.append(df['job_name'][i])
one_data.append(df['hr_name'][i])
one_data.append(df['interview_name'][i])
one_data.append(interview_sign_dict[df['interview_sign'][i]])
datas.append(one_data)
columns = ['面试类型', '面试阶段', '面试时间', '候选人', '联系方式', '应聘职位', '面试负责人', '面试官', '面试签到']
dfmi = pd.DataFrame(data=datas, columns=columns)
Download = Download_xlsx(dfmi, xlsx_name)
return Download

20
core/configuration.py Normal file
View File

@ -0,0 +1,20 @@
# 基础的配置文件,用来映射数据返回给前端
# 面试类型
interview_type_dict = {0: '线上面试', 1: '线下面试'}
# 面试签到
interview_sign_dict = {1: '已签到', 0: '未签到'}
# 面试反馈
feedback_dict = {1: '已反馈', 0: '未反馈'}
# 面试轮次
interview_round_idct = {1: '初试', 2: '复试', 3: '终试'}
# 简历阶段
interview_stage_dict = {1: '初筛', 2: '用人部门复筛', 3: '初试', 4: '复试', 5: '沟通offer', 6: '待入职', 7: '淘汰', 8: '在职', 9: '离职'}
# 简历来源
owner_name_dict = {1: '前程无忧', 2: '人才库', 3: '智联招聘', 4: 'Boss直聘', 5: '58同城'}
# 学历
education_dict = {1: '大专', 2: '本科', 3: '研究生', 4: '博士', 5: '硕士'}
# 推荐状态
mmended_state_dict = {0: '未推荐', 1: '已推荐'}
# 当前面试状态
interview_state_dict = {1: '待安排', 2: '面试中', 3: '已结束'}

53
liwei_接口文档.md Normal file
View File

@ -0,0 +1,53 @@
#面试情况
api:/api/v1/itr/condition
请求方式post
参数:选择全部时传空字符串
job_name: str # 应聘职位
hr_name: str # 面试负责人
interview_name: str # 面试官
interview_type: str # 面试类型
interview_sign: int # 面试签到
feedback: int # 面试反馈
interview_round: int # 面试轮次
pages: int = 1 # 分页的当前页
time_type: str # 要查询的时间范围类型
#获取职位名称
api:/api/v1/itr/get_job
请求方式:post
参数:可不传
#新增职位
api:/api/v1/itr/add_job
请求方式post
参数:
job_id: str = int(time.time()) # 职位的唯一id
job_name: str # 职位名称
job_sector: str # 职位部门
job_nature: str # 职位性质
job_priority: str # 职位优先级
owner_name: str # 渠道
principal: str # 负责人
patronn: str # 协助人
start_time: datetime = datetime.now() # 开始招聘时间
function_type: str # 职能类型
filtering_rules: bool # 是否使用筛选规则
hiring_needs: bool # 是否关联招聘需求
auto_repeater: bool # 是否自动转发
cacsi_count: int = 0 # 面试满意度反馈次数
state: bool = True # 职位招聘状态
job_num: int # 招聘人数
education: str # 学历要求
job_rank: str # 职位级别
work_exp: str # 工作经验
report_obj: str # 汇报对象
min_money: int # 薪资范围min
max_money: int # 薪资范围max
requirement: str # 需求描述
#导出面试安排
api:/api/v1/itr/download_interview
请求方式post
参数:
start_time: str # 开始时间
end_time: str # 结束时间

View File

@ -5,12 +5,19 @@ from typing import Optional
class Interview(BaseModel):
job_name: str =None # 应聘职位
hr_name: str # 面试负责人
interview_name: str # 面试官
interview_type: str # 面试类型
interview_sign: int # 面试签到
feedback: int # 面试反馈
interview_round: int # 面试轮次
date: dict # 要查询的条件
pages: int = 1 # 分页的当前页
time_type: str # 要查询的时间范围类型
# date包含如下参数,如选择全部的则传空字符串
# job_name: str # 应聘职位
# hr_name: str # 面试负责人
# interview_name: str # 面试官
# interview_type: int # 面试类型
# interview_sign: int # 面试签到
# feedback: int # 面试反馈
# interview_round: int # 面试轮次
class Timesinter(BaseModel):
start_time: str # 开始时间
end_time: str # 结束时间

View File

@ -2,6 +2,8 @@ import random
import time
import datetime
import pandas as pd
def get_uid():
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
@ -22,7 +24,9 @@ def estimate_data(data_type):
return "Nullable(DateTime('UTC'))"
else:
return "Nullable(String)"
#将字典变成字符串
# 将字典变成字符串
def dict_to_str(dic):
c = str()
b = 0
@ -36,18 +40,21 @@ def dict_to_str(dic):
c += "\"%s\":\"%s\"}" % (k, v)
return c
def getEveryDay(begin_date,end_date):
def getEveryDay(begin_date, end_date):
# 前闭后闭
date_list = []
begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date,"%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
while begin_date <= end_date:
date_str = begin_date.strftime("%Y-%m-%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
return date_list
#print(getEveryDay('2016-01-01','2017-05-11'))
def Download_xlsx(df,name):
# print(getEveryDay('2016-01-01','2017-05-11'))
def Download_xlsx(df, name):
"""
下载功能
name为文件名
@ -56,14 +63,15 @@ def Download_xlsx(df,name):
import mimetypes
from utils import DfToStream
from fastapi.responses import StreamingResponse
file_name=quote(f'{name}.xlsx')
file_name = quote(f'{name}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df, name))
with df_to_stream as d:
export = d.to_stream()
Download=StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
return Download
def jiange_insert(list_date):
"""
间隔1条插入一条数据插入数据
@ -76,6 +84,7 @@ def jiange_insert(list_date):
i += 2
return list_date
def create_df(resp):
"""
分布分析外部下载功能的df数据
@ -113,7 +122,9 @@ def create_df(resp):
columns.insert(0, '事件发生时间')
df = pd.DataFrame(data=date, columns=columns)
return df
def create_neidf(resp,columnName):
def create_neidf(resp, columnName):
"""
分布分析内部下载功能的df数据
"""
@ -151,12 +162,46 @@ def create_neidf(resp,columnName):
df = pd.DataFrame(data=date, columns=columns)
return df
def random_hex():
"""
生成16位随机数
:return: 随机数
"""
result = hex(random.randint(0,16**16)).replace('0x','').upper()
if(len(result)<16):
result = '0'*(16-len(result))+result
return result
result = hex(random.randint(0, 16 ** 16)).replace('0x', '').upper()
if (len(result) < 16):
result = '0' * (16 - len(result)) + result
return result
def get_time(fmt: str = '%Y-%m-%d') -> str:
'''
获取当前时间
'''
ts = time.time()
ta = time.localtime(ts)
t = time.strftime(fmt, ta)
return t
def import_excel(data, columns, name):
"""
导出数据到xlsx表里面
:param data: 需要导出的数据
:param columns: df的表名 ['a','b','c']
:param name: 文件名'随机数'
:return:
"""
zh = pd.DataFrame(data, columns=columns)
zh.to_excel(f'{name}.xlsx', index=False, header=True)
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess