Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2af12caaaf | ||
![]() |
fc325b7518 | ||
![]() |
8a1c414008 | ||
![]() |
c6b50804d3 | ||
![]() |
16ddfda32f |
3
.gitignore
vendored
3
.gitignore
vendored
@ -129,3 +129,6 @@ dmypy.json
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
.idea
|
||||
# 不同游戏单独配置
|
||||
settings.py
|
||||
clear_up.py
|
||||
|
3
app.py
3
app.py
@ -11,6 +11,7 @@ class XProcess(Process):
|
||||
|
||||
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
||||
super(XProcess, self).__init__()
|
||||
# self.daemon = True
|
||||
self.partition = partition
|
||||
self.lock = lock
|
||||
self.ipsearch = ipsearch
|
||||
@ -25,8 +26,10 @@ class XProcess(Process):
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
||||
self.partition)
|
||||
transmitter.start_ping()
|
||||
transmitter.add_source(handler_event, 5000, 60)
|
||||
transmitter.add_source(handler_user, 500, 60)
|
||||
|
||||
last_ts = int(time.time())
|
||||
consumer, kafka_client = create_consumer(self.partition)
|
||||
|
||||
|
40
clear_up.py
Normal file
40
clear_up.py
Normal file
@ -0,0 +1,40 @@
|
||||
import json
|
||||
|
||||
from settings import settings
|
||||
|
||||
game = 'zhengba'
|
||||
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||
|
||||
import pandas as pd
|
||||
from clickhouse_driver import Client
|
||||
|
||||
client = Client(**settings.CK_CONFIG)
|
||||
|
||||
df = pd.read_json(server_list_url)
|
||||
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||
|
||||
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
|
||||
from {game}.event
|
||||
where
|
||||
lower(`#os`) = 'windows'
|
||||
or svrindex not in {serverid}
|
||||
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
|
||||
having n = 1 limit 2000"""
|
||||
|
||||
data, columns = client.execute(
|
||||
sql, columnar=True, with_column_types=True
|
||||
)
|
||||
if not data:
|
||||
exit(0)
|
||||
data_df = pd.DataFrame(
|
||||
{col[0]: d for d, col in zip(data, columns)}
|
||||
)
|
||||
|
||||
data_df.drop('n', axis=1, inplace=True)
|
||||
data_df['sign'] = -1
|
||||
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
|
||||
insert_sql = f'INSERT INTO {game}.event FORMAT JSONEachRow '
|
||||
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
|
||||
|
||||
client.execute(insert_sql)
|
@ -5,7 +5,7 @@ class Config:
|
||||
# ck数据库连接
|
||||
CK_CONFIG = {'host': '139.159.159.3',
|
||||
'port': 9654,
|
||||
'send_receive_timeout': 30}
|
||||
}
|
||||
|
||||
# 每个游戏不一样 游戏上报 kafka 主题
|
||||
SUBSCRIBE_TOPIC = 'zhengba_test'
|
||||
|
@ -18,6 +18,8 @@ def create_consumer(partition=-1):
|
||||
# print(msg)
|
||||
topic = msg.topic
|
||||
val = msg.value
|
||||
if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'):
|
||||
continue
|
||||
yield topic, val
|
||||
|
||||
return consumer, c
|
||||
|
@ -1,6 +1,7 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
@ -9,6 +10,29 @@ from .valid_data import *
|
||||
__all__ = 'Transmitter',
|
||||
|
||||
|
||||
class Ping(threading.Thread):
|
||||
def __init__(self, db_client, p, log):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.ping_ts = 0
|
||||
self.time_out = 60
|
||||
self.log = log
|
||||
self.db_client = db_client
|
||||
self.p = p
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
ts = int(time.time())
|
||||
if self.ping_ts + self.time_out < ts:
|
||||
# 保持连接
|
||||
try:
|
||||
self.ping_ts = ts
|
||||
self.log.info(f'保持连接{self.p} ping')
|
||||
self.db_client.execute('select 1')
|
||||
except:
|
||||
self.log.error('ping error')
|
||||
|
||||
|
||||
class Transmitter:
|
||||
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
||||
self.db_client = db_client
|
||||
@ -20,12 +44,16 @@ class Transmitter:
|
||||
self.event_attr = event_attr
|
||||
self.p = p
|
||||
|
||||
def start_ping(self):
|
||||
t = Ping(self.db_client, self.p, self.log)
|
||||
t.start()
|
||||
|
||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||
|
||||
def check_send(self):
|
||||
ts = int(time.time())
|
||||
for h, p in self.slots.items():
|
||||
ts = int(time.time())
|
||||
tb, buffer = h.buffer_pool
|
||||
buffer_size = len(buffer)
|
||||
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
||||
|
@ -26,9 +26,10 @@ def is_valid_int(v, **kwargs):
|
||||
|
||||
|
||||
def is_valid_srt(v, **kwargs):
|
||||
if isinstance(v, str):
|
||||
return v
|
||||
return None
|
||||
try:
|
||||
return str(v)
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_float(v, **kwargs):
|
||||
@ -39,15 +40,17 @@ def is_valid_float(v, **kwargs):
|
||||
|
||||
|
||||
def is_valid_bool(v, **kwargs):
|
||||
if isinstance(v, bool):
|
||||
return v
|
||||
return None
|
||||
try:
|
||||
return bool(v)
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_array(v, **kwargs):
|
||||
if isinstance(v, list):
|
||||
try:
|
||||
return [str(i) for i in v]
|
||||
return None
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_ipv4(v, **kwargs):
|
||||
|
Loading…
Reference in New Issue
Block a user