xbackend/db/ckdb.py
李伟 49d5f163f5 1.新增路径分析节点详情
2.新增路径分析节点详情里面的用户详情
2022-05-19 15:40:28 +08:00

108 lines
4.1 KiB
Python

import asyncio
import datetime
from aioch import Client
import pandas as pd
class CKDrive:
ClientPool = set()
@classmethod
async def _execute(cls, *args, typ_cnt=5, **kwargs):
if not cls.ClientPool:
if typ_cnt < 0:
raise Exception('连接池耗尽')
await asyncio.sleep(1)
await cls._execute(*args, **kwargs, typ_cnt=typ_cnt - 1)
client = None
try:
client = cls.ClientPool.pop()
res = await client.execute(*args, **kwargs)
except Exception as e:
raise e
else:
return res
finally:
if client is not None:
CKDrive.ClientPool.add(client)
async def execute(self, sql) -> dict:
data, columns = await self._execute(sql, with_column_types=True, columnar=True)
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
return df.T.to_dict()
async def query_dataframe(self, sql):
data, columns = await self._execute(sql, with_column_types=True, columnar=True)
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
return df
async def query_data_trace(self, sql, name_dict):
data, columns = await self._execute(sql, with_column_types=True, columnar=True)
res_dict = {}
if data:
for index, value in enumerate(data[0]):
event_next_list = []
value_len = len(value)
for i, key in enumerate(value):
if i >= 10:
continue
next_event = value[i + 1] if i < value_len - 1 else '流失'
# 按对应的中文名显示
event_namess = name_dict.get(key, key)
next_eventss = name_dict.get(next_event, next_event)
key = (f'{event_namess}-{i}', f'{next_eventss}-{i + 1}')
keys = list(key)
for next_key in keys:
if next_key in event_next_list:
continue
event_next_list.append(next_key)
for event_name in event_next_list:
if event_name not in res_dict:
res_dict[event_name] = [data[1][index]]
continue
res_dict[event_name].append(data[1][index])
res_dict[event_name] = list(set(res_dict[event_name]))
return res_dict
async def count(self, db: str, tb: str):
sql = f'select count() as `count` from {db}.{tb}'
res = await self.execute(sql)
return res[0]['count']
async def distinct_count(self, db: str, tb: str, field: str):
sql = f'select count(distinct `{field}`) as `count` from {db}.{tb}'
res = await self.execute(sql)
return res[0]['count']
async def field_count(self, db: str, tb: str):
sql = f"select count(name) as `count` from system.columns where database='{db}' and table='{tb}'"
res = await self.execute(sql)
return res[0]['count']
async def distinct(self, db: str, tb: str, field: str, where: str = '1'):
sql = f'select distinct `{field}` as v from {db}.{tb} where {where}'
res = await self.query_dataframe(sql)
return res['v'].to_list()
async def yesterday_event_count(self, db: str):
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
today_str = today.strftime('%Y-%m-%d %H:%M:%S')
yesterday_str = yesterday.strftime('%Y-%m-%d %H:%M:%S')
sql = f"select `#event_name` as event_name, count() as v from {db}.event where `#event_time`>='{yesterday_str}' and `#event_time`<'{today_str}' group by `#event_name`"
df = await self.query_dataframe(sql)
return df.set_index('event_name').T.to_dict()
async def get_columns(self, db: str, tb: str):
sql = f"select name,type from system.columns where database='{db}' and table='{tb}'"
df = await self.query_dataframe(sql)
return df.T.to_dict().values()
ckdb = CKDrive()
def get_ck_db() -> CKDrive:
return ckdb