xbackend/utils/dingding.py
2022-11-11 15:47:35 +08:00

371 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import datetime
import json
import pprint
import time
from copy import deepcopy
import pymongo
import redis
import requests
from alibabacloud_dingtalk.todo_1_0.client import Client as dingtalktodo_1_0Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dingtalk.todo_1_0 import models as dingtalktodo__1__0_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
from alibabacloud_tea_openapi import models as open_api_models
from core.config import Settings, Debug
Settings = Settings()
Mongo = Debug()
host = Settings.REDIS_CONF.get('host')
port = Settings.REDIS_CONF.get('port')
db = Settings.REDIS_CONF.get('db')
clientMongo = Mongo.DATABASE_URI
password = Settings.REDIS_CONF.get('password')
# redisdb = redis.Redis(host=host, port=port, db=db,password=password)
redisdb = redis.Redis(host=host, port=port, db=db,password=password)
myClient = pymongo.MongoClient(clientMongo)
def get_msec():
"""
获取当前日期的毫秒级时间
:return:
"""
today = datetime.date.today().strftime('%Y-%m-%d 23:59:59')
t = int(time.mktime(time.strptime(today, "%Y-%m-%d %H:%M:%S")))
return int(round(t * 1000))
def get_token():
"""
获取钉钉token
:return:
"""
#最外部应用的配置
corpid = 'dingd10b2fc689700431'
corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3'
# 具体应用的配置
# corpid = 'dinghujiejktl8lytdfo'
# corpsecret = 'T5wXtmBZ0JD4ciWs1SP8ZWgRjnLiNFp4KJ0nRJpW_y621aPfu4qz631eCsapkFCh'
url = f"https://oapi.dingtalk.com/gettoken"
# 请求头
headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'}
# query查询参数
query = {
'corpid': corpid,
'corpsecret': corpsecret
}
r = requests.get(url=url, params=query, headers=headers)
date = r.json()
tokens = date['access_token']
# print(tokens)
return tokens
def get_uid1(dept_id):
"""
获取钉钉部门的成员名和uid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/user/listsimple"
query = {
'access_token': get_redistoken()
}
data = {
'dept_id': dept_id,
'cursor': 0,
'size': 100
}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas['result']['list']
def get_zi():
"""
获取钉钉的所有部门列表
:return:
"""
url = "https://oapi.dingtalk.com/department/list"
query = {
'access_token': get_redistoken()
}
r = requests.get(url=url, params=query)
datas = r.json()
return datas['department']
def send_date(name, job, times, hr, interview_name, userid_list):
"""
发送消息至钉钉
:param name: 面试者姓名
:param job: 面试岗位
:param times: 面试时间
:param hr: hr名字
:param interview_name: 面试官
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR{hr}\n面试官:{interview_name}"
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def get_redistoken():
"""
获取redis中存的token如没有重新获取再存数据库中再获取
:return: str
"""
redistoken = redisdb.get('token')
if redistoken == None:
# token = '16524325693811559'
token = get_token()
redisdb.set(name='token', value=token, ex=3600) # 1小时 3600
redistoken = redisdb.get('token')
return redistoken.decode()
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess
def get_section():
"""
获取分组后的部门id通过id查询每个部门人的id
:param bumen_data: 上级部门id list
:return: dict[list]
"""
section = {}
bumen_data = get_zi()
for i in bumen_data:
if i.get('parentid') == 1:
section[i['id']] = i['name']
da_dict = {}
for k, v in section.items():
da_s = []
for i in bumen_data:
if i.get('parentid') == k:
da_s.append(i.get('id'))
if v == '启新工作室':
da_s.append(k)
if v != 'NBD':
da_dict[v] = da_s
depid = {}
for k, v in section.items():
depid[v] = k
return da_dict, depid
def get_all_uid():
"""
获取所有部门的钉钉用户id
:return: list
"""
date_s, depid = get_section()
res_c = []
for k, v in date_s.items():
res = {}
res['section_name'] = k
res['depid'] = depid[k]
res_list = []
for i in v:
user_list = get_uid1(i)
for nu in user_list:
res_list.append(nu)
res['user_id'] = res_list
res_c.append(res)
return res_c
def get_redis_alluid():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list #包含有所在部门
"""
redisuid = redisdb.get('user_id')
if redisuid == None:
user_list = get_all_uid()
user_list = json.dumps(user_list)
redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id')
user = redisuid.decode()
return json.loads(user)
def send_dates(content, userid_list):
"""
发送消息至钉钉的通用模板
:param content: 需要发送的内容
:param userid_list: 要发送的人的uid列表
:return:
"""
userid_list = ','.join(userid_list)
url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"
query = {
'access_token': get_redistoken()
}
data = {
'agent_id': 792756727,
"to_all_user": "false",
'userid_list': userid_list,
"msg": {
"msgtype": "text",
"text": {
"content": content
}
}
}
json_data = json.dumps(data)
# 发送消息到钉钉
requests.post(url=url, params=query, data=json_data)
def Unionid(unionid):
"""
根据unionid获取用户userid
:param unionid: 企业账号范围内的唯一标识
:return: 用户userid
"""
url = "https://oapi.dingtalk.com/topapi/user/getbyunionid"
query = {
'access_token': get_redistoken()
}
data = {'unionid': unionid}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas.get('result').get('userid')
def user_details(userid):
"""
根据userid获取用户详情
:param userid: 钉钉用户userid
:return:
"""
url = "https://oapi.dingtalk.com/topapi/v2/user/get"
query = {
'access_token': get_redistoken()
}
data = {'userid': userid}
r = requests.post(url=url, params=query, data=data)
datas = r.json()
return datas
def get_user_list():
"""
获取所有部门钉钉用户id,list返回
"""
user = get_redis_alluid()
user_list = []
for i in user:
for ii in i.get('user_id'):
user_list.append(ii.get('userid'))
return user_list
def get_alluid_list():
"""
获取redis中存的所有部门钉钉用户id如没有重新获取再存数据库中再获取
:return: list #直接列表里面包含user_id,没有其他的
"""
redisuid = redisdb.get('user_id_list')
if redisuid == None:
user_list = get_user_list()
user_list = json.dumps(user_list)
redisdb.set(name='user_id_list', value=user_list, ex=86400) # 存一天时间
redisuid = redisdb.get('user_id_list')
user = redisuid.decode()
return json.loads(user)
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> dingtalktodo_1_0Client:
"""
使用 Token 初始化账号Client
@return: Client
@throws Exception
"""
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalktodo_1_0Client(config)
@staticmethod
def create_task(
subject: str, creator_id: str, description: str, executor_ids: list
) -> None:
"""
创建钉钉待办
:param subject: 待办标题
:param creator_id: 创建者
:param description: 待办内容
:param executor_ids: 执行者
:return:
"""
client = Sample.create_client()
create_todo_task_headers = dingtalktodo__1__0_models.CreateTodoTaskHeaders()
create_todo_task_headers.x_acs_dingtalk_access_token = get_redistoken()
notify_configs = dingtalktodo__1__0_models.CreateTodoTaskRequestNotifyConfigs(
ding_notify='1'
)
# detail_url = dingtalktodo__1__0_models.CreateTodoTaskRequestDetailUrl(
# app_url='https://www.dingtalk.com',
# pc_url='https://www.dingtalk.com'
# )
create_todo_task_request = dingtalktodo__1__0_models.CreateTodoTaskRequest(
# source_id='isv_dingtalkTodo1', #业务id
subject=subject, # 待办标题
creator_id=creator_id, # 创建者unionid
description=description, # 最大长度 4096
due_time=get_msec(), # 完成待办的截止日期 毫秒
executor_ids=executor_ids, # 执行者的unionid
# participant_ids=[
# '6IiSFiSMFoRLWvXgl1Xc82XgiEiE'
# ], #参与者
# detail_url=detail_url,#详情页跳转地址
is_only_show_executor=True,
priority=20, # 优先级,普通
notify_configs=notify_configs # 待办通知配置
)
client.create_todo_task_with_options(creator_id, create_todo_task_request,
create_todo_task_headers, util_models.RuntimeOptions())
# try:
# client.create_todo_task_with_options('PUoiinWIpa2yH2ymhiiGiP6g', create_todo_task_request,
# create_todo_task_headers, util_models.RuntimeOptions())
# except Exception as err:
# if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):
# return err.message
if __name__ == '__main__':
a=get_uid1('698233030') #698233030
#a=get_all_uid()
pprint.pprint(a)