From 8777d35081b6787b198f49ad8b087dbe1d83efb8 Mon Sep 17 00:00:00 2001 From: wuaho Date: Fri, 11 Jun 2021 17:03:19 +0800 Subject: [PATCH] update --- models/to_sql.py | 155 ----------------------------------------------- 1 file changed, 155 deletions(-) delete mode 100644 models/to_sql.py diff --git a/models/to_sql.py b/models/to_sql.py deleted file mode 100644 index 56ce5f6..0000000 --- a/models/to_sql.py +++ /dev/null @@ -1,155 +0,0 @@ -from typing import List, Tuple -import sqlalchemy as sa -from sqlalchemy.sql import func -from sqlalchemy import create_engine, column, and_, desc, table, or_, select -import pandas as pd - -from core.config import settings - - -class ToSql: - def __init__(self, data: dict, db_name: str, event_columns: List[str], user_columns: List[str]): - self.db_name = db_name - self.engine = create_engine('clickhouse://') - self.event_view = data.get('eventView') - self.events = data.get('events') - - self.event_columns = self.gen_columns(event_columns) - self.user_columns = self.gen_columns(user_columns) - - self.event_table = sa.table('event', *self.event_columns.values(), schema=self.db_name) - self.user_table = sa.table('user_view', *self.user_columns.values(), schema=self.db_name) - - def gen_columns(self, columns): - return {col: column(col) for col in columns} - - def get_zone_time(self): - return int(self.event_view.get('zone_time')) - - def get_date_range(self) -> Tuple[str, str]: - start_data: str = self.event_view.get('startTime') - end_data: str = self.event_view.get('endTime') - - return start_data, end_data - - def get_global_filters(self): - return self.event_view.get('filters') or [] - - def get_group_by(self): - # return self.event_view.get('groupBy') or [] - return [item['column_id'] for item in self.event_view.get('groupBy')] - - def get_time_particle_size(self): - return self.event_view.get('timeParticleSize') or 'P1D' - - def get_sql_query_event_model(self): - is_join_user = False - sqls = [] - select_exprs = self.get_group_by() - select_exprs = [self.event_columns.get(item) for item in select_exprs] - time_particle_size = self.get_time_particle_size() - start_data, end_data = self.get_date_range() - time_zone = self.get_zone_time() - select_exprs.insert(0, settings.TIME_GRAIN_EXPRESSIONS[time_particle_size](self.event_columns['#event_time'], - time_zone)) - date_range = pd.date_range(start_data, end_data, freq=settings.PROPHET_TIME_GRAIN_MAP[time_particle_size], - tz='UTC').tolist() - groupby = [item.name for item in select_exprs] - - for event in self.events: - event_name = event['event_name'] - base_where = [ - func.addHours(self.event_columns['#event_time'], time_zone) >= start_data, - func.addHours(self.event_columns['#event_time'], time_zone) <= end_data, - self.event_columns['#event_name'] == event_name - ] - analysis = event['analysis'] - filters = event['filters'] + self.get_global_filters() - - event_filter = [] - user_filter = [] - - for item in filters: - if item['table_type'] == 'user': - where = user_filter - elif item['table_type'] == 'event': - where = event_filter - - col = getattr(self, f'{item["table_type"]}_columns').get(item['column_id']) - - comparator = item['comparator_id'] - ftv = item['ftv'] - if comparator == '==': - if len(ftv) > 1: - where.append(or_(*[col == v for v in ftv])) - else: - where.append(col == ftv[0]) - elif comparator == '>=': - where.append(col >= ftv[0]) - elif comparator == '<=': - where.append(col <= ftv[0]) - elif comparator == '>': - where.append(col > ftv[0]) - elif comparator == '<': - where.append(col < ftv[0]) - - elif comparator == 'is not null': - where.append(col.isnot(None)) - elif comparator == 'is null': - where.append(col.is_(None)) - - elif comparator == '!=': - where.append(col != ftv[0]) - - # 查出用户过滤 - subq = sa.select([self.user_columns.get('#account_id')]) - subq = subq.where(and_(*user_filter)) - - if analysis == 'total_count': - selectd = select_exprs + [func.count().label('values')] - elif analysis == 'touch_user_count': - selectd = sa.select( - select_exprs + [func.count(sa.distinct(self.event_columns['#account_id'])).label('values')]) - elif analysis == 'touch_user_avg': - selectd = select_exprs + [ - func.round((func.count() / func.count(sa.distinct(self.event_columns['#account_id']))), 2).label( - 'values')] - - elif analysis == 'distinct_count': - selectd = select_exprs + [ - func.count(sa.distinct(self.event_columns[event['event_attr_id']])).label('values')] - else: - selectd = select_exprs + [ - func.round(getattr(func, analysis)(self.event_columns[event['event_attr_id']]), 2).label( - 'values')] - - account_id = column('#account_id') - - if user_filter: - sa.table('user', account_id) - on_clause = [self.event_columns.get('#account_id') == account_id] - qry = self.event_table.join(subq.alias('user'), and_(*on_clause)) - - qry = sa.select(selectd).select_from(qry) - qry = qry.where(and_(*event_filter, *base_where)) - else: - qry = sa.select(selectd) - qry = qry.where(and_(*event_filter, *base_where)) - - - qry = qry.group_by(*select_exprs) - qry = qry.order_by(column('date')) - qry = qry.limit(1000) - - # self.event_table.join(self.user_table, and_(*on_clause)) - # qry = qry.select_from(self.event_table) - sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True})) - print(sql) - sqls.append({'sql': sql, - 'groupby': groupby, - 'date_range': date_range, - 'event_name': event_name - - }) - - return sqls