閱讀980 返回首頁    go Python


如何在kafka-python和confluent-kafka之間做出選擇?

作者:Russell Jurney

在Data Syndrome,我們使用並喜愛Kafka。它使我們能夠以最少的努力和複雜性將批處理變為實時處理。然而,在最近的一個項目中,我們學到了有關kafka-python軟件包的慘痛教訓,該軟件包促使我思考該如何選擇開源工具。本文將反思我們的開源決策過程,介紹兩個用於Python的Kafka客戶端、我們遇到的問題及我們將采用的解決方案。

kafka-python:蠻荒的西部

kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控製台Consumer的分析驗證了這一點。

需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下麵這樣:

kafka_consumer = KafkaConsumer( topic, enable_auto_commit=True, group_id=group_id, bootstrap_servers=config.kafka.host, api_version=(0, 10), security_protocol='SSL', ssl_check_hostname=True, ssl_cafile=config.kafka.ca_pem, ssl_certfile=config.kafka.service_cert, ssl_keyfile=config.kafka.service_key ) for message in kafka_consumer: application_message = json.loads(message.value.decode()) ...

當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:

while True: raw_messages = consumer.poll(timeout_ms=1000, max_records=5000) for topic_partition, messages in raw_messages.items(): application_message = json.loads(message.value.decode()) ...

雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。

confluent-kafka:企業支持

發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當“由非正式社區擁有或支持”這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟件質量,而且可以選擇買或不買商業支持,這一點真是太棒了。

用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上麵提到的訪問kafka-python的變通方案。

kafka_consumer = Consumer( { "api.version.request": True, "enable.auto.commit": True, "group.id": group_id, "bootstrap.servers": config.kafka.host, "security.protocol": "ssl", "ssl.ca.location": config.kafka.ca_pem, "ssl.certificate.location": config.kafka.service_cert, "ssl.key.location": config.kafka.service_key, "default.topic.config": {"auto.offset.reset": "smallest"} } ) consumer.subscribe([topic]) # Now loop on the consumer to read messages running = True while running: message = kafka_consumer.poll() application_message = json.load(message.value.decode()) kafka_consumer.close()

現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。

開源治理

開源是強大的,但是涉及到複雜的“大數據”和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控製權。你的意見可能無關緊要,除非你是付費客戶。

理想情況是采取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟件來說根本不可能。限製自己隻使用那些公司蓋章批準後的工具將非常限製你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且隻專心做一件事,我就會使用它。

信任開源

對於更大型的工具,以上決策評估過程更為複雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。

但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟件。社區在目標、經驗和開源項目的投入時間方麵各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。

End.

最後更新:2017-10-08 20:10:55

  上一篇:go Python成為高收入國家增長最快的語言
  下一篇:go 代碼實現!教學視頻!Python學習者最易上手的機器學習漫遊指南