prs_server/utils/dingding.py

279 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import pprint
import redis
import requests
from core.config import Settings
Settings = Settings()
host = Settings.REDIS_CONF.get('host')
port = Settings.REDIS_CONF.get('port')
db = Settings.REDIS_CONF.get('db')
password = Settings.REDIS_CONF.get('password')
# redisdb = redis.Redis(host=host, port=port, db=db,password=password)
redisdb = redis.Redis(host=host, port=port, db=db)
def get_token():
"""
获取钉钉token
:return:
"""
corpid = 'dingd10b2fc689700431'
corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3'
url = f"https://oapi.dingtalk.com/gettoken"
# 请求头
headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'}
# query查询参数
query = {
'corpid': corpid,
'corpsecret': corpsecret
}
r = requests.get(url=url, params=query, headers=headers)
date = r.json()
tokens = date['access_token']
# print(tokens)
return tokens
def get_uid(dept_id):
"""
获取钉钉部门的成员名和uid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/user/listsimple"
query = {
'access_token': get_redistoken()
}
data = {
'dept_id': dept_id,
'cursor': 0,
'size': 100
}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas['result']['list']
def get_zi():
"""
获取钉钉的所有部门列表
:return:
"""
url = "https://oapi.dingtalk.com/department/list"
query = {
'access_token': get_redistoken()
}
r = requests.get(url=url, params=query)
datas = r.json()
return datas['department']
def send_date(name, job, times, hr, interview_name, userid_list):
"""
发送消息至钉钉
:param name: 面试者姓名
:param job: 面试岗位
:param times: 面试时间
:param hr: hr名字
:param interview_name: 面试官
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR{hr}\n面试官:{interview_name}"
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def get_redistoken():
"""
获取redis中存的token如没有重新获取再存数据库中再获取
:return: str
"""
redistoken = redisdb.get('token')
if redistoken == None:
# token = '16524325693811559'
token = get_token()
redisdb.set(name='token', value=token, ex=3600) # 1小时 3600
redistoken = redisdb.get('token')
return redistoken.decode()
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess
def get_section():
"""
获取分组后的部门id通过id查询每个部门人的id
:param bumen_data: 上级部门id list
:return: dict[list]
"""
section = {}
bumen_data = get_zi()
for i in bumen_data:
if i.get('parentid') == 1:
section[i['id']] = i['name']
da_dict = {}
for k, v in section.items():
da_s = []
for i in bumen_data:
if i.get('parentid') == k:
da_s.append(i.get('id'))
if v == '启新工作室':
da_s.append(k)
if v != 'NBD':
da_dict[v] = da_s
depid = {}
for k, v in section.items():
depid[v] = k
return da_dict, depid
def get_all_uid():
"""
获取所有部门的钉钉用户id
:return: list
"""
date_s,depid = get_section()
res_c = []
for k, v in date_s.items():
res = {}
res['section_name'] = k
res['depid'] = depid[k]
res_list = []
for i in v:
user_list = get_uid(i)
for nu in user_list:
res_list.append(nu)
res['user_id'] = res_list
res_c.append(res)
return res_c
def get_redis_alluid():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list #包含有所在部门
"""
redisuid = redisdb.get('user_id')
if redisuid == None:
user_list = get_all_uid()
user_list = json.dumps(user_list)
redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id')
user = redisuid.decode()
return json.loads(user)
def send_dates(content, userid_list):
"""
发送消息至钉钉的通用模板
:param content: 需要发送的内容
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": content
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def Unionid(unionid):
"""
根据unionid获取用户userid
:param unionid: 企业账号范围内的唯一标识
:return: 用户userid
"""
url = "https://oapi.dingtalk.com/topapi/user/getbyunionid"
query = {
'access_token': get_redistoken()
}
data = {'unionid': unionid}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas.get('result').get('userid')
def user_details(userid):
"""
根据userid获取用户详情
:param userid: 钉钉用户userid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/v2/user/get"
query = {
'access_token': get_redistoken()
}
data = {'userid': userid}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas
def get_user_list():
"""
获取所有部门钉钉用户id,list返回
"""
user=get_redis_alluid()
user_list = []
for i in user:
for ii in i.get('user_id'):
user_list.append(ii.get('userid'))
return user_list
def get_alluid_list():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list #直接列表里面包含user_id,没有其他的
"""
redisuid = redisdb.get('user_id_list')
if redisuid == None:
user_list = get_user_list()
user_list = json.dumps(user_list)
redisdb.set(name='user_id_list', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id_list')
user = redisuid.decode()
return json.loads(user)
if __name__ == '__main__':
#a = user_details('16371426094531014')
a=get_all_uid()
pprint.pprint(a)