online gambling singapore online gambling singapore online slot malaysia online slot malaysia mega888 malaysia slot gacor live casino malaysia online betting malaysia mega888 mega888 mega888 mega888 mega888 mega888 mega888 mega888 mega888 使用Spark Streaming + Kudu + Impala構建一個預測引擎

摘要: 隨著用戶使用天數的增加,不管你的業務是擴大還是縮減了,為什麼你的大數據中心架構保持線性增長的趨勢?很明顯需要一個穩定的基本架構來保障你的業務線。當你的客戶處在休眠期,或者你的業務處在淡季,你增加的計算資源就處在浪費階段;相對應地,當你的業務在旺季期,或者每週一每個人對上週的數據進行查詢分析,有多少次你忒想擁有額外的計算資源。......

 


▲(來源:InfiQ)

根據需求水平動態分配資源VS 固定的資源分配方式,似乎不太好實現。幸運的是,借助於現今強大的開源技術,可以很輕鬆的實現你所願。在這篇文章中,我將給出一個解決例子,基於流式API數據來演示如何預測資源需求變化來調整資源分配。 我們旨在用流式回歸模型預測接下來十分鐘的海量事件數據,並與傳統批處理的方法預測的結果進行對比。這個預測結果可用來動態規劃計算機資源,或者業務優化。傳統的批處理方法預測採用Impala和Spark兩種方法,動態預測使用Spark Streaming。 我們旨在用流式回歸模型預測接下來十分鐘的海量事件數據,並與傳統批處理的方法預測的結果進行對比。這個預測結果可用來動態規劃計算機資源,或者業務優化。傳統的批處理方法預測採用Impala和Spark兩種方法,動態預測使用Spark Streaming。 任何預測的起點是基於海量歷史數據和實時更新的數據來預測未來的數據業務。流式API提供穩定的流失RSVP數據,用來預測未來一段時間RSVP數據。

動態資源分配預測架構圖

這個例子的數據通過流式API進入Kafka,然後使用Spark Streaming從Kafka加載數據到Kudu。Kafka允許數據同時進入兩個獨立的Spark Streaming作業:一個用來進行特徵工程;一個用來使用MLlib進行流式預測。預測的結果存儲在Kudu中,我們也可以使用Impala或者Spark SQL進行交互式查詢,見下圖。

▲(來源:InfiQ)

你可能急切想知道我的技術選型,下面是一些技術概要: Kafka:Kafka可抽像數據輸入,支持擴展,並耦合Spark Streaming框架。Kafka擁有每秒處理百萬事件的擴展能力,並能和其他各項技術集成,比如,Spark Streaming。 Spark Streaming:Spark Streaming能夠處理複雜的流式事件,並且採用Scala編程僅需簡單的幾行代碼即可,也支持Java、Python或者R語言。Spark Streaming提供和Kafka、MLlib(Spark的機器學習庫)的集成。 Apache Kudu:Kudu支持事件的增量插入,它旨在提供一種基於HDFS(HDFS優勢在於大數據存儲下的快速掃描能力)和HBase(HBase優勢是基於主鍵的快速插入/查詢)之間超存儲層。本項目可以採用HBase或者Cassandra,但Kudu為數據分析提供了快速的掃描能力、列式存儲架構。 Impala:使用Impala可很容易的即席查詢。它提供一個查詢引擎直接查詢加載到Kudu上的數據,並能理解生成模型。作為可選的方案可使用Spark SQL,但這里為了比較使用MADlib庫訓練的回歸模型和使用Saprk MLlib訓練的模型,故用Impala。

構建實例 現在解釋下架構的選擇,詳細細節如下: 首先,粗略瀏覽一下流式數據源。通過Kafka來監測文件,tail文件變化發送到Kafka,部分代碼見Github。下面給出RSVP內容樣例:

{"response":"yes","member":{"member_name":"Richard Williamson","photo":"http:\/\/photos3.meetupstatic.com\/photos\/member\/d\/a\/4\/0\/thu mb_231595872.jpeg","member_id":29193652},"visibility":"public","event": {"time":1424223000000,"event_url":"http:\/\/www.meetup.com\/Big-Data- Science\/events\/217322312\/","event_id":"fbtgdlytdbbc","event_name":"Big Data Science @Strata Conference, 2015"},"guests":0,"mtime":1424020205391,"rsvp_id":1536654666,"group":{"group_name":"Big Data Science","group_state":"CA","group_city":"Fremont","group_lat":37.52,"group_urlname":"Big- Data-Science","group_id":3168962,"group_country":"us","group_topics": [{"urlkey":"data-visualization","topic_name":"Data Visualization"},{"urlkey":"data- mining","topic_name":"Data Mining"},{"urlkey":"businessintell","topic_name":"Business Intelligence"},{"urlkey":"mapreduce","topic_name":"MapReduce"}, {"urlkey":"hadoop","topic_name":"Hadoop"},{"urlkey":"opensource","topic_name":"Open Source"},{"urlkey":"r-project-for-statistical-computing","topic_name":"R Project for Statistical Computing"},{"urlkey":"predictive-analytics","topic_name":"Predictive Analytics"}, {"urlkey":"cloud-computing","topic_name":"Cloud Computing"},{"urlkey":"big- data","topic_name":"Big Data"},{"urlkey":"data-science","topic_name":"Data Science"}, {"urlkey":"data-analytics","topic_name":"Data Analytics"}, {"urlkey":"hbase","topic_name":"HBase"}, {"urlkey":"hive","topic_name":"Hive"}],"group_lon":-121.93},"venue": {"lon":-121.889122,"venue_name":"San Jose Convention Center, Room 210AE","venue_id":21805972,"lat":37.330341}}

流式作業在Kudu上初始化一個表,接著運行Spark Streaming加載數據到數據表。你可以創建一個Impala外部表,並指向Kudu上存儲的數據。

CREATE EXTERNAL TABLE `kudu_meetup_rsvps` ( `event_id` STRING, `member_id` INT, `rsvp_id` INT, `event_name` STRING, `event_url` STRING, `TIME` BIGINT, `guests` INT, `member_name` STRING, `facebook_identifier` STRING, `linkedin_identifier` STRING, `twitter_identifier` STRING, `photo` STRING, `mtime` BIGINT, `response` STRING, `lat` DOUBLE, `lon` DOUBLE, `venue_id` INT, `venue_name` STRING, `visibility` STRING ) TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'kudu_meetup_rsvps', 'kudu.master_addresses' = 'quickstart.cloudera:7051', 'kudu.key_columns' = 'event_id, member_id, rsvp_id' );

緊接著用Impala表查詢獲得小時RSVP數據:

create table rsvps_by​​_hour as select from_unixtime(cast(mtime/1000 as bigint), "yyyy-MM-dd") as mdate ,cast(from_unixtime(cast(mtime/1000 as bigint), "HH") as int) as mhour ,count(*) as rsvp_cnt from kudu_meetup_rsvps group by 1,2

有了RSVP數據後可以畫隨時間的變化圖,見下圖:

 

 

接著可以進行特徵工程,為了後續可以直接用Impala建立預測模型:......

轉貼自: InfoQ

 


留下你的回應

以訪客張貼回應

0

在此對話中的人們