to_ck/v2/db.py
2021-05-11 15:08:55 +08:00

88 lines
2.6 KiB
Python

__all__ = 'CK',
import traceback
import pandas as pd
from datetime import datetime
from datetime import timedelta
from clickhouse_driver import Client
from pandas import DatetimeTZDtype
class CK:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.__client = self.__create_client()
def __create_client(self):
return Client(*self.args, **self.kwargs)
def execute(self, *args, **kwargs):
return self.__client.execute(*args, **kwargs)
def get_one(self, db, tb, try_cnt=3, **where):
sql = f"select * from {db}.{tb} where 1"
for k, v in where.items():
sql += f" and `{k}`='{v}'"
sql += ' limit 1'
data = None
try:
data, columns = self.__client.execute(sql, with_column_types=True)
except Exception as e:
traceback.print_exc()
self.__client.disconnect()
self.__client = self.__create_client()
if try_cnt > 0:
self.get_one(db, tb, try_cnt - 1, **where)
else:
return None
res = dict()
if data:
data = {k[0]: v for k, v in zip(columns, data[0])}
for k, v in data.items():
if isinstance(v, datetime):
res[k] = (v + timedelta(hours=data['#zone_offset'])).strftime('%Y-%m-%d %H:%M:%S')
else:
res[k] = v
return res
return None
def get_all(self, db, tb, where: str, try_cnt=3):
"""
注意 还原时区
:param db:
:param tb:
:param where:
:return:
"""
sql = f"select * from {db}.{tb} where "
sql += where
data = None
try:
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True)
except Exception as e:
traceback.print_exc()
if e.code == 60:
return self.get_all(db, 'user', where, try_cnt - 1)
self.__client.disconnect()
self.__client = self.__create_client()
if try_cnt > 0:
return self.get_all(db, tb, where, try_cnt - 1)
# 异常导致导致 避免认为用户不存在
if data is None:
return None
if not data:
return dict()
df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)})
tz = df['#zone_offset'].apply(lambda x: timedelta(hours=x))
for t_type in df.select_dtypes(include=[DatetimeTZDtype]):
df[t_type] = (df[t_type] + tz).apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
return df.T.to_dict()