import time import hmac import hashlib import urllib.parse import base64 import requests from db import get_local_db __all__ = ('ddsend_msg',) local_db = get_local_db('admin_game') dd_conf = local_db['game'].find_one() or {} msg_set = set() def get_sign(): timestamp = str(round(time.time() * 1000)) secret = dd_conf.get('dd_secret') or 'SEC8de0abcb83a3c387fb168995be58913f6e5fcc3612a02686fcc9107d25809930' secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return timestamp, sign def ddsend_msg(msg): hash_msg = hmac.new(msg.encode('utf-8'), digestmod=hashlib.sha256).digest() if hash_msg not in msg_set: msg_set.add(hash_msg) else: return data = { 'msgtype': 'text', 'text': { 'content': msg } } timestamp, sign = get_sign() webhook = dd_conf.get('dd_webhook') or 'https://oapi.dingtalk.com/robot/send?access_token' \ '=9d620df1653de39f28bf6b059e0fff9978ede3185645e4917c52d754a050b458 ' url = f'{webhook}×tamp={timestamp}&sign={sign}' resp = requests.post(url, json=data) if __name__ == '__main__': ddsend_msg('大家好,我是叉叉,一个冒的感情的机器人')