xbackend/models/to_sql.py
2021-05-25 14:22:01 +08:00

98 lines
3.9 KiB
Python

from typing import List, Tuple
import sqlalchemy as sa
from sqlalchemy.sql import func
from sqlalchemy import create_engine, column, and_, desc, table, or_
class ToSql:
def __init__(self, data: dict, db_name: str, table_name: str, columns: List[str]):
self.db_name = db_name
self.engine = create_engine('clickhouse://')
self.columns = self.gen_columns(columns)
self.event_view = data.get('eventView')
self.events = data.get('events')
self.table = sa.table(table_name, *self.columns, schema=self.db_name)
def gen_columns(self, columns):
return {col: column(col) for col in columns}
def get_date_range(self) -> Tuple[str, str]:
start_data: str = self.event_view.get('startTime')
end_data: str = self.event_view.get('endTime')
return start_data, end_data
def get_global_filters(self):
return self.event_view.get('filters') or []
def get_group_by(self):
# return self.event_view.get('groupBy') or []
return [item['columnName'] for item in self.event_view.get('groupBy')]
def get_time_particle_size(self):
return self.event_view.get('timeParticleSize') or 'day'
def get_sql_query_event_model(self):
"""只是查event表"""
sqls = []
select_exprs = self.get_group_by()
select_exprs = [self.columns.get(item) for item in select_exprs]
time_particle_size = self.get_time_particle_size()
if time_particle_size == 'day':
select_exprs.append(func.toYYYYMMDD(self.columns['#event_time']).label('date'))
start_data, end_data = self.get_date_range()
for event in self.events:
event_name = event['event_name']
where = [
self.columns['#event_time'] >= start_data,
self.columns['#event_time'] <= end_data,
self.columns['#event_name'] == event_name
]
analysis = event['analysis']
filters = event['filters'] + self.get_global_filters()
for item in filters:
col = self.columns.get(item['column_id'])
comparator = item['comparator_id']
ftv = item['ftv']
if comparator == '==':
if len(ftv) > 1:
where.append(or_(*[col == v for v in ftv]))
else:
where.append(col == ftv[0])
elif comparator == '>=':
where.append(col >= ftv[0])
elif comparator == '<=':
where.append(col <= ftv[0])
elif comparator == '>':
where.append(col > ftv[0])
elif comparator == '<':
where.append(col < ftv[0])
elif comparator == '!=':
where.append(col != ftv[0])
if analysis == 'total_count':
qry = sa.select(select_exprs + [func.count()])
elif analysis == 'touch_user_count':
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['#account_id']]))])
elif analysis == 'touch_user_avg':
qry = sa.select(select_exprs + [func.count(func.avg(self.columns[event['#account_id']]))])
elif analysis == 'distinct_count':
qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']]))])
else:
qry = sa.select(select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']])])
qry = qry.where(and_(*where))
qry = qry.group_by(*select_exprs)
if time_particle_size == 'day':
qry = qry.order_by(column('date'))
qry = qry.select_from(self.table)
sqls.append(str(qry.compile(self.engine, compile_kwargs={"literal_binds": True})))
return sqls