# coding:utf-8 import asyncio import threading from aioch import Client import pandas as pd __all__ = ('CKDrive',) class CKDrive: _instance_lock = threading.Lock() connect_pool = set() def __init__(self, pool_size=1, *args, **kwargs): pass def __new__(cls, *args, **kwargs): if not hasattr(CKDrive, "_instance"): with CKDrive._instance_lock: if not hasattr(CKDrive, "_instance"): CKDrive._instance = object.__new__(cls) return CKDrive._instance def connected_pool(self, pool_size=1, *args, **kwargs): if self.connect_pool: return for i in range(pool_size): client = Client(*args, **kwargs) self.connect_pool.add(client) async def __execute(self, *args, typ_cnt=5, **kwargs): if not self.connect_pool: if typ_cnt < 0: raise Exception('连接池耗尽') await asyncio.sleep(1) await self.__execute(*args, **kwargs, typ_cnt=typ_cnt - 1) client = None try: client = self.connect_pool.pop() res = await client.execute(*args, **kwargs) except Exception as e: raise e else: return res finally: if client is not None: self.connect_pool.add(client) async def execute(self, sql) -> dict: data, columns = await self.__execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df.T.to_dict() async def query_dataframe(self, sql) -> pd.DataFrame: data, columns = await self.__execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) return df