Compare commits
6 Commits
12f80d3ef2
...
90762159a7
Author | SHA1 | Date | |
---|---|---|---|
90762159a7 | |||
1a0048ec15 | |||
ab3b196cba | |||
6de17f4722 | |||
d80d922c70 | |||
7aaa97c400 |
@ -1,20 +1,19 @@
|
|||||||
from loguru import logger
|
from setting import logger
|
||||||
|
|
||||||
from .handle_event import HandlerEvent
|
from .handle_event import HandlerEvent
|
||||||
from .handle_paylist import HandlerPaylist
|
from .handle_paylist import HandlerPaylist
|
||||||
from .handle_user import HandlerUser
|
from .handle_user import HandlerUser
|
||||||
|
|
||||||
HANDLER_DICT = {
|
HANDLER_DICT = {
|
||||||
'user': HandlerUser,
|
b'user': HandlerUser,
|
||||||
'paylist': HandlerPaylist,
|
b'paylist': HandlerPaylist,
|
||||||
'event': HandlerEvent,
|
b'event': HandlerEvent,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def handle_factory(data):
|
def handle_factory(key, data):
|
||||||
type_ = data['type']
|
obj = HANDLER_DICT.get(key)
|
||||||
obj = HANDLER_DICT.get(type_)
|
|
||||||
if not obj:
|
if not obj:
|
||||||
logger.warning(f'未知类型{type_}')
|
logger.warning(f'未知类型{key}')
|
||||||
return
|
return
|
||||||
return obj(data)
|
return obj(data)
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
|
import os
|
||||||
from abc import ABCMeta, abstractmethod
|
from abc import ABCMeta, abstractmethod
|
||||||
import traceback
|
import traceback
|
||||||
import arrow
|
import arrow
|
||||||
from loguru import logger
|
from setting import logger
|
||||||
|
|
||||||
from setting import settings
|
from setting import settings
|
||||||
|
|
||||||
@ -13,14 +14,16 @@ class Handler(metaclass=ABCMeta):
|
|||||||
|
|
||||||
def __init__(self, data):
|
def __init__(self, data):
|
||||||
self._data = data
|
self._data = data
|
||||||
self._appid = data['appid']
|
appid = data['appid']
|
||||||
self._event = self._data['event']
|
self._event = self._data['event']
|
||||||
self._game = self._data['project']
|
self._game = self._data['project']
|
||||||
self._type = self._data['type']
|
self._type = self._data['type']
|
||||||
Handler._init_game_config(self._appid)
|
Handler._init_game_config()
|
||||||
Handler._init_mdb(f'{settings.DB_PREFIX}_{Handler.game_config["id_name"]}')
|
Handler._init_mdb(f'{settings.DB_PREFIX}_{Handler.game_config[appid]["id_name"]}')
|
||||||
self.secret_token = Handler.game_config.get('secret_token', '')
|
self.secret_token = Handler.game_config[appid].get('secret_token', '')
|
||||||
self._nt = arrow.now(tz=Handler.game_config.get('timezone', 'local')).timestamp
|
self._nt = arrow.now(tz=Handler.game_config[appid].get('timezone', 'local')).timestamp
|
||||||
|
|
||||||
|
print(Handler.game_config)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _init_mdb(cls, dbname):
|
def _init_mdb(cls, dbname):
|
||||||
@ -44,9 +47,11 @@ class Handler(metaclass=ABCMeta):
|
|||||||
coll.update_one(where, {'$set': update_data}, upsert=True)
|
coll.update_one(where, {'$set': update_data}, upsert=True)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _init_game_config(cls, appid):
|
def _init_game_config(cls):
|
||||||
if not cls.game_config:
|
if not cls.game_config:
|
||||||
cls.game_config = settings.admin_game_coll.find_one({'appid': appid})
|
topic_list = settings.kafka_setting['topic_name']
|
||||||
|
data = settings.admin_game_coll.find({'appid': {'$in': topic_list}}, {'_id': False})
|
||||||
|
cls.game_config = {item['appid']:item for item in data}
|
||||||
return cls.game_config
|
return cls.game_config
|
||||||
|
|
||||||
def handler_event(self):
|
def handler_event(self):
|
||||||
|
7
main.py
7
main.py
@ -4,7 +4,7 @@ import time
|
|||||||
import ssl
|
import ssl
|
||||||
|
|
||||||
from kafka import KafkaConsumer
|
from kafka import KafkaConsumer
|
||||||
from loguru import logger
|
from setting import logger
|
||||||
|
|
||||||
from handler import handle_factory
|
from handler import handle_factory
|
||||||
from setting import settings
|
from setting import settings
|
||||||
@ -31,11 +31,12 @@ def main():
|
|||||||
sasl_plain_username=conf['sasl_plain_username'],
|
sasl_plain_username=conf['sasl_plain_username'],
|
||||||
sasl_plain_password=conf['sasl_plain_password'],
|
sasl_plain_password=conf['sasl_plain_password'],
|
||||||
value_deserializer=json.loads)
|
value_deserializer=json.loads)
|
||||||
consumer.subscribe((conf['topic_name'],))
|
consumer.subscribe(conf['topic_name'])
|
||||||
for msg in consumer:
|
for msg in consumer:
|
||||||
st = time.time() * 1000
|
st = time.time() * 1000
|
||||||
|
key = msg.key
|
||||||
data = msg.value
|
data = msg.value
|
||||||
obj = handle_factory(data)
|
obj = handle_factory(key, data)
|
||||||
if not obj:
|
if not obj:
|
||||||
continue
|
continue
|
||||||
obj.run()
|
obj.run()
|
||||||
|
14
setting.py
14
setting.py
@ -3,6 +3,9 @@ import os
|
|||||||
import pymongo
|
import pymongo
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="INFO",
|
||||||
|
rotation="100 MB", retention='7 days',
|
||||||
|
enqueue=True)
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
@ -10,8 +13,8 @@ class Config:
|
|||||||
'sasl_plain_username': 'legu666666',
|
'sasl_plain_username': 'legu666666',
|
||||||
'sasl_plain_password': '5EINbjyI',
|
'sasl_plain_password': '5EINbjyI',
|
||||||
'bootstrap_servers': ["39.104.71.242:9093", "39.104.71.78:9093", "39.104.67.122:9093"],
|
'bootstrap_servers': ["39.104.71.242:9093", "39.104.71.78:9093", "39.104.67.122:9093"],
|
||||||
'topic_name': 'legu_geshouccs',
|
'topic_name': os.environ.get('topic_name').split(','),
|
||||||
'consumer_id': 'geshouccs_consumer'
|
'consumer_id': os.environ.get('consumer_id')
|
||||||
}
|
}
|
||||||
|
|
||||||
local_mongo_uri = os.environ["local_mongo_uri"]
|
local_mongo_uri = os.environ["local_mongo_uri"]
|
||||||
@ -23,16 +26,9 @@ class Config:
|
|||||||
class Production(Config):
|
class Production(Config):
|
||||||
DB_PREFIX = 'game'
|
DB_PREFIX = 'game'
|
||||||
|
|
||||||
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="INFO",
|
|
||||||
rotation="100 MB", retention='7 days',
|
|
||||||
enqueue=True)
|
|
||||||
|
|
||||||
|
|
||||||
class Debug(Config):
|
class Debug(Config):
|
||||||
DB_PREFIX = 'debug'
|
DB_PREFIX = 'debug'
|
||||||
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="DEBUG",
|
|
||||||
rotation="100 MB", retention='7 days',
|
|
||||||
enqueue=True)
|
|
||||||
|
|
||||||
|
|
||||||
settings = Production if os.environ.get('xlegudata_env') == 'production' else Debug
|
settings = Production if os.environ.get('xlegudata_env') == 'production' else Debug
|
||||||
|
Loading…
Reference in New Issue
Block a user