This commit is contained in:
wuaho 2021-05-11 14:35:34 +08:00
parent 46d75639e1
commit cbaac95cca
6 changed files with 62 additions and 4 deletions

View File

@ -6,6 +6,8 @@ name = "pypi"
[packages]
kafka-python = "*"
clickhouse-driver = "*"
pipfile = "*"
pandas = "*"
[dev-packages]

2
app.py
View File

@ -16,7 +16,7 @@ class XProcess(Process):
def run(self):
db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME,ipsearch)
handler_event = HandlerEvent(db_client, settings.GAME,self.ipsearch)
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, self.lock)
transmitter.add_source(handler_event, 10000, 60)

10
main.py
View File

@ -1,10 +1,18 @@
import os
from app import XProcess
from multiprocessing import Lock
from v2.ipregion import IpSearch, Ip2Region
if __name__ == '__main__':
pid = os.getpid()
stop_shell = """#!/bin/bash
echo `pstree -p {pid}`|awk 'BEGIN{{ FS="(" ; RS=")" }} NF>1 {{ print $NF }}'|xargs kill &>/dev/null
"""
with open('stop.sh', 'w', encoding='utf8') as f:
f.write(stop_shell.format(pid=pid))
lock = Lock()
ipsearch = IpSearch(Ip2Region, "ip2region.db")
for i in range(0, 16):
XProcess(i, lock,ipsearch).start()
XProcess(i, lock, ipsearch).start()

48
single_process.py Normal file
View File

@ -0,0 +1,48 @@
import time
from settings import settings
from v2 import *
from v2.ipregion import IpSearch, Ip2Region
from multiprocessing import Lock
ipsearch = IpSearch(Ip2Region, "ip2region.db")
lock = Lock()
def run():
db_client = CK(**settings.CK_CONFIG)
sketch = Sketch(db_client)
handler_event = HandlerEvent(db_client, settings.GAME, ipsearch)
handler_user = HandlerUser(db_client, settings.GAME)
transmitter = Transmitter(db_client, settings.GAME, sketch, lock)
transmitter.add_source(handler_event, 10000, 60)
transmitter.add_source(handler_user, 1000, 60)
last_ts = int(time.time())
consumer = create_consumer(-1)
for topic, msg in consumer():
# print(msg)
type_ = msg['#type']
del msg['#type']
ts = int(time.time())
if 'user' in type_:
# continue
obj = getattr(handler_user, type_)
handler_user.receive_data.append(UserAct(obj, msg))
if len(handler_user.receive_data) >= 1000 or last_ts + 60 < ts:
last_ts = ts
handler_user.execute()
elif 'track' in type_:
# continue
obj = getattr(handler_event, type_)
obj(msg)
else:
continue
transmitter.run()
if __name__ == '__main__':
run()

View File

@ -15,7 +15,7 @@ class HandlerEvent:
def set_region(self, data):
ip = data.get('#ip')
if ip:
data['#country'], data['#province'], data['#city'] = self.ipsearch(ip)
data['#country'], data['#province'], data['#city'] = self.ipsearch.search(ip)
def merge_update(self, a: dict, b: dict):
"""

View File

@ -1,7 +1,7 @@
create table shjy.user
(
`#role_create_time` DateTime('UTC'),
`#reg_time` DateTime('UTC'),
`#account_id` String,
`svrindex` UInt16,