__all__ = 'CK', import traceback import pandas as pd from datetime import datetime from datetime import timedelta from clickhouse_driver import Client from pandas import DatetimeTZDtype class CK: def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs self.__client = self.__create_client() def __create_client(self): return Client(*self.args, **self.kwargs) def execute(self, *args, **kwargs): return self.__client.execute(*args, **kwargs) def get_one(self, db, tb, try_cnt=3, **where): sql = f"select * from {db}.{tb} where 1" for k, v in where.items(): sql += f" and `{k}`='{v}'" sql += ' limit 1' data = None try: data, columns = self.__client.execute(sql, with_column_types=True) except Exception as e: traceback.print_exc() self.__client.disconnect() self.__client = self.__create_client() if try_cnt > 0: self.get_one(db, tb, try_cnt - 1, **where) else: return None res = dict() if data: data = {k[0]: v for k, v in zip(columns, data[0])} for k, v in data.items(): if isinstance(v, datetime): res[k] = (v + timedelta(hours=data['#zone_offset'])).strftime('%Y-%m-%d %H:%M:%S') else: res[k] = v return res return None def get_all(self, db, tb, where: str, try_cnt=3): """ 注意 还原时区 :param db: :param tb: :param where: :return: """ sql = f"select * from {db}.{tb} where " sql += where data = None try: data, columns = self.__client.execute(sql, columnar=True, with_column_types=True) except Exception as e: traceback.print_exc() if e.code == 60: return self.get_all(db, 'user', where, try_cnt - 1) self.__client.disconnect() self.__client = self.__create_client() if try_cnt > 0: return self.get_all(db, tb, where, try_cnt - 1) # 异常导致导致 避免认为用户不存在 if data is None: return None if not data: return dict() df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) tz = df['#zone_offset'].apply(lambda x: timedelta(hours=x)) for t_type in df.select_dtypes(include=[DatetimeTZDtype]): df[t_type] = (df[t_type] + tz).apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) return df.T.to_dict()