Compare commits

...

5 Commits

Author SHA1 Message Date
wuaho
2af12caaaf 1 2021-08-14 14:55:04 +08:00
wuaho
fc325b7518 1 2021-08-14 14:48:08 +08:00
wuaho
8a1c414008 1 2021-08-14 11:30:10 +08:00
wuaho
c6b50804d3 数据清理 2021-08-11 20:06:35 +08:00
wuaho
16ddfda32f 1 2021-08-03 17:53:00 +08:00
7 changed files with 89 additions and 10 deletions

3
.gitignore vendored
View File

@ -129,3 +129,6 @@ dmypy.json
# Pyre type checker
.pyre/
.idea
# 不同游戏单独配置
settings.py
clear_up.py

3
app.py
View File

@ -11,6 +11,7 @@ class XProcess(Process):
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
super(XProcess, self).__init__()
# self.daemon = True
self.partition = partition
self.lock = lock
self.ipsearch = ipsearch
@ -25,8 +26,10 @@ class XProcess(Process):
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
self.partition)
transmitter.start_ping()
transmitter.add_source(handler_event, 5000, 60)
transmitter.add_source(handler_user, 500, 60)
last_ts = int(time.time())
consumer, kafka_client = create_consumer(self.partition)

40
clear_up.py Normal file
View File

@ -0,0 +1,40 @@
import json
from settings import settings
game = 'zhengba'
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
import pandas as pd
from clickhouse_driver import Client
client = Client(**settings.CK_CONFIG)
df = pd.read_json(server_list_url)
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
serverid = tuple((str(i) for i in df['serverid'].to_list()))
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
from {game}.event
where
lower(`#os`) = 'windows'
or svrindex not in {serverid}
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
having n = 1 limit 2000"""
data, columns = client.execute(
sql, columnar=True, with_column_types=True
)
if not data:
exit(0)
data_df = pd.DataFrame(
{col[0]: d for d, col in zip(data, columns)}
)
data_df.drop('n', axis=1, inplace=True)
data_df['sign'] = -1
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
insert_sql = f'INSERT INTO {game}.event FORMAT JSONEachRow '
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
client.execute(insert_sql)

View File

@ -5,7 +5,7 @@ class Config:
# ck数据库连接
CK_CONFIG = {'host': '139.159.159.3',
'port': 9654,
'send_receive_timeout': 30}
}
# 每个游戏不一样 游戏上报 kafka 主题
SUBSCRIBE_TOPIC = 'zhengba_test'

View File

@ -18,6 +18,8 @@ def create_consumer(partition=-1):
# print(msg)
topic = msg.topic
val = msg.value
if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'):
continue
yield topic, val
return consumer, c

View File

@ -1,6 +1,7 @@
import json
import os
import re
import threading
import time
import traceback
@ -9,6 +10,29 @@ from .valid_data import *
__all__ = 'Transmitter',
class Ping(threading.Thread):
def __init__(self, db_client, p, log):
threading.Thread.__init__(self)
self.daemon = True
self.ping_ts = 0
self.time_out = 60
self.log = log
self.db_client = db_client
self.p = p
def run(self):
while True:
ts = int(time.time())
if self.ping_ts + self.time_out < ts:
# 保持连接
try:
self.ping_ts = ts
self.log.info(f'保持连接{self.p} ping')
self.db_client.execute('select 1')
except:
self.log.error('ping error')
class Transmitter:
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
self.db_client = db_client
@ -20,12 +44,16 @@ class Transmitter:
self.event_attr = event_attr
self.p = p
def start_ping(self):
t = Ping(self.db_client, self.p, self.log)
t.start()
def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
def check_send(self):
for h, p in self.slots.items():
ts = int(time.time())
for h, p in self.slots.items():
tb, buffer = h.buffer_pool
buffer_size = len(buffer)
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:

View File

@ -26,8 +26,9 @@ def is_valid_int(v, **kwargs):
def is_valid_srt(v, **kwargs):
if isinstance(v, str):
return v
try:
return str(v)
except:
return None
@ -39,14 +40,16 @@ def is_valid_float(v, **kwargs):
def is_valid_bool(v, **kwargs):
if isinstance(v, bool):
return v
try:
return bool(v)
except:
return None
def is_valid_array(v, **kwargs):
if isinstance(v, list):
try:
return [str(i) for i in v]
except:
return None