import copy from clickhouse_driver import Client import pandas as pd from core import settings from utils import PostData post_data = PostData() def handler_os(properties): os_: str = properties.get('#os', 'Android') if os_.lower() == 'ios': properties['#os'] = 102 properties['#os'] = 101 def handler_money(properties): properties['unitPrice'] = int(properties.get('unitPrice', 0) / 100) def handler_userid(properties): properties['user_id'] = properties['binduid'] client = Client(**{'host': '139.159.159.3', 'port': 9654, 'user': 'legu', 'password': 'gncPASUwpYrc' }) dates = pd.date_range('2021-09-29 00:00:00', '2021-09-30 23:59:59', freq='30T') for i in range(len(dates) - 1): s = dates[i].strftime('%Y-%m-%d %H:%M:%S') e = (dates[i + 1] - pd.Timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S') sql = f"""select * from xiangsu.event where addHours(`#event_time`,8) between '{s}' and '{e}' """ print(sql) data, columns = client.execute(sql, with_column_types=True, columnar=True) df = pd.DataFrame({col[0]: d for d, col in zip(data, columns)}) if df.empty: continue df['#event_time'] = df['#event_time'].apply(lambda x: int(x.timestamp())) datas = df.T.to_dict().values() for data in datas: event_name = data.get('#event_name') if event_name not in settings.legu_to_sm_event: continue if 'shoumeng' not in data.get('owner_name'): continue send_data = copy.deepcopy(settings.template_data) handler_os(data) if event_name == 'pay': handler_money(data) handler_userid(data) send_data['event']['event_name'] = settings.legu_to_sm_event[event_name] send_data['event']['event_time'] = data['#event_time'] for k, v in settings.legu_to_sm_attr_base.items(): send_data['data'][v] = data.get(k) for k, v in settings.legu_to_sm_attr.get(event_name, dict()).items(): send_data['data'][v] = data.get(k) if post_data.is_upload(): post_data.post() post_data.add(send_data) post_data.post()