From cfb3948c5cd234657bcf554a0a48a920a588de91 Mon Sep 17 00:00:00 2001 From: wuaho Date: Sat, 9 Oct 2021 10:36:52 +0800 Subject: [PATCH] 'init' --- .gitignore | 2 + app.py | 74 +++++++++++++ core/__init__.py | 1 + core/config.py | 246 ++++++++++++++++++++++++++++++++++++++++++++ handler/__init__.py | 0 test/__init__.py | 0 test/check_api.py | 64 ++++++++++++ test/to_shoumeng.py | 97 +++++++++++++++++ timer_defmo.py | 19 ++++ utils/__init__.py | 2 + utils/consumer.py | 20 ++++ utils/post_data.py | 86 ++++++++++++++++ 12 files changed, 611 insertions(+) create mode 100644 app.py create mode 100644 core/__init__.py create mode 100644 core/config.py create mode 100644 handler/__init__.py create mode 100644 test/__init__.py create mode 100644 test/check_api.py create mode 100644 test/to_shoumeng.py create mode 100644 timer_defmo.py create mode 100644 utils/__init__.py create mode 100644 utils/consumer.py create mode 100644 utils/post_data.py diff --git a/.gitignore b/.gitignore index 13d1490..6520e2a 100644 --- a/.gitignore +++ b/.gitignore @@ -129,3 +129,5 @@ dmypy.json # Pyre type checker .pyre/ +.idea/ + diff --git a/app.py b/app.py new file mode 100644 index 0000000..89180ba --- /dev/null +++ b/app.py @@ -0,0 +1,74 @@ +import copy +import datetime + +import schedule + +from utils import create_consumer, PostData +from core import settings + +consumer, client = create_consumer() + + +def handler_os(properties): + os_: str = properties.get('#os', 'Android') + if os_.lower() == 'ios': + properties['#os'] = 102 + properties['#os'] = 101 + + +def handler_money(properties): + properties['unitPrice'] = int(properties.get('unitPrice', 0) / 100) + + +def handler_userid(properties): + properties['user_id'] = properties['binduid'] + + +def run(): + post_data = PostData() + schedule.every(60).seconds.do(post_data.post) + for msg in consumer(): + data: dict = msg.value + if data.get('#type') != 'track': + continue + data.update(data.get('properties', {})) + data.pop('properties') + + if data.get('game') != 'xiangsu': + continue + + event_name = data.get('#event_name') + # if event_name == 'ping': + # continue + + if event_name not in settings.legu_to_sm_event: + continue + + # if data.get('owner_name') != 'shoumeng': + # continue + + send_data = copy.deepcopy(settings.template_data) + + handler_os(data) + handler_money(data) + handler_userid(data) + + send_data['event']['event_name'] = settings.legu_to_sm_event[event_name] + send_data['event']['event_time'] = str(int( + datetime.datetime.strptime(data['#event_time'], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)) + + for k, v in settings.legu_to_sm_attr_base.items(): + send_data['data'][v] = data.get(k) + + for k, v in settings.legu_to_sm_attr.get(event_name, dict()).items(): + send_data['data'][v] = data.get(k) + + # print(send_data) + if post_data.is_upload(): + post_data.post() + post_data.add(send_data) + schedule.run_pending() + + +if __name__ == '__main__': + run() diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..156144f --- /dev/null +++ b/core/__init__.py @@ -0,0 +1 @@ +from .config import settings \ No newline at end of file diff --git a/core/config.py b/core/config.py new file mode 100644 index 0000000..df66e9b --- /dev/null +++ b/core/config.py @@ -0,0 +1,246 @@ +# coding:utf-8 +from __future__ import annotations + +import json +from typing import Optional, Union + +from pydantic import BaseSettings, Field + + +class GlobalConfig(BaseSettings): + """Global configurations.""" + + # This variable will be loaded from the .env file. However, if there is a + # shell environment variable having the same name, that will take precedence. + + # the class Field is necessary while defining the global variables + ENV_STATE: Optional[str] = Field(..., env="ENV_STATE") + + class Config: + """Loads the dotenv file.""" + + env_file: str = ".env" + + # 事件映射 legu->shoumeng + legu_to_sm_event = {'pay': 'role_pay', + 'create_account': 'role_create', + 'login': 'role_login', + 'animation': 'animation', + 'guide': 'guide_flow', + 'ping': 'role_logout', + 'level_up': 'role_rank', + 'vip_level': 'vip_rank', + 'fight': 'role_power', + 'mission_completed': 'task_flow', + 'enter_stage': 'dungeon_flow', + 'resource_change': 'item_flow', + 'summon': 'recruit', + 'ask_for_join_guild': 'guild_apply', + 'leave_guild': 'guild_quit', + 'create_guild': 'guild_set', + 'get_hero': 'hero_get', + 'hero_up': 'hero_rank', + 'hero_star_up': 'hero_advanced', + 'hero_skill_get': 'hero_skill_get', + 'hero_equip': 'equip_get', + 'equip_level_up': 'hero_equip_rank', + 'rune_forge': 'rune_flow', + 'event_participate': 'activity_flow', + 'home_rank': 'home_rank', + 'fishing_flow': 'fishing_flow'} + # 公共属性映射 legu->shoumeng + legu_to_sm_attr_base = { + # 基础属性 + 'channel': 'channel_name', + 'svrindex': 'game_server', + '#os': 'platform_id', + 'binduid': 'sm_user_id', + 'user_id': 'user_id', # 避免重复key,使用的是binduid + '#account_id': 'role_id', + 'role_name': 'role_name', + 'ghid': 'guild_id', + 'ghname': 'guild', + 'zhanli': 'power', + 'lv': 'role_rank', + 'vip': 'role_vip', + # '':'castle_rank', # 城堡等级 + 'exp': 'exp' + + } + # 事件属性映射 + legu_to_sm_attr = {'pay': {'unitPrice': 'pay_amount', + 'orderid': 'pay_product_id', + 'proid': 'pay_product', + 'pay_product_json': 'pay_product_json'}, + 'create_account': {}, + 'login': {}, + 'ta_app_end': {}, + 'animation': {'animation_seconds': 'animation_seconds', 'result': 'result'}, + 'guide': {'step_id': 'guide_id', 'step_name': 'guide_name'}, + 'ping': {'online_ts': 'online_time'}, + 'level_up': {'role_level': 'rank', + 'before_level': 'rank_before', + 'after_level': 'rank_change'}, + 'vip_level': {'before_vip': 'vip_rank_before', + 'after_vip': 'vip_rank_before'}, + 'fight': {'fighting_capacity': 'power', + 'fighting_before': 'power_before', + 'after_fighting': 'power_change'}, + 'mission_completed': {'mission_type': 'task_type', + 'mission_id': 'task_id', + 'mission_name': 'task_name'}, + 'enter_stage': {'stage_id': 'dungeon_id', + 'stage_name': 'dungeon_name', + 'stage_type': 'dungeon_type'}, + 'end_stage': {'reward_list': 'dungeon_result', 'chapter': 'chapter'}, + 'shopping': {}, + 'idle_reward': {}, + 'online_reward': {}, + 'sign': {}, + 'resource_change': {'resource_id': 'item_id', + 'resource_name': 'item_name', + 'change_type': 'item_type', + 'change_num': 'count_change', + 'change_before': 'count_before', + 'change_after': 'count', + 'change_reason': 'sub_season'}, + 'summon': {'summon_type': 'recruit_type', + 'item_list': 'item_id', + 'item_name': 'reward_name', + 'summon_cost_type': 'item_name', + 'summon_cost_num': 'cost_count'}, + 'ask_for_join_guild': {'guild_level': 'guiild_rank', + 'guild_id': 'guild_id', + 'guild_name': 'guild_name', + 'guild_position': 'guild_position'}, + 'leave_guild': {'guild_level': 'guiild_rank', + 'guild_id': 'guild_id', + 'guild_name': 'guild_name', + 'guild_position': 'guild_position'}, + 'create_guild': {'guild_level': 'guiild_rank', + 'guild_id': 'guild_id', + 'guild_name': 'guild_name', + 'guild_fighting_capacity': 'guild_position'}, + 'guild_donate': {}, + 'get_hero': {'hero_name': 'hero_name', + 'hero_job': 'hero_type', + 'get_type': 'reason'}, + 'hero_up': {'hero_name': 'hero_name', + 'hero_job': 'hero_type', + 'original_level': 'hero_rank_before', + 'new_level': 'hero_rank_before', + 'item_name': 'item_name', + 'cost_count': 'cost_count'}, + 'hero_star_up': {'hero_name': 'hero_name', + 'hero_job': 'hero_type', + 'before_awake': 'hero_rank_before', + 'after_awake': 'hero_rank_before', + 'item_name': 'item_name', + 'cost_count': 'cost_count'}, + 'hero_skill_get': {'hero_name': 'hero_name', + 'hero_job': 'hero_job', + 'skill_id': 'skill_id', + 'skill_name': 'skill_name', + 'reason': 'reason', + 'sub_season': 'sub_season'}, + 'hero_equip': {'equip_id': 'equip_id', + 'after_fight': 'equip_name', + 'equip_quality': 'equip_quality'}, + 'equip_level_up': {'equip_name': 'equip_name', + 'equip_quality': 'equip_id', + 'strength_level': 'equip_rank_before', + 'equip_rank': 'equip_rank'}, + 'rune_forge': {'rune_id': 'rune_id', + 'rune_type': 'rune_type', + 'rune_name': 'rune_name'}, + 'event_participate': {'activity_id': 'activity_id', + 'activity_name': 'activity_name', + 'activity_type': 'activity_type'}, + 'event_centered': {}, + 'event_finish': {'mission_start_time': 'activity_reward_content'}, + 'home_rank': {'home_rank_before': 'home_rank_before', + 'home_rank': 'home_rank_before'}, + 'fishing_flow': {'result': 'result', + 'reward_content': 'reward_content', + 'fish_time_rest': 'fish_time_rest'}, + 'training_room': {}, + 'plant': {}, + 'loot': {}, + 'harvest': {}, + 'banquet': {}, + 'home_hero_list': {}, + 'mine_layer': {}, + 'decoration_recycle': {}} + # 手盟上报模板 + template_data = { + "cp_game_id": 682, + "category": "cp_api", + "event": { + 'event_access': 'realtime', + "event_time": 0, + "event_name": "" + }, + "data": { + + } + } + + +class DevConfig(GlobalConfig): + """Development configurations.""" + SM_API = 'https://data.910app.com/cp_test' + SM_CHECK_API = 'http://datacheck.910app.com:8099/cp/check' + + KAFKA_CONSUMER_CONF: dict = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_deserializer': json.loads, + 'auto_offset_reset': 'earliest', + # 'enable_auto_commit': True, + # 'auto_commit_interval_ms': 10000, + + # 每个游戏不一样 + 'group_id': 'shoumeng_xiangsu_debug' # 'shou_meng_xiangsu' + } + SUBSCRIBE_TOPIC: str = 'debug' + + class Config: + env_prefix: str = "DEV_" + + +class ProdConfig(GlobalConfig): + """Production configurations.""" + + SM_API = 'https://data.910app.com/cp' + + KAFKA_CONSUMER_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_deserializer': json.loads, + 'auto_offset_reset': 'earliest', + # 'enable_auto_commit': True, + # 'auto_commit_interval_ms': 10000, + + # 每个游戏不一样 + 'group_id': 'shou_meng_xiangsu2' # 'shou_meng_xiangsu' + } + SUBSCRIBE_TOPIC = 'xiangsu' + + class Config: + env_prefix: str = "PROD_" + + +class FactoryConfig: + """Returns a config instance dependending on the ENV_STATE variable.""" + + def __init__(self, env_state: Optional[str]) -> None: + self.env_state = env_state + + def __call__(self) -> Union[DevConfig, ProdConfig]: + if self.env_state == "dev": + return DevConfig() + + elif self.env_state == "prod": + return ProdConfig() + + +settings = FactoryConfig(GlobalConfig().ENV_STATE)() +print(11) diff --git a/handler/__init__.py b/handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/check_api.py b/test/check_api.py new file mode 100644 index 0000000..56258d2 --- /dev/null +++ b/test/check_api.py @@ -0,0 +1,64 @@ +# coding:utf-8 + +import gzip +import json +import base64 +import time + +import requests + +from core import settings + +data = { + "cp_game_id": 682, + "category": "cp_api", + "event": { + "event_time": f"{int(time.time() * 1000)}", + "event_name": "role_rank" + }, + "data": { + "utc_time": 1506054735456, + "game_server": 1, + "platform_id": 101, + # "sm_user_id": "test_123456", + # "user_id": "14444444", + # "role_id": "14362455", + # "role_name": "无名", + # "role": "狂暴战士", + # "school": "玄月宗", + # "combat": 156784, + # "role_vip": 2, + # "before_rank": 2, + # "role_rank": 3 + + } +} + + +def run(): + print('-' * 30) + + post_data = [] + for i in range(2): + post_data.append(data) + print(json.dumps(post_data)) + gzip_data = gzip.compress(json.dumps(post_data).encode()) + base64_data = base64.b64encode(gzip_data) + + # resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False) + + proxies = { + 'http': '127.0.0.1:8899', + 'https': '127.0.0.1:8899' + } + resp = requests.post(settings.SM_API, base64_data, verify=False, proxies=proxies) + try: + resp_json = resp.json() + print(resp_json) + + except: + print(resp.text) + + +if __name__ == '__main__': + run() diff --git a/test/to_shoumeng.py b/test/to_shoumeng.py new file mode 100644 index 0000000..86f8223 --- /dev/null +++ b/test/to_shoumeng.py @@ -0,0 +1,97 @@ +import copy +import datetime + +from utils import create_consumer + +consumer, client = create_consumer() + +attr_default = dict() + +# ¼ӳ legu->shoumeng +legu_to_sm_event = { + 'create_account': 'role_create', + # '':'role_logout', # ɫdz־ + 'login': 'role_login', + 'level_up': 'role_rank', + 'pay': 'role_pay', +} +# ӳ legu->shoumeng +legu_to_sm_attr_base = { + # + 'channel': 'channel_name', + 'svrindex': 'game_server', + '#os': 'platform_id', + # '':'sm_user_id', # ˺ID + 'binduid': 'user_id', + '#account_id': 'role_id', + 'role_name': 'role_name', + 'ghid': 'guild_id', + 'ghname': 'guild', + 'zhanli': 'power', + 'lv': 'role_rank', + 'vip': 'role_vip', + # '':'castle_rank', # DZȼ + 'exp': 'exp' + +} +legu_to_sm_attr = { + # ȼ + # '':'rank_before', + 'lv': 'rank', + # '':'rank_change', + +} + +template_data = { + "cp_game_id": 682, + "category": "cp_api", + 'event_access': 'realtime', + "event": { + "event_time": 0, + "event_name": "" + }, + "data": { + + } +} + + +def handler_os(properties): + os_: str = properties.get('#os', 'Android') + if os_.lower() == 'ios': + properties['#os'] = 102 + properties['#os'] = 101 + + +for msg in consumer(): + data: dict = msg.value + if data.get('#type') != 'track': + continue + data.update(data.get('properties', {})) + data.pop('properties') + + event_name = data.get('#event_name') + + if event_name not in legu_to_sm_event: + continue + + data.update(attr_default) + + # if data.get('owner_name') != 'shoumeng': + # continue + + send_data = copy.deepcopy(template_data) + + handler_os(data) + + send_data['event']['event_name'] = legu_to_sm_event[event_name] + send_data['event']['event_time'] = int( + datetime.datetime.strptime(data['#event_time'], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) + + for k, v in legu_to_sm_attr_base.items(): + send_data['data'][v] = data.get(k) + + for k, v in legu_to_sm_attr.items(): + send_data['data'][v] = data.get(k) + + print(send_data) diff --git a/timer_defmo.py b/timer_defmo.py new file mode 100644 index 0000000..ac8b208 --- /dev/null +++ b/timer_defmo.py @@ -0,0 +1,19 @@ +import schedule +import time + + +def job(): + print("I'm working...") + print(int(time.time())) + + +schedule.every(3).seconds.do(job) +schedule.every(1).minutes.do(job) +schedule.every().hour.do(job) +schedule.every().day.at("10:30").do(job) +schedule.every().monday.do(job) +schedule.every().wednesday.at("13:15").do(job) + +while True: + schedule.run_pending() + time.sleep(1) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..3eb4741 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,2 @@ +from .consumer import * +from .post_data import PostData \ No newline at end of file diff --git a/utils/consumer.py b/utils/consumer.py new file mode 100644 index 0000000..3f9985a --- /dev/null +++ b/utils/consumer.py @@ -0,0 +1,20 @@ +from kafka import KafkaConsumer +from kafka import TopicPartition + +__all__ = 'create_consumer', + +from core import settings + + +def create_consumer(partition=-1): + client = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) + + def consumer(): + if partition > 0: + client.assign([TopicPartition(settings.SUBSCRIBE_TOPIC, partition)]) + else: + client.subscribe([settings.SUBSCRIBE_TOPIC]) + for msg in client: + yield msg + + return consumer, client diff --git a/utils/post_data.py b/utils/post_data.py new file mode 100644 index 0000000..03b2fb3 --- /dev/null +++ b/utils/post_data.py @@ -0,0 +1,86 @@ +# coding:utf-8 + +import gzip +import json +import base64 +import time + +import requests + +from core import settings + +requests.packages.urllib3.disable_warnings() + + +class PostData: + def __init__(self): + self.data_list = [] + + def add(self, data): + self.data_list.append(data) + + def get_data_length(self): + return len(self.data_list) + + def is_upload(self): + return len(self.data_list) > 900 + + def post(self): + # print(int(time.time())) + # print(len(self.data_list)) + # print(json.dumps(post_data)) + if not self.data_list: + return + gzip_data = gzip.compress(json.dumps(self.data_list).encode()) + base64_data = base64.b64encode(gzip_data) + # resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False) + + # proxies = { + # 'http': '127.0.0.1:8899', + # 'https': '127.0.0.1:8899' + # } + + try: + print(json.dumps(self.data_list)) + print('*' * 50) + resp = requests.post(settings.SM_API, base64_data, verify=False) + # resp = requests.post(settings.SM_CHECK_API, base64_data, verify=False) + resp_text = resp.text + print(resp_text) + + + except Exception as e: + print(e) + finally: + self.clear_data() + + def clear_data(self): + self.data_list.clear() + + +if __name__ == '__main__': + pass +# data = { +# "cp_game_id": 682, +# "category": "cp_api", +# "event": { +# "event_time": f"{int(time.time() * 1000)}", +# "event_name": "role_rank" +# }, +# "data": { +# "utc_time": 1506054735456, +# "game_server": 1, +# "platform_id": 101, +# # "sm_user_id": "test_123456", +# # "user_id": "14444444", +# # "role_id": "14362455", +# # "role_name": "无名", +# # "role": "狂暴战士", +# # "school": "玄月宗", +# # "combat": 156784, +# # "role_vip": 2, +# # "before_rank": 2, +# # "role_rank": 3 +# +# } +# }