153 lines
6.0 KiB
Python
153 lines
6.0 KiB
Python
import datetime
|
|
from typing import List, Tuple
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.sql import func
|
|
from sqlalchemy import create_engine, column, and_, desc, table, or_
|
|
import pandas as pd
|
|
|
|
TIME_ZONE_MAP = {
|
|
8: 'Asia/Shanghai'
|
|
}
|
|
|
|
PROPHET_TIME_GRAIN_MAP = {
|
|
"PT1S": "S",
|
|
"PT1M": "min",
|
|
"PT5M": "5min",
|
|
"PT10M": "10min",
|
|
"PT15M": "15min",
|
|
"PT0.5H": "30min",
|
|
"PT1H": "H",
|
|
"P1D": "D",
|
|
"P1W": "W",
|
|
"P1M": "M",
|
|
}
|
|
|
|
TIME_GRAIN_EXPRESSIONS = {
|
|
'PT1S': lambda col, zone: func.toStartOfSecond(func.toTimeZone(col, zone)).label('date'),
|
|
'PT1M': lambda col, zone: func.toStartOfMinute(func.toTimeZone(col, zone)).label('date'),
|
|
'PT5M': lambda col, zone: func.toStartOfFiveMinute(func.toTimeZone(col, zone)).label('date'),
|
|
'PT10M': lambda col, zone: func.toStartOfTenMinutes(func.toTimeZone(col, zone)).label('date'),
|
|
'PT15M': lambda col, zone: func.toStartOfFifteenMinutes(func.toTimeZone(col, zone)).label('date'),
|
|
# 'PT0.5H': lambda col, zone: func.toStartOfMinute(func.toTimeZone(col, zone)).label('date'),
|
|
'PT1H': lambda col, zone: func.toStartOfHour(func.toTimeZone(col, zone)).label('date'),
|
|
'P1D': lambda col, zone: func.toStartOfDay(func.toTimeZone(col, zone)).label('date'),
|
|
'P1W': lambda col, zone: func.toMonday(func.toTimeZone(col, zone)).label('date'),
|
|
'P1M': lambda col, zone: func.toStartOfMonth(func.toTimeZone(col, zone)).label('date'),
|
|
}
|
|
|
|
|
|
class ToSql:
|
|
def __init__(self, data: dict, db_name: str, table_name: str, columns: List[str]):
|
|
self.db_name = db_name
|
|
self.engine = create_engine('clickhouse://')
|
|
self.columns = self.gen_columns(columns)
|
|
self.event_view = data.get('eventView')
|
|
self.events = data.get('events')
|
|
|
|
self.table = sa.table(table_name, *self.columns.values(), schema=self.db_name)
|
|
|
|
def gen_columns(self, columns):
|
|
return {col: column(col) for col in columns}
|
|
|
|
def get_zone_time(self):
|
|
return int(self.event_view.get('zone_time'))
|
|
|
|
def get_date_range(self) -> Tuple[str, str]:
|
|
start_data: str = self.event_view.get('startTime')
|
|
end_data: str = self.event_view.get('endTime')
|
|
|
|
return start_data, end_data
|
|
|
|
def get_global_filters(self):
|
|
return self.event_view.get('filters') or []
|
|
|
|
def get_group_by(self):
|
|
# return self.event_view.get('groupBy') or []
|
|
return [item['column_id'] for item in self.event_view.get('groupBy')]
|
|
|
|
def get_time_particle_size(self):
|
|
return self.event_view.get('timeParticleSize') or 'P1D'
|
|
|
|
def get_sql_query_event_model(self):
|
|
"""只是查event表"""
|
|
sqls = []
|
|
select_exprs = self.get_group_by()
|
|
select_exprs = [self.columns.get(item) for item in select_exprs]
|
|
time_particle_size = self.get_time_particle_size()
|
|
start_data, end_data = self.get_date_range()
|
|
time_zone = TIME_ZONE_MAP[self.get_zone_time()]
|
|
select_exprs.insert(0, TIME_GRAIN_EXPRESSIONS[time_particle_size](self.columns['#event_time'], time_zone))
|
|
date_range = pd.date_range(start_data, end_data, freq=PROPHET_TIME_GRAIN_MAP[time_particle_size],
|
|
tz=time_zone).tolist()
|
|
groupby = [item.name for item in select_exprs]
|
|
|
|
for event in self.events:
|
|
event_name = event['event_name']
|
|
where = [
|
|
self.columns['#event_time'] >= start_data,
|
|
self.columns['#event_time'] <= end_data,
|
|
self.columns['#event_name'] == event_name
|
|
]
|
|
analysis = event['analysis']
|
|
filters = event['filters'] + self.get_global_filters()
|
|
for item in filters:
|
|
col = self.columns.get(item['column_id'])
|
|
comparator = item['comparator_id']
|
|
ftv = item['ftv']
|
|
if comparator == '==':
|
|
if len(ftv) > 1:
|
|
where.append(or_(*[col == v for v in ftv]))
|
|
else:
|
|
where.append(col == ftv[0])
|
|
elif comparator == '>=':
|
|
where.append(col >= ftv[0])
|
|
elif comparator == '<=':
|
|
where.append(col <= ftv[0])
|
|
elif comparator == '>':
|
|
where.append(col > ftv[0])
|
|
elif comparator == '<':
|
|
where.append(col < ftv[0])
|
|
|
|
elif comparator == 'is not null':
|
|
where.append(col.isnot(None))
|
|
elif comparator == 'is null':
|
|
where.append(col.is_(None))
|
|
|
|
elif comparator == '!=':
|
|
where.append(col != ftv[0])
|
|
|
|
if analysis == 'total_count':
|
|
qry = sa.select(select_exprs + [func.count().label('values')])
|
|
elif analysis == 'touch_user_count':
|
|
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns['#account_id'])).label('values')])
|
|
elif analysis == 'touch_user_avg':
|
|
qry = sa.select(select_exprs + [
|
|
func.round((func.count() / func.count(sa.distinct(self.columns['#account_id']))), 2).label(
|
|
'values')])
|
|
|
|
elif analysis == 'distinct_count':
|
|
qry = sa.select(
|
|
select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']])).label('values')])
|
|
else:
|
|
qry = sa.select(
|
|
select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']]).label('values')])
|
|
|
|
qry = qry.where(and_(*where))
|
|
|
|
qry = qry.group_by(*select_exprs)
|
|
|
|
qry = qry.order_by(column('date'))
|
|
qry = qry.limit(1000)
|
|
|
|
qry = qry.select_from(self.table)
|
|
sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True}))
|
|
print(sql)
|
|
sqls.append({'sql': sql,
|
|
'groupby': groupby,
|
|
'date_range': date_range,
|
|
'event_name': event_name
|
|
|
|
})
|
|
|
|
return sqls
|