import asyncio import datetime from aioch import Client import pandas as pd class CKDrive: ClientPool = set() @classmethod async def _execute(cls, *args, typ_cnt=5, **kwargs): if not cls.ClientPool: if typ_cnt < 0: raise Exception('连接池耗尽') await asyncio.sleep(1) await cls._execute(*args, **kwargs, typ_cnt=typ_cnt - 1) client = None try: client = cls.ClientPool.pop() res = await client.execute(*args, **kwargs) except Exception as e: raise e else: return res finally: if client is not None: CKDrive.ClientPool.add(client) async def execute(self, sql) -> dict: data, columns = await self._execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df.T.to_dict() async def query_dataframe(self, sql): data, columns = await self._execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df async def query_data_trace(self, sql, name_dict): data, columns = await self._execute(sql, with_column_types=True, columnar=True) res_dict = {} if data: for index, value in enumerate(data[0]): event_next_list = [] value_len = len(value) for i, key in enumerate(value): if i >= 10: continue next_event = value[i + 1] if i < value_len - 1 else '流失' # 按对应的中文名显示 event_namess = name_dict.get(key, key) next_eventss = name_dict.get(next_event, next_event) key = (f'{event_namess}-{i}', f'{next_eventss}-{i + 1}') keys = list(key) for next_key in keys: if next_key in event_next_list: continue event_next_list.append(next_key) for event_name in event_next_list: if event_name not in res_dict: res_dict[event_name] = [data[1][index]] continue res_dict[event_name].append(data[1][index]) res_dict[event_name] = list(set(res_dict[event_name])) return res_dict async def count(self, db: str, tb: str): sql = f'select count() as `count` from {db}.{tb}' res = await self.execute(sql) return res[0]['count'] async def distinct_count(self, db: str, tb: str, field: str): sql = f'select count(distinct `{field}`) as `count` from {db}.{tb}' res = await self.execute(sql) return res[0]['count'] async def field_count(self, db: str, tb: str): sql = f"select count(name) as `count` from system.columns where database='{db}' and table='{tb}'" res = await self.execute(sql) return res[0]['count'] async def distinct(self, db: str, tb: str, field: str, where: str = '1'): sql = f'select distinct `{field}` as v from {db}.{tb} where {where}' res = await self.query_dataframe(sql) return res['v'].to_list() async def yesterday_event_count(self, db: str): today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) today_str = today.strftime('%Y-%m-%d %H:%M:%S') yesterday_str = yesterday.strftime('%Y-%m-%d %H:%M:%S') sql = f"select `#event_name` as event_name, count() as v from {db}.event where `#event_time`>='{yesterday_str}' and `#event_time`<'{today_str}' group by `#event_name`" df = await self.query_dataframe(sql) return df.set_index('event_name').T.to_dict() async def get_columns(self, db: str, tb: str): sql = f"select name,type from system.columns where database='{db}' and table='{tb}'" df = await self.query_dataframe(sql) return df.T.to_dict().values() ckdb = CKDrive() def get_ck_db() -> CKDrive: return ckdb