This commit is contained in:
wuhao 2020-11-13 15:27:13 +08:00
parent 5eee525206
commit a13782e532
9 changed files with 271 additions and 0 deletions

2
.gitignore vendored
View File

@ -129,3 +129,5 @@ dmypy.json
# Pyre type checker
.pyre/
.idea

20
ca-cert Normal file
View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDPDCCAqWgAwIBAgIJAMRsb0DLM1fsMA0GCSqGSIb3DQEBBQUAMHIxCzAJBgNV
BAYTAkNOMQswCQYDVQQIEwJIWjELMAkGA1UEBxMCSFoxCzAJBgNVBAoTAkFCMRAw
DgYDVQQDEwdLYWZrYUNBMSowKAYJKoZIhvcNAQkBFht6aGVuZG9uZ2xpdS5semRA
YWxpYmFiYS5jb20wIBcNMTcwMzA5MTI1MDUyWhgPMjEwMTAyMTcxMjUwNTJaMHIx
CzAJBgNVBAYTAkNOMQswCQYDVQQIEwJIWjELMAkGA1UEBxMCSFoxCzAJBgNVBAoT
AkFCMRAwDgYDVQQDEwdLYWZrYUNBMSowKAYJKoZIhvcNAQkBFht6aGVuZG9uZ2xp
dS5semRAYWxpYmFiYS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALZV
bbIO1ULQQN853BTBgRfPiRJaAOWf38u8GC0TNp/E9qtI88A+79ywAP17k5WYJ7XS
wXMOJ3h1qkQT2TYJVetZ6E69CUJq4BsOvNlNRvmnW6eFymh5QZsEz2MTooxJjVjC
JQPlI2XRDjIrTVYEQWUDxj2JhB8VVPEed+6u4KQVAgMBAAGjgdcwgdQwHQYDVR0O
BBYEFHFlOoiqQxXanVi2GUoDiKDD33ujMIGkBgNVHSMEgZwwgZmAFHFlOoiqQxXa
nVi2GUoDiKDD33ujoXakdDByMQswCQYDVQQGEwJDTjELMAkGA1UECBMCSFoxCzAJ
BgNVBAcTAkhaMQswCQYDVQQKEwJBQjEQMA4GA1UEAxMHS2Fma2FDQTEqMCgGCSqG
SIb3DQEJARYbemhlbmRvbmdsaXUubHpkQGFsaWJhYmEuY29tggkAxGxvQMszV+ww
DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQBTSz04p0AJXKl30sHw+UM/
/k1jGFJzI5p0Z6l2JzKQYPP3PfE/biE8/rmiGYEenNqWNy1ZSniEHwa8L/Ux98ci
4H0ZSpUrMo2+6bfuNW9X35CFPp5vYYJqftilJBKIJX3C3J1ruOuBR28UxE42xx4K
pQ70wChNi914c4B+SxkGUg==
-----END CERTIFICATE-----

15
handler/__init__.py Normal file
View File

@ -0,0 +1,15 @@
from .handle_event import HandlerEvent
from .handle_paylist import HandlerPaylist
from .handle_user import HandlerUser
HANDLER_DICT = {
'user': HandlerUser,
'paylist': HandlerPaylist,
'event': HandlerEvent,
}
def handle_factory(data):
type_ = data['type']
obj = HANDLER_DICT[type_](data)
return obj

30
handler/handle_event.py Normal file
View File

@ -0,0 +1,30 @@
import arrow
from .handler import Handler
class HandlerEvent(Handler):
@classmethod
def get_coll(cls, *args, **kwargs):
event_date = kwargs.get('event_date')
coll_name = f'event_{event_date}'
return cls.g_mdb[coll_name]
def handler_event(self):
update_data = self._data.get('props', {})
if not update_data:
return
event_time = update_data.get('_event_time', None)
if not event_time:
return
event_date = arrow.get(event_time) \
.to(Handler.game_config.get('timezone', 'local')) \
.strftime('%Y-%m-%d')
update_data.update({
'_ut': self._nt,
'_ct': self._nt,
})
coll = self.get_coll(event_date=event_date)
coll.insert(update_data)

15
handler/handle_paylist.py Normal file
View File

@ -0,0 +1,15 @@
from .handler import Handler
class HandlerPaylist(Handler):
@classmethod
def get_coll(cls, *args, **kwargs):
return cls.g_mdb['paylist']
def get_where(self):
props = self._data['props']
where = {
'_game_role_id': props.get('_game_role_id'), # 游戏角色id
'orderid': props.get('orderid'), # 订单
}
return where

15
handler/handle_user.py Normal file
View File

@ -0,0 +1,15 @@
from .handler import Handler
class HandlerUser(Handler):
@classmethod
def get_coll(cls, *args, **kwargs):
return cls.g_mdb['user']
def get_where(self):
props = self._data['props']
where = {
'_game_role_id': props.get('_game_role_id'), # 游戏角色id
}
return where

95
handler/handler.py Normal file
View File

@ -0,0 +1,95 @@
import hashlib
from abc import ABCMeta, abstractmethod
import traceback
import arrow
from loguru import logger
from setting import settings
class Handler(metaclass=ABCMeta):
game_config = None
g_mdb = None
def __init__(self, data):
self._data = data
self._appid = data['appid']
self._event = self._data['event']
self._game = self._data['project']
self._type = self._data['type']
Handler._init_game_config(self._appid)
Handler._init_mdb(f'{settings.DB_PREFIX}_{Handler.game_config["id_name"]}')
self.secret_token = Handler.game_config.get('secret_token', '')
self._nt = arrow.now(tz=Handler.game_config.get('timezone', 'local')).timestamp
@classmethod
def _init_mdb(cls, dbname):
if not cls.g_mdb:
cls.g_mdb = settings.mdb_clint[dbname]
return cls.g_mdb
def set_handle(self):
"""
设置数据 更新数据 没有的话创建数据
:return:
"""
where = self.get_where()
coll = self.get_coll()
update_data = self._data.get('props', {})
if update_data:
update_data.update({
'_ut': self._nt,
})
coll.update_one(where, {'$set': update_data}, upsert=True)
@classmethod
def _init_game_config(cls, appid):
if not cls.game_config:
cls.game_config = settings.admin_game_coll.find_one({'appid': appid})
return cls.game_config
def handler_event(self):
fun_name = f'{self._event}_handle'
if not hasattr(self, fun_name):
return
fun_name = getattr(self, fun_name)
fun_name()
def get_where(self):
pass
@classmethod
@abstractmethod
def get_coll(cls, *args, **kwargs):
pass
def run(self):
try:
if not self._check_data():
logger.warning('签名错误')
return
self.handler_event()
except Exception as e:
traceback.print_exc()
logger.error(e)
def _check_data(self):
if not isinstance(self._data, dict):
return False
if 'sign' not in self._data or 'timestamp' not in self._data:
return False
sign = self._data.get('sign', '')
appid = self._data.get('appid', '')
project = self._data.get('project', '')
type_ = self._data.get('type', '')
event_act = self._data.get('event_act', '')
event = self._data.get('event', '')
timestamp = self._data.get('timestamp', '')
m = hashlib.md5()
m.update((str(appid) + project + type_ + event + event_act + str(timestamp) + self.secret_token).encode('utf8'))
local_sign = m.hexdigest()
if sign != local_sign:
return False
return True

44
main.py Normal file
View File

@ -0,0 +1,44 @@
import json
import os
import time
import ssl
from kafka import KafkaConsumer
from loguru import logger
from handler import handle_factory
from setting import settings
def main():
env = os.environ.get('xlegudata_env') or 'debug'
logger.info(f'当前处于:{env}')
conf = settings.kafka_setting
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = False
context.load_verify_locations("ca-cert")
consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'],
api_version=(0, 10, 2),
session_timeout_ms=25000,
max_poll_records=100,
fetch_max_bytes=1 * 1024 * 1024,
security_protocol='SASL_SSL',
sasl_mechanism="PLAIN",
ssl_context=context,
sasl_plain_username=conf['sasl_plain_username'],
sasl_plain_password=conf['sasl_plain_password'],
value_deserializer=json.loads)
consumer.subscribe((conf['topic_name'],))
for msg in consumer:
st = time.time() * 1000
data = msg.value
obj = handle_factory(data)
obj.run()
logger.debug(time.time() * 1000 - st)
if __name__ == '__main__':
main()

35
setting.py Normal file
View File

@ -0,0 +1,35 @@
import os
import pymongo
from loguru import logger
logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="INFO",
rotation="100 MB", retention='7 days',
enqueue=True)
class Config:
kafka_setting = {
'sasl_plain_username': 'legu666666',
'sasl_plain_password': '5EINbjyI',
'bootstrap_servers': ["39.104.71.242:9093", "39.104.71.78:9093", "39.104.67.122:9093"],
'topic_name': 'legu_geshouccs',
'consumer_id': 'geshouccs_consumer'
}
# local_mongo_uri = os.environ["local_mongo_uri"]
local_mongo_uri = 'mongodb://root:iamciniao@127.0.0.1:27017/?authSource=admin&readPreference=primary&ssl=false'
mdb_clint = pymongo.MongoClient(local_mongo_uri)
mydb = mdb_clint["admin_game"]
admin_game_coll = mydb['game']
class Production(Config):
DB_PREFIX = 'game'
class Debug(Config):
DB_PREFIX = 'debug'
settings = Production if os.environ.get('xlegudata_env') == 'production' else Debug