47 lines
1.5 KiB
Python
47 lines
1.5 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import ssl
|
|
|
|
from kafka import KafkaConsumer
|
|
from loguru import logger
|
|
|
|
from handler import handle_factory
|
|
from setting import settings
|
|
|
|
|
|
def main():
|
|
env = os.environ.get('xlegudata_env') or 'debug'
|
|
logger.info(f'当前处于:{env}')
|
|
conf = settings.kafka_setting
|
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
context.check_hostname = False
|
|
context.load_verify_locations("ca-cert")
|
|
|
|
consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
|
|
group_id=conf['consumer_id'],
|
|
api_version=(0, 10, 2),
|
|
session_timeout_ms=25000,
|
|
max_poll_records=100,
|
|
fetch_max_bytes=1 * 1024 * 1024,
|
|
security_protocol='SASL_SSL',
|
|
sasl_mechanism="PLAIN",
|
|
ssl_context=context,
|
|
sasl_plain_username=conf['sasl_plain_username'],
|
|
sasl_plain_password=conf['sasl_plain_password'],
|
|
value_deserializer=json.loads)
|
|
consumer.subscribe((conf['topic_name'],))
|
|
for msg in consumer:
|
|
st = time.time() * 1000
|
|
data = msg.value
|
|
obj = handle_factory(data)
|
|
if not obj:
|
|
continue
|
|
obj.run()
|
|
logger.debug(time.time() * 1000 - st)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|