update
This commit is contained in:
parent
eb08c99f46
commit
8777d35081
155
models/to_sql.py
155
models/to_sql.py
@ -1,155 +0,0 @@
|
||||
from typing import List, Tuple
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.sql import func
|
||||
from sqlalchemy import create_engine, column, and_, desc, table, or_, select
|
||||
import pandas as pd
|
||||
|
||||
from core.config import settings
|
||||
|
||||
|
||||
class ToSql:
|
||||
def __init__(self, data: dict, db_name: str, event_columns: List[str], user_columns: List[str]):
|
||||
self.db_name = db_name
|
||||
self.engine = create_engine('clickhouse://')
|
||||
self.event_view = data.get('eventView')
|
||||
self.events = data.get('events')
|
||||
|
||||
self.event_columns = self.gen_columns(event_columns)
|
||||
self.user_columns = self.gen_columns(user_columns)
|
||||
|
||||
self.event_table = sa.table('event', *self.event_columns.values(), schema=self.db_name)
|
||||
self.user_table = sa.table('user_view', *self.user_columns.values(), schema=self.db_name)
|
||||
|
||||
def gen_columns(self, columns):
|
||||
return {col: column(col) for col in columns}
|
||||
|
||||
def get_zone_time(self):
|
||||
return int(self.event_view.get('zone_time'))
|
||||
|
||||
def get_date_range(self) -> Tuple[str, str]:
|
||||
start_data: str = self.event_view.get('startTime')
|
||||
end_data: str = self.event_view.get('endTime')
|
||||
|
||||
return start_data, end_data
|
||||
|
||||
def get_global_filters(self):
|
||||
return self.event_view.get('filters') or []
|
||||
|
||||
def get_group_by(self):
|
||||
# return self.event_view.get('groupBy') or []
|
||||
return [item['column_id'] for item in self.event_view.get('groupBy')]
|
||||
|
||||
def get_time_particle_size(self):
|
||||
return self.event_view.get('timeParticleSize') or 'P1D'
|
||||
|
||||
def get_sql_query_event_model(self):
|
||||
is_join_user = False
|
||||
sqls = []
|
||||
select_exprs = self.get_group_by()
|
||||
select_exprs = [self.event_columns.get(item) for item in select_exprs]
|
||||
time_particle_size = self.get_time_particle_size()
|
||||
start_data, end_data = self.get_date_range()
|
||||
time_zone = self.get_zone_time()
|
||||
select_exprs.insert(0, settings.TIME_GRAIN_EXPRESSIONS[time_particle_size](self.event_columns['#event_time'],
|
||||
time_zone))
|
||||
date_range = pd.date_range(start_data, end_data, freq=settings.PROPHET_TIME_GRAIN_MAP[time_particle_size],
|
||||
tz='UTC').tolist()
|
||||
groupby = [item.name for item in select_exprs]
|
||||
|
||||
for event in self.events:
|
||||
event_name = event['event_name']
|
||||
base_where = [
|
||||
func.addHours(self.event_columns['#event_time'], time_zone) >= start_data,
|
||||
func.addHours(self.event_columns['#event_time'], time_zone) <= end_data,
|
||||
self.event_columns['#event_name'] == event_name
|
||||
]
|
||||
analysis = event['analysis']
|
||||
filters = event['filters'] + self.get_global_filters()
|
||||
|
||||
event_filter = []
|
||||
user_filter = []
|
||||
|
||||
for item in filters:
|
||||
if item['table_type'] == 'user':
|
||||
where = user_filter
|
||||
elif item['table_type'] == 'event':
|
||||
where = event_filter
|
||||
|
||||
col = getattr(self, f'{item["table_type"]}_columns').get(item['column_id'])
|
||||
|
||||
comparator = item['comparator_id']
|
||||
ftv = item['ftv']
|
||||
if comparator == '==':
|
||||
if len(ftv) > 1:
|
||||
where.append(or_(*[col == v for v in ftv]))
|
||||
else:
|
||||
where.append(col == ftv[0])
|
||||
elif comparator == '>=':
|
||||
where.append(col >= ftv[0])
|
||||
elif comparator == '<=':
|
||||
where.append(col <= ftv[0])
|
||||
elif comparator == '>':
|
||||
where.append(col > ftv[0])
|
||||
elif comparator == '<':
|
||||
where.append(col < ftv[0])
|
||||
|
||||
elif comparator == 'is not null':
|
||||
where.append(col.isnot(None))
|
||||
elif comparator == 'is null':
|
||||
where.append(col.is_(None))
|
||||
|
||||
elif comparator == '!=':
|
||||
where.append(col != ftv[0])
|
||||
|
||||
# 查出用户过滤
|
||||
subq = sa.select([self.user_columns.get('#account_id')])
|
||||
subq = subq.where(and_(*user_filter))
|
||||
|
||||
if analysis == 'total_count':
|
||||
selectd = select_exprs + [func.count().label('values')]
|
||||
elif analysis == 'touch_user_count':
|
||||
selectd = sa.select(
|
||||
select_exprs + [func.count(sa.distinct(self.event_columns['#account_id'])).label('values')])
|
||||
elif analysis == 'touch_user_avg':
|
||||
selectd = select_exprs + [
|
||||
func.round((func.count() / func.count(sa.distinct(self.event_columns['#account_id']))), 2).label(
|
||||
'values')]
|
||||
|
||||
elif analysis == 'distinct_count':
|
||||
selectd = select_exprs + [
|
||||
func.count(sa.distinct(self.event_columns[event['event_attr_id']])).label('values')]
|
||||
else:
|
||||
selectd = select_exprs + [
|
||||
func.round(getattr(func, analysis)(self.event_columns[event['event_attr_id']]), 2).label(
|
||||
'values')]
|
||||
|
||||
account_id = column('#account_id')
|
||||
|
||||
if user_filter:
|
||||
sa.table('user', account_id)
|
||||
on_clause = [self.event_columns.get('#account_id') == account_id]
|
||||
qry = self.event_table.join(subq.alias('user'), and_(*on_clause))
|
||||
|
||||
qry = sa.select(selectd).select_from(qry)
|
||||
qry = qry.where(and_(*event_filter, *base_where))
|
||||
else:
|
||||
qry = sa.select(selectd)
|
||||
qry = qry.where(and_(*event_filter, *base_where))
|
||||
|
||||
|
||||
qry = qry.group_by(*select_exprs)
|
||||
qry = qry.order_by(column('date'))
|
||||
qry = qry.limit(1000)
|
||||
|
||||
# self.event_table.join(self.user_table, and_(*on_clause))
|
||||
# qry = qry.select_from(self.event_table)
|
||||
sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True}))
|
||||
print(sql)
|
||||
sqls.append({'sql': sql,
|
||||
'groupby': groupby,
|
||||
'date_range': date_range,
|
||||
'event_name': event_name
|
||||
|
||||
})
|
||||
|
||||
return sqls
|
Loading…
Reference in New Issue
Block a user