From cbaac95cca7c64199b7e7c5a21122f747ee197c2 Mon Sep 17 00:00:00 2001 From: wuaho Date: Tue, 11 May 2021 14:35:34 +0800 Subject: [PATCH] 1 --- Pipfile | 2 ++ app.py | 2 +- main.py | 10 +++++++++- single_process.py | 48 +++++++++++++++++++++++++++++++++++++++++++++ v2/handler_event.py | 2 +- 初始化用户表.sql | 2 +- 6 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 single_process.py diff --git a/Pipfile b/Pipfile index 8993b7d..1143fb3 100644 --- a/Pipfile +++ b/Pipfile @@ -6,6 +6,8 @@ name = "pypi" [packages] kafka-python = "*" clickhouse-driver = "*" +pipfile = "*" +pandas = "*" [dev-packages] diff --git a/app.py b/app.py index dda5e6b..0611712 100644 --- a/app.py +++ b/app.py @@ -16,7 +16,7 @@ class XProcess(Process): def run(self): db_client = CK(**settings.CK_CONFIG) sketch = Sketch(db_client) - handler_event = HandlerEvent(db_client, settings.GAME,ipsearch) + handler_event = HandlerEvent(db_client, settings.GAME,self.ipsearch) handler_user = HandlerUser(db_client, settings.GAME) transmitter = Transmitter(db_client, settings.GAME, sketch, self.lock) transmitter.add_source(handler_event, 10000, 60) diff --git a/main.py b/main.py index 5109952..dbcd5fd 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,18 @@ +import os from app import XProcess from multiprocessing import Lock from v2.ipregion import IpSearch, Ip2Region if __name__ == '__main__': + pid = os.getpid() + stop_shell = """#!/bin/bash +echo `pstree -p {pid}`|awk 'BEGIN{{ FS="(" ; RS=")" }} NF>1 {{ print $NF }}'|xargs kill &>/dev/null +""" + with open('stop.sh', 'w', encoding='utf8') as f: + f.write(stop_shell.format(pid=pid)) + lock = Lock() ipsearch = IpSearch(Ip2Region, "ip2region.db") for i in range(0, 16): - XProcess(i, lock,ipsearch).start() + XProcess(i, lock, ipsearch).start() diff --git a/single_process.py b/single_process.py new file mode 100644 index 0000000..2042b6c --- /dev/null +++ b/single_process.py @@ -0,0 +1,48 @@ +import time + +from settings import settings +from v2 import * +from v2.ipregion import IpSearch, Ip2Region +from multiprocessing import Lock + +ipsearch = IpSearch(Ip2Region, "ip2region.db") +lock = Lock() + + +def run(): + db_client = CK(**settings.CK_CONFIG) + sketch = Sketch(db_client) + handler_event = HandlerEvent(db_client, settings.GAME, ipsearch) + handler_user = HandlerUser(db_client, settings.GAME) + transmitter = Transmitter(db_client, settings.GAME, sketch, lock) + transmitter.add_source(handler_event, 10000, 60) + transmitter.add_source(handler_user, 1000, 60) + last_ts = int(time.time()) + consumer = create_consumer(-1) + + for topic, msg in consumer(): + # print(msg) + type_ = msg['#type'] + del msg['#type'] + ts = int(time.time()) + + if 'user' in type_: + # continue + obj = getattr(handler_user, type_) + handler_user.receive_data.append(UserAct(obj, msg)) + if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts: + last_ts = ts + handler_user.execute() + + elif 'track' in type_: + # continue + obj = getattr(handler_event, type_) + obj(msg) + else: + continue + + transmitter.run() + + +if __name__ == '__main__': + run() diff --git a/v2/handler_event.py b/v2/handler_event.py index 8204759..425f996 100644 --- a/v2/handler_event.py +++ b/v2/handler_event.py @@ -15,7 +15,7 @@ class HandlerEvent: def set_region(self, data): ip = data.get('#ip') if ip: - data['#country'], data['#province'], data['#city'] = self.ipsearch(ip) + data['#country'], data['#province'], data['#city'] = self.ipsearch.search(ip) def merge_update(self, a: dict, b: dict): """ diff --git a/初始化用户表.sql b/初始化用户表.sql index 4feb96a..fb58433 100644 --- a/初始化用户表.sql +++ b/初始化用户表.sql @@ -1,7 +1,7 @@ create table shjy.user ( - `#role_create_time` DateTime('UTC'), + `#reg_time` DateTime('UTC'), `#account_id` String, `svrindex` UInt16,