prs_server/utils/func.py

282 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import time
import datetime
import pandas as pd
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from datetime import timedelta
from datetime import datetime as p1
import calendar
from core.config import Email
def get_uid():
return hex(int(time.time() * 10 ** 7) + random.randint(0, 10000))[2:]
# 获取筛选条件的包含关系
def get_bijiao(bijiao):
if bijiao == '==' or bijiao == 'in' or bijiao == 'like' or bijiao == 'is not null':
return "IN"
elif bijiao == '!=' or bijiao == 'not like' or bijiao == 'is null':
return 'NOT LIKE'
# 判断传入的数据类型
def estimate_data(data_type):
if data_type == 'int':
return "Nullable(Int64)"
elif data_type == 'ip':
return "Nullable(DateTime('UTC'))"
else:
return "Nullable(String)"
# 将字典变成字符串
def dict_to_str(dic):
c = str()
b = 0
for k, v in dic.items():
b += 1
if b == 1:
c += "{\"%s\":\"%s\"," % (k, v)
elif b != len(dic):
c += "\"%s\":\"%s\"," % (k, v)
else:
c += "\"%s\":\"%s\"}" % (k, v)
return c
def getEveryDay(begin_date, end_date):
# 前闭后闭
date_list = []
begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
while begin_date <= end_date:
date_str = begin_date.strftime("%Y-%m-%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
return date_list
# print(getEveryDay('2016-01-01','2017-05-11'))
def Download_xlsx(df, name):
"""
下载功能
name为文件名
"""
from urllib.parse import quote
import mimetypes
from utils import DfToStream
from fastapi.responses import StreamingResponse
file_name = quote(f'{name}.xlsx')
mime = mimetypes.guess_type(file_name)[0]
df_to_stream = DfToStream((df, name))
with df_to_stream as d:
export = d.to_stream()
Download = StreamingResponse(export, media_type=mime, headers={'Content-Disposition': f'filename="{file_name}"'})
return Download
def jiange_insert(list_date):
"""
间隔1条插入一条数据插入数据
:param day: list数据
:return: list
"""
i = 1
while i <= len(list_date):
list_date.insert(i, '-')
i += 2
return list_date
def create_df(resp):
"""
分布分析外部下载功能的df数据
"""
columns = resp['label']
day = list(resp['list'].keys())
jiange_insert(day)
date = []
day_nu = 0
for i in day:
if i == '-':
av = day[day_nu - 1]
day_date = resp['list'][av]['总体']
else:
day_date = resp['list'][i]['总体']
date_dict = {}
n = 0
p = 0
if i == '-':
date_dict['事件发生时间'] = '-'
date_dict['总人数'] = '-'
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['p'][p]
p += 1
date.append(date_dict)
else:
date_dict['事件发生时间'] = i
date_dict['总人数'] = day_date['total']
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['n'][n]
n += 1
date.append(date_dict)
day_nu += 1
columns.insert(0, '总人数')
columns.insert(0, '事件发生时间')
df = pd.DataFrame(data=date, columns=columns)
return df
def create_neidf(resp, columnName):
"""
分布分析内部下载功能的df数据
"""
columns = resp['label']
day = list(resp['list'].keys())
jiange_insert(day)
date = []
day_nu = 0
for i in day:
if i == '-':
av = day[day_nu - 1]
day_date = resp['list'][av]
else:
day_date = resp['list'][i]
date_dict = {}
n = 0
p = 0
if i == '-':
date_dict[columnName] = '-'
date_dict['全部用户数'] = '-'
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['p'][p]
p += 1
date.append(date_dict)
else:
date_dict[columnName] = i
date_dict['全部用户数'] = day_date['total']
for nu in range(len(columns)):
date_dict[columns[nu]] = day_date['n'][n]
n += 1
date.append(date_dict)
day_nu += 1
columns.insert(0, '全部用户数')
columns.insert(0, columnName)
df = pd.DataFrame(data=date, columns=columns)
return df
def random_hex():
"""
生成16位随机数
:return: 随机数
"""
result = hex(random.randint(0, 16 ** 16)).replace('0x', '').upper()
if (len(result) < 16):
result = '0' * (16 - len(result)) + result
return result
def get_time(fmt: str = '%Y-%m-%d') -> str:
'''
获取当前时间
'''
ts = time.time()
ta = time.localtime(ts)
t = time.strftime(fmt, ta)
return t
def import_excel(data, columns, name):
"""
导出数据到xlsx表里面
:param data: 需要导出的数据
:param columns: df的表名 例:['a','b','c']
:param name: 文件名。例:'随机数'
:return:
"""
zh = pd.DataFrame(data, columns=columns)
zh.to_excel(f'{name}.xlsx', index=False, header=True)
def qujian_time(start_time, end_time):
"""
把两个时间变成区间
:param start_time: '2022-07-01 10:00:00'
:param end_time: '2022-07-01 10:30:00'
:return: '2022-07-01 10:00:00~10:30:00'
"""
timess = str(end_time).split(' ')[-1]
return str(start_time) + '~' + timess
# 发送的文本邮件
def send_str_mail(str_msg, my_user):
"""
发送的文本邮件
:param str_msg: 发送的邮件内容
:param my_user: 接收者的邮箱
:return: bool
"""
msg = MIMEText(str_msg, 'plain', 'utf-8')
msg['From'] = formataddr(["乐谷游戏人事", Email.my_sender]) # 括号里的对应发件人邮箱昵称,发件人邮箱账号
msg['To'] = formataddr(["FK", my_user]) # 括号里的对应收件人邮箱昵称,收件人邮箱账号
msg['Subject'] = Email.subject # 邮件的主题,也可以说是标题
server = smtplib.SMTP_SSL("smtp.qq.com", 465) # 发件人邮箱中的SMTP服务器端口是25
server.login(Email.my_sender, Email.my_pass) # 括号中对应的是发件人邮箱账号,邮箱密码
# server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件
server.sendmail(Email.my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号,收件人邮箱账号,发送邮件
server.quit() # 关闭连接
def get_week(date_str=None):
if date_str and isinstance(date_str, str):
now_time = (p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=1)).strftime(
"%Y-%m-%d %H:%M:%S")
else:
now_time = p1.now().replace(hour=0, minute=0, second=0, microsecond=0)
now_time = strptime(now_time)
# 当前日期所在周的周一
week_start_time = now_time - timedelta(days=now_time.weekday() + 1, hours=now_time.hour, minutes=now_time.minute,
seconds=now_time.second)
# 当前日期所在周的周日
week_end_time = week_start_time + timedelta(days=6, hours=23, minutes=59, seconds=59)
return week_start_time, week_end_time
def strptime(date_string):
"""
将字符串转换成datetime.datetime类型
:param date_string: '2022-05-29 23:59:59'
:return: 2022-05-29 23:59:59
"""
return p1.strptime(date_string, '%Y-%m-%d %H:%M:%S')
def strptime1(date_str):
"""
将字符串转换成datetime.datetime类型
:param date_string: '2022-05-29'
:return: 2022-05-29 00:00:00
"""
return p1.strptime(date_str + " 00:00:00", "%Y-%m-%d %H:%M:%S")
def start_end_month(time):
"""
获取某个月的起始时间和结束时间
:param time: '2022-05-29'
:return:
"""
now = p1.strptime(time + " 00:00:00", "%Y-%m-%d %H:%M:%S")
this_month_start = datetime.datetime(now.year, now.month, 1)
this_month_end = datetime.datetime(now.year, now.month, calendar.monthrange(now.year, now.month)[1])
this_month_end1 = this_month_end + timedelta(hours=23, minutes=59, seconds=59)
return this_month_start, this_month_end1