這篇文章主要介紹“Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”,在日常操作中,相信很多人在Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設、網(wǎng)站重做改版、坪山網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、HTML5、成都商城網(wǎng)站開發(fā)、集團公司官網(wǎng)建設、成都外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為坪山等各大城市提供網(wǎng)站開發(fā)制作服務。
在大數(shù)據(jù)處理領域,有一個非常常見但是很麻煩的問題,即hdfs小文件問題,我們也被這個問題困擾了很久。開始的時候我們是自己寫的一個小文件壓縮工具,定期的去合并,原理就是把待壓縮數(shù)據(jù)寫入一個新的臨時的文件夾,壓縮完,和原來的數(shù)據(jù)進行檢驗,數(shù)據(jù)一致之后,用壓縮的數(shù)據(jù)覆蓋原來的數(shù)據(jù),但是由于無法保證事務,所以出現(xiàn)了很多的問題,比如壓縮的同時又有數(shù)據(jù)寫入了,檢驗就會失敗,導致合并小文件失敗,而且無法實時的合并,只能按照分區(qū)合并一天之前的?;蛘咭粋€小時之前的,最新的數(shù)據(jù)仍然有小文件的問題,導致查詢性能提高不了。
所以基于以上的一些問題,我調(diào)研了數(shù)據(jù)湖技術,由于我們的流式數(shù)據(jù)主要是flink為主,查詢引擎是presto,而hudi強耦合了spark,對flink的支持還不太友好,所以綜合考慮了一下,決定引入iceberg。在對iceberg進行功能測試和簡單代碼review之后,發(fā)現(xiàn)iceberg在flink這塊還有一些需要優(yōu)化和提升,不過我覺得應該能hold的住,不完善的地方和需要優(yōu)化的地方我們自己來補全,所以最終引入了iceberg來解決小文件的問題。
除此之外,對于一些其他的問題,比如cdc數(shù)據(jù)的接入,以及根據(jù)查詢條件刪除數(shù)據(jù)等,后續(xù)也可以通過數(shù)據(jù)湖技術來解決。
我們的主要使用場景是使用flink將kafka的流式數(shù)據(jù)寫入到Iceberg,為了代碼的簡潔以及可維護性,我們盡量將程序使用sql來編寫,示例代碼如下:
// create catalog CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive'," + 'warehouse'='hdfs://localhost/user/hive/warehouse', 'uri'='thrift://localhost:9083')// create table CREATE TABLE iceberg.tmp.iceberg_table ( id BIGINT COMMENT 'unique id', data STRING, d int) PARTITIONED BY (d)WITH ('connector'='iceberg','write.format.default'='orc')// insert into insert into iceberg.tmp.iceberg_table select * from kafka_table
提示:記得開啟checkpoint
目前壓縮小文件是采用的一個額外批任務來進行的,Iceberg提供了一個spark版本的action,我在做功能測試的時候發(fā)現(xiàn)了一些問題,此外我對spark也不是非常熟悉,擔心出了問題不好排查,所以參照spark版本的自己實現(xiàn)了一個flink版本,并修復了一些bug,進行了一些功能的優(yōu)化。
由于我們的iceberg的元數(shù)據(jù)都是存儲在hive中的,所以壓縮程序的邏輯是我把hive中所有的iceberg表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區(qū)表還是非分區(qū)表,都進行全表的壓縮。這樣做是為了處理某些使用eventtime的flink任務,如果有延遲的數(shù)據(jù)的到來。就會把數(shù)據(jù)寫入以前的分區(qū),如果不是全表壓縮只壓縮當天分區(qū)的話,新寫入的其他天的數(shù)據(jù)就不會被壓縮。
代碼示例參考:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles()//.maxParallelism(parallelism)//.filter(Expressions.equal("day", day))//.targetSizeInBytes(targetSizeInBytes).execute();
具體的壓縮小文件相關的信息可以參考這篇文章[Flink集成iceberg數(shù)據(jù)湖之合并小文件]。
我們的快照過期策略,我是和壓縮小文件的批處理任務寫在一起的,壓縮完小文件之后,進行表的快照過期處理,目前保留的時間是一個小時,這是因為對于有一些比較大的表,分區(qū)比較多,而且checkpoint比較短,如果保留的快照過長的話,還是會保留過多小文件,我們暫時沒有查詢歷史快照的需求,所以我將快照的保留時間設置了一個小時。
long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); table.expireSnapshots() // .retainLast(20) .expireOlderThan(olderThanTimestamp) .commit();
寫入了數(shù)據(jù)之后,有時候我想查看一下相應的快照下面有多少數(shù)據(jù)文件,直接查詢hdfs你不知道哪個是有用的,哪個是沒用的。所以需要有對應的管理工具。目前flink這塊還不太成熟,我們可以使用spark3提供的工具來查看。
目前create table 這些操作我們是通過flink sql client來做的。其他相關的ddl的操作可以使用spark來做:
https://iceberg.apache.org/spark/#ddl-commands
一些相關的數(shù)據(jù)的操作,比如刪除數(shù)據(jù)等可以通過spark來實現(xiàn),presto目前只支持分區(qū)級別的刪除功能。
在使用iceberg的過程中,有時候會有這樣的情況,我提交了一個flink任務,由于各種原因,我把它給停了,這個時候iceberg還沒提交相應的快照。還有由于一些異常導致程序失敗,就會產(chǎn)生一些不在iceberg元數(shù)據(jù)里面的孤立的數(shù)據(jù)文件,這些文件對iceberg來說是不可達的,也是沒用的。所以我們需要像jvm的垃圾回收一樣來清理這些文件。
目前iceberg提供了一個spark版本的action來進行處理這些沒用的文件,我們采取的策略和壓縮小文件一樣,獲取hive中的所有的iceberg表。每隔一個小時執(zhí)行一次定時任務來刪除這些沒用的文件。
SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
在程序運行過程中出現(xiàn)了正常的數(shù)據(jù)文件被刪除的問題,經(jīng)過調(diào)研,由于我的快照保留設置是一小時,這個清理程序清理時間也是設置一個小時,通過日志發(fā)現(xiàn)是這個清理程序刪除了正常的數(shù)據(jù)。查了查代碼,覺得應該是他們設置了一樣的時間,在清理孤立文件的時候,有其他程序正在讀寫表,由于這個清理程序是沒有事務的,導致刪除了正常的數(shù)據(jù)。最后把這個清理程序的清理時間改成默認的三天,沒有再出現(xiàn)刪除數(shù)據(jù)文件的問題。當然,為了保險起見,我們可以覆蓋原來的刪除文件的方法,改成將文件到一個備份文件夾,檢查沒有問題之后,手工刪除。
目前我們使用的版本是prestosql 346,這個版本安裝的時候需要jdk11,presto查詢iceberg比較簡單。官方提供了相應的conncter,我們配置一下就行,
//iceberg.propertiesconnector.name=iceberg hive.metastore.uri=thrift://localhost:9083
目前查詢iceberg的批處理任務,使用的flink的客戶端,首先我們啟動一個基于yarn session 的flink集群,然后通過sql客戶端提交任務到集群。
主要的配置就是我們需要根據(jù)數(shù)據(jù)的大小設置sql任務執(zhí)行的并行度,可以通過以下參數(shù)設置。
set table.exec.resource.default-parallelism = 100;
此外我在sql客戶端的配置文件里配置了hive和iceberg相應的catalog,這樣每次客戶端啟動的時候就不需要建catalog了。
catalogs: # empty list - name: iceberg type: iceberg warehouse: hdfs://localhost/user/hive2/warehouse uri: thrift://localhost:9083 catalog-type: hive cache-enabled: false - name: hive type: hive hive-conf-dir: /Users/user/work/hive/conf default-database: default
目前對于定時調(diào)度中的批處理任務,flink的sql客戶端還沒hive那樣做的很完善,比如執(zhí)行hive -f來執(zhí)行一個文件。而且不同的任務需要不同的資源,并行度等。所以我自己封裝了一個flinK程序,通過調(diào)用這個程序來進行處理,讀取一個指定文件里面的sql,來提交批任務。在命令行控制任務的資源和并行度等。
/home/flink/bin/flink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
批任務的查詢這塊,做了一些優(yōu)化,比如limit下推,filter下推,查詢并行度優(yōu)化等,可以大大提高查詢的速度,這些優(yōu)化都已經(jīng)推回給社區(qū)。
目前我們的所有數(shù)據(jù)都是存儲在hive表的,在驗證完iceberg之后,我們決定將hive的數(shù)據(jù)遷移到iceberg,所以我寫了一個工具,可以使用hive的數(shù)據(jù),然后新建一個iceberg表,為其建立相應的元數(shù)據(jù),但是測試的時候發(fā)現(xiàn),如果采用這種方式,就需要把寫入hive的程序停止,因為如果iceberg和hive使用同一個數(shù)據(jù)文件,而壓縮程序會不斷地壓縮iceberg表的小文件,壓縮完之后,不會馬上刪除舊數(shù)據(jù),所以hive表就會查到雙份的數(shù)據(jù)。鑒于iceberg測試的時候還有一些不穩(wěn)定,所以我們采用雙寫的策略,原來寫入hive的程序不動,新啟動一套程序?qū)懭雐ceberg,這樣能對iceberg表觀察一段時間。還能和原來hive中的數(shù)據(jù)進行比對,來驗證程序的正確性。
經(jīng)過一段時間觀察,每天將近20億數(shù)據(jù)的hive表和iceberg表,一條數(shù)據(jù)也不差。所以在最終對比數(shù)據(jù)沒有問題之后,把hive表停止寫入,使用新的iceberg表,然后把hive中的舊數(shù)據(jù)導入到iceberg。
到此,關于“Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章題目:Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么
標題URL:http://www.ekvhdxd.cn/article28/jecdjp.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)、自適應網(wǎng)站、App設計、、網(wǎng)站內(nèi)鏈、標簽優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)