This commit is contained in:
wuaho 2021-04-21 18:05:14 +08:00
parent 091e9d673c
commit 1aecea0ac2
6 changed files with 107 additions and 0 deletions

1
.gitignore vendored
View File

@ -129,3 +129,4 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
.idea

13
Pipfile Normal file
View File

@ -0,0 +1,13 @@
[[source]]
url = "https://pypi.douban.com/simple"
verify_ssl = false
name = "pypi"
[packages]
kafka-python = "*"
clickhouse-driver = "*"
[dev-packages]
[requires]
python_version = "3.8"

0
__init__.py Normal file
View File

48
common.py Normal file
View File

@ -0,0 +1,48 @@
import random
from kafka import KafkaConsumer
from kafka import KafkaProducer
from settings import settings
__all__ = 'flat_data', 'producer', 'consumer'
producer_client = KafkaProducer(**settings.KAFKA_PRODUCER_CONF)
struct_dict = {}
def consumer():
c = KafkaConsumer(**settings.KAFKA_CONSUMER_CONF)
c.subscribe(settings.SUBSCRIBE_TOPIC)
for msg in c:
topic = msg.topic
val = msg.value
yield topic, val
def producer(topic, data):
producer_client.send(topic, data, partition=random.randint(0, 15))
def flat_data(data: dict):
"""
一层kv 去掉#
:param data:
:return:
"""
res = dict()
for k, v in data.items():
if k.startswith('#'):
k = k[1:]
if isinstance(v, dict):
res.update(flat_data(v))
else:
res[k] = v
if 'uuid' in res:
del res['uuid']
if 'event_time' not in res and 'time' in res:
res['event_time'] = res['time']
return res

17
main.py Normal file
View File

@ -0,0 +1,17 @@
import traceback
from common import *
from settings import settings
def run():
for topic, msg in consumer():
try:
data = flat_data(msg)
legu_topic = settings.TOPIC_TO_LEGU.get(data['app_id'])
producer(legu_topic, data)
except Exception as e:
traceback.print_exc()
if __name__ == '__main__':
run()

28
settings.py Normal file
View File

@ -0,0 +1,28 @@
import json
class Config:
SUBSCRIBE_TOPIC = ['test', 'test2']
KAFKA_CONSUMER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_deserializer': json.loads,
'group_id': 'ta2legu'
}
KAFKA_PRODUCER_CONF = {
'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"],
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
}
TOPIC_TO_LEGU = {
'a77703e24e6643d08b74a4163a14f74c': 'legu_test',
'c3e0409ac18341149877b08f087db640': 'legu_test'
}
class Debug(Config):
pass
settings = Debug