import datetime from typing import List, Tuple import sqlalchemy as sa from sqlalchemy.sql import func from sqlalchemy import create_engine, column, and_, desc, table, or_ import pandas as pd TIME_ZONE_MAP = { 8: 'Asia/Shanghai' } PROPHET_TIME_GRAIN_MAP = { "PT1S": "S", "PT1M": "min", "PT5M": "5min", "PT10M": "10min", "PT15M": "15min", "PT0.5H": "30min", "PT1H": "H", "P1D": "D", "P1W": "W", "P1M": "M", } TIME_GRAIN_EXPRESSIONS = { 'PT1S': lambda col, zone: func.toStartOfSecond(func.toTimeZone(col, zone)).label('date'), 'PT1M': lambda col, zone: func.toStartOfMinute(func.toTimeZone(col, zone)).label('date'), 'PT5M': lambda col, zone: func.toStartOfFiveMinute(func.toTimeZone(col, zone)).label('date'), 'PT10M': lambda col, zone: func.toStartOfTenMinutes(func.toTimeZone(col, zone)).label('date'), 'PT15M': lambda col, zone: func.toStartOfFifteenMinutes(func.toTimeZone(col, zone)).label('date'), # 'PT0.5H': lambda col, zone: func.toStartOfMinute(func.toTimeZone(col, zone)).label('date'), 'PT1H': lambda col, zone: func.toStartOfHour(func.toTimeZone(col, zone)).label('date'), 'P1D': lambda col, zone: func.toStartOfDay(func.toTimeZone(col, zone)).label('date'), 'P1W': lambda col, zone: func.toMonday(func.toTimeZone(col, zone)).label('date'), 'P1M': lambda col, zone: func.toStartOfMonth(func.toTimeZone(col, zone)).label('date'), } class ToSql: def __init__(self, data: dict, db_name: str, table_name: str, columns: List[str]): self.db_name = db_name self.engine = create_engine('clickhouse://') self.columns = self.gen_columns(columns) self.event_view = data.get('eventView') self.events = data.get('events') self.table = sa.table(table_name, *self.columns.values(), schema=self.db_name) def gen_columns(self, columns): return {col: column(col) for col in columns} def get_zone_time(self): return int(self.event_view.get('zone_time')) def get_date_range(self) -> Tuple[str, str]: start_data: str = self.event_view.get('startTime') end_data: str = self.event_view.get('endTime') return start_data, end_data def get_global_filters(self): return self.event_view.get('filters') or [] def get_group_by(self): # return self.event_view.get('groupBy') or [] return [item['column_id'] for item in self.event_view.get('groupBy')] def get_time_particle_size(self): return self.event_view.get('timeParticleSize') or 'P1D' def get_sql_query_event_model(self): """只是查event表""" sqls = [] select_exprs = self.get_group_by() select_exprs = [self.columns.get(item) for item in select_exprs] time_particle_size = self.get_time_particle_size() start_data, end_data = self.get_date_range() time_zone = TIME_ZONE_MAP[self.get_zone_time()] select_exprs.insert(0, TIME_GRAIN_EXPRESSIONS[time_particle_size](self.columns['#event_time'], time_zone)) date_range = pd.date_range(start_data, end_data, freq=PROPHET_TIME_GRAIN_MAP[time_particle_size], tz=time_zone).tolist() groupby = [item.name for item in select_exprs] for event in self.events: event_name = event['event_name'] where = [ self.columns['#event_time'] >= start_data, self.columns['#event_time'] <= end_data, self.columns['#event_name'] == event_name ] analysis = event['analysis'] filters = event['filters'] + self.get_global_filters() for item in filters: col = self.columns.get(item['column_id']) comparator = item['comparator_id'] ftv = item['ftv'] if comparator == '==': if len(ftv) > 1: where.append(or_(*[col == v for v in ftv])) else: where.append(col == ftv[0]) elif comparator == '>=': where.append(col >= ftv[0]) elif comparator == '<=': where.append(col <= ftv[0]) elif comparator == '>': where.append(col > ftv[0]) elif comparator == '<': where.append(col < ftv[0]) elif comparator == 'is not null': where.append(col.isnot(None)) elif comparator == 'is null': where.append(col.is_(None)) elif comparator == '!=': where.append(col != ftv[0]) if analysis == 'total_count': qry = sa.select(select_exprs + [func.count().label('values')]) elif analysis == 'touch_user_count': qry = sa.select(select_exprs + [func.count(sa.distinct(self.columns['#account_id'])).label('values')]) elif analysis == 'touch_user_avg': qry = sa.select(select_exprs + [ func.round((func.count() / func.count(sa.distinct(self.columns['#account_id']))), 2).label( 'values')]) elif analysis == 'distinct_count': qry = sa.select( select_exprs + [func.count(sa.distinct(self.columns[event['event_attr_id']])).label('values')]) else: qry = sa.select( select_exprs + [getattr(func, analysis)(self.columns[event['event_attr_id']]).label('values')]) qry = qry.where(and_(*where)) qry = qry.group_by(*select_exprs) qry = qry.order_by(column('date')) qry = qry.limit(1000) qry = qry.select_from(self.table) sql = str(qry.compile(self.engine, compile_kwargs={"literal_binds": True})) print(sql) sqls.append({'sql': sql, 'groupby': groupby, 'date_range': date_range, 'event_name': event_name }) return sqls