Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2af12caaaf | ||
![]() |
fc325b7518 | ||
![]() |
8a1c414008 | ||
![]() |
c6b50804d3 | ||
![]() |
16ddfda32f |
3
.gitignore
vendored
3
.gitignore
vendored
@ -129,3 +129,6 @@ dmypy.json
|
|||||||
# Pyre type checker
|
# Pyre type checker
|
||||||
.pyre/
|
.pyre/
|
||||||
.idea
|
.idea
|
||||||
|
# 不同游戏单独配置
|
||||||
|
settings.py
|
||||||
|
clear_up.py
|
||||||
|
3
app.py
3
app.py
@ -11,6 +11,7 @@ class XProcess(Process):
|
|||||||
|
|
||||||
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
||||||
super(XProcess, self).__init__()
|
super(XProcess, self).__init__()
|
||||||
|
# self.daemon = True
|
||||||
self.partition = partition
|
self.partition = partition
|
||||||
self.lock = lock
|
self.lock = lock
|
||||||
self.ipsearch = ipsearch
|
self.ipsearch = ipsearch
|
||||||
@ -25,8 +26,10 @@ class XProcess(Process):
|
|||||||
handler_user = HandlerUser(db_client, settings.GAME)
|
handler_user = HandlerUser(db_client, settings.GAME)
|
||||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
||||||
self.partition)
|
self.partition)
|
||||||
|
transmitter.start_ping()
|
||||||
transmitter.add_source(handler_event, 5000, 60)
|
transmitter.add_source(handler_event, 5000, 60)
|
||||||
transmitter.add_source(handler_user, 500, 60)
|
transmitter.add_source(handler_user, 500, 60)
|
||||||
|
|
||||||
last_ts = int(time.time())
|
last_ts = int(time.time())
|
||||||
consumer, kafka_client = create_consumer(self.partition)
|
consumer, kafka_client = create_consumer(self.partition)
|
||||||
|
|
||||||
|
40
clear_up.py
Normal file
40
clear_up.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
game = 'zhengba'
|
||||||
|
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from clickhouse_driver import Client
|
||||||
|
|
||||||
|
client = Client(**settings.CK_CONFIG)
|
||||||
|
|
||||||
|
df = pd.read_json(server_list_url)
|
||||||
|
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||||
|
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||||
|
|
||||||
|
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
|
||||||
|
from {game}.event
|
||||||
|
where
|
||||||
|
lower(`#os`) = 'windows'
|
||||||
|
or svrindex not in {serverid}
|
||||||
|
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
|
||||||
|
having n = 1 limit 2000"""
|
||||||
|
|
||||||
|
data, columns = client.execute(
|
||||||
|
sql, columnar=True, with_column_types=True
|
||||||
|
)
|
||||||
|
if not data:
|
||||||
|
exit(0)
|
||||||
|
data_df = pd.DataFrame(
|
||||||
|
{col[0]: d for d, col in zip(data, columns)}
|
||||||
|
)
|
||||||
|
|
||||||
|
data_df.drop('n', axis=1, inplace=True)
|
||||||
|
data_df['sign'] = -1
|
||||||
|
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
|
||||||
|
insert_sql = f'INSERT INTO {game}.event FORMAT JSONEachRow '
|
||||||
|
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
|
||||||
|
|
||||||
|
client.execute(insert_sql)
|
@ -5,7 +5,7 @@ class Config:
|
|||||||
# ck数据库连接
|
# ck数据库连接
|
||||||
CK_CONFIG = {'host': '139.159.159.3',
|
CK_CONFIG = {'host': '139.159.159.3',
|
||||||
'port': 9654,
|
'port': 9654,
|
||||||
'send_receive_timeout': 30}
|
}
|
||||||
|
|
||||||
# 每个游戏不一样 游戏上报 kafka 主题
|
# 每个游戏不一样 游戏上报 kafka 主题
|
||||||
SUBSCRIBE_TOPIC = 'zhengba_test'
|
SUBSCRIBE_TOPIC = 'zhengba_test'
|
||||||
|
@ -18,6 +18,8 @@ def create_consumer(partition=-1):
|
|||||||
# print(msg)
|
# print(msg)
|
||||||
topic = msg.topic
|
topic = msg.topic
|
||||||
val = msg.value
|
val = msg.value
|
||||||
|
if val.get('properties',{}).get('owner_name') not in ('gmhdgdt', 'gmhdtt'):
|
||||||
|
continue
|
||||||
yield topic, val
|
yield topic, val
|
||||||
|
|
||||||
return consumer, c
|
return consumer, c
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
@ -9,6 +10,29 @@ from .valid_data import *
|
|||||||
__all__ = 'Transmitter',
|
__all__ = 'Transmitter',
|
||||||
|
|
||||||
|
|
||||||
|
class Ping(threading.Thread):
|
||||||
|
def __init__(self, db_client, p, log):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.daemon = True
|
||||||
|
self.ping_ts = 0
|
||||||
|
self.time_out = 60
|
||||||
|
self.log = log
|
||||||
|
self.db_client = db_client
|
||||||
|
self.p = p
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
ts = int(time.time())
|
||||||
|
if self.ping_ts + self.time_out < ts:
|
||||||
|
# 保持连接
|
||||||
|
try:
|
||||||
|
self.ping_ts = ts
|
||||||
|
self.log.info(f'保持连接{self.p} ping')
|
||||||
|
self.db_client.execute('select 1')
|
||||||
|
except:
|
||||||
|
self.log.error('ping error')
|
||||||
|
|
||||||
|
|
||||||
class Transmitter:
|
class Transmitter:
|
||||||
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
||||||
self.db_client = db_client
|
self.db_client = db_client
|
||||||
@ -20,12 +44,16 @@ class Transmitter:
|
|||||||
self.event_attr = event_attr
|
self.event_attr = event_attr
|
||||||
self.p = p
|
self.p = p
|
||||||
|
|
||||||
|
def start_ping(self):
|
||||||
|
t = Ping(self.db_client, self.p, self.log)
|
||||||
|
t.start()
|
||||||
|
|
||||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||||
|
|
||||||
def check_send(self):
|
def check_send(self):
|
||||||
for h, p in self.slots.items():
|
|
||||||
ts = int(time.time())
|
ts = int(time.time())
|
||||||
|
for h, p in self.slots.items():
|
||||||
tb, buffer = h.buffer_pool
|
tb, buffer = h.buffer_pool
|
||||||
buffer_size = len(buffer)
|
buffer_size = len(buffer)
|
||||||
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
||||||
|
@ -26,8 +26,9 @@ def is_valid_int(v, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def is_valid_srt(v, **kwargs):
|
def is_valid_srt(v, **kwargs):
|
||||||
if isinstance(v, str):
|
try:
|
||||||
return v
|
return str(v)
|
||||||
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@ -39,14 +40,16 @@ def is_valid_float(v, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def is_valid_bool(v, **kwargs):
|
def is_valid_bool(v, **kwargs):
|
||||||
if isinstance(v, bool):
|
try:
|
||||||
return v
|
return bool(v)
|
||||||
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def is_valid_array(v, **kwargs):
|
def is_valid_array(v, **kwargs):
|
||||||
if isinstance(v, list):
|
try:
|
||||||
return [str(i) for i in v]
|
return [str(i) for i in v]
|
||||||
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user