data_cleaning/utils/dd_msg.py
2021-01-15 09:36:04 +08:00

60 lines
1.5 KiB
Python

import time
import hmac
import hashlib
import urllib.parse
import base64
import requests
from db import get_local_db
__all__ = ('ddsend_msg',)
from settings import settings
local_db = get_local_db('admin_game')
dd_conf = local_db['game'].find_one() or {}
msg_set = set()
def get_sign():
timestamp = str(round(time.time() * 1000))
secret = dd_conf.get('dd_secret') or 'SEC8de0abcb83a3c387fb168995be58913f6e5fcc3612a02686fcc9107d25809930'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return timestamp, sign
def ddsend_msg(msg):
if settings.run_model != 'production':
return
hash_msg = hmac.new(msg.encode('utf-8'), digestmod=hashlib.sha256).digest()
if hash_msg not in msg_set:
msg_set.add(hash_msg)
else:
return
data = {
'msgtype': 'text',
'text': {
'content': msg
}
}
timestamp, sign = get_sign()
webhook = dd_conf.get('dd_webhook') or 'https://oapi.dingtalk.com/robot/send?access_token' \
'=9d620df1653de39f28bf6b059e0fff9978ede3185645e4917c52d754a050b458 '
url = f'{webhook}&timestamp={timestamp}&sign={sign}'
resp = requests.post(url, json=data)
if __name__ == '__main__':
ddsend_msg('大家好,我是叉叉,一个冒的感情的机器人')