1.修改登录逻辑

This commit is contained in:
李伟 2022-11-11 15:17:35 +08:00
parent b6c5ce6187
commit 9611ce3c72
5 changed files with 424 additions and 8 deletions

View File

@ -1,6 +1,7 @@
from datetime import timedelta
from typing import Any
import redis
from fastapi import APIRouter, Body, Depends, HTTPException, Request
from fastapi.security import OAuth2PasswordRequestForm
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
@ -9,15 +10,40 @@ import crud, schemas
from api import deps
from core import security
from core.config import settings
from utils import get_uid
from db.redisdb import RedisDrive, get_redis_pool
from utils import get_uid, random_hex
from db import get_database
from utils.dingding import send_dates
router = APIRouter()
host = settings.REDIS_CONF.get('host')
port = settings.REDIS_CONF.get('port')
db = settings.REDIS_CONF.get('db')
redisdb = redis.Redis(host=host, port=port, db=db)
@router.post("/send_auth_code")
async def reset_password(request: Request,
data_in: schemas.send,
db: AsyncIOMotorDatabase = Depends(get_database),
rdb: RedisDrive = Depends(get_redis_pool)
):
"""发送验证码"""
res= await crud.user.get_by_user(db,data_in.name)
user_id=res['user_id']
# X平台登陆提醒你的验证码是7933
if res['types'] == 1: # 内部员工发送验证码
code=random_hex(4)
content=f'X平台登陆提醒你的验证码是{code}'
send_dates(content, [user_id]) # 发送验证码
redisdb.set(name=user_id, value=code, ex=120) # 120秒
return schemas.Msg(code=0, msg='ok',data=code)
@router.post("/login")
async def login(
# data: schemas.UserLogin,
#code:str,
data: OAuth2PasswordRequestForm = Depends(),
db: AsyncIOMotorDatabase = Depends(get_database)
) -> Any:
@ -27,6 +53,12 @@ async def login(
user = await crud.user.authenticate(db,
name=data.username, password=data.password
)
if user.types == 1: # 内部员工校验验证码
rdbcode=redisdb.get(user.user_id)
if rdbcode == None:
return schemas.Msg(code=-1, msg='验证码过期')
if rdbcode.decode() != data.scopes[0]:
return schemas.Msg(code=-1, msg='验证码错误')
if not user:
# raise HTTPException(status_code=400, detail="Incorrect name or password")
return schemas.Msg(code=-1, msg='密码或用户名错误')
@ -40,7 +72,7 @@ async def login(
'nickname': user.nickname,
'email': user.email,
'tel': user.tel,
'userid':user.id,
'userid': user.id,
'token': security.create_access_token(
expires_delta=access_token_expires, _id=str(user.id), email=user.email,
@ -150,12 +182,12 @@ async def all_account(
else:
new_account = schemas.UserCreate(name=name, password='123')
created.append(name)
#创建账户并返回id
id_one=await crud.user.create(db, new_account)
# 创建账户并返回id
id_one = await crud.user.create(db, new_account)
id.append(id_one)
res = {
'created_account': created,
'password': '123',
'id':id
'id': id
}
return schemas.Msg(code=0, msg='ok', data=res)

View File

@ -83,7 +83,7 @@ async def panduan_quanxian_url(request: Request, call_next):
url=request.url.path
if 'docs' in url or 'openapi.json' in url:
return response
if url == '/api/v1/user/login':
if url in ['/api/v1/user/login','/api/v1/user/send_auth_code']:
return response
game=request.url.query.split('=')[1]
if 'undefined' in game:
@ -162,5 +162,5 @@ async def add_process_time_header(request: Request, call_next):
if __name__ == '__main__':
#uvicorn.run(app='main:app', host="10.0.0.240", port=7899, reload=True, debug=True)
#uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True)
uvicorn.run(app='main:app', host="0.0.0.0", port=7899, reload=True, debug=True)

View File

@ -62,7 +62,8 @@ class UserDB(DBBase):
tel: Any = ''
last_login_ts: str = '尚未登录'
data_where: dict = dict()
user_id: str = ''
types: int = None
class UserDBRW(UserDB):
hashed_password: str

370
utils/dingding.py Normal file
View File

@ -0,0 +1,370 @@
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import datetime
import json
import pprint
import time
from copy import deepcopy
import pymongo
import redis
import requests
from alibabacloud_dingtalk.todo_1_0.client import Client as dingtalktodo_1_0Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dingtalk.todo_1_0 import models as dingtalktodo__1__0_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
from alibabacloud_tea_openapi import models as open_api_models
from core.config import Settings, Debug
Settings = Settings()
Mongo = Debug()
host = Settings.REDIS_CONF.get('host')
port = Settings.REDIS_CONF.get('port')
db = Settings.REDIS_CONF.get('db')
clientMongo = Mongo.DATABASE_URI
password = Settings.REDIS_CONF.get('password')
# redisdb = redis.Redis(host=host, port=port, db=db,password=password)
redisdb = redis.Redis(host=host, port=port, db=db)
myClient = pymongo.MongoClient(clientMongo)
def get_msec():
"""
获取当前日期的毫秒级时间
:return:
"""
today = datetime.date.today().strftime('%Y-%m-%d 23:59:59')
t = int(time.mktime(time.strptime(today, "%Y-%m-%d %H:%M:%S")))
return int(round(t * 1000))
def get_token():
"""
获取钉钉token
:return:
"""
#最外部应用的配置
corpid = 'dingd10b2fc689700431'
corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3'
# 具体应用的配置
# corpid = 'dinghujiejktl8lytdfo'
# corpsecret = 'T5wXtmBZ0JD4ciWs1SP8ZWgRjnLiNFp4KJ0nRJpW_y621aPfu4qz631eCsapkFCh'
url = f"https://oapi.dingtalk.com/gettoken"
# 请求头
headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'}
# query查询参数
query = {
'corpid': corpid,
'corpsecret': corpsecret
}
r = requests.get(url=url, params=query, headers=headers)
date = r.json()
tokens = date['access_token']
# print(tokens)
return tokens
def get_uid1(dept_id):
"""
获取钉钉部门的成员名和uid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/user/listsimple"
query = {
'access_token': get_redistoken()
}
data = {
'dept_id': dept_id,
'cursor': 0,
'size': 100
}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas['result']['list']
def get_zi():
"""
获取钉钉的所有部门列表
:return:
"""
url = "https://oapi.dingtalk.com/department/list"
query = {
'access_token': get_redistoken()
}
r = requests.get(url=url, params=query)
datas = r.json()
return datas['department']
def send_date(name, job, times, hr, interview_name, userid_list):
"""
发送消息至钉钉
:param name: 面试者姓名
:param job: 面试岗位
:param times: 面试时间
:param hr: hr名字
:param interview_name: 面试官
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR{hr}\n面试官:{interview_name}"
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def get_redistoken():
"""
获取redis中存的token如没有重新获取再存数据库中再获取
:return: str
"""
redistoken = redisdb.get('token')
if redistoken == None:
# token = '16524325693811559'
token = get_token()
redisdb.set(name='token', value=token, ex=3600) # 1小时 3600
redistoken = redisdb.get('token')
return redistoken.decode()
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess
def get_section():
"""
获取分组后的部门id通过id查询每个部门人的id
:param bumen_data: 上级部门id list
:return: dict[list]
"""
section = {}
bumen_data = get_zi()
for i in bumen_data:
if i.get('parentid') == 1:
section[i['id']] = i['name']
da_dict = {}
for k, v in section.items():
da_s = []
for i in bumen_data:
if i.get('parentid') == k:
da_s.append(i.get('id'))
if v == '启新工作室':
da_s.append(k)
if v != 'NBD':
da_dict[v] = da_s
depid = {}
for k, v in section.items():
depid[v] = k
return da_dict, depid
def get_all_uid():
"""
获取所有部门的钉钉用户id
:return: list
"""
date_s, depid = get_section()
res_c = []
for k, v in date_s.items():
res = {}
res['section_name'] = k
res['depid'] = depid[k]
res_list = []
for i in v:
user_list = get_uid1(i)
for nu in user_list:
res_list.append(nu)
res['user_id'] = res_list
res_c.append(res)
return res_c
def get_redis_alluid():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list #包含有所在部门
"""
redisuid = redisdb.get('user_id')
if redisuid == None:
user_list = get_all_uid()
user_list = json.dumps(user_list)
redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id')
user = redisuid.decode()
return json.loads(user)
def send_dates(content, userid_list):
"""
发送消息至钉钉的通用模板
:param content: 需要发送的内容
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": content
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def Unionid(unionid):
"""
根据unionid获取用户userid
:param unionid: 企业账号范围内的唯一标识
:return: 用户userid
"""
url = "https://oapi.dingtalk.com/topapi/user/getbyunionid"
query = {
'access_token': get_redistoken()
}
data = {'unionid': unionid}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas.get('result').get('userid')
def user_details(userid):
"""
根据userid获取用户详情
:param userid: 钉钉用户userid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/v2/user/get"
query = {
'access_token': get_redistoken()
}
data = {'userid': userid}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas
def get_user_list():
"""
获取所有部门钉钉用户id,list返回
"""
user = get_redis_alluid()
user_list = []
for i in user:
for ii in i.get('user_id'):
user_list.append(ii.get('userid'))
return user_list
def get_alluid_list():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list #直接列表里面包含user_id,没有其他的
"""
redisuid = redisdb.get('user_id_list')
if redisuid == None:
user_list = get_user_list()
user_list = json.dumps(user_list)
redisdb.set(name='user_id_list', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id_list')
user = redisuid.decode()
return json.loads(user)
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> dingtalktodo_1_0Client:
"""
使用 Token 初始化账号Client
@return: Client
@throws Exception
"""
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalktodo_1_0Client(config)
@staticmethod
def create_task(
subject: str, creator_id: str, description: str, executor_ids: list
) -> None:
"""
创建钉钉待办
:param subject: 待办标题
:param creator_id: 创建者
:param description: 待办内容
:param executor_ids: 执行者
:return:
"""
client = Sample.create_client()
create_todo_task_headers = dingtalktodo__1__0_models.CreateTodoTaskHeaders()
create_todo_task_headers.x_acs_dingtalk_access_token = get_redistoken()
notify_configs = dingtalktodo__1__0_models.CreateTodoTaskRequestNotifyConfigs(
ding_notify='1'
)
# detail_url = dingtalktodo__1__0_models.CreateTodoTaskRequestDetailUrl(
# app_url='https://www.dingtalk.com',
# pc_url='https://www.dingtalk.com'
# )
create_todo_task_request = dingtalktodo__1__0_models.CreateTodoTaskRequest(
# source_id='isv_dingtalkTodo1', #业务id
subject=subject, # 待办标题
creator_id=creator_id, # 创建者unionid
description=description, # 最大长度 4096
due_time=get_msec(), # 完成待办的截止日期 毫秒
executor_ids=executor_ids, # 执行者的unionid
# participant_ids=[
# '6IiSFiSMFoRLWvXgl1Xc82XgiEiE'
# ], #参与者
# detail_url=detail_url,#详情页跳转地址
is_only_show_executor=True,
priority=20, # 优先级,普通
notify_configs=notify_configs # 待办通知配置
)
client.create_todo_task_with_options(creator_id, create_todo_task_request,
create_todo_task_headers, util_models.RuntimeOptions())
# try:
# client.create_todo_task_with_options('PUoiinWIpa2yH2ymhiiGiP6g', create_todo_task_request,
# create_todo_task_headers, util_models.RuntimeOptions())
# except Exception as err:
# if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):
# return err.message
if __name__ == '__main__':
a=get_uid1('698233030') #698233030
#a=get_all_uid()
pprint.pprint(a)

View File

@ -240,3 +240,16 @@ def get_time(fmt: str = '%Y-%m-%d') -> str:
ta = time.localtime(ts)
t = time.strftime(fmt, ta)
return t
def random_hex(length):
"""
生成随机数
:param length: 想要几位随机数就传多少位
:return: 随机数
"""
result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper()
if (len(result) < length):
result = '0' * (length - len(result)) + result
print(result)
return result