shoumeng_xiangsu/utils/post_data.py
2021-10-15 16:29:44 +08:00

92 lines
2.2 KiB
Python

# coding:utf-8
import gzip
import json
import base64
import time
import requests
from core import settings
from .data_idx import set_data_id
requests.packages.urllib3.disable_warnings()
class PostData:
def __init__(self):
self.data_list = []
def add(self, data):
self.data_list.append(data)
def get_data_length(self):
return len(self.data_list)
def is_upload(self):
return len(self.data_list) > 900
def post(self):
# print(int(time.time()))
# print(len(self.data_list))
# print(json.dumps(post_data))
if not self.data_list:
return
gzip_data = gzip.compress(json.dumps(self.data_list).encode())
base64_data = base64.b64encode(gzip_data)
# resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False)
# proxies = {
# 'http': '127.0.0.1:8899',
# 'https': '127.0.0.1:8899'
# }
try:
print(json.dumps(self.data_list))
print('*' * 50)
resp = requests.post(settings.SM_API, base64_data, verify=False)
# resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False)
resp_text = resp.text
print(resp_text)
except Exception as e:
print(e)
finally:
self.set_idx()
self.clear_data()
def set_idx(self):
set_data_id(self.data_list[-1]['data']['data_id'])
def clear_data(self):
self.data_list.clear()
if __name__ == '__main__':
pass
# data = {
# "cp_game_id": 682,
# "category": "cp_api",
# "event": {
# "event_time": f"{int(time.time() * 1000)}",
# "event_name": "role_rank"
# },
# "data": {
# "utc_time": 1506054735456,
# "game_server": 1,
# "platform_id": 101,
# # "sm_user_id": "test_123456",
# # "user_id": "14444444",
# # "role_id": "14362455",
# # "role_name": "无名",
# # "role": "狂暴战士",
# # "school": "玄月宗",
# # "combat": 156784,
# # "role_vip": 2,
# # "before_rank": 2,
# # "role_rank": 3
#
# }
# }