前言
在之前的文章中,我們提到車(chē)聯(lián)網(wǎng)TSP平臺(tái)擁有很多不同業(yè)務(wù)的主題,并介紹了??如何根據(jù)不同業(yè)務(wù)場(chǎng)景進(jìn)行MQTT主題設(shè)計(jì)??。車(chē)輛會(huì)持續(xù)不斷產(chǎn)生海量的消息,每一條通過(guò)車(chē)聯(lián)網(wǎng)上報(bào)的數(shù)據(jù)都是非常珍貴的,其背后蘊(yùn)藏著巨大的業(yè)務(wù)價(jià)值。因此我們構(gòu)建的車(chē)輛TSP平臺(tái)也通常需要擁有千萬(wàn)級(jí)主題和百萬(wàn)級(jí)消息吞吐能力。
傳統(tǒng)的互聯(lián)網(wǎng)系統(tǒng)很難支撐百萬(wàn)量級(jí)的消息吞吐。在本文中,我們將主要介紹如何針對(duì)百萬(wàn)級(jí)消息吞吐這一需求進(jìn)行新一代車(chē)聯(lián)網(wǎng)平臺(tái)架構(gòu)設(shè)計(jì)。
車(chē)聯(lián)網(wǎng)場(chǎng)景消息吞吐設(shè)計(jì)的關(guān)聯(lián)因素
車(chē)聯(lián)網(wǎng)的消息分為上行和下行。上行消息一般是傳感器及車(chē)輛發(fā)出的告警等消息,把設(shè)備的信息發(fā)送給云端的消息平臺(tái)。下行消息一般有遠(yuǎn)程控制指令集消息和消息推送,是由云端平臺(tái)給車(chē)輛發(fā)送相應(yīng)的指令。
在車(chē)聯(lián)網(wǎng)消息吞吐設(shè)計(jì)中,我們需要重點(diǎn)考慮以下因素:
(1)消息頻率
車(chē)在行駛過(guò)程中,GPS、車(chē)載傳感器等一直不停地在收集消息,為了收到實(shí)時(shí)的反饋信息,其上報(bào)接收的消息也是非常頻繁的。上報(bào)頻率一般在100ms-30s不等,所以當(dāng)車(chē)輛數(shù)量達(dá)到百萬(wàn)量級(jí)時(shí),平臺(tái)就需要支持每秒百萬(wàn)級(jí)的消息吞吐。
(2)消息包大小
消息是通過(guò)各種傳感器來(lái)采集自身環(huán)境和狀態(tài)信息(車(chē)聯(lián)網(wǎng)場(chǎng)景常見(jiàn)的有新能源國(guó)標(biāo)數(shù)據(jù)和企標(biāo)數(shù)據(jù))。整個(gè)消息包大小一般在500B到幾十KB不等。當(dāng)大量消息包同時(shí)上報(bào)時(shí),需要車(chē)聯(lián)網(wǎng)平臺(tái)擁有更強(qiáng)的接收、發(fā)送大消息包的能力。
(3)消息延時(shí)
車(chē)輛在行駛過(guò)程中,消息數(shù)據(jù)只能通過(guò)無(wú)線網(wǎng)絡(luò)來(lái)進(jìn)行傳輸。在大部分車(chē)聯(lián)網(wǎng)場(chǎng)景下,對(duì)車(chē)輛的時(shí)延要求是ms級(jí)別。平臺(tái)在滿足百萬(wàn)級(jí)吞吐條件下,還需要保持低延時(shí)的消息傳輸。
(4)Topic數(shù)量和層級(jí)
在考慮百萬(wàn)級(jí)消息吞吐場(chǎng)景時(shí),還需要針對(duì)消息Topic數(shù)量和Topic樹(shù)層級(jí)進(jìn)行規(guī)范設(shè)計(jì)。
(5)Payload編解碼
當(dāng)消息包比較大的時(shí)候,需要重點(diǎn)考慮消息體的封裝。單純的JSON封裝在消息解析時(shí)不夠高效,可以考慮采用Avro、Protobuf等編碼格式進(jìn)行Payload格式化封裝。
對(duì)于百萬(wàn)級(jí)消息吞吐場(chǎng)景,基于MQTT客戶端共享訂閱消息或通過(guò)規(guī)則引擎實(shí)時(shí)寫(xiě)入關(guān)系型數(shù)據(jù)庫(kù)的傳統(tǒng)架構(gòu)顯然無(wú)法滿足。目前主流的架構(gòu)選型有兩種:一種是消息接入產(chǎn)品/服務(wù)+消息隊(duì)列(Kafka、Pulsar、RabbitMQ、RocketMQ等),另外一種是消息接入產(chǎn)品/服務(wù)+時(shí)序數(shù)據(jù)庫(kù)(InfluxDB、TDengine、Lindorm等)來(lái)實(shí)現(xiàn)。
接下來(lái)我們將基于上述的關(guān)聯(lián)因素和客戶案例的最佳實(shí)踐,以云原生分布式物聯(lián)網(wǎng)消息服務(wù)器EMQX作為消息接入層,分別介紹這兩種架構(gòu)的實(shí)現(xiàn)方式。
EMQX+Kafka構(gòu)建百萬(wàn)級(jí)吞吐車(chē)聯(lián)網(wǎng)平臺(tái)
架構(gòu)設(shè)計(jì)
Kafka作為主流消息隊(duì)列之一,具有持久化數(shù)據(jù)存儲(chǔ)能力,可進(jìn)行持久化操作,同時(shí)可通過(guò)將數(shù)據(jù)持久化到硬盤(pán)以及replication防止數(shù)據(jù)丟失。后端TSP平臺(tái)或者大數(shù)據(jù)平臺(tái)可以批量訂閱想要的消息。
由于Kafka擁有訂閱發(fā)布的能力,既可以從南向接收,把上報(bào)消息緩存起來(lái);又可以通過(guò)北向的連接,把需要發(fā)送的指令通過(guò)接口傳輸給前端,用作指令下發(fā)。
我們以Kafka為例,構(gòu)建EMQX+Kafka百萬(wàn)級(jí)吞吐車(chē)聯(lián)網(wǎng)平臺(tái):
前端車(chē)機(jī)的連接與消息可通過(guò)公有云商提供的負(fù)載均衡產(chǎn)品用作域名轉(zhuǎn)發(fā),如果采用了TLS/DTLS的安全認(rèn)證,可在云上建立四臺(tái)HAProxy/Nginx服務(wù)器作為證書(shū)卸載和負(fù)載均衡使用。
采用10臺(tái)EMQX組成一個(gè)大集群,把一百萬(wàn)的消息吞吐平均分到每個(gè)節(jié)點(diǎn)十萬(wàn)消息吞吐,同時(shí)滿足高可用場(chǎng)景需求。
如有離線離線/消息緩存需求,可選用Redis作為存儲(chǔ)數(shù)據(jù)庫(kù)。
Kafka作為總體消息隊(duì)列,EMQX把全量消息通過(guò)規(guī)則引擎,轉(zhuǎn)發(fā)給后端Kafka集群中。
后端TSP平臺(tái)/OTA等應(yīng)用通過(guò)訂閱Kafka的主題接收相應(yīng)的消息,業(yè)務(wù)平臺(tái)的控制指令和推送消息可通過(guò)Kafka/API的方式下發(fā)到EMQX。
總體架構(gòu)圖
在這一方案架構(gòu)中,EMQX作為消息中間件具有如下優(yōu)勢(shì),可滿足該場(chǎng)景下的需求:
支持千萬(wàn)級(jí)車(chē)輛連接、百萬(wàn)級(jí)消息吞吐能力。
分布式集群架構(gòu),穩(wěn)定可靠,支持動(dòng)態(tài)水平擴(kuò)展。
強(qiáng)大的規(guī)則引擎和數(shù)據(jù)橋接、持久化能力,支持百萬(wàn)級(jí)消息吞吐處理。
擁有豐富API與認(rèn)證等系統(tǒng)能順利對(duì)接。
百萬(wàn)吞吐場(chǎng)景驗(yàn)證
為了驗(yàn)證上述架構(gòu)的吞吐能力,在條件允許的情況下,我們可以通過(guò)以下配置搭建百萬(wàn)級(jí)消息吞吐測(cè)試場(chǎng)景。壓測(cè)工具可以選用BenchmarkTools、JMeter或XMeter測(cè)試平臺(tái)。共模擬100萬(wàn)設(shè)備,每個(gè)設(shè)備分別都有自己的主題,每個(gè)設(shè)備每秒發(fā)送一次消息,持續(xù)壓測(cè)12小時(shí)。
壓測(cè)架構(gòu)圖如下:
性能測(cè)試部分結(jié)果呈現(xiàn):
(1)EMQX集群Dashboard統(tǒng)計(jì)
EMQX規(guī)則引擎中可以看到每個(gè)節(jié)點(diǎn)速度為10萬(wàn)/秒的處理速度,10個(gè)節(jié)點(diǎn)總共100萬(wàn)/秒的速度進(jìn)行。
(2)EMQX規(guī)則引擎統(tǒng)計(jì)
在Kafka中可以看到每秒100萬(wàn)的寫(xiě)入速度,并且一直持續(xù)存儲(chǔ)。
Kafka管理界面統(tǒng)計(jì)
EMQX+InfluxDB構(gòu)建百萬(wàn)級(jí)吞吐車(chē)聯(lián)網(wǎng)平臺(tái)
架構(gòu)設(shè)計(jì)
采用EMQX+時(shí)序數(shù)據(jù)庫(kù)的架構(gòu),同樣可以構(gòu)建百萬(wàn)級(jí)消息吞吐平臺(tái)。在本文我們以InfluxDB時(shí)序數(shù)據(jù)庫(kù)為例。
InfluxDB是一個(gè)高性能的時(shí)序數(shù)據(jù)庫(kù),被廣泛應(yīng)用于存儲(chǔ)系統(tǒng)的監(jiān)控?cái)?shù)據(jù)、IoT行業(yè)的實(shí)時(shí)數(shù)據(jù)等場(chǎng)景。它從時(shí)間維度去記錄消息,具備很強(qiáng)寫(xiě)入和存儲(chǔ)性能,適用于大數(shù)據(jù)和數(shù)據(jù)分析。分析完的數(shù)據(jù)可以提供給后臺(tái)應(yīng)用系統(tǒng)進(jìn)行數(shù)據(jù)支撐。
此架構(gòu)中通過(guò)EMQX規(guī)則引擎進(jìn)行消息轉(zhuǎn)發(fā),InfluxDB進(jìn)行消息存儲(chǔ),對(duì)接后端大數(shù)據(jù)和分析平臺(tái),可以更方便地服務(wù)于時(shí)序分析。
前端設(shè)備的消息通過(guò)云上云廠商的負(fù)載均衡產(chǎn)品用作域名轉(zhuǎn)發(fā)和負(fù)載均衡。
本次采用1臺(tái)EMQX作為測(cè)試,后續(xù)需要時(shí)可以采用多節(jié)點(diǎn)的方式,組成相應(yīng)的集群方案(測(cè)試100萬(wàn)可以部署10臺(tái)EMQX集群)。
如有離線離線/消息緩存需求,可選用Redis作為存儲(chǔ)數(shù)據(jù)庫(kù)。
EMQX把全量消息通過(guò)規(guī)則引擎轉(zhuǎn)發(fā)給后端InfluxDB進(jìn)行數(shù)據(jù)持久化存儲(chǔ)。
后端大數(shù)據(jù)平臺(tái)通過(guò)InfluxDB接收相應(yīng)的消息,對(duì)其進(jìn)行大數(shù)據(jù)分析,分析后再通過(guò)API的方式把想要的信息傳輸?shù)紼MQX。
總體架構(gòu)圖
場(chǎng)景驗(yàn)證
如測(cè)試架構(gòu)圖中所示,XMeter壓力機(jī)模擬10萬(wàn)MQTT客戶端向EMQX發(fā)起連接,新增連接速率為每秒10000,客戶端心跳間隔(KeepAlive)300秒。所有連接成功后每個(gè)客戶端每秒發(fā)送一條QoS為1、Payload為200B的消息,所有消息通過(guò)HTTPInfluxDB規(guī)則引擎橋過(guò)濾篩選并持久化發(fā)至InfluxDB數(shù)據(jù)庫(kù)。
測(cè)試結(jié)果呈現(xiàn)如下:
EMQXDashboard統(tǒng)計(jì):
EMQX規(guī)則引擎統(tǒng)計(jì):
InfluxDB數(shù)據(jù)庫(kù)收到數(shù)據(jù):
EMQXDashboard消息數(shù)統(tǒng)計(jì)
單臺(tái)EMQX服務(wù)器實(shí)現(xiàn)了單臺(tái)服務(wù)器10萬(wàn)TPS的消息吞吐持久化到InfluxDB能力。參考EMQX+Kafka架構(gòu)的測(cè)試場(chǎng)景,將EMQX的集群節(jié)點(diǎn)擴(kuò)展到10臺(tái),就可以支持100萬(wàn)的TPS消息吞吐能力。
結(jié)語(yǔ)
通過(guò)本文,我們介紹了車(chē)聯(lián)網(wǎng)場(chǎng)景消息吞吐設(shè)計(jì)需要考慮的因素,同時(shí)提供了兩種較為主流的百萬(wàn)級(jí)吞吐平臺(tái)架構(gòu)設(shè)計(jì)方案。面對(duì)車(chē)聯(lián)網(wǎng)場(chǎng)景下日益增加的數(shù)據(jù)量,希望本文能夠?yàn)橄嚓P(guān)團(tuán)隊(duì)和開(kāi)發(fā)者在車(chē)聯(lián)網(wǎng)平臺(tái)設(shè)計(jì)與開(kāi)發(fā)過(guò)程中提供參考。?