import asyncio import datetime from aioch import Client import pandas as pd class CKDrive: ClientPool = set() @classmethod async def _execute(cls, *args, typ_cnt=5, **kwargs): if not cls.ClientPool: if typ_cnt < 0: raise Exception('连接池耗尽') await asyncio.sleep(1) await cls._execute(*args, **kwargs, typ_cnt=typ_cnt - 1) client = None try: client = cls.ClientPool.pop() res = await client.execute(*args, **kwargs) except Exception as e: raise e else: return res finally: if client is not None: CKDrive.ClientPool.add(client) async def execute(self, sql) -> dict: data, columns = await self._execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df.T.to_dict() async def query_dataframe(self, sql): data, columns = await self._execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df async def count(self, db: str, tb: str): sql = f'select count() as `count` from {db}.{tb}' res = await self.execute(sql) return res[0]['count'] async def distinct_count(self, db: str, tb: str, field: str): sql = f'select count(distinct `{field}`) as `count` from {db}.{tb}' res = await self.execute(sql) return res[0]['count'] async def field_count(self, db: str, tb: str): sql = f"select count(name) as `count` from system.columns where database='{db}' and table='{tb}'" res = await self.execute(sql) return res[0]['count'] async def distinct(self, db: str, tb: str, field: str): sql = f'select distinct `{field}` as v from {db}.{tb}' res = await self.query_dataframe(sql) return res['v'].to_list() async def yesterday_event_count(self, db: str): today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) today_str = today.strftime('%Y-%m-%d %H:%M:%S') yesterday_str = yesterday.strftime('%Y-%m-%d %H:%M:%S') sql = f"select `#event_name` as event_name, count() as v from {db}.event where `#event_time`>='{yesterday_str}' and `#event_time`<'{today_str}' group by `#event_name`" df = await self.query_dataframe(sql) return df.set_index('event_name').T.to_dict() ckdb = CKDrive() def get_ck_db() -> CKDrive: return ckdb