import json import pprint import redis import requests from core.config import Settings Settings = Settings() host = Settings.REDIS_CONF.get('host') port = Settings.REDIS_CONF.get('port') db = Settings.REDIS_CONF.get('db') password = Settings.REDIS_CONF.get('password') # redisdb = redis.Redis(host=host, port=port, db=db,password=password) redisdb = redis.Redis(host=host, port=port, db=db) def get_token(): """ 获取钉钉token :return: """ corpid = 'dingd10b2fc689700431' corpsecret = 'eL2gsI4Hgqx__FC8T5Bjmue_zQoyG5sm-SJqEDdS4NmKPTzFLQ2G93CX24VP2_n3' url = f"https://oapi.dingtalk.com/gettoken" # 请求头 headers = {'Content-type': 'text/html;charset=utf-8', 'Access-Control-Allow-Origin': '*'} # query查询参数 query = { 'corpid': corpid, 'corpsecret': corpsecret } r = requests.get(url=url, params=query, headers=headers) date = r.json() tokens = date['access_token'] # print(tokens) return tokens def get_uid(dept_id): """ 获取钉钉部门的成员名和uid :return: """ url = "https://oapi.dingtalk.com/topapi/user/listsimple" query = { 'access_token': get_redistoken() } data = { 'dept_id': dept_id, 'cursor': 0, 'size': 100 } r = requests.post(url=url, params=query, data=data) datas = r.json() return datas['result']['list'] def get_zi(): """ 获取钉钉的所有部门列表 :return: """ url = "https://oapi.dingtalk.com/department/list" query = { 'access_token': get_redistoken() } r = requests.get(url=url, params=query) datas = r.json() return datas['department'] def send_date(name, job, times, hr, interview_name, userid_list): """ 发送消息至钉钉 :param name: 面试者姓名 :param job: 面试岗位 :param times: 面试时间 :param hr: hr名字 :param interview_name: 面试官 :param userid_list: 要发送的人的uid,列表 :return: """ userid_list = ','.join(userid_list) url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2" query = { 'access_token': get_redistoken() } data = { 'agent_id': 792756727, "to_all_user": "false", 'userid_list': userid_list, "msg": { "msgtype": "text", "text": { "content": f"【面试通知】\n面试者:{name}\n面试岗位:{job}\n面试时间:{times}\nHR:{hr}\n面试官:{interview_name}" } } } json_data = json.dumps(data) # 发送消息到钉钉 requests.post(url=url, params=query, data=json_data) def get_redistoken(): """ 获取redis中存的token,如没有重新获取再存数据库中,再获取 :return: str """ redistoken = redisdb.get('token') if redistoken == None: # token = '16524325693811559' token = get_token() redisdb.set(name='token', value=token, ex=3600) # 1小时 3600 redistoken = redisdb.get('token') return redistoken.decode() def qujian_time(start_time, end_time): """ 把两个时间变成区间 :param start_time: '2022-07-01 10:00:00' :param end_time: '2022-07-01 10:30:00' :return: '2022-07-01 10:00:00~10:30:00' """ timess = str(end_time).split(' ')[-1] return str(start_time) + '~' + timess def get_section(): """ 获取分组后的部门id,通过id查询每个部门人的id :param bumen_data: 上级部门id list :return: dict[list] """ section = {} bumen_data = get_zi() for i in bumen_data: if i.get('parentid') == 1: section[i['id']] = i['name'] da_dict = {} for k, v in section.items(): da_s = [] for i in bumen_data: if i.get('parentid') == k: da_s.append(i.get('id')) if v == '启新工作室': da_s.append(k) if v != 'NBD': da_dict[v] = da_s return da_dict def get_all_uid(): """ 获取所有部门的钉钉用户id :return: list """ date_s = get_section() res_c = [] for k, v in date_s.items(): res = {} res['section_name'] = k res_list = [] for i in v: user_list = get_uid(i) for nu in user_list: res_list.append(nu) res['user_id'] = res_list res_c.append(res) return res_c def get_redis_alluid(): """ 获取redis中存的所有部门钉钉用户id,如没有重新获取再存数据库中,再获取 :return: list #包含有所在部门 """ redisuid = redisdb.get('user_id') if redisuid == None: user_list = get_all_uid() user_list = json.dumps(user_list) redisdb.set(name='user_id', value=user_list, ex=86400) # 存一天时间 redisuid = redisdb.get('user_id') user = redisuid.decode() return json.loads(user) def send_dates(content, userid_list): """ 发送消息至钉钉的通用模板 :param content: 需要发送的内容 :param userid_list: 要发送的人的uid,列表 :return: """ userid_list = ','.join(userid_list) url = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2" query = { 'access_token': get_redistoken() } data = { 'agent_id': 792756727, "to_all_user": "false", 'userid_list': userid_list, "msg": { "msgtype": "text", "text": { "content": content } } } json_data = json.dumps(data) # 发送消息到钉钉 requests.post(url=url, params=query, data=json_data) def Unionid(unionid): """ 根据unionid获取用户userid :param unionid: 企业账号范围内的唯一标识 :return: 用户userid """ url = "https://oapi.dingtalk.com/topapi/user/getbyunionid" query = { 'access_token': get_redistoken() } data = {'unionid': unionid} r = requests.post(url=url, params=query, data=data) datas = r.json() return datas.get('result').get('userid') def user_details(userid): """ 根据userid获取用户详情 :param userid: 钉钉用户userid :return: """ url = "https://oapi.dingtalk.com/topapi/v2/user/get" query = { 'access_token': get_redistoken() } data = {'userid': userid} r = requests.post(url=url, params=query, data=data) datas = r.json() return datas def get_user_list(): """ 获取所有部门钉钉用户id,list返回 """ user=get_redis_alluid() user_list = [] for i in user: for ii in i.get('user_id'): user_list.append(ii.get('userid')) return user_list def get_alluid_list(): """ 获取redis中存的所有部门钉钉用户id,如没有重新获取再存数据库中,再获取 :return: list #直接列表里面包含user_id,没有其他的 """ redisuid = redisdb.get('user_id_list') if redisuid == None: user_list = get_user_list() user_list = json.dumps(user_list) redisdb.set(name='user_id_list', value=user_list, ex=86400) # 存一天时间 redisuid = redisdb.get('user_id_list') user = redisuid.decode() return json.loads(user) if __name__ == '__main__': #a = user_details('16371426094531014') a=get_alluid_list() pprint.pprint(a)