From a13782e5320cdf956c7e32df661f21a45f83bac6 Mon Sep 17 00:00:00 2001 From: wuhao <15392746632@qq.com> Date: Fri, 13 Nov 2020 15:27:13 +0800 Subject: [PATCH] init --- .gitignore | 2 + ca-cert | 20 +++++++++ handler/__init__.py | 15 +++++++ handler/handle_event.py | 30 +++++++++++++ handler/handle_paylist.py | 15 +++++++ handler/handle_user.py | 15 +++++++ handler/handler.py | 95 +++++++++++++++++++++++++++++++++++++++ main.py | 44 ++++++++++++++++++ setting.py | 35 +++++++++++++++ 9 files changed, 271 insertions(+) create mode 100644 ca-cert create mode 100644 handler/__init__.py create mode 100644 handler/handle_event.py create mode 100644 handler/handle_paylist.py create mode 100644 handler/handle_user.py create mode 100644 handler/handler.py create mode 100644 main.py create mode 100644 setting.py diff --git a/.gitignore b/.gitignore index 13d1490..ff4821b 100644 --- a/.gitignore +++ b/.gitignore @@ -129,3 +129,5 @@ dmypy.json # Pyre type checker .pyre/ +.idea + diff --git a/ca-cert b/ca-cert new file mode 100644 index 0000000..e2c11f0 --- /dev/null +++ b/ca-cert @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDPDCCAqWgAwIBAgIJAMRsb0DLM1fsMA0GCSqGSIb3DQEBBQUAMHIxCzAJBgNV +BAYTAkNOMQswCQYDVQQIEwJIWjELMAkGA1UEBxMCSFoxCzAJBgNVBAoTAkFCMRAw +DgYDVQQDEwdLYWZrYUNBMSowKAYJKoZIhvcNAQkBFht6aGVuZG9uZ2xpdS5semRA +YWxpYmFiYS5jb20wIBcNMTcwMzA5MTI1MDUyWhgPMjEwMTAyMTcxMjUwNTJaMHIx +CzAJBgNVBAYTAkNOMQswCQYDVQQIEwJIWjELMAkGA1UEBxMCSFoxCzAJBgNVBAoT +AkFCMRAwDgYDVQQDEwdLYWZrYUNBMSowKAYJKoZIhvcNAQkBFht6aGVuZG9uZ2xp +dS5semRAYWxpYmFiYS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALZV +bbIO1ULQQN853BTBgRfPiRJaAOWf38u8GC0TNp/E9qtI88A+79ywAP17k5WYJ7XS +wXMOJ3h1qkQT2TYJVetZ6E69CUJq4BsOvNlNRvmnW6eFymh5QZsEz2MTooxJjVjC +JQPlI2XRDjIrTVYEQWUDxj2JhB8VVPEed+6u4KQVAgMBAAGjgdcwgdQwHQYDVR0O +BBYEFHFlOoiqQxXanVi2GUoDiKDD33ujMIGkBgNVHSMEgZwwgZmAFHFlOoiqQxXa +nVi2GUoDiKDD33ujoXakdDByMQswCQYDVQQGEwJDTjELMAkGA1UECBMCSFoxCzAJ +BgNVBAcTAkhaMQswCQYDVQQKEwJBQjEQMA4GA1UEAxMHS2Fma2FDQTEqMCgGCSqG +SIb3DQEJARYbemhlbmRvbmdsaXUubHpkQGFsaWJhYmEuY29tggkAxGxvQMszV+ww +DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQBTSz04p0AJXKl30sHw+UM/ +/k1jGFJzI5p0Z6l2JzKQYPP3PfE/biE8/rmiGYEenNqWNy1ZSniEHwa8L/Ux98ci +4H0ZSpUrMo2+6bfuNW9X35CFPp5vYYJqftilJBKIJX3C3J1ruOuBR28UxE42xx4K +pQ70wChNi914c4B+SxkGUg== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/handler/__init__.py b/handler/__init__.py new file mode 100644 index 0000000..2cae881 --- /dev/null +++ b/handler/__init__.py @@ -0,0 +1,15 @@ +from .handle_event import HandlerEvent +from .handle_paylist import HandlerPaylist +from .handle_user import HandlerUser + +HANDLER_DICT = { + 'user': HandlerUser, + 'paylist': HandlerPaylist, + 'event': HandlerEvent, +} + + +def handle_factory(data): + type_ = data['type'] + obj = HANDLER_DICT[type_](data) + return obj diff --git a/handler/handle_event.py b/handler/handle_event.py new file mode 100644 index 0000000..a4d8bb2 --- /dev/null +++ b/handler/handle_event.py @@ -0,0 +1,30 @@ +import arrow + +from .handler import Handler + + +class HandlerEvent(Handler): + @classmethod + def get_coll(cls, *args, **kwargs): + event_date = kwargs.get('event_date') + coll_name = f'event_{event_date}' + return cls.g_mdb[coll_name] + + def handler_event(self): + update_data = self._data.get('props', {}) + if not update_data: + return + event_time = update_data.get('_event_time', None) + if not event_time: + return + event_date = arrow.get(event_time) \ + .to(Handler.game_config.get('timezone', 'local')) \ + .strftime('%Y-%m-%d') + + update_data.update({ + '_ut': self._nt, + '_ct': self._nt, + }) + coll = self.get_coll(event_date=event_date) + + coll.insert(update_data) diff --git a/handler/handle_paylist.py b/handler/handle_paylist.py new file mode 100644 index 0000000..400f606 --- /dev/null +++ b/handler/handle_paylist.py @@ -0,0 +1,15 @@ +from .handler import Handler + + +class HandlerPaylist(Handler): + @classmethod + def get_coll(cls, *args, **kwargs): + return cls.g_mdb['paylist'] + + def get_where(self): + props = self._data['props'] + where = { + '_game_role_id': props.get('_game_role_id'), # 游戏角色id + 'orderid': props.get('orderid'), # 订单 + } + return where diff --git a/handler/handle_user.py b/handler/handle_user.py new file mode 100644 index 0000000..fcaa89a --- /dev/null +++ b/handler/handle_user.py @@ -0,0 +1,15 @@ +from .handler import Handler + + +class HandlerUser(Handler): + @classmethod + def get_coll(cls, *args, **kwargs): + return cls.g_mdb['user'] + + def get_where(self): + props = self._data['props'] + where = { + '_game_role_id': props.get('_game_role_id'), # 游戏角色id + } + return where + diff --git a/handler/handler.py b/handler/handler.py new file mode 100644 index 0000000..93a889f --- /dev/null +++ b/handler/handler.py @@ -0,0 +1,95 @@ +import hashlib +from abc import ABCMeta, abstractmethod +import traceback +import arrow +from loguru import logger + +from setting import settings + + +class Handler(metaclass=ABCMeta): + game_config = None + g_mdb = None + + def __init__(self, data): + self._data = data + self._appid = data['appid'] + self._event = self._data['event'] + self._game = self._data['project'] + self._type = self._data['type'] + Handler._init_game_config(self._appid) + Handler._init_mdb(f'{settings.DB_PREFIX}_{Handler.game_config["id_name"]}') + self.secret_token = Handler.game_config.get('secret_token', '') + self._nt = arrow.now(tz=Handler.game_config.get('timezone', 'local')).timestamp + + @classmethod + def _init_mdb(cls, dbname): + if not cls.g_mdb: + cls.g_mdb = settings.mdb_clint[dbname] + return cls.g_mdb + + def set_handle(self): + """ + 设置数据 更新数据 没有的话创建数据 + + :return: + """ + where = self.get_where() + coll = self.get_coll() + update_data = self._data.get('props', {}) + if update_data: + update_data.update({ + '_ut': self._nt, + }) + coll.update_one(where, {'$set': update_data}, upsert=True) + + @classmethod + def _init_game_config(cls, appid): + if not cls.game_config: + cls.game_config = settings.admin_game_coll.find_one({'appid': appid}) + return cls.game_config + + def handler_event(self): + fun_name = f'{self._event}_handle' + if not hasattr(self, fun_name): + return + fun_name = getattr(self, fun_name) + fun_name() + + def get_where(self): + pass + + @classmethod + @abstractmethod + def get_coll(cls, *args, **kwargs): + pass + + def run(self): + try: + if not self._check_data(): + logger.warning('签名错误') + return + self.handler_event() + except Exception as e: + traceback.print_exc() + logger.error(e) + + def _check_data(self): + if not isinstance(self._data, dict): + return False + if 'sign' not in self._data or 'timestamp' not in self._data: + return False + sign = self._data.get('sign', '') + appid = self._data.get('appid', '') + project = self._data.get('project', '') + + type_ = self._data.get('type', '') + event_act = self._data.get('event_act', '') + event = self._data.get('event', '') + timestamp = self._data.get('timestamp', '') + m = hashlib.md5() + m.update((str(appid) + project + type_ + event + event_act + str(timestamp) + self.secret_token).encode('utf8')) + local_sign = m.hexdigest() + if sign != local_sign: + return False + return True diff --git a/main.py b/main.py new file mode 100644 index 0000000..debcf75 --- /dev/null +++ b/main.py @@ -0,0 +1,44 @@ +import json +import os +import time +import ssl + +from kafka import KafkaConsumer +from loguru import logger + +from handler import handle_factory +from setting import settings + + +def main(): + env = os.environ.get('xlegudata_env') or 'debug' + logger.info(f'当前处于:{env}') + conf = settings.kafka_setting + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.check_hostname = False + context.load_verify_locations("ca-cert") + + consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'], + group_id=conf['consumer_id'], + api_version=(0, 10, 2), + session_timeout_ms=25000, + max_poll_records=100, + fetch_max_bytes=1 * 1024 * 1024, + security_protocol='SASL_SSL', + sasl_mechanism="PLAIN", + ssl_context=context, + sasl_plain_username=conf['sasl_plain_username'], + sasl_plain_password=conf['sasl_plain_password'], + value_deserializer=json.loads) + consumer.subscribe((conf['topic_name'],)) + for msg in consumer: + st = time.time() * 1000 + data = msg.value + obj = handle_factory(data) + obj.run() + logger.debug(time.time() * 1000 - st) + + +if __name__ == '__main__': + main() diff --git a/setting.py b/setting.py new file mode 100644 index 0000000..97ee2f9 --- /dev/null +++ b/setting.py @@ -0,0 +1,35 @@ +import os + +import pymongo +from loguru import logger + +logger.add('log.log', format="{time} {level} {name}:{line} {message}", level="INFO", + rotation="100 MB", retention='7 days', + enqueue=True) + + +class Config: + kafka_setting = { + 'sasl_plain_username': 'legu666666', + 'sasl_plain_password': '5EINbjyI', + 'bootstrap_servers': ["39.104.71.242:9093", "39.104.71.78:9093", "39.104.67.122:9093"], + 'topic_name': 'legu_geshouccs', + 'consumer_id': 'geshouccs_consumer' + } + + # local_mongo_uri = os.environ["local_mongo_uri"] + local_mongo_uri = 'mongodb://root:iamciniao@127.0.0.1:27017/?authSource=admin&readPreference=primary&ssl=false' + mdb_clint = pymongo.MongoClient(local_mongo_uri) + mydb = mdb_clint["admin_game"] + admin_game_coll = mydb['game'] + + +class Production(Config): + DB_PREFIX = 'game' + + +class Debug(Config): + DB_PREFIX = 'debug' + + +settings = Production if os.environ.get('xlegudata_env') == 'production' else Debug