import random import time import datetime import pandas as pd from datetime import timedelta from datetime import datetime as p1 import calendar import crud import schemas from db import get_database def get_uid(): return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:] # 获取筛选条件的包含关系 def get_bijiao(bijiao): if bijiao == '==' or bijiao == 'in' or bijiao == 'like' or bijiao == 'is not null': return "IN" elif bijiao == '!=' or bijiao == 'not like' or bijiao == 'is null': return 'NOT LIKE' # 判断传入的数据类型 def estimate_data(data_type): if data_type == 'int': return "Nullable(Int64)" elif data_type == 'ip': return "Nullable(DateTime('UTC'))" else: return "Nullable(String)" # 将字典变成字符串 def dict_to_str(dic): c = str() b = 0 for k, v in dic.items(): b += 1 if b == 1: c += "{\"%s\":\"%s\"," % (k, v) elif b != len(dic): c += "\"%s\":\"%s\"," % (k, v) else: c += "\"%s\":\"%s\"}" % (k, v) return c def getEveryDay(begin_date, end_date): # 前闭后闭 date_list = [] begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d") end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") while begin_date <= end_date: date_str = begin_date.strftime("%Y-%m-%d") date_list.append(date_str) begin_date += datetime.timedelta(days=1) return date_list # print(getEveryDay('2016-01-01','2017-05-11')) def Download_xlsx(df, name): """ 下载功能 name为文件名 """ from urllib.parse import quote import mimetypes from utils import DfToStream from fastapi.responses import StreamingResponse file_name = quote(f'{name}.xlsx') mime = mimetypes.guess_type(file_name)[0] df_to_stream = DfToStream((df, name)) with df_to_stream as d: export = d.to_stream() Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'}) return Download def jiange_insert(list_date): """ 间隔1条插入一条数据插入数据 :param day: list数据 :return: list """ i = 1 while i <= len(list_date): list_date.insert(i, '-') i += 2 return list_date def create_df(resp): """ 分布分析外部下载功能的df数据 """ columns = resp['label'] day = list(resp['list'].keys()) jiange_insert(day) date = [] day_nu = 0 for i in day: if i == '-': av = day[day_nu - 1] day_date = resp['list'][av]['总体'] else: day_date = resp['list'][i]['总体'] date_dict = {} n = 0 p = 0 if i == '-': date_dict['事件发生时间'] = '-' date_dict['总人数'] = '-' for nu in range(len(columns)): date_dict[columns[nu]] = day_date['p'][p] p += 1 date.append(date_dict) else: date_dict['事件发生时间'] = i date_dict['总人数'] = day_date['total'] for nu in range(len(columns)): date_dict[columns[nu]] = day_date['n'][n] n += 1 date.append(date_dict) day_nu += 1 columns.insert(0, '总人数') columns.insert(0, '事件发生时间') df = pd.DataFrame(data=date, columns=columns) return df def create_neidf(resp, columnName): """ 分布分析内部下载功能的df数据 """ columns = resp['label'] day = list(resp['list'].keys()) jiange_insert(day) date = [] day_nu = 0 for i in day: if i == '-': av = day[day_nu - 1] day_date = resp['list'][av] else: day_date = resp['list'][i] date_dict = {} n = 0 p = 0 if i == '-': date_dict[columnName] = '-' date_dict['全部用户数'] = '-' for nu in range(len(columns)): date_dict[columns[nu]] = day_date['p'][p] p += 1 date.append(date_dict) else: date_dict[columnName] = i date_dict['全部用户数'] = day_date['total'] for nu in range(len(columns)): date_dict[columns[nu]] = day_date['n'][n] n += 1 date.append(date_dict) day_nu += 1 columns.insert(0, '全部用户数') columns.insert(0, columnName) df = pd.DataFrame(data=date, columns=columns) return df def get_week(date_str=None): if date_str and isinstance(date_str, str): now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime( "%Y-%m-%d %H:%M:%S") else: now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0) now_time = strptime(now_time) # 当前日期所在周的周一 week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute, seconds=now_time.second) # 当前日期所在周的周日 week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59) return week_start_time, week_end_time def strptime(date_string): """ 将字符串转换成datetime.datetime类型 :param date_string: '2022-05-29 23:59:59' :return: 2022-05-29 23:59:59 """ return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S') def strptime1(date_str): """ 将字符串转换成datetime.datetime类型 :param date_string: '2022-05-29' :return: 2022-05-29 00:00:00 """ return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") def start_end_month(time): """ 获取某个月的起始时间和结束时间 :param time: '2022-05-29' :return: """ now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S") this_month_start = datetime.datetime(now.year, now.month, 1) this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1]) this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59) return this_month_start, this_month_end1 async def get_event(attr, game): """ :param attr: str 事件属性 :param game: str 游戏名 :return: str 事件名 """ res = await crud.event_point.all_event(get_database(), game) event_name = '' for i in res: if attr in i['event_attr']: event_name = i['event_name'] break return event_name def get_time(fmt: str = '%Y-%m-%d') -> str: ''' 获取当前时间 ''' ts = time.time() ta = time.localtime(ts) t = time.strftime(fmt, ta) return t def random_hex(length): """ 生成随机数 :param length: 想要几位随机数就传多少位 :return: 随机数 """ result = hex(random.randint(0, 16 ** length)).replace('0x', '').upper() if (len(result) < length): result = '0' * (length - len(result)) + result print(result) return result