Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
234402527b | ||
![]() |
b14354f32a | ||
![]() |
19801a09eb | ||
![]() |
4c8c74e673 | ||
![]() |
ff79538302 | ||
![]() |
b11dae3ed5 | ||
![]() |
8ad16163a9 | ||
![]() |
f95e5bdb0b | ||
![]() |
e58621a4d2 | ||
![]() |
60eaf813df | ||
![]() |
83790bed72 | ||
![]() |
76c25d994c | ||
![]() |
fab52c7282 | ||
![]() |
1bdb13b620 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -129,3 +129,6 @@ dmypy.json
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
.idea
|
||||
# 不同游戏单独配置
|
||||
settings.py
|
||||
clear_up.py
|
||||
|
23
app.py
23
app.py
@ -1,3 +1,4 @@
|
||||
# coding:utf-8
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
@ -10,6 +11,7 @@ class XProcess(Process):
|
||||
|
||||
def __init__(self, partition, lock, ipsearch, log, rdb=None, event_attr=None):
|
||||
super(XProcess, self).__init__()
|
||||
# self.daemon = True
|
||||
self.partition = partition
|
||||
self.lock = lock
|
||||
self.ipsearch = ipsearch
|
||||
@ -24,10 +26,12 @@ class XProcess(Process):
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, self.log, self.lock, self.event_attr,
|
||||
self.partition)
|
||||
transmitter.add_source(handler_event, 10000, 60)
|
||||
transmitter.add_source(handler_user, 99, 60)
|
||||
transmitter.start_ping()
|
||||
transmitter.add_source(handler_event, 5000, 60)
|
||||
transmitter.add_source(handler_user, 500, 60)
|
||||
|
||||
last_ts = int(time.time())
|
||||
consumer = create_consumer(self.partition)
|
||||
consumer, kafka_client = create_consumer(self.partition)
|
||||
|
||||
for topic, msg in consumer():
|
||||
# print(msg)
|
||||
@ -47,6 +51,19 @@ class XProcess(Process):
|
||||
# continue
|
||||
obj = getattr(handler_event, type_)
|
||||
obj(msg)
|
||||
elif type_ == settings.STOP_SIGNAL:
|
||||
# 1 小时内有效
|
||||
if msg.get('#time', 0) + 3600 < int(time.time()):
|
||||
continue
|
||||
kafka_client.close()
|
||||
# 停止消费kafka
|
||||
print(f'进程{self.partition} 等待90秒')
|
||||
time.sleep(90)
|
||||
print(f'进程{self.partition} 写入数据')
|
||||
transmitter.run()
|
||||
print(f'进程{self.partition} 结束')
|
||||
|
||||
break
|
||||
else:
|
||||
continue
|
||||
|
||||
|
40
clear_up.py
Normal file
40
clear_up.py
Normal file
@ -0,0 +1,40 @@
|
||||
import json
|
||||
|
||||
from settings import settings
|
||||
|
||||
game = 'shanhai'
|
||||
server_list_url = f'http://gametools.legu.cc/?app=api&act=getServerList&game={game}'
|
||||
|
||||
import pandas as pd
|
||||
from clickhouse_driver import Client
|
||||
|
||||
client = Client(**settings.CK_CONFIG)
|
||||
|
||||
df = pd.read_json(server_list_url)
|
||||
df = df[~df['hostname'].isin(['119.3.89.14', '119.3.105.109'])]
|
||||
serverid = tuple((str(i) for i in df['serverid'].to_list()))
|
||||
|
||||
sql = f"""select `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`, count() as n
|
||||
from {game}.event
|
||||
where
|
||||
lower(`#os`) = 'windows'
|
||||
or svrindex not in {serverid}
|
||||
group by `#account_id`, `#event_time`, `#event_name`,`#os`,`svrindex`
|
||||
having n = 1 limit 2000"""
|
||||
|
||||
data, columns = client.execute(
|
||||
sql, columnar=True, with_column_types=True
|
||||
)
|
||||
if not data:
|
||||
exit(0)
|
||||
data_df = pd.DataFrame(
|
||||
{col[0]: d for d, col in zip(data, columns)}
|
||||
)
|
||||
|
||||
data_df.drop('n', axis=1, inplace=True)
|
||||
data_df['sign'] = -1
|
||||
data_df['#event_time'] = data_df['#event_time'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
|
||||
insert_sql = f'INSERT INTO {game}.event FORMAT JSONEachRow '
|
||||
insert_sql = insert_sql + '\n'.join([json.dumps(item) for item in data_df.T.to_dict().values()])
|
||||
|
||||
client.execute(insert_sql)
|
20
settings.py
20
settings.py
@ -1,23 +1,33 @@
|
||||
import json
|
||||
|
||||
|
||||
class Config:
|
||||
# ck数据库连接
|
||||
CK_CONFIG = {'host': '119.29.176.224',
|
||||
'send_receive_timeout': 30}
|
||||
CK_CONFIG = {'host': '139.159.159.3',
|
||||
'port': 9654,
|
||||
}
|
||||
|
||||
# 每个游戏不一样 游戏上报 kafka 主题
|
||||
SUBSCRIBE_TOPIC = 'test2'
|
||||
SUBSCRIBE_TOPIC = 'shanhai'
|
||||
|
||||
KAFKA_CONSUMER_CONF = {
|
||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||
'value_deserializer': json.loads,
|
||||
'auto_offset_reset': 'earliest',
|
||||
'enable_auto_commit': True,
|
||||
'auto_commit_interval_ms': 10000,
|
||||
|
||||
# 每个游戏不一样
|
||||
'group_id': 'legu_group3'
|
||||
'group_id': 'shanhai_group'
|
||||
}
|
||||
KAFKA_PRODUCER_CONF = {
|
||||
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
|
||||
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
|
||||
}
|
||||
# 游戏数据库名
|
||||
GAME = 'shjy'
|
||||
GAME = 'shanhai'
|
||||
|
||||
STOP_SIGNAL = 'stop_MntxuXMc'
|
||||
|
||||
REDIS_CONF = {
|
||||
'host': '192.168.0.161',
|
||||
|
@ -23,8 +23,8 @@ def run():
|
||||
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
|
||||
handler_user = HandlerUser(db_client, settings.GAME)
|
||||
transmitter = Transmitter(db_client, settings.GAME, sketch, logger, lock, event_attr)
|
||||
transmitter.add_source(handler_event, 10000, 60)
|
||||
transmitter.add_source(handler_user, 1000, 60)
|
||||
transmitter.add_source(handler_event, 1, 1)
|
||||
transmitter.add_source(handler_user, 1, 1)
|
||||
last_ts = int(time.time())
|
||||
consumer = create_consumer(-1)
|
||||
|
||||
|
21
stop.py
Normal file
21
stop.py
Normal file
@ -0,0 +1,21 @@
|
||||
# coding:utf-8
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from kafka import KafkaProducer
|
||||
|
||||
from settings import settings
|
||||
|
||||
producer = KafkaProducer(**settings.KAFKA_PRODUCER_CONF)
|
||||
|
||||
msg = {
|
||||
'#type': settings.STOP_SIGNAL,
|
||||
'#time': int(time.time())
|
||||
}
|
||||
for i in range(0,16):
|
||||
print(f'发送分区{i}')
|
||||
future1 = producer.send(settings.GAME, msg, partition=i)
|
||||
try:
|
||||
future1.get(timeout=10)
|
||||
except Exception as e: # 发送失败抛出kafka_errors
|
||||
traceback.format_exc()
|
@ -1,4 +1,5 @@
|
||||
create view shjy.user_view as select *
|
||||
drop table if exists shanhai.user_view;
|
||||
create view shanhai.user_view as select *
|
||||
from shjy.user
|
||||
order by `#reg_time` desc
|
||||
LIMIT 1 by `#account_id`
|
@ -7,8 +7,9 @@ __all__ = 'create_consumer',
|
||||
|
||||
|
||||
def create_consumer(partition=-1):
|
||||
def consumer():
|
||||
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
|
||||
|
||||
def consumer():
|
||||
if partition > 0:
|
||||
c.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)])
|
||||
else:
|
||||
@ -19,4 +20,4 @@ def create_consumer(partition=-1):
|
||||
val = msg.value
|
||||
yield topic, val
|
||||
|
||||
return consumer
|
||||
return consumer, c
|
||||
|
2
v2/db.py
2
v2/db.py
@ -64,6 +64,8 @@ class CK:
|
||||
try:
|
||||
data, columns = self.__client.execute(sql, columnar=True, with_column_types=True)
|
||||
except Exception as e:
|
||||
print('*'*50)
|
||||
print(sql)
|
||||
traceback.print_exc()
|
||||
if e.code == 60:
|
||||
return self.get_all(db, 'user', where, try_cnt - 1)
|
||||
|
@ -11,7 +11,7 @@ class EventAttr:
|
||||
attr = self.event_attr.get(key)
|
||||
if not attr:
|
||||
self.event_attr[key] = self.rdb.smembers(key) or set()
|
||||
return self.event_attr[key]
|
||||
return set(self.event_attr[key])
|
||||
|
||||
def set_event_attr(self, key, *data):
|
||||
self.rdb.sadd(key, *data)
|
||||
|
35
v2/sketch.py
35
v2/sketch.py
@ -1,5 +1,4 @@
|
||||
import copy
|
||||
|
||||
from settings import settings
|
||||
from .valid_data import *
|
||||
|
||||
|
||||
@ -9,6 +8,7 @@ class Sketch:
|
||||
self.struct_cache = struct_cache
|
||||
self.__type_dict = dict()
|
||||
self.__struct_dict = dict()
|
||||
self.init_tb_struct()
|
||||
|
||||
@property
|
||||
def type_dict(self):
|
||||
@ -30,20 +30,30 @@ class Sketch:
|
||||
if self.struct_cache:
|
||||
self.struct_cache.update(db, tb, data)
|
||||
|
||||
def init_tb_struct_cache(self, db, tb):
|
||||
sql = f"select name,type from system.columns where database='{db}' and table='{tb}'"
|
||||
|
||||
data, columns = self.db_client.execute(sql, with_column_types=True, columnar=True)
|
||||
res = {k: v for k, v in zip(data[0], data[1])}
|
||||
self.__struct_dict[f'{db}_{tb}'] = res
|
||||
self.up_tb_struct(db, tb, res)
|
||||
return res
|
||||
|
||||
def init_tb_struct(self):
|
||||
self.init_tb_struct_cache(settings.GAME, 'event')
|
||||
self.init_tb_struct_cache(settings.GAME, 'user')
|
||||
|
||||
def get_tb_struct_cache(self, db, tb):
|
||||
"""
|
||||
查一条记录 取字段 和类型
|
||||
取字段 和类型
|
||||
:param db:
|
||||
:param tb:
|
||||
:return:
|
||||
"""
|
||||
if self.__struct_dict.get(f'{db}_{tb}'):
|
||||
return self.__struct_dict.get(f'{db}_{tb}')
|
||||
sql = f'select * from {db}.{tb} limit 1'
|
||||
_, columns = self.db_client.execute(sql, with_column_types=True)
|
||||
res = {item[0]: item[1] for item in columns}
|
||||
self.__struct_dict[f'{db}_{tb}'] = res
|
||||
self.up_tb_struct(db, tb, res)
|
||||
|
||||
res = self.init_tb_struct_cache(db, tb)
|
||||
return res
|
||||
|
||||
def update_user_view(self, db, tb):
|
||||
@ -64,7 +74,7 @@ class Sketch:
|
||||
LIMIT 1 by `#account_id`"""
|
||||
self.db_client.execute(sql)
|
||||
|
||||
def alter_table(self, db, tb, data):
|
||||
def alter_table(self, db, tb, data, try_cnt=10):
|
||||
"""
|
||||
数据库字段检查
|
||||
添加新字段为第一次出现类型
|
||||
@ -104,8 +114,13 @@ class Sketch:
|
||||
try:
|
||||
self.db_client.execute(sql)
|
||||
except Exception as e:
|
||||
print(f'添加字段 {k} 失败')
|
||||
print(f'添加字段 {k} 失败,同步数据库表结构')
|
||||
# 读取数据库表结构并设置
|
||||
self.init_tb_struct()
|
||||
default_field.pop(k)
|
||||
if try_cnt < 0:
|
||||
raise e
|
||||
return self.alter_table(db, tb, data, try_cnt=try_cnt - 1)
|
||||
|
||||
if set(default_field) - keys:
|
||||
self.up_tb_struct(db, tb, default_field)
|
||||
|
@ -1,6 +1,7 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
@ -9,6 +10,29 @@ from .valid_data import *
|
||||
__all__ = 'Transmitter',
|
||||
|
||||
|
||||
class Ping(threading.Thread):
|
||||
def __init__(self, db_client, p, log):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.ping_ts = 0
|
||||
self.time_out = 60
|
||||
self.log = log
|
||||
self.db_client = db_client
|
||||
self.p = p
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
ts = int(time.time())
|
||||
if self.ping_ts + self.time_out < ts:
|
||||
# 保持连接
|
||||
try:
|
||||
self.ping_ts = ts
|
||||
self.log.info(f'保持连接{self.p} ping')
|
||||
self.db_client.execute('select 1')
|
||||
except:
|
||||
self.log.error('ping error')
|
||||
|
||||
|
||||
class Transmitter:
|
||||
def __init__(self, db_client, db_name, sketch, log, lock, event_attr, p=0):
|
||||
self.db_client = db_client
|
||||
@ -20,12 +44,16 @@ class Transmitter:
|
||||
self.event_attr = event_attr
|
||||
self.p = p
|
||||
|
||||
def start_ping(self):
|
||||
t = Ping(self.db_client, self.p, self.log)
|
||||
t.start()
|
||||
|
||||
def add_source(self, handler, bulk_max=1000, time_out=60):
|
||||
self.slots[handler] = {'bulk_max': bulk_max, 'time_out': time_out, "ts": int(time.time())}
|
||||
|
||||
def check_send(self):
|
||||
for h, p in self.slots.items():
|
||||
ts = int(time.time())
|
||||
for h, p in self.slots.items():
|
||||
tb, buffer = h.buffer_pool
|
||||
buffer_size = len(buffer)
|
||||
if (p['ts'] + p['time_out'] <= ts or buffer_size >= p['bulk_max']) and buffer_size > 0:
|
||||
|
@ -26,8 +26,9 @@ def is_valid_int(v, **kwargs):
|
||||
|
||||
|
||||
def is_valid_srt(v, **kwargs):
|
||||
if isinstance(v, str):
|
||||
return v
|
||||
try:
|
||||
return str(v)
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
@ -39,14 +40,16 @@ def is_valid_float(v, **kwargs):
|
||||
|
||||
|
||||
def is_valid_bool(v, **kwargs):
|
||||
if isinstance(v, bool):
|
||||
return v
|
||||
try:
|
||||
return bool(v)
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_array(v, **kwargs):
|
||||
if isinstance(v, list):
|
||||
try:
|
||||
return [str(i) for i in v]
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
create table shjy.event
|
||||
drop table if exists shanhai.event;
|
||||
create table shanhai.event
|
||||
(
|
||||
`#ip` Nullable(IPv4),
|
||||
`#country` Nullable(String),
|
||||
|
@ -1,4 +1,5 @@
|
||||
create table shjy.user
|
||||
drop table if exists shanhai.user;
|
||||
create table shanhai.user
|
||||
(
|
||||
|
||||
`#reg_time` DateTime('UTC'),
|
||||
|
Loading…
Reference in New Issue
Block a user