From 1aecea0ac20103be2344a7a9b2908958d5800c0f Mon Sep 17 00:00:00 2001 From: wuaho Date: Wed, 21 Apr 2021 18:05:14 +0800 Subject: [PATCH] init --- .gitignore | 1 + Pipfile | 13 +++++++++++++ __init__.py | 0 common.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 17 +++++++++++++++++ settings.py | 28 ++++++++++++++++++++++++++++ 6 files changed, 107 insertions(+) create mode 100644 Pipfile create mode 100644 __init__.py create mode 100644 common.py create mode 100644 main.py create mode 100644 settings.py diff --git a/.gitignore b/.gitignore index 13d1490..de0e8b2 100644 --- a/.gitignore +++ b/.gitignore @@ -129,3 +129,4 @@ dmypy.json # Pyre type checker .pyre/ +.idea \ No newline at end of file diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..8993b7d --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +url = "https://pypi.douban.com/simple" +verify_ssl = false +name = "pypi" + +[packages] +kafka-python = "*" +clickhouse-driver = "*" + +[dev-packages] + +[requires] +python_version = "3.8" diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common.py b/common.py new file mode 100644 index 0000000..ebe5025 --- /dev/null +++ b/common.py @@ -0,0 +1,48 @@ +import random + +from kafka import KafkaConsumer +from kafka import KafkaProducer + +from settings import settings + +__all__ = 'flat_data', 'producer', 'consumer' +producer_client = KafkaProducer(**settings.KAFKA_PRODUCER_CONF) + +struct_dict = {} + + + + +def consumer(): + c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF) + c.subscribe(settings.SUBSCRIBE_TOPIC) + for msg in c: + topic = msg.topic + val = msg.value + yield topic, val + + +def producer(topic, data): + producer_client.send(topic, data, partition=random.randint(0, 15)) + + +def flat_data(data: dict): + """ + 一层kv 去掉# + :param data: + :return: + """ + res = dict() + for k, v in data.items(): + if k.startswith('#'): + k = k[1:] + if isinstance(v, dict): + res.update(flat_data(v)) + else: + res[k] = v + + if 'uuid' in res: + del res['uuid'] + if 'event_time' not in res and 'time' in res: + res['event_time'] = res['time'] + return res diff --git a/main.py b/main.py new file mode 100644 index 0000000..adca1a1 --- /dev/null +++ b/main.py @@ -0,0 +1,17 @@ +import traceback +from common import * +from settings import settings + + +def run(): + for topic, msg in consumer(): + try: + data = flat_data(msg) + legu_topic = settings.TOPIC_TO_LEGU.get(data['app_id']) + producer(legu_topic, data) + except Exception as e: + traceback.print_exc() + + +if __name__ == '__main__': + run() diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..9d0a10e --- /dev/null +++ b/settings.py @@ -0,0 +1,28 @@ +import json + + +class Config: + + SUBSCRIBE_TOPIC = ['test', 'test2'] + KAFKA_CONSUMER_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_deserializer': json.loads, + 'group_id': 'ta2legu' + } + KAFKA_PRODUCER_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + } + + TOPIC_TO_LEGU = { + 'a77703e24e6643d08b74a4163a14f74c': 'legu_test', + 'c3e0409ac18341149877b08f087db640': 'legu_test' + } + + + +class Debug(Config): + pass + + +settings = Debug