Compare commits

...

14 Commits

Author SHA1 Message Date
wuaho
234402527b 1 2021-08-14 14:53:54 +08:00
wuaho
b14354f32a 1 2021-08-14 14:48:45 +08:00
wuaho
19801a09eb 1 2021-08-14 11:31:07 +08:00
wuaho
4c8c74e673 数据清理 2021-08-11 19:57:35 +08:00
wuaho
ff79538302 停止入库信号 2021-08-02 16:48:11 +08:00
wuaho
b11dae3ed5 优化表结构读取 2021-07-28 20:31:06 +08:00
wuaho
8ad16163a9 清空表 2021-07-28 17:48:23 +08:00
wuaho
f95e5bdb0b 测试 2021-07-26 23:38:27 +08:00
wuaho
e58621a4d2 初始化结构 2021-07-26 23:21:21 +08:00
wuaho
60eaf813df shanhai 2021-07-19 20:31:29 +08:00
wuaho
83790bed72 shanhai 2021-07-19 20:17:23 +08:00
wuaho
76c25d994c 迁移数据库 2021-07-10 02:10:52 +08:00
wuaho
fab52c7282 类型转换 2021-07-09 21:10:44 +08:00
wuaho
1bdb13b620 类型转换 2021-07-06 10:57:28 +08:00
16 changed files with 179 additions and 35 deletions

3
.gitignore vendored
View File

@ -129,3 +129,6 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
.idea .idea
# 不同游戏单独配置
settings.py
clear_up.py

23
app.py
View File

@ -1,3 +1,4 @@
# coding:utf-8
import time import time
from multiprocessing import Process from multiprocessing import Process
@ -10,6 +11,7 @@ class XProcess(Process):
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None): def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
super(XProcess, self).__init__() super(XProcess, self).__init__()
# self.daemon = True
self.partition = partition self.partition = partition
self.lock = lock self.lock = lock
self.ipsearch = ipsearch self.ipsearch = ipsearch
@ -24,10 +26,12 @@ class XProcess(Process):
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr, transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
self.partition) self.partition)
transmitter.add_source(handler_event, 10000, 60) transmitter.start_ping()
transmitter.add_source(handler_user, 99, 60) transmitter.add_source(handler_event, 5000, 60)
transmitter.add_source(handler_user, 500, 60)
last_ts = int(time.time()) last_ts = int(time.time())
consumer = create_consumer(self.partition) consumer, kafka_client = create_consumer(self.partition)
for topic, msg in consumer(): for topic, msg in consumer():
# print(msg) # print(msg)
@ -47,6 +51,19 @@ class XProcess(Process):
# continue # continue
obj = getattr(handler_event, type_) obj = getattr(handler_event, type_)
obj(msg) obj(msg)
elif type_ == settings.STOP_SIGNAL:
# 1 小时内有效
if msg.get('#time', 0) + 3600 < int(time.time()):
continue
kafka_client.close()
# 停止消费kafka
print(f'进程{self.partition} 等待90秒')
time.sleep(90)
print(f'进程{self.partition} 写入数据')
transmitter.run()
print(f'进程{self.partition} 结束')
break
else: else:
continue continue

40
clear_up.py Normal file
View File

@ -0,0 +1,40 @@
import json
from settings import settings
game = 'shanhai'
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
import pandas as pd
from clickhouse_driver import Client
client = Client(**settings.CK_CONFIG)
df = pd.read_json(server_list_url)
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
serverid = tuple((str(i) for i in df['serverid'].to_list()))
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
from {game}.event
where
lower(`#os`) = 'windows'
or svrindex not in {serverid}
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
having n = 1 limit 2000"""
data, columns = client.execute(
sql, columnar=True, with_column_types=True
)
if not data:
exit(0)
data_df = pd.DataFrame(
{col[0]: d for d, col in zip(data, columns)}
)
data_df.drop('n', axis=1, inplace=True)
data_df['sign'] = -1
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
insert_sql = f'INSERT INTO {game}.event FORMAT JSONEachRow '
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
client.execute(insert_sql)

View File

@ -1,3 +1,4 @@
#coding:utf-8
import os import os
import redis import redis

View File

@ -1,23 +1,33 @@
import json import json
class Config: class Config:
# ck数据库连接 # ck数据库连接
CK_CONFIG = {'host': '119.29.176.224', CK_CONFIG = {'host': '139.159.159.3',
'send_receive_timeout': 30} 'port': 9654,
}
# 每个游戏不一样 游戏上报 kafka 主题 # 每个游戏不一样 游戏上报 kafka 主题
SUBSCRIBE_TOPIC = 'test2' SUBSCRIBE_TOPIC = 'shanhai'
KAFKA_CONSUMER_CONF = { KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads, 'value_deserializer': json.loads,
'auto_offset_reset': 'earliest', 'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'auto_commit_interval_ms': 10000,
# 每个游戏不一样 # 每个游戏不一样
'group_id': 'legu_group3' 'group_id': 'shanhai_group'
}
KAFKA_PRODUCER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
} }
# 游戏数据库名 # 游戏数据库名
GAME = 'shjy' GAME = 'shanhai'
STOP_SIGNAL = 'stop_MntxuXMc'
REDIS_CONF = { REDIS_CONF = {
'host': '192.168.0.161', 'host': '192.168.0.161',

View File

@ -23,8 +23,8 @@ def run():
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
handler_user = HandlerUser(db_client, settings.GAME) handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr) transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr)
transmitter.add_source(handler_event, 10000, 60) transmitter.add_source(handler_event, 1, 1)
transmitter.add_source(handler_user, 1000, 60) transmitter.add_source(handler_user, 1, 1)
last_ts = int(time.time()) last_ts = int(time.time())
consumer = create_consumer(-1) consumer = create_consumer(-1)

21
stop.py Normal file
View File

@ -0,0 +1,21 @@
# coding:utf-8
import time
import traceback
from kafka import KafkaProducer
from settings import settings
producer = KafkaProducer(**settings.KAFKA_PRODUCER_CONF)
msg = {
'#type': settings.STOP_SIGNAL,
'#time': int(time.time())
}
for i in range(0,16):
print(f'发送分区{i}')
future1 = producer.send(settings.GAME, msg, partition=i)
try:
future1.get(timeout=10)
except Exception as e: # 发送失败抛出kafka_errors
traceback.format_exc()

View File

@ -1,4 +1,5 @@
create view shjy.user_view as select * drop table if exists shanhai.user_view;
create view shanhai.user_view as select *
from shjy.user from shjy.user
order by `#reg_time` desc order by `#reg_time` desc
LIMIT 1 by `#account_id` LIMIT 1 by `#account_id`

View File

@ -7,8 +7,9 @@ __all__ = 'create_consumer',
def create_consumer(partition=-1): def create_consumer(partition=-1):
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
def consumer(): def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
if partition > 0: if partition > 0:
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
else: else:
@ -19,4 +20,4 @@ def create_consumer(partition=-1):
val = msg.value val = msg.value
yield topic, val yield topic, val
return consumer return consumer, c

View File

@ -64,6 +64,8 @@ class CK:
try: try:
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True) data, columns = self.__client.execute(sql, columnar=True, with_column_types=True)
except Exception as e: except Exception as e:
print('*'*50)
print(sql)
traceback.print_exc() traceback.print_exc()
if e.code == 60: if e.code == 60:
return self.get_all(db, 'user', where, try_cnt - 1) return self.get_all(db, 'user', where, try_cnt - 1)

View File

@ -11,7 +11,7 @@ class EventAttr:
attr = self.event_attr.get(key) attr = self.event_attr.get(key)
if not attr: if not attr:
self.event_attr[key] = self.rdb.smembers(key) or set() self.event_attr[key] = self.rdb.smembers(key) or set()
return self.event_attr[key] return set(self.event_attr[key])
def set_event_attr(self, key, *data): def set_event_attr(self, key, *data):
self.rdb.sadd(key, *data) self.rdb.sadd(key, *data)

View File

@ -1,5 +1,4 @@
import copy from settings import settings
from .valid_data import * from .valid_data import *
@ -9,6 +8,7 @@ class Sketch:
self.struct_cache = struct_cache self.struct_cache = struct_cache
self.__type_dict = dict() self.__type_dict = dict()
self.__struct_dict = dict() self.__struct_dict = dict()
self.init_tb_struct()
@property @property
def type_dict(self): def type_dict(self):
@ -30,20 +30,30 @@ class Sketch:
if self.struct_cache: if self.struct_cache:
self.struct_cache.update(db, tb, data) self.struct_cache.update(db, tb, data)
def init_tb_struct_cache(self, db, tb):
sql = f"select name,type from system.columns where database='{db}' and table='{tb}'"
data, columns = self.db_client.execute(sql, with_column_types=True, columnar=True)
res = {k: v for k, v in zip(data[0], data[1])}
self.__struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res
def init_tb_struct(self):
self.init_tb_struct_cache(settings.GAME, 'event')
self.init_tb_struct_cache(settings.GAME, 'user')
def get_tb_struct_cache(self, db, tb): def get_tb_struct_cache(self, db, tb):
""" """
查一条记录 取字段 和类型 取字段 和类型
:param db: :param db:
:param tb: :param tb:
:return: :return:
""" """
if self.__struct_dict.get(f'{db}_{tb}'): if self.__struct_dict.get(f'{db}_{tb}'):
return self.__struct_dict.get(f'{db}_{tb}') return self.__struct_dict.get(f'{db}_{tb}')
sql = f'select * from {db}.{tb} limit 1'
_, columns = self.db_client.execute(sql, with_column_types=True) res = self.init_tb_struct_cache(db, tb)
res = {item[0]: item[1] for item in columns}
self.__struct_dict[f'{db}_{tb}'] = res
self.up_tb_struct(db, tb, res)
return res return res
def update_user_view(self, db, tb): def update_user_view(self, db, tb):
@ -64,7 +74,7 @@ class Sketch:
LIMIT 1 by `#account_id`""" LIMIT 1 by `#account_id`"""
self.db_client.execute(sql) self.db_client.execute(sql)
def alter_table(self, db, tb, data): def alter_table(self, db, tb, data, try_cnt=10):
""" """
数据库字段检查 数据库字段检查
添加新字段为第一次出现类型 添加新字段为第一次出现类型
@ -104,8 +114,13 @@ class Sketch:
try: try:
self.db_client.execute(sql) self.db_client.execute(sql)
except Exception as e: except Exception as e:
print(f'添加字段 {k} 失败') print(f'添加字段 {k} 失败,同步数据库表结构')
# 读取数据库表结构并设置
self.init_tb_struct()
default_field.pop(k) default_field.pop(k)
if try_cnt < 0:
raise e
return self.alter_table(db, tb, data, try_cnt=try_cnt - 1)
if set(default_field) - keys: if set(default_field) - keys:
self.up_tb_struct(db, tb, default_field) self.up_tb_struct(db, tb, default_field)

View File

@ -1,6 +1,7 @@
import json import json
import os import os
import re import re
import threading
import time import time
import traceback import traceback
@ -9,6 +10,29 @@ from .valid_data import *
__all__ = 'Transmitter', __all__ = 'Transmitter',
class Ping(threading.Thread):
def __init__(self, db_client, p, log):
threading.Thread.__init__(self)
self.daemon = True
self.ping_ts = 0
self.time_out = 60
self.log = log
self.db_client = db_client
self.p = p
def run(self):
while True:
ts = int(time.time())
if self.ping_ts + self.time_out < ts:
# 保持连接
try:
self.ping_ts = ts
self.log.info(f'保持连接{self.p} ping')
self.db_client.execute('select 1')
except:
self.log.error('ping error')
class Transmitter: class Transmitter:
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0): def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
self.db_client = db_client self.db_client = db_client
@ -20,12 +44,16 @@ class Transmitter:
self.event_attr = event_attr self.event_attr = event_attr
self.p = p self.p = p
def start_ping(self):
t = Ping(self.db_client, self.p, self.log)
t.start()
def add_source(self, handler, bulk_max=1000, time_out=60): def add_source(self, handler, bulk_max=1000, time_out=60):
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())} self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
def check_send(self): def check_send(self):
ts = int(time.time())
for h, p in self.slots.items(): for h, p in self.slots.items():
ts = int(time.time())
tb, buffer = h.buffer_pool tb, buffer = h.buffer_pool
buffer_size = len(buffer) buffer_size = len(buffer)
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0: if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:

View File

@ -26,9 +26,10 @@ def is_valid_int(v, **kwargs):
def is_valid_srt(v, **kwargs): def is_valid_srt(v, **kwargs):
if isinstance(v, str): try:
return v return str(v)
return None except:
return None
def is_valid_float(v, **kwargs): def is_valid_float(v, **kwargs):
@ -39,15 +40,17 @@ def is_valid_float(v, **kwargs):
def is_valid_bool(v, **kwargs): def is_valid_bool(v, **kwargs):
if isinstance(v, bool): try:
return v return bool(v)
return None except:
return None
def is_valid_array(v, **kwargs): def is_valid_array(v, **kwargs):
if isinstance(v, list): try:
return [str(i) for i in v] return [str(i) for i in v]
return None except:
return None
def is_valid_ipv4(v, **kwargs): def is_valid_ipv4(v, **kwargs):

View File

@ -1,4 +1,5 @@
create table shjy.event drop table if exists shanhai.event;
create table shanhai.event
( (
`#ip` Nullable(IPv4), `#ip` Nullable(IPv4),
`#country` Nullable(String), `#country` Nullable(String),

View File

@ -1,4 +1,5 @@
create table shjy.user drop table if exists shanhai.user;
create table shanhai.user
( (
`#reg_time` DateTime('UTC'), `#reg_time` DateTime('UTC'),