Compare commits

...

6 Commits

Author SHA1 Message Date
90762159a7 update 2020-11-13 20:37:16 +08:00
1a0048ec15 update 2020-11-13 20:19:35 +08:00
ab3b196cba update 2020-11-13 20:02:49 +08:00
6de17f4722 update 2020-11-13 19:08:51 +08:00
d80d922c70 update 2020-11-13 18:45:38 +08:00
7aaa97c400 update 2020-11-13 18:35:46 +08:00
4 changed files with 29 additions and 28 deletions

View File

@ -1,20 +1,19 @@
from loguru import logger
from setting import logger
from .handle_event import HandlerEvent
from .handle_paylist import HandlerPaylist
from .handle_user import HandlerUser
HANDLER_DICT = {
'user': HandlerUser,
'paylist': HandlerPaylist,
'event': HandlerEvent,
b'user': HandlerUser,
b'paylist': HandlerPaylist,
b'event': HandlerEvent,
}
def handle_factory(data):
type_ = data['type']
obj = HANDLER_DICT.get(type_)
def handle_factory(key, data):
obj = HANDLER_DICT.get(key)
if not obj:
logger.warning(f'未知类型{type_}')
logger.warning(f'未知类型{key}')
return
return obj(data)

View File

@ -1,8 +1,9 @@
import hashlib
import os
from abc import ABCMeta, abstractmethod
import traceback
import arrow
from loguru import logger
from setting import logger
from setting import settings
@ -13,14 +14,16 @@ class Handler(metaclass=ABCMeta):
def __init__(self, data):
self._data = data
self._appid = data['appid']
appid = data['appid']
self._event = self._data['event']
self._game = self._data['project']
self._type = self._data['type']
Handler._init_game_config(self._appid)
Handler._init_mdb(f'{settings.DB_PREFIX}_{Handler.game_config["id_name"]}')
self.secret_token = Handler.game_config.get('secret_token', '')
self._nt = arrow.now(tz=Handler.game_config.get('timezone', 'local')).timestamp
Handler._init_game_config()
Handler._init_mdb(f'{settings.DB_PREFIX}_{Handler.game_config[appid]["id_name"]}')
self.secret_token = Handler.game_config[appid].get('secret_token', '')
self._nt = arrow.now(tz=Handler.game_config[appid].get('timezone', 'local')).timestamp
print(Handler.game_config)
@classmethod
def _init_mdb(cls, dbname):
@ -44,9 +47,11 @@ class Handler(metaclass=ABCMeta):
coll.update_one(where, {'$set': update_data}, upsert=True)
@classmethod
def _init_game_config(cls, appid):
def _init_game_config(cls):
if not cls.game_config:
cls.game_config = settings.admin_game_coll.find_one({'appid': appid})
topic_list = settings.kafka_setting['topic_name']
data = settings.admin_game_coll.find({'appid': {'$in': topic_list}}, {'_id': False})
cls.game_config = {item['appid']:item for item in data}
return cls.game_config
def handler_event(self):

View File

@ -4,7 +4,7 @@ import time
import ssl
from kafka import KafkaConsumer
from loguru import logger
from setting import logger
from handler import handle_factory
from setting import settings
@ -31,11 +31,12 @@ def main():
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'],
value_deserializer=json.loads)
consumer.subscribe((conf['topic_name'],))
consumer.subscribe(conf['topic_name'])
for msg in consumer:
st = time.time() * 1000
key = msg.key
data = msg.value
obj = handle_factory(data)
obj = handle_factory(key, data)
if not obj:
continue
obj.run()

View File

@ -3,6 +3,9 @@ import os
import pymongo
from loguru import logger
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="INFO",
rotation="100 MB", retention='7 days',
enqueue=True)
class Config:
@ -10,8 +13,8 @@ class Config:
'sasl_plain_username': 'legu666666',
'sasl_plain_password': '5EINbjyI',
'bootstrap_servers': ["39.104.71.242:9093", "39.104.71.78:9093", "39.104.67.122:9093"],
'topic_name': 'legu_geshouccs',
'consumer_id': 'geshouccs_consumer'
'topic_name': os.environ.get('topic_name').split(','),
'consumer_id': os.environ.get('consumer_id')
}
local_mongo_uri = os.environ["local_mongo_uri"]
@ -23,16 +26,9 @@ class Config:
class Production(Config):
DB_PREFIX = 'game'
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="INFO",
rotation="100 MB", retention='7 days',
enqueue=True)
class Debug(Config):
DB_PREFIX = 'debug'
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="DEBUG",
rotation="100 MB", retention='7 days',
enqueue=True)
settings = Production if os.environ.get('xlegudata_env') == 'production' else Debug