# coding:utf-8 import gzip import json import base64 import time import requests from core import settings requests.packages.urllib3.disable_warnings() class PostData: def __init__(self): self.data_list = [] def add(self, data): self.data_list.append(data) def get_data_length(self): return len(self.data_list) def is_upload(self): return len(self.data_list) > 900 def post(self): # print(int(time.time())) # print(len(self.data_list)) # print(json.dumps(post_data)) if not self.data_list: return gzip_data = gzip.compress(json.dumps(self.data_list).encode()) base64_data = base64.b64encode(gzip_data) # resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False) # proxies = { # 'http': '127.0.0.1:8899', # 'https': '127.0.0.1:8899' # } try: print(json.dumps(self.data_list)) print('*' * 50) resp = requests.post(settings.SM_API, base64_data, verify=False) # resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False) resp_text = resp.text print(resp_text) except Exception as e: print(e) finally: self.clear_data() def clear_data(self): self.data_list.clear() if __name__ == '__main__': pass # data = { # "cp_game_id": 682, # "category": "cp_api", # "event": { # "event_time": f"{int(time.time() * 1000)}", # "event_name": "role_rank" # }, # "data": { # "utc_time": 1506054735456, # "game_server": 1, # "platform_id": 101, # # "sm_user_id": "test_123456", # # "user_id": "14444444", # # "role_id": "14362455", # # "role_name": "无名", # # "role": "狂暴战士", # # "school": "玄月宗", # # "combat": 156784, # # "role_vip": 2, # # "before_rank": 2, # # "role_rank": 3 # # } # }