prs_server/api/api_v1/endpoints/interview.py

105 lines
3.1 KiB
Python

import datetime
import mimetypes
from collections import defaultdict
import time
from urllib.parse import quote
import re
from clickhouse_driver import Client
import pandas as pd
import numpy as np
from fastapi import APIRouter, Depends, Request
from motor.motor_asyncio import AsyncIOMotorDatabase
from pandas import DataFrame
from starlette.responses import StreamingResponse
import crud, schemas
from common import *
from api import deps
from db import get_database
from db.ckdb import get_ck_db, CKDrive, ckdb
from db.redisdb import get_redis_pool, RedisDrive
from models.behavior_analysis import BehaviorAnalysis
from models.user_analysis import UserAnalysis
from models.interview_zsgc import InterviewDo
from utils import DfToStream, get_bijiao
router = APIRouter()
# 面试查询
@router.post("/interview_find")
async def interview_find(
request: Request,
interview: InterviewDo = Depends(InterviewDo),
db: CKDrive = Depends(get_ck_db),
) -> schemas.Msg:
""" interview面试数据查询 """
await interview.init()
res = interview.find_interview_sql()
sql = res['sql']
data = await db.execute(sql)
if not data:
return schemas.Msg(code=-9, msg='无数据', data=None)
return schemas.Msg(code=200, msg='ok', data=data)
# 面试修改
@router.post("/interview_update")
async def interview_update(
request: Request,
interview: InterviewDo = Depends(InterviewDo),
db: CKDrive = Depends(get_ck_db),
) -> schemas.Msg:
""" interview面试数据更新 """
await interview.init()
res = interview.update_interview_sql()
sql = res['sql']
try:
data = await db.execute(sql)
except:
return schemas.Msg(code=-9, msg='数据有误', data=None)
return schemas.Msg(code=200, msg='ok', data=data)
# 写入面试数据
@router.post("/interview_insert")
async def interview_insert(
request: Request,
interview: InterviewDo = Depends(InterviewDo),
db: CKDrive = Depends(get_ck_db),
) -> schemas.Msg:
""" interview面试数据写入 """
await interview.init()
res = interview.insert_interview_sql()
sql = res['sql']
insert_data = res['insert_data']
data = await db.execute_dict(sql, insert_data)
return schemas.Msg(code=200, msg='ok', data=data)
# @router.post("/interview_insert")
# async def interview_insert(
# request: Request,
# data_in: schemas.Interview,
# ckdb: CKDrive = Depends(get_ck_db),
# ) -> schemas.Msg:
# """ 面试情况 """
# await interview.init()
# res = interview.insert_interview_sql()
# sql = res['sql']
# insert_data = res['insert_data']
# data = await db.execute_dict(sql, insert_data)
# return schemas.Msg(code=200, msg='ok', data=data)
@router.post("/add_job")
async def event_edit(
request: Request,
data_in: schemas.Ins_Job,
db: AsyncIOMotorDatabase = Depends(get_database),
current_user: schemas.UserDB = Depends(deps.get_current_user)
) -> schemas.Msg:
"""新增职位"""
await crud.jobs.insert_job(db, data_in)
return schemas.Msg(code=200, msg='ok', data='')