import json import os import time import ssl from kafka import KafkaConsumer from setting import logger from handler import handle_factory from setting import settings def main(): env = os.environ.get('xlegudata_env') or 'debug' logger.info(f'当前处于:{env}') conf = settings.kafka_setting context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = False context.load_verify_locations("ca-cert") consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], api_version=(0, 10, 2), session_timeout_ms=25000, max_poll_records=100, fetch_max_bytes=1 * 1024 * 1024, security_protocol='SASL_SSL', sasl_mechanism="PLAIN", ssl_context=context, sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password'], value_deserializer=json.loads) consumer.subscribe((conf['topic_name'],)) for msg in consumer: st = time.time() * 1000 key = msg.key data = msg.value obj = handle_factory(key, data) if not obj: continue obj.run() logger.debug(time.time() * 1000 - st) if __name__ == '__main__': main()