這篇文章主要介紹Flink SQL如何連接Hive并寫入/讀取數(shù)據(jù),文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!
成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供廣德企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、HTML5建站、小程序制作等業(yè)務(wù)。10年已為廣德眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站設(shè)計公司優(yōu)惠進(jìn)行中。
<properties> <flink.version>1.11.2</flink.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 添加flink table api 集成Hive的依賴--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.6.5-7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.4.0</version> </dependency> </dependencies>
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
經(jīng)過實際測試,目前HiveTableSink 不支持流式寫入(未實現(xiàn) AppendStreamTableSink),必須是批處理環(huán)境才可以往hive里面寫入數(shù)據(jù),而不能將流式數(shù)據(jù)寫入hive。例如將kafka創(chuàng)建一張臨時表,然后將表中的數(shù)據(jù)流持續(xù)插入hive,這是不可以的,官網(wǎng)上1.11版本通過flink sql-client可以實現(xiàn)hive的流式寫入,還有待驗證。
// 連接外部文件 bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt")) .withFormat(new Csv().fieldDelimiter(',')) .withSchema(new Schema().field("id", DataTypes.STRING())) .createTemporaryTable("output"); // 設(shè)置 hive 方言 bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // 獲取hive-site.xml目錄 String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1); HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir); bbTableEnv.registerCatalog("hive", hive); bbTableEnv.useCatalog("hive"); bbTableEnv.useDatabase("warningplatform"); bbTableEnv.executeSql("insert into test select id from default_catalog.default_database.output");
通過bbTableEnv.connect()去創(chuàng)建臨時表的方式已經(jīng)過時了,建議使用bbTableEnv.executeSql()的方式,通過DDL去創(chuàng)建臨時表,臨時表到底是屬于哪一個catalog目前還不太確定,到底是什么規(guī)則目前還不清楚。 查資料得知,臨時表與單個Flink會話的生命周期相關(guān),臨時表始終存儲在內(nèi)存中。 永久表需要一個catalog來管理表對應(yīng)的元數(shù)據(jù),比如hive metastore,該表將一直存在,直到明確刪除該表為止。 因此猜測:default_catalog是存儲在內(nèi)存中,如果在切換成hive catalog之前創(chuàng)建臨時表,那我們就可以使用default_catalog.default_database.tableName來獲取這個臨時表。 如果切換了catalog再去創(chuàng)建臨時表,那我們就無法獲取到臨時表了,因為它不在default_catalog中,而且保存在內(nèi)存里面,直接查詢臨時表會去當(dāng)前的catalog里面去查找臨時表,因此一定要在default_catalog 里面創(chuàng)建臨時表。 而臨時視圖好像是存儲在當(dāng)前的catalog里面
通過bbTableEnv.createTemporaryView()創(chuàng)建的視圖則是屬于當(dāng)前的database的
bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));
注意1.11版本的執(zhí)行sql的方法發(fā)生了改變,通過執(zhí)行環(huán)境的executeSql(),executeInsert()等來進(jìn)行插入或者執(zhí)行sql語句
以上是“Flink SQL如何連接Hive并寫入/讀取數(shù)據(jù)”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
分享題目:FlinkSQL如何連接Hive并寫入/讀取數(shù)據(jù)
本文網(wǎng)址:http://www.ekvhdxd.cn/article16/iecpdg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、網(wǎng)站制作、小程序開發(fā)、網(wǎng)站維護(hù)、網(wǎng)站營銷、手機(jī)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)