62 lines
1.8 KiB
Python
62 lines
1.8 KiB
Python
# coding:utf-8
|
|
|
|
|
|
import asyncio
|
|
import threading
|
|
|
|
from aioch import Client
|
|
import pandas as pd
|
|
|
|
__all__ = ('CKDrive',)
|
|
|
|
|
|
class CKDrive:
|
|
_instance_lock = threading.Lock()
|
|
connect_pool = set()
|
|
|
|
def __init__(self, pool_size=1, *args, **kwargs):
|
|
pass
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if not hasattr(CKDrive, "_instance"):
|
|
with CKDrive._instance_lock:
|
|
if not hasattr(CKDrive, "_instance"):
|
|
CKDrive._instance = object.__new__(cls)
|
|
return CKDrive._instance
|
|
|
|
def connected_pool(self, pool_size=1, *args, **kwargs):
|
|
if self.connect_pool:
|
|
return
|
|
for i in range(pool_size):
|
|
client = Client(*args, **kwargs)
|
|
self.connect_pool.add(client)
|
|
|
|
async def __execute(self, *args, typ_cnt=5, **kwargs):
|
|
if not self.connect_pool:
|
|
if typ_cnt < 0:
|
|
raise Exception('连接池耗尽')
|
|
|
|
await asyncio.sleep(1)
|
|
await self.__execute(*args, **kwargs, typ_cnt=typ_cnt - 1)
|
|
client = None
|
|
try:
|
|
client = self.connect_pool.pop()
|
|
res = await client.execute(*args, **kwargs)
|
|
except Exception as e:
|
|
raise e
|
|
else:
|
|
return res
|
|
finally:
|
|
if client is not None:
|
|
self.connect_pool.add(client)
|
|
|
|
async def execute(self, sql) -> dict:
|
|
data, columns = await self.__execute(sql, with_column_types=True, columnar=True)
|
|
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
|
|
return df.T.to_dict()
|
|
|
|
async def query_dataframe(self, sql) -> pd.DataFrame:
|
|
data, columns = await self.__execute(sql, with_column_types=True, columnar=True)
|
|
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
|
|
return df
|