钉钉的封装方法

This commit is contained in:
李伟 2022-07-15 18:20:10 +08:00
parent 2b57c5db5a
commit 063645e0a1

177
utils/dingding.py Normal file
View File

@ -0,0 +1,177 @@
import json
import requests
from db.redisdb import get_redis_pool
redisdb = get_redis_pool()
def get_token():
"""
获取钉钉token
:return:
"""
corpid = 'dingd10b2fc689700431'
corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3'
url = f"https://oapi.dingtalk.com/gettoken"
# 请求头
headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'}
# query查询参数
query = {
'corpid': corpid,
'corpsecret': corpsecret
}
r = requests.get(url=url, params=query, headers=headers)
date = r.json()
tokens = date['access_token']
# print(tokens)
return tokens
def get_uid(dept_id):
"""
获取钉钉部门的成员名和uid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/user/listsimple"
query = {
'access_token': get_redistoken()
}
data = {
'dept_id': dept_id,
'cursor': 0,
'size': 100
}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas['result']['list']
def get_zi():
"""
获取钉钉的所有部门列表
:return:
"""
url = "https://oapi.dingtalk.com/department/list"
query = {
'access_token': get_redistoken()
}
r = requests.get(url=url, params=query)
datas = r.json()
return datas['department']
def send_date(name, job, times, hr, interview_name, userid_list):
"""
发送消息至钉钉
:param name: 面试者姓名
:param job: 面试岗位
:param times: 面试时间
:param hr: hr名字
:param interview_name: 面试官
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR{hr}\n面试官:{interview_name}"
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def get_redistoken():
"""
获取redis中存的token如没有重新获取再存数据库中再获取
:return: str
"""
redistoken = redisdb.get('token')
if redistoken == None:
# token = '16524325693811559'
token = get_token()
redisdb.set(name='token', value=token, ex=3600) # 1小时 3600
redistoken = redisdb.get('token')
return redistoken.decode()
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess
def get_section():
"""
获取分组后的部门id通过id查询每个部门人的id
:param bumen_data: 上级部门id list
:return: dict[list]
"""
section = {}
bumen_data = get_zi()
for i in bumen_data:
if i.get('parentid') == 1:
section[i['id']] = i['name']
da_dict = {}
for k, v in section.items():
da_s = []
for i in bumen_data:
if i.get('parentid') == k:
da_s.append(i.get('id'))
if v == '启新工作室':
da_s.append(k)
if v != 'NBD':
da_dict[v] = da_s
return da_dict
def get_all_uid():
"""
获取所有部门的钉钉用户id
:return: list
"""
date_s = get_section()
res_c = []
for k, v in date_s.items():
res = {}
res['section_name'] = k
res_list = []
for i in v:
user_list = get_uid(i)
for nu in user_list:
res_list.append(nu)
res['user_id'] = res_list
res_c.append(res)
return res_c
def get_redis_alluid():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list
"""
redisuid = redisdb.get('user_id')
if redisuid == None:
user_list = get_all_uid()
user_list = json.dumps(user_list)
redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id')
user=redisuid.decode()
return json.loads(user)