-
當(dāng)前位置:首頁(yè) > 創(chuàng)意學(xué)院 > 技術(shù) > 專題列表 > 正文
請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程(請(qǐng)簡(jiǎn)述hdfs的讀寫(xiě)流程)
大家好!今天讓創(chuàng)意嶺的小編來(lái)大家介紹下關(guān)于請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程的問(wèn)題,以下是小編對(duì)此問(wèn)題的歸納整理,讓我們一起來(lái)看看吧。
開(kāi)始之前先推薦一個(gè)非常厲害的Ai人工智能工具,一鍵生成原創(chuàng)文章、方案、文案、工作計(jì)劃、工作報(bào)告、論文、代碼、作文、做題和對(duì)話答疑等等
只需要輸入關(guān)鍵詞,就能返回你想要的內(nèi)容,越精準(zhǔn),寫(xiě)出的就越詳細(xì),有微信小程序端、在線網(wǎng)頁(yè)版、PC客戶端
官網(wǎng):https://ai.de1919.com。
創(chuàng)意嶺作為行業(yè)內(nèi)優(yōu)秀的企業(yè),服務(wù)客戶遍布全球各地,如需了解SEO相關(guān)業(yè)務(wù)請(qǐng)撥打電話175-8598-2043,或添加微信:1454722008
本文目錄:
一、Hadoop讀寫(xiě)文件時(shí)內(nèi)部工作機(jī)制是怎樣的
客戶端通過(guò)調(diào)用FileSystem對(duì)象(對(duì)應(yīng)于HDFS文件系統(tǒng),調(diào)用DistributedFileSystem對(duì)象)的open()方法來(lái)打開(kāi)文件(也即圖中的第一步),DistributedFileSystem通過(guò)RPC(Remote Procedure Call)調(diào)用詢問(wèn)NameNode來(lái)得到此文件最開(kāi)始幾個(gè)block的文件位置(第二步)。對(duì)每一個(gè)block來(lái)說(shuō),namenode返回?fù)碛写薭lock備份的所有namenode的地址信息(按集群的拓?fù)渚W(wǎng)絡(luò)中與客戶端距離的遠(yuǎn)近排序,關(guān)于在Hadoop集群中如何進(jìn)行網(wǎng)絡(luò)拓?fù)湔?qǐng)看下面介紹)。如果客戶端本身就是一個(gè)datanode(如客戶端是一個(gè)mapreduce任務(wù))并且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。
以上步驟完成后,DistributedFileSystem會(huì)返回一個(gè)FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數(shù)據(jù)。FSDataInputStream包裝了一個(gè)DFSInputSteam類,用來(lái)處理namenode和datanode的I/O操作。
客戶端然后執(zhí)行read()方法(第三步),DFSInputStream(已經(jīng)存儲(chǔ)了欲讀取文件的開(kāi)始幾個(gè)block的位置信息)連接到第一個(gè)datanode(也即最近的datanode)來(lái)獲取數(shù)據(jù)。通過(guò)重復(fù)調(diào)用read()方法(第四、第五步),文件內(nèi)的數(shù)據(jù)就被流式的送到了客戶端。當(dāng)讀到該block的末尾時(shí),DFSInputStream就會(huì)關(guān)閉指向該block的流,轉(zhuǎn)而找到下一個(gè)block的位置信息然后重復(fù)調(diào)用read()方法繼續(xù)對(duì)該block的流式讀取。這些過(guò)程對(duì)于用戶來(lái)說(shuō)都是透明的,在用戶看來(lái)這就是不間斷的流式讀取整個(gè)文件。
當(dāng)真?zhèn)€文件讀取完畢時(shí),客戶端調(diào)用FSDataInputSteam中的close()方法關(guān)閉文件輸入流(第六步)。
如果在讀某個(gè)block是DFSInputStream檢測(cè)到錯(cuò)誤,DFSInputSteam就會(huì)連接下一個(gè)datanode以獲取此block的其他備份,同時(shí)他會(huì)記錄下以前檢測(cè)到的壞掉的datanode以免以后再無(wú)用的重復(fù)讀取該datanode。DFSInputSteam也會(huì)檢查從datanode讀取來(lái)的數(shù)據(jù)的校驗(yàn)和,如果發(fā)現(xiàn)有數(shù)據(jù)損壞,它會(huì)把壞掉的block報(bào)告給namenode同時(shí)重新讀取其他datanode上的其他block備份。
這種設(shè)計(jì)模式的一個(gè)好處是,文件讀取是遍布這個(gè)集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點(diǎn)瓶頸問(wèn)題從而可以更大的擴(kuò)充集群的規(guī)模。
Hadoop中的網(wǎng)絡(luò)拓?fù)?/p>
在Hadoop集群中如何衡量?jī)蓚€(gè)節(jié)點(diǎn)的遠(yuǎn)近呢?要知道,在高速處理數(shù)據(jù)時(shí),數(shù)據(jù)處理速率的唯一限制因素就是數(shù)據(jù)在不同節(jié)點(diǎn)間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量?jī)蓚€(gè)節(jié)點(diǎn)距離大小的標(biāo)準(zhǔn)。
但是計(jì)算兩個(gè)節(jié)點(diǎn)之間的帶寬是比較復(fù)雜的,而且它需要在一個(gè)靜態(tài)的集群下才能衡量,但Hadoop集群一般是隨著數(shù)據(jù)處理的規(guī)模動(dòng)態(tài)變化的(且兩兩節(jié)點(diǎn)直接相連的連接數(shù)是節(jié)點(diǎn)數(shù)的平方)。于是Hadoop使用了一個(gè)簡(jiǎn)單的方法來(lái)衡量距離,它把集群內(nèi)的網(wǎng)絡(luò)表示成一個(gè)樹(shù)結(jié)構(gòu),兩個(gè)節(jié)點(diǎn)之間的距離就是他們離共同祖先節(jié)點(diǎn)的距離之和。樹(shù)一般按數(shù)據(jù)中心(datacenter),機(jī)架(rack),計(jì)算節(jié)點(diǎn)(datanode)的結(jié)構(gòu)組織。計(jì)算節(jié)點(diǎn)上的本地運(yùn)算速度最快,跨數(shù)據(jù)中心的計(jì)算速度最慢(現(xiàn)在跨數(shù)據(jù)中心的Hadoop集群用的還很少,一般都是在一個(gè)數(shù)據(jù)中心內(nèi)做運(yùn)算的)。
假如有個(gè)計(jì)算節(jié)點(diǎn)n1處在數(shù)據(jù)中心c1的機(jī)架r1上,它可以表示為/c1/r1/n1,下面是不同情況下兩個(gè)節(jié)點(diǎn)的距離:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
如下圖所示:
Hadoop
寫(xiě)文件
現(xiàn)在我們來(lái)看一下Hadoop中的寫(xiě)文件機(jī)制解析,通過(guò)寫(xiě)文件機(jī)制我們可以更好的了解一下Hadoop中的一致性模型。
Hadoop
上圖為我們展示了一個(gè)創(chuàng)建一個(gè)新文件并向其中寫(xiě)數(shù)據(jù)的例子。
首先客戶端通過(guò)DistributedFileSystem上的create()方法指明一個(gè)欲創(chuàng)建的文件的文件名(第一步),DistributedFileSystem再通過(guò)RPC調(diào)用向NameNode申請(qǐng)創(chuàng)建一個(gè)新文件(第二步,這時(shí)該文件還沒(méi)有分配相應(yīng)的block)。namenode檢查是否有同名文件存在以及用戶是否有相應(yīng)的創(chuàng)建權(quán)限,如果檢查通過(guò),namenode會(huì)為該文件創(chuàng)建一個(gè)新的記錄,否則的話文件創(chuàng)建失敗,客戶端得到一個(gè)IOException異常。DistributedFileSystem返回一個(gè)FSDataOutputStream以供客戶端寫(xiě)入數(shù)據(jù),與FSDataInputStream類似,F(xiàn)SDataOutputStream封裝了一個(gè)DFSOutputStream用于處理namenode與datanode之間的通信。
當(dāng)客戶端開(kāi)始寫(xiě)數(shù)據(jù)時(shí)(第三步),DFSOutputStream把寫(xiě)入的數(shù)據(jù)分成包(packet), 放入一個(gè)中間隊(duì)列——數(shù)據(jù)隊(duì)列(data queue)中去。DataStreamer從數(shù)據(jù)隊(duì)列中取數(shù)據(jù),同時(shí)向namenode申請(qǐng)一個(gè)新的block來(lái)存放它已經(jīng)取得的數(shù)據(jù)。namenode選擇一系列合適的datanode(個(gè)數(shù)由文件的replica數(shù)決定)構(gòu)成一個(gè)管道線(pipeline),這里我們假設(shè)replica為3,所以管道線中就有三個(gè)datanode。DataSteamer把數(shù)據(jù)流式的寫(xiě)入到管道線中的第一個(gè)datanode中(第四步),第一個(gè)datanode再把接收到的數(shù)據(jù)轉(zhuǎn)到第二個(gè)datanode中(第四步),以此類推。
DFSOutputStream同時(shí)也維護(hù)著另一個(gè)中間隊(duì)列——確認(rèn)隊(duì)列(ack queue),確認(rèn)隊(duì)列中的包只有在得到管道線中所有的datanode的確認(rèn)以后才會(huì)被移出確認(rèn)隊(duì)列(第五步)。
如果某個(gè)datanode在寫(xiě)數(shù)據(jù)的時(shí)候當(dāng)?shù)袅耍旅孢@些對(duì)用戶透明的步驟會(huì)被執(zhí)行:
1)管道線關(guān)閉,所有確認(rèn)隊(duì)列上的數(shù)據(jù)會(huì)被挪到數(shù)據(jù)隊(duì)列的首部重新發(fā)送,這樣可以確保管道線中當(dāng)?shù)舻膁atanode下流的datanode不會(huì)因?yàn)楫?dāng)?shù)舻膁atanode而丟失數(shù)據(jù)包。
2)在還在正常運(yùn)行的datanode上的當(dāng)前block上做一個(gè)標(biāo)志,這樣當(dāng)當(dāng)?shù)舻膁atanode重新啟動(dòng)以后namenode就會(huì)知道該datanode上哪個(gè)block是剛才當(dāng)機(jī)時(shí)殘留下的局部損壞block,從而可以把它刪掉。
3)已經(jīng)當(dāng)?shù)舻膁atanode從管道線中被移除,未寫(xiě)完的block的其他數(shù)據(jù)繼續(xù)被寫(xiě)入到其他兩個(gè)還在正常運(yùn)行的datanode中去,namenode知道這個(gè)block還處在under-replicated狀態(tài)(也即備份數(shù)不足的狀態(tài))下,然后他會(huì)安排一個(gè)新的replica從而達(dá)到要求的備份數(shù),后續(xù)的block寫(xiě)入方法同前面正常時(shí)候一樣。
有可能管道線中的多個(gè)datanode當(dāng)?shù)簦m然不太經(jīng)常發(fā)生),但只要dfs.replication.min(默認(rèn)為1)個(gè)replica被創(chuàng)建,我們就認(rèn)為該創(chuàng)建成功了。剩余的replica會(huì)在以后異步創(chuàng)建以達(dá)到指定的replica數(shù)。
當(dāng)客戶端完成寫(xiě)數(shù)據(jù)后,它會(huì)調(diào)用close()方法(第六步)。這個(gè)操作會(huì)沖洗(flush)所有剩下的package到pipeline中,等待這些package確認(rèn)成功,然后通知namenode寫(xiě)入文件成功(第七步)。這時(shí)候namenode就知道該文件由哪些block組成(因?yàn)镈ataStreamer向namenode請(qǐng)求分配新block,namenode當(dāng)然會(huì)知道它分配過(guò)哪些blcok給給定文件),它會(huì)等待最少的replica數(shù)被創(chuàng)建,然后成功返回。
replica是如何分布的
Hadoop在創(chuàng)建新文件時(shí)是如何選擇block的位置的呢,綜合來(lái)說(shuō),要考慮以下因素:帶寬(包括寫(xiě)帶寬和讀帶寬)和數(shù)據(jù)安全性。如果我們把三個(gè)備份全部放在一個(gè)datanode上,雖然可以避免了寫(xiě)帶寬的消耗,但幾乎沒(méi)有提供數(shù)據(jù)冗余帶來(lái)的安全性,因?yàn)槿绻@個(gè)datanode當(dāng)機(jī),那么這個(gè)文件的所有數(shù)據(jù)就全部丟失了。另一個(gè)極端情況是,如果把三個(gè)冗余備份全部放在不同的機(jī)架,甚至數(shù)據(jù)中心里面,雖然這樣數(shù)據(jù)會(huì)安全,但寫(xiě)數(shù)據(jù)會(huì)消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個(gè)默認(rèn)replica分配策略(Hadoop 1.X以后允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認(rèn)分配策略是把第一個(gè)備份放在與客戶端相同的datanode上(如果客戶端在集群外運(yùn)行,就隨機(jī)選取一個(gè)datanode來(lái)存放第一個(gè)replica),第二個(gè)replica放在與第一個(gè)replica不同機(jī)架的一個(gè)隨機(jī)datanode上,第三個(gè)replica放在與第二個(gè)replica相同機(jī)架的隨機(jī)datanode上。如果replica數(shù)大于三,則隨后的replica在集群中隨機(jī)存放,Hadoop會(huì)盡量避免過(guò)多的replica存放在同一個(gè)機(jī)架上。選取replica的放置位置后,管道線的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)如下所示:
Hadoop
總體來(lái)說(shuō),上述默認(rèn)的replica分配策略給了我們很好的可用性(blocks放置在兩個(gè)rack上,較為安全),寫(xiě)帶寬優(yōu)化(寫(xiě)數(shù)據(jù)只需要跨越一個(gè)rack),讀帶寬優(yōu)化(你可以從兩個(gè)機(jī)架中選擇較近的一個(gè)讀?。?/p>
一致性模型
HDFS某些地方為了性能可能會(huì)不符合POSIX(是的,你沒(méi)有看錯(cuò),POSIX不僅僅只適用于linux/unix, Hadoop 使用了POSIX的設(shè)計(jì)來(lái)實(shí)現(xiàn)對(duì)文件系統(tǒng)文件流的讀取 ),所以它看起來(lái)可能與你所期望的不同,要注意。
創(chuàng)建了一個(gè)文件以后,它是可以在命名空間(namespace)中可以看到的:
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
但是任何向此文件中寫(xiě)入的數(shù)據(jù)并不能保證是可見(jiàn)的,即使你flush了已經(jīng)寫(xiě)入的數(shù)據(jù),此文件的長(zhǎng)度可能仍然為零:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
這是因?yàn)?,在Hadoop中,只有滿一個(gè)block數(shù)據(jù)量的數(shù)據(jù)被寫(xiě)入文件后,此文件中的內(nèi)容才是可見(jiàn)的(即這些數(shù)據(jù)會(huì)被寫(xiě)入到硬盤(pán)中去),所以當(dāng)前正在寫(xiě)的block中的內(nèi)容總是不可見(jiàn)的。
Hadoop提供了一種強(qiáng)制使buffer中的內(nèi)容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調(diào)用了sync()方法后,Hadoop保證所有已經(jīng)被寫(xiě)入的數(shù)據(jù)都被沖洗到了管道線中的datanode中,并且對(duì)所有讀者都可見(jiàn)了:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
這個(gè)方法就像POSIX中的fsync系統(tǒng)調(diào)用(它沖洗給定文件描述符中的所有緩沖數(shù)據(jù)到磁盤(pán)中)。例如,使用java API寫(xiě)一個(gè)本地文件,我們可以保證在調(diào)用flush()和同步化后可以看到已寫(xiě)入的內(nèi)容:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk (getFD()返回與該流所對(duì)應(yīng)的文件描述符)
assertThat(localFile.length(), is(((long) "content".length())));
在HDFS中關(guān)閉一個(gè)流隱式的調(diào)用了sync()方法:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
由于Hadoop中的一致性模型限制,如果我們不調(diào)用sync()方法的話,我們很可能會(huì)丟失多大一個(gè)block的數(shù)據(jù)。這是難以接受的,所以我們應(yīng)該使用sync()方法來(lái)確保數(shù)據(jù)已經(jīng)寫(xiě)入磁盤(pán)。但頻繁調(diào)用sync()方法也是不好的,因?yàn)闀?huì)造成很多額外開(kāi)銷。我們可以再寫(xiě)入一定量數(shù)據(jù)后調(diào)用sync()方法一次,至于這個(gè)具體的數(shù)據(jù)量大小就要根據(jù)你的應(yīng)用程序而定了,在不影響你的應(yīng)用程序的性能的情況下,這個(gè)數(shù)據(jù)量應(yīng)越大越好。
二、如何使用Hadoop讀寫(xiě)數(shù)據(jù)庫(kù)
我們的一些應(yīng)用程序中,常常避免不了要與數(shù)據(jù)庫(kù)進(jìn)行交互,而在我們的hadoop中,有時(shí)候也需要和數(shù)據(jù)庫(kù)進(jìn)行交互,比如說(shuō),數(shù)據(jù)分析的結(jié)果存入數(shù)據(jù)庫(kù),
或者是,讀取數(shù)據(jù)庫(kù)的信息寫(xiě)入HDFS上,不過(guò)直接使用MapReduce操作數(shù)據(jù)庫(kù),這種情況在現(xiàn)實(shí)開(kāi)發(fā)還是比較少,一般我們會(huì)采用Sqoop來(lái)進(jìn)行數(shù)
據(jù)的遷入,遷出,使用Hive分析數(shù)據(jù)集,大多數(shù)情況下,直接使用Hadoop訪問(wèn)關(guān)系型數(shù)據(jù)庫(kù),可能產(chǎn)生比較大的數(shù)據(jù)訪問(wèn)壓力,尤其是在數(shù)據(jù)庫(kù)還是單機(jī)
的情況下,情況可能更加糟糕,在集群的模式下壓力會(huì)相對(duì)少一些。
那么,今天散仙就來(lái)看下,如何直接使用Hadoop1.2.0的MR來(lái)讀寫(xiě)操作數(shù)據(jù)庫(kù),hadoop的API提供了DBOutputFormat和
DBInputFormat這兩個(gè)類,來(lái)進(jìn)行與數(shù)據(jù)庫(kù)交互,除此之外,我們還需要定義一個(gè)類似JAVA
Bean的實(shí)體類,來(lái)與數(shù)據(jù)庫(kù)的每行記錄進(jìn)行對(duì)應(yīng),通常這個(gè)類要實(shí)現(xiàn)Writable和DBWritable接口,來(lái)重寫(xiě)里面的4個(gè)方法以對(duì)應(yīng)獲取每行記
三、怎樣將數(shù)據(jù)庫(kù)數(shù)據(jù)寫(xiě)入到hdfs中
如下面這個(gè)shell腳本:
#Oracle的連接字符串,其中包含了Oracle的地址,SID,和端口號(hào)
CONNECTURL=jdbc:oracle:thin:@20.135.60.21:1521:DWRAC2
#使用的用戶名
ORACLENAME=kkaa
#使用的密碼
ORACLEPASSWORD=kkaa123
#需要從Oracle中導(dǎo)入的表名
oralceTableName=tt
#需要從Oracle中導(dǎo)入的表中的字段名
columns=AREA_ID,TEAM_NAME
#將Oracle中的數(shù)據(jù)導(dǎo)入到HDFS后的存放路徑
hdfsPath=apps/as/hive/$oralceTableName
#執(zhí)行導(dǎo)入邏輯。將Oracle中的數(shù)據(jù)導(dǎo)入到HDFS中
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'
執(zhí)行這個(gè)腳本之后,導(dǎo)入程序就完成了。
四、如何使用Java API讀寫(xiě)HDFS
Java API讀寫(xiě)HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 創(chuàng)建文件目錄
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 創(chuàng)建文件夾
System.out.println("make dir :" + result);
// 創(chuàng)建文件,并寫(xiě)入內(nèi)容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 刪除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下載,將hdfs文件下載到本地磁盤(pán)
*
* @param localSrc1
* 本地的文件地址,即文件的路徑
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上傳,將本地文件copy到hdfs系統(tǒng)中
*
* @param localSrc
* 本地的文件地址,即文件的路徑
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置對(duì)象
FileSystem fs; // 文件系統(tǒng)
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 輸出流,創(chuàng)建一個(gè)輸出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重寫(xiě)progress方法
public void progress() {
// System.out.println("上傳完一個(gè)設(shè)定緩存區(qū)大小容量的文件!");
}
});
// 連接兩個(gè)流,形成通道,使輸入流向輸出流傳輸數(shù)據(jù),
IOUtils.copyBytes(in, out, 10240, true); // in為輸入流對(duì)象,out為輸出流對(duì)象,4096為緩沖區(qū)大小,true為上傳后關(guān)閉流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移動(dòng)
*
* @param old_st原來(lái)存放的路徑
* @param new_st移動(dòng)到的路徑
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下載到服務(wù)器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文檔/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 刪除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 從服務(wù)器本地傳到新路徑
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文檔/temp", new_st);
if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 獲取給定目錄下的所有子目錄以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判斷文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//獲取hdfs集群所有主機(jī)結(jié)點(diǎn)數(shù)據(jù)
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
以上就是關(guān)于請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程相關(guān)問(wèn)題的回答。希望能幫到你,如有更多相關(guān)問(wèn)題,您也可以聯(lián)系我們的客服進(jìn)行咨詢,客服也會(huì)為您講解更多精彩的知識(shí)和內(nèi)容。
推薦閱讀:
請(qǐng)簡(jiǎn)述網(wǎng)絡(luò)營(yíng)銷策略有哪幾種(請(qǐng)簡(jiǎn)述網(wǎng)絡(luò)營(yíng)銷策略有哪幾種形式)
請(qǐng)簡(jiǎn)述包裝流程(簡(jiǎn)述包裝工作流程)
簡(jiǎn)述推銷與營(yíng)銷的區(qū)別和聯(lián)系(請(qǐng)簡(jiǎn)述推銷與營(yíng)銷的區(qū)別和聯(lián)系)
廣告高級(jí)背景圖片素材(廣告高級(jí)背景圖片素材高清)
家裝報(bào)價(jià)清單明細(xì)表(全包裝修價(jià)格一覽表)