From 063645e0a1e0e2eede70311c19d4b6b8a400d957 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E4=BC=9F?= <250213850@qq.com> Date: Fri, 15 Jul 2022 18:20:10 +0800 Subject: [PATCH] =?UTF-8?q?=E9=92=89=E9=92=89=E7=9A=84=E5=B0=81=E8=A3=85?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/dingding.py | 177 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 utils/dingding.py diff --git a/utils/dingding.py b/utils/dingding.py new file mode 100644 index 0000000..8c640ca --- /dev/null +++ b/utils/dingding.py @@ -0,0 +1,177 @@ +import json +import requests +from db.redisdb import get_redis_pool + +redisdb = get_redis_pool() + + +def get_token(): + """ + 获取钉钉token + :return: + """ + corpid = 'dingd10b2fc689700431' + corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3' + url = f"https://oapi.dingtalk.com/gettoken" + # 请求头 + headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'} + # query查询参数 + query = { + 'corpid': corpid, + 'corpsecret': corpsecret + } + r = requests.get(url=url, params=query, headers=headers) + date = r.json() + tokens = date['access_token'] + # print(tokens) + return tokens + + +def get_uid(dept_id): + """ + 获取钉钉部门的成员名和uid + :return: + """ + url = "https://oapi.dingtalk.com/topapi/user/listsimple" + query = { + 'access_token': get_redistoken() + } + data = { + 'dept_id': dept_id, + 'cursor': 0, + 'size': 100 + } + r = requests.post(url=url, params=query, data=data) + datas = r.json() + return datas['result']['list'] + + +def get_zi(): + """ + 获取钉钉的所有部门列表 + :return: + """ + url = "https://oapi.dingtalk.com/department/list" + query = { + 'access_token': get_redistoken() + } + r = requests.get(url=url, params=query) + datas = r.json() + return datas['department'] + + +def send_date(name, job, times, hr, interview_name, userid_list): + """ + 发送消息至钉钉 + :param name: 面试者姓名 + :param job: 面试岗位 + :param times: 面试时间 + :param hr: hr名字 + :param interview_name: 面试官 + :param userid_list: 要发送的人的uid,列表 + :return: + """ + userid_list = ','.join(userid_list) + url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2" + query = { + 'access_token': get_redistoken() + } + data = { + 'agent_id': 792756727, + "to_all_user": "false", + 'userid_list': userid_list, + "msg": { + "msgtype": "text", + "text": { + "content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR:{hr}\n面试官:{interview_name}" + } + } + } + json_data = json.dumps(data) + # 发送消息到钉钉 + requests.post(url=url, params=query, data=json_data) + + +def get_redistoken(): + """ + 获取redis中存的token,如没有重新获取再存数据库中,再获取 + :return: str + """ + redistoken = redisdb.get('token') + if redistoken == None: + # token = '16524325693811559' + token = get_token() + redisdb.set(name='token', value=token, ex=3600) # 1小时 3600 + redistoken = redisdb.get('token') + return redistoken.decode() + + +def qujian_time(start_time, end_time): + """ + 把两个时间变成区间 + :param start_time: '2022-07-01 10:00:00' + :param end_time: '2022-07-01 10:30:00' + :return: '2022-07-01 10:00:00~10:30:00' + """ + timess = str(end_time).split(' ')[-1] + return str(start_time) + '~' + timess + + +def get_section(): + """ + 获取分组后的部门id,通过id查询每个部门人的id + :param bumen_data: 上级部门id list + :return: dict[list] + """ + section = {} + bumen_data = get_zi() + for i in bumen_data: + if i.get('parentid') == 1: + section[i['id']] = i['name'] + da_dict = {} + for k, v in section.items(): + da_s = [] + for i in bumen_data: + if i.get('parentid') == k: + da_s.append(i.get('id')) + if v == '启新工作室': + da_s.append(k) + if v != 'NBD': + da_dict[v] = da_s + return da_dict + + +def get_all_uid(): + """ + 获取所有部门的钉钉用户id + :return: list + """ + date_s = get_section() + res_c = [] + for k, v in date_s.items(): + res = {} + res['section_name'] = k + res_list = [] + for i in v: + user_list = get_uid(i) + for nu in user_list: + res_list.append(nu) + res['user_id'] = res_list + res_c.append(res) + return res_c + + +def get_redis_alluid(): + """ + 获取redis中存的所有部门钉钉用户id,如没有重新获取再存数据库中,再获取 + :return: list + """ + redisuid = redisdb.get('user_id') + if redisuid == None: + user_list = get_all_uid() + user_list = json.dumps(user_list) + redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间 + redisuid = redisdb.get('user_id') + user=redisuid.decode() + return json.loads(user) +