# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import datetime import json import pprint import time from copy import deepcopy import pymongo import redis import requests from alibabacloud_dingtalk.todo_1_0.client import Client as dingtalktodo_1_0Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dingtalk.todo_1_0 import models as dingtalktodo__1__0_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient from alibabacloud_tea_openapi import models as open_api_models from core.config import Settings, Debug Settings = Settings() Mongo = Debug() host = Settings.REDIS_CONF.get('host') port = Settings.REDIS_CONF.get('port') db = Settings.REDIS_CONF.get('db') clientMongo = Mongo.DATABASE_URI password = Settings.REDIS_CONF.get('password') # redisdb = redis.Redis(host=host, port=port, db=db,password=password) redisdb = redis.Redis(host=host, port=port, db=db,password=password) myClient = pymongo.MongoClient(clientMongo) def get_msec(): """ 获取当前日期的毫秒级时间 :return: """ today = datetime.date.today().strftime('%Y-%m-%d 23:59:59') t = int(time.mktime(time.strptime(today, "%Y-%m-%d %H:%M:%S"))) return int(round(t * 1000)) def get_token(): """ 获取钉钉token :return: """ #最外部应用的配置 corpid = 'dingd10b2fc689700431' corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3' # 具体应用的配置 # corpid = 'dinghujiejktl8lytdfo' # corpsecret = 'T5wXtmBZ0JD4ciWs1SP8ZWgRjnLiNFp4KJ0nRJpW_y621aPfu4qz631eCsapkFCh' url = f"https://oapi.dingtalk.com/gettoken" # 请求头 headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'} # query查询参数 query = { 'corpid': corpid, 'corpsecret': corpsecret } r = requests.get(url=url, params=query, headers=headers) date = r.json() tokens = date['access_token'] # print(tokens) return tokens def get_uid1(dept_id): """ 获取钉钉部门的成员名和uid :return: """ url = "https://oapi.dingtalk.com/topapi/user/listsimple" query = { 'access_token': get_redistoken() } data = { 'dept_id': dept_id, 'cursor': 0, 'size': 100 } r = requests.post(url=url, params=query, data=data) datas = r.json() return datas['result']['list'] def get_zi(): """ 获取钉钉的所有部门列表 :return: """ url = "https://oapi.dingtalk.com/department/list" query = { 'access_token': get_redistoken() } r = requests.get(url=url, params=query) datas = r.json() return datas['department'] def send_date(name, job, times, hr, interview_name, userid_list): """ 发送消息至钉钉 :param name: 面试者姓名 :param job: 面试岗位 :param times: 面试时间 :param hr: hr名字 :param interview_name: 面试官 :param userid_list: 要发送的人的uid,列表 :return: """ userid_list = ','.join(userid_list) url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2" query = { 'access_token': get_redistoken() } data = { 'agent_id': 792756727, "to_all_user": "false", 'userid_list': userid_list, "msg": { "msgtype": "text", "text": { "content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR:{hr}\n面试官:{interview_name}" } } } json_data = json.dumps(data) # 发送消息到钉钉 requests.post(url=url, params=query, data=json_data) def get_redistoken(): """ 获取redis中存的token,如没有重新获取再存数据库中,再获取 :return: str """ redistoken = redisdb.get('token') if redistoken == None: # token = '16524325693811559' token = get_token() redisdb.set(name='token', value=token, ex=3600) # 1小时 3600 redistoken = redisdb.get('token') return redistoken.decode() def qujian_time(start_time, end_time): """ 把两个时间变成区间 :param start_time: '2022-07-01 10:00:00' :param end_time: '2022-07-01 10:30:00' :return: '2022-07-01 10:00:00~10:30:00' """ timess = str(end_time).split(' ')[-1] return str(start_time) + '~' + timess def get_section(): """ 获取分组后的部门id,通过id查询每个部门人的id :param bumen_data: 上级部门id list :return: dict[list] """ section = {} bumen_data = get_zi() for i in bumen_data: if i.get('parentid') == 1: section[i['id']] = i['name'] da_dict = {} for k, v in section.items(): da_s = [] for i in bumen_data: if i.get('parentid') == k: da_s.append(i.get('id')) if v == '启新工作室': da_s.append(k) if v != 'NBD': da_dict[v] = da_s depid = {} for k, v in section.items(): depid[v] = k return da_dict, depid def get_all_uid(): """ 获取所有部门的钉钉用户id :return: list """ date_s, depid = get_section() res_c = [] for k, v in date_s.items(): res = {} res['section_name'] = k res['depid'] = depid[k] res_list = [] for i in v: user_list = get_uid1(i) for nu in user_list: res_list.append(nu) res['user_id'] = res_list res_c.append(res) return res_c def get_redis_alluid(): """ 获取redis中存的所有部门钉钉用户id,如没有重新获取再存数据库中,再获取 :return: list #包含有所在部门 """ redisuid = redisdb.get('user_id') if redisuid == None: user_list = get_all_uid() user_list = json.dumps(user_list) redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间 redisuid = redisdb.get('user_id') user = redisuid.decode() return json.loads(user) def send_dates(content, userid_list): """ 发送消息至钉钉的通用模板 :param content: 需要发送的内容 :param userid_list: 要发送的人的uid,列表 :return: """ userid_list = ','.join(userid_list) url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2" query = { 'access_token': get_redistoken() } data = { 'agent_id': 792756727, "to_all_user": "false", 'userid_list': userid_list, "msg": { "msgtype": "text", "text": { "content": content } } } json_data = json.dumps(data) # 发送消息到钉钉 requests.post(url=url, params=query, data=json_data) def Unionid(unionid): """ 根据unionid获取用户userid :param unionid: 企业账号范围内的唯一标识 :return: 用户userid """ url = "https://oapi.dingtalk.com/topapi/user/getbyunionid" query = { 'access_token': get_redistoken() } data = {'unionid': unionid} r = requests.post(url=url, params=query, data=data) datas = r.json() return datas.get('result').get('userid') def user_details(userid): """ 根据userid获取用户详情 :param userid: 钉钉用户userid :return: """ url = "https://oapi.dingtalk.com/topapi/v2/user/get" query = { 'access_token': get_redistoken() } data = {'userid': userid} r = requests.post(url=url, params=query, data=data) datas = r.json() return datas def get_user_list(): """ 获取所有部门钉钉用户id,list返回 """ user = get_redis_alluid() user_list = [] for i in user: for ii in i.get('user_id'): user_list.append(ii.get('userid')) return user_list def get_alluid_list(): """ 获取redis中存的所有部门钉钉用户id,如没有重新获取再存数据库中,再获取 :return: list #直接列表里面包含user_id,没有其他的 """ redisuid = redisdb.get('user_id_list') if redisuid == None: user_list = get_user_list() user_list = json.dumps(user_list) redisdb.set(name='user_id_list', value=user_list, ex=86400) # 存一天时间 redisuid = redisdb.get('user_id_list') user = redisuid.decode() return json.loads(user) class Sample: def __init__(self): pass @staticmethod def create_client() -> dingtalktodo_1_0Client: """ 使用 Token 初始化账号Client @return: Client @throws Exception """ config = open_api_models.Config() config.protocol = 'https' config.region_id = 'central' return dingtalktodo_1_0Client(config) @staticmethod def create_task( subject: str, creator_id: str, description: str, executor_ids: list ) -> None: """ 创建钉钉待办 :param subject: 待办标题 :param creator_id: 创建者 :param description: 待办内容 :param executor_ids: 执行者 :return: """ client = Sample.create_client() create_todo_task_headers = dingtalktodo__1__0_models.CreateTodoTaskHeaders() create_todo_task_headers.x_acs_dingtalk_access_token = get_redistoken() notify_configs = dingtalktodo__1__0_models.CreateTodoTaskRequestNotifyConfigs( ding_notify='1' ) # detail_url = dingtalktodo__1__0_models.CreateTodoTaskRequestDetailUrl( # app_url='https://www.dingtalk.com', # pc_url='https://www.dingtalk.com' # ) create_todo_task_request = dingtalktodo__1__0_models.CreateTodoTaskRequest( # source_id='isv_dingtalkTodo1', #业务id subject=subject, # 待办标题 creator_id=creator_id, # 创建者unionid description=description, # 最大长度 4096 due_time=get_msec(), # 完成待办的截止日期 毫秒 executor_ids=executor_ids, # 执行者的unionid # participant_ids=[ # '6IiSFiSMFoRLWvXgl1Xc82XgiEiE' # ], #参与者 # detail_url=detail_url,#详情页跳转地址 is_only_show_executor=True, priority=20, # 优先级,普通 notify_configs=notify_configs # 待办通知配置 ) client.create_todo_task_with_options(creator_id, create_todo_task_request, create_todo_task_headers, util_models.RuntimeOptions()) # try: # client.create_todo_task_with_options('PUoiinWIpa2yH2ymhiiGiP6g', create_todo_task_request, # create_todo_task_headers, util_models.RuntimeOptions()) # except Exception as err: # if not UtilClient.empty(err.code) and not UtilClient.empty(err.message): # return err.message if __name__ == '__main__': a=get_uid1('698233030') #698233030 #a=get_all_uid() pprint.pprint(a)