import re from typing import Tuple import arrow import sqlalchemy as sa import json from fastapi import Depends import pandas as pd import numpy as np from sqlalchemy import func, or_, and_, not_ import crud import schemas from core.config import settings from db import get_database from db.redisdb import get_redis_pool, RedisDrive from models.user_label import UserClusterDef # 面试数据ck增删改查 class InterviewDo: def __init__(self, query_in: schemas.InterviewQuery): self.query_in = query_in self.find_column = set() self.data_in = {} self.where = {} async def init(self, *args, **kwargs): self.find_column = self.query_in.find_column self.data_in = self.query_in.data_in self.where = self.query_in.interview_query def insert_interview_sql(self): insert_data = [] if isinstance(self.data_in, dict): insert_data = [self.data_in] if isinstance(self.data_in, list): insert_data = self.data_in # sql = f"insert into test.interview(auth, state, name, phone, position, teacher_name, boss_name, stage, " \ # f"interview_type,interview_location, interview_time) values" sql = f"insert into test.interview(name, kardid) values" print(sql) return {'sql': sql, 'insert_data': insert_data } def update_interview_sql(self): updateStr = '' whereStr = '' for key, value in self.data_in.items(): updateStr += str(key) + ' = ' + str(value) + ' ' for key, value in self.where.items(): whereStr += str(key) + ' = ' + str(value) + ' ' updateStr = updateStr.strip() whereStr = whereStr.strip() sql = f"alter table test.interview update `{updateStr}` where `{whereStr}`" print(sql) return {'sql': sql, } def find_interview_sql(self): sql = f"select `{self.find_column}` from test.interview where {self.where}" print(sql) return {'sql': sql, }