HOME 首頁(yè)
SERVICE 服務(wù)產(chǎn)品
XINMEITI 新媒體代運(yùn)營(yíng)
CASE 服務(wù)案例
NEWS 熱點(diǎn)資訊
ABOUT 關(guān)于我們
CONTACT 聯(lián)系我們
創(chuàng)意嶺
讓品牌有溫度、有情感
專注品牌策劃15年

    請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程(請(qǐng)簡(jiǎn)述hdfs的讀寫(xiě)流程)

    發(fā)布時(shí)間:2023-04-19 09:16:50     稿源: 創(chuàng)意嶺    閱讀: 65        

    大家好!今天讓創(chuàng)意嶺的小編來(lái)大家介紹下關(guān)于請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程的問(wèn)題,以下是小編對(duì)此問(wèn)題的歸納整理,讓我們一起來(lái)看看吧。

    開(kāi)始之前先推薦一個(gè)非常厲害的Ai人工智能工具,一鍵生成原創(chuàng)文章、方案、文案、工作計(jì)劃、工作報(bào)告、論文、代碼、作文、做題和對(duì)話答疑等等

    只需要輸入關(guān)鍵詞,就能返回你想要的內(nèi)容,越精準(zhǔn),寫(xiě)出的就越詳細(xì),有微信小程序端、在線網(wǎng)頁(yè)版、PC客戶端

    官網(wǎng):https://ai.de1919.com。

    創(chuàng)意嶺作為行業(yè)內(nèi)優(yōu)秀的企業(yè),服務(wù)客戶遍布全球各地,如需了解SEO相關(guān)業(yè)務(wù)請(qǐng)撥打電話175-8598-2043,或添加微信:1454722008

    本文目錄:

    請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程(請(qǐng)簡(jiǎn)述hdfs的讀寫(xiě)流程)

    一、Hadoop讀寫(xiě)文件時(shí)內(nèi)部工作機(jī)制是怎樣的

    客戶端通過(guò)調(diào)用FileSystem對(duì)象(對(duì)應(yīng)于HDFS文件系統(tǒng),調(diào)用DistributedFileSystem對(duì)象)的open()方法來(lái)打開(kāi)文件(也即圖中的第一步),DistributedFileSystem通過(guò)RPC(Remote Procedure Call)調(diào)用詢問(wèn)NameNode來(lái)得到此文件最開(kāi)始幾個(gè)block的文件位置(第二步)。對(duì)每一個(gè)block來(lái)說(shuō),namenode返回?fù)碛写薭lock備份的所有namenode的地址信息(按集群的拓?fù)渚W(wǎng)絡(luò)中與客戶端距離的遠(yuǎn)近排序,關(guān)于在Hadoop集群中如何進(jìn)行網(wǎng)絡(luò)拓?fù)湔?qǐng)看下面介紹)。如果客戶端本身就是一個(gè)datanode(如客戶端是一個(gè)mapreduce任務(wù))并且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。

    以上步驟完成后,DistributedFileSystem會(huì)返回一個(gè)FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數(shù)據(jù)。FSDataInputStream包裝了一個(gè)DFSInputSteam類,用來(lái)處理namenode和datanode的I/O操作。

    客戶端然后執(zhí)行read()方法(第三步),DFSInputStream(已經(jīng)存儲(chǔ)了欲讀取文件的開(kāi)始幾個(gè)block的位置信息)連接到第一個(gè)datanode(也即最近的datanode)來(lái)獲取數(shù)據(jù)。通過(guò)重復(fù)調(diào)用read()方法(第四、第五步),文件內(nèi)的數(shù)據(jù)就被流式的送到了客戶端。當(dāng)讀到該block的末尾時(shí),DFSInputStream就會(huì)關(guān)閉指向該block的流,轉(zhuǎn)而找到下一個(gè)block的位置信息然后重復(fù)調(diào)用read()方法繼續(xù)對(duì)該block的流式讀取。這些過(guò)程對(duì)于用戶來(lái)說(shuō)都是透明的,在用戶看來(lái)這就是不間斷的流式讀取整個(gè)文件。

    當(dāng)真?zhèn)€文件讀取完畢時(shí),客戶端調(diào)用FSDataInputSteam中的close()方法關(guān)閉文件輸入流(第六步)。

    如果在讀某個(gè)block是DFSInputStream檢測(cè)到錯(cuò)誤,DFSInputSteam就會(huì)連接下一個(gè)datanode以獲取此block的其他備份,同時(shí)他會(huì)記錄下以前檢測(cè)到的壞掉的datanode以免以后再無(wú)用的重復(fù)讀取該datanode。DFSInputSteam也會(huì)檢查從datanode讀取來(lái)的數(shù)據(jù)的校驗(yàn)和,如果發(fā)現(xiàn)有數(shù)據(jù)損壞,它會(huì)把壞掉的block報(bào)告給namenode同時(shí)重新讀取其他datanode上的其他block備份。

    這種設(shè)計(jì)模式的一個(gè)好處是,文件讀取是遍布這個(gè)集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點(diǎn)瓶頸問(wèn)題從而可以更大的擴(kuò)充集群的規(guī)模。

    Hadoop中的網(wǎng)絡(luò)拓?fù)?/p>

    在Hadoop集群中如何衡量?jī)蓚€(gè)節(jié)點(diǎn)的遠(yuǎn)近呢?要知道,在高速處理數(shù)據(jù)時(shí),數(shù)據(jù)處理速率的唯一限制因素就是數(shù)據(jù)在不同節(jié)點(diǎn)間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量?jī)蓚€(gè)節(jié)點(diǎn)距離大小的標(biāo)準(zhǔn)。

    但是計(jì)算兩個(gè)節(jié)點(diǎn)之間的帶寬是比較復(fù)雜的,而且它需要在一個(gè)靜態(tài)的集群下才能衡量,但Hadoop集群一般是隨著數(shù)據(jù)處理的規(guī)模動(dòng)態(tài)變化的(且兩兩節(jié)點(diǎn)直接相連的連接數(shù)是節(jié)點(diǎn)數(shù)的平方)。于是Hadoop使用了一個(gè)簡(jiǎn)單的方法來(lái)衡量距離,它把集群內(nèi)的網(wǎng)絡(luò)表示成一個(gè)樹(shù)結(jié)構(gòu),兩個(gè)節(jié)點(diǎn)之間的距離就是他們離共同祖先節(jié)點(diǎn)的距離之和。樹(shù)一般按數(shù)據(jù)中心(datacenter),機(jī)架(rack),計(jì)算節(jié)點(diǎn)(datanode)的結(jié)構(gòu)組織。計(jì)算節(jié)點(diǎn)上的本地運(yùn)算速度最快,跨數(shù)據(jù)中心的計(jì)算速度最慢(現(xiàn)在跨數(shù)據(jù)中心的Hadoop集群用的還很少,一般都是在一個(gè)數(shù)據(jù)中心內(nèi)做運(yùn)算的)。

    假如有個(gè)計(jì)算節(jié)點(diǎn)n1處在數(shù)據(jù)中心c1的機(jī)架r1上,它可以表示為/c1/r1/n1,下面是不同情況下兩個(gè)節(jié)點(diǎn)的距離:

    • distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)

    • distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)

    • distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)

    • distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)

    如下圖所示:

    Hadoop

    寫(xiě)文件

    現(xiàn)在我們來(lái)看一下Hadoop中的寫(xiě)文件機(jī)制解析,通過(guò)寫(xiě)文件機(jī)制我們可以更好的了解一下Hadoop中的一致性模型。

    Hadoop

    上圖為我們展示了一個(gè)創(chuàng)建一個(gè)新文件并向其中寫(xiě)數(shù)據(jù)的例子。

    首先客戶端通過(guò)DistributedFileSystem上的create()方法指明一個(gè)欲創(chuàng)建的文件的文件名(第一步),DistributedFileSystem再通過(guò)RPC調(diào)用向NameNode申請(qǐng)創(chuàng)建一個(gè)新文件(第二步,這時(shí)該文件還沒(méi)有分配相應(yīng)的block)。namenode檢查是否有同名文件存在以及用戶是否有相應(yīng)的創(chuàng)建權(quán)限,如果檢查通過(guò),namenode會(huì)為該文件創(chuàng)建一個(gè)新的記錄,否則的話文件創(chuàng)建失敗,客戶端得到一個(gè)IOException異常。DistributedFileSystem返回一個(gè)FSDataOutputStream以供客戶端寫(xiě)入數(shù)據(jù),與FSDataInputStream類似,F(xiàn)SDataOutputStream封裝了一個(gè)DFSOutputStream用于處理namenode與datanode之間的通信。

    當(dāng)客戶端開(kāi)始寫(xiě)數(shù)據(jù)時(shí)(第三步),DFSOutputStream把寫(xiě)入的數(shù)據(jù)分成包(packet), 放入一個(gè)中間隊(duì)列——數(shù)據(jù)隊(duì)列(data queue)中去。DataStreamer從數(shù)據(jù)隊(duì)列中取數(shù)據(jù),同時(shí)向namenode申請(qǐng)一個(gè)新的block來(lái)存放它已經(jīng)取得的數(shù)據(jù)。namenode選擇一系列合適的datanode(個(gè)數(shù)由文件的replica數(shù)決定)構(gòu)成一個(gè)管道線(pipeline),這里我們假設(shè)replica為3,所以管道線中就有三個(gè)datanode。DataSteamer把數(shù)據(jù)流式的寫(xiě)入到管道線中的第一個(gè)datanode中(第四步),第一個(gè)datanode再把接收到的數(shù)據(jù)轉(zhuǎn)到第二個(gè)datanode中(第四步),以此類推。

    DFSOutputStream同時(shí)也維護(hù)著另一個(gè)中間隊(duì)列——確認(rèn)隊(duì)列(ack queue),確認(rèn)隊(duì)列中的包只有在得到管道線中所有的datanode的確認(rèn)以后才會(huì)被移出確認(rèn)隊(duì)列(第五步)。

    如果某個(gè)datanode在寫(xiě)數(shù)據(jù)的時(shí)候當(dāng)?shù)袅耍旅孢@些對(duì)用戶透明的步驟會(huì)被執(zhí)行:

    1)管道線關(guān)閉,所有確認(rèn)隊(duì)列上的數(shù)據(jù)會(huì)被挪到數(shù)據(jù)隊(duì)列的首部重新發(fā)送,這樣可以確保管道線中當(dāng)?shù)舻膁atanode下流的datanode不會(huì)因?yàn)楫?dāng)?shù)舻膁atanode而丟失數(shù)據(jù)包。

    2)在還在正常運(yùn)行的datanode上的當(dāng)前block上做一個(gè)標(biāo)志,這樣當(dāng)當(dāng)?shù)舻膁atanode重新啟動(dòng)以后namenode就會(huì)知道該datanode上哪個(gè)block是剛才當(dāng)機(jī)時(shí)殘留下的局部損壞block,從而可以把它刪掉。

    3)已經(jīng)當(dāng)?shù)舻膁atanode從管道線中被移除,未寫(xiě)完的block的其他數(shù)據(jù)繼續(xù)被寫(xiě)入到其他兩個(gè)還在正常運(yùn)行的datanode中去,namenode知道這個(gè)block還處在under-replicated狀態(tài)(也即備份數(shù)不足的狀態(tài))下,然后他會(huì)安排一個(gè)新的replica從而達(dá)到要求的備份數(shù),后續(xù)的block寫(xiě)入方法同前面正常時(shí)候一樣。

    有可能管道線中的多個(gè)datanode當(dāng)?shù)簦m然不太經(jīng)常發(fā)生),但只要dfs.replication.min(默認(rèn)為1)個(gè)replica被創(chuàng)建,我們就認(rèn)為該創(chuàng)建成功了。剩余的replica會(huì)在以后異步創(chuàng)建以達(dá)到指定的replica數(shù)。

    當(dāng)客戶端完成寫(xiě)數(shù)據(jù)后,它會(huì)調(diào)用close()方法(第六步)。這個(gè)操作會(huì)沖洗(flush)所有剩下的package到pipeline中,等待這些package確認(rèn)成功,然后通知namenode寫(xiě)入文件成功(第七步)。這時(shí)候namenode就知道該文件由哪些block組成(因?yàn)镈ataStreamer向namenode請(qǐng)求分配新block,namenode當(dāng)然會(huì)知道它分配過(guò)哪些blcok給給定文件),它會(huì)等待最少的replica數(shù)被創(chuàng)建,然后成功返回。

    replica是如何分布的

    Hadoop在創(chuàng)建新文件時(shí)是如何選擇block的位置的呢,綜合來(lái)說(shuō),要考慮以下因素:帶寬(包括寫(xiě)帶寬和讀帶寬)和數(shù)據(jù)安全性。如果我們把三個(gè)備份全部放在一個(gè)datanode上,雖然可以避免了寫(xiě)帶寬的消耗,但幾乎沒(méi)有提供數(shù)據(jù)冗余帶來(lái)的安全性,因?yàn)槿绻@個(gè)datanode當(dāng)機(jī),那么這個(gè)文件的所有數(shù)據(jù)就全部丟失了。另一個(gè)極端情況是,如果把三個(gè)冗余備份全部放在不同的機(jī)架,甚至數(shù)據(jù)中心里面,雖然這樣數(shù)據(jù)會(huì)安全,但寫(xiě)數(shù)據(jù)會(huì)消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個(gè)默認(rèn)replica分配策略(Hadoop 1.X以后允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認(rèn)分配策略是把第一個(gè)備份放在與客戶端相同的datanode上(如果客戶端在集群外運(yùn)行,就隨機(jī)選取一個(gè)datanode來(lái)存放第一個(gè)replica),第二個(gè)replica放在與第一個(gè)replica不同機(jī)架的一個(gè)隨機(jī)datanode上,第三個(gè)replica放在與第二個(gè)replica相同機(jī)架的隨機(jī)datanode上。如果replica數(shù)大于三,則隨后的replica在集群中隨機(jī)存放,Hadoop會(huì)盡量避免過(guò)多的replica存放在同一個(gè)機(jī)架上。選取replica的放置位置后,管道線的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)如下所示:

    Hadoop

    總體來(lái)說(shuō),上述默認(rèn)的replica分配策略給了我們很好的可用性(blocks放置在兩個(gè)rack上,較為安全),寫(xiě)帶寬優(yōu)化(寫(xiě)數(shù)據(jù)只需要跨越一個(gè)rack),讀帶寬優(yōu)化(你可以從兩個(gè)機(jī)架中選擇較近的一個(gè)讀?。?/p>

    一致性模型

    HDFS某些地方為了性能可能會(huì)不符合POSIX(是的,你沒(méi)有看錯(cuò),POSIX不僅僅只適用于linux/unix, Hadoop 使用了POSIX的設(shè)計(jì)來(lái)實(shí)現(xiàn)對(duì)文件系統(tǒng)文件流的讀取 ),所以它看起來(lái)可能與你所期望的不同,要注意。

    創(chuàng)建了一個(gè)文件以后,它是可以在命名空間(namespace)中可以看到的:

    Path p = new Path("p");

    fs.create(p);

    assertThat(fs.exists(p), is(true));

    但是任何向此文件中寫(xiě)入的數(shù)據(jù)并不能保證是可見(jiàn)的,即使你flush了已經(jīng)寫(xiě)入的數(shù)據(jù),此文件的長(zhǎng)度可能仍然為零:

    Path p = new Path("p");

    OutputStream out = fs.create(p);

    out.write("content".getBytes("UTF-8"));

    out.flush();

    assertThat(fs.getFileStatus(p).getLen(), is(0L));

    這是因?yàn)?,在Hadoop中,只有滿一個(gè)block數(shù)據(jù)量的數(shù)據(jù)被寫(xiě)入文件后,此文件中的內(nèi)容才是可見(jiàn)的(即這些數(shù)據(jù)會(huì)被寫(xiě)入到硬盤(pán)中去),所以當(dāng)前正在寫(xiě)的block中的內(nèi)容總是不可見(jiàn)的。

    Hadoop提供了一種強(qiáng)制使buffer中的內(nèi)容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調(diào)用了sync()方法后,Hadoop保證所有已經(jīng)被寫(xiě)入的數(shù)據(jù)都被沖洗到了管道線中的datanode中,并且對(duì)所有讀者都可見(jiàn)了:

    Path p = new Path("p");

    FSDataOutputStream out = fs.create(p);

    out.write("content".getBytes("UTF-8"));

    out.flush();

    out.sync();

    assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

    這個(gè)方法就像POSIX中的fsync系統(tǒng)調(diào)用(它沖洗給定文件描述符中的所有緩沖數(shù)據(jù)到磁盤(pán)中)。例如,使用java API寫(xiě)一個(gè)本地文件,我們可以保證在調(diào)用flush()和同步化后可以看到已寫(xiě)入的內(nèi)容:

    FileOutputStream out = new FileOutputStream(localFile);

    out.write("content".getBytes("UTF-8"));

    out.flush(); // flush to operating system

    out.getFD().sync(); // sync to disk (getFD()返回與該流所對(duì)應(yīng)的文件描述符)

    assertThat(localFile.length(), is(((long) "content".length())));

    在HDFS中關(guān)閉一個(gè)流隱式的調(diào)用了sync()方法:

    Path p = new Path("p");

    OutputStream out = fs.create(p);

    out.write("content".getBytes("UTF-8"));

    out.close();

    assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

    由于Hadoop中的一致性模型限制,如果我們不調(diào)用sync()方法的話,我們很可能會(huì)丟失多大一個(gè)block的數(shù)據(jù)。這是難以接受的,所以我們應(yīng)該使用sync()方法來(lái)確保數(shù)據(jù)已經(jīng)寫(xiě)入磁盤(pán)。但頻繁調(diào)用sync()方法也是不好的,因?yàn)闀?huì)造成很多額外開(kāi)銷。我們可以再寫(xiě)入一定量數(shù)據(jù)后調(diào)用sync()方法一次,至于這個(gè)具體的數(shù)據(jù)量大小就要根據(jù)你的應(yīng)用程序而定了,在不影響你的應(yīng)用程序的性能的情況下,這個(gè)數(shù)據(jù)量應(yīng)越大越好。

    請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程(請(qǐng)簡(jiǎn)述hdfs的讀寫(xiě)流程)

    二、如何使用Hadoop讀寫(xiě)數(shù)據(jù)庫(kù)

    我們的一些應(yīng)用程序中,常常避免不了要與數(shù)據(jù)庫(kù)進(jìn)行交互,而在我們的hadoop中,有時(shí)候也需要和數(shù)據(jù)庫(kù)進(jìn)行交互,比如說(shuō),數(shù)據(jù)分析的結(jié)果存入數(shù)據(jù)庫(kù),

    或者是,讀取數(shù)據(jù)庫(kù)的信息寫(xiě)入HDFS上,不過(guò)直接使用MapReduce操作數(shù)據(jù)庫(kù),這種情況在現(xiàn)實(shí)開(kāi)發(fā)還是比較少,一般我們會(huì)采用Sqoop來(lái)進(jìn)行數(shù)

    據(jù)的遷入,遷出,使用Hive分析數(shù)據(jù)集,大多數(shù)情況下,直接使用Hadoop訪問(wèn)關(guān)系型數(shù)據(jù)庫(kù),可能產(chǎn)生比較大的數(shù)據(jù)訪問(wèn)壓力,尤其是在數(shù)據(jù)庫(kù)還是單機(jī)

    的情況下,情況可能更加糟糕,在集群的模式下壓力會(huì)相對(duì)少一些。

    那么,今天散仙就來(lái)看下,如何直接使用Hadoop1.2.0的MR來(lái)讀寫(xiě)操作數(shù)據(jù)庫(kù),hadoop的API提供了DBOutputFormat和

    DBInputFormat這兩個(gè)類,來(lái)進(jìn)行與數(shù)據(jù)庫(kù)交互,除此之外,我們還需要定義一個(gè)類似JAVA

    Bean的實(shí)體類,來(lái)與數(shù)據(jù)庫(kù)的每行記錄進(jìn)行對(duì)應(yīng),通常這個(gè)類要實(shí)現(xiàn)Writable和DBWritable接口,來(lái)重寫(xiě)里面的4個(gè)方法以對(duì)應(yīng)獲取每行記

    三、怎樣將數(shù)據(jù)庫(kù)數(shù)據(jù)寫(xiě)入到hdfs中

    如下面這個(gè)shell腳本:

    #Oracle的連接字符串,其中包含了Oracle的地址,SID,和端口號(hào)

    CONNECTURL=jdbc:oracle:thin:@20.135.60.21:1521:DWRAC2

    #使用的用戶名

    ORACLENAME=kkaa

    #使用的密碼

    ORACLEPASSWORD=kkaa123

    #需要從Oracle中導(dǎo)入的表名

    oralceTableName=tt

    #需要從Oracle中導(dǎo)入的表中的字段名

    columns=AREA_ID,TEAM_NAME

    #將Oracle中的數(shù)據(jù)導(dǎo)入到HDFS后的存放路徑

    hdfsPath=apps/as/hive/$oralceTableName

    #執(zhí)行導(dǎo)入邏輯。將Oracle中的數(shù)據(jù)導(dǎo)入到HDFS中

    sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'

    執(zhí)行這個(gè)腳本之后,導(dǎo)入程序就完成了。

    四、如何使用Java API讀寫(xiě)HDFS

    Java API讀寫(xiě)HDFS

    public class FSOptr {

    /**

    * @param args

    */

    public static void main(String[] args) throws Exception {

    // TODO Auto-generated method stub

    Configuration conf = new Configuration();

    makeDir(conf);

    rename(conf);

    delete(conf);

    }

    // 創(chuàng)建文件目錄

    private static void makeDir(Configuration conf) throws Exception {

    FileSystem fs = FileSystem.get(conf);

    Path dir = new Path("/user/hadoop/data/20140318");

    boolean result = fs.mkdirs(dir);// 創(chuàng)建文件夾

    System.out.println("make dir :" + result);

    // 創(chuàng)建文件,并寫(xiě)入內(nèi)容

    Path dst = new Path("/user/hadoop/data/20140318/tmp");

    byte[] buff = "hello,hadoop!".getBytes();

    FSDataOutputStream outputStream = fs.create(dst);

    outputStream.write(buff, 0, buff.length);

    outputStream.close();

    FileStatus files[] = fs.listStatus(dst);

    for (FileStatus file : files) {

    System.out.println(file.getPath());

    }

    fs.close();

    }

    // 重命名文件

    private static void rename(Configuration conf) throws Exception {

    FileSystem fs = FileSystem.get(conf);

    Path oldName = new Path("/user/hadoop/data/20140318/1.txt");

    Path newName = new Path("/user/hadoop/data/20140318/2.txt");

    fs.rename(oldName, newName);

    FileStatus files[] = fs.listStatus(new Path(

    "/user/hadoop/data/20140318"));

    for (FileStatus file : files) {

    System.out.println(file.getPath());

    }

    fs.close();

    }

    // 刪除文件

    @SuppressWarnings("deprecation")

    private static void delete(Configuration conf) throws Exception {

    FileSystem fs = FileSystem.get(conf);

    Path path = new Path("/user/hadoop/data/20140318");

    if (fs.isDirectory(path)) {

    FileStatus files[] = fs.listStatus(path);

    for (FileStatus file : files) {

    fs.delete(file.getPath());

    }

    } else {

    fs.delete(path);

    }

    // 或者

    fs.delete(path, true);

    fs.close();

    }

    /**

    * 下載,將hdfs文件下載到本地磁盤(pán)

    *

    * @param localSrc1

    * 本地的文件地址,即文件的路徑

    * @param hdfsSrc1

    * 存放在hdfs的文件地址

    */

    public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

    Configuration conf = new Configuration();

    FileSystem fs = null;

    try {

    fs = FileSystem.get(URI.create(hdfsSrc1), conf);

    Path hdfs_path = new Path(hdfsSrc1);

    Path local_path = new Path(localSrc1);

    fs.copyToLocalFile(hdfs_path, local_path);

    return true;

    } catch (IOException e) {

    e.printStackTrace();

    }

    return false;

    }

    /**

    * 上傳,將本地文件copy到hdfs系統(tǒng)中

    *

    * @param localSrc

    * 本地的文件地址,即文件的路徑

    * @param hdfsSrc

    * 存放在hdfs的文件地址

    */

    public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

    InputStream in;

    try {

    in = new BufferedInputStream(new FileInputStream(localSrc));

    Configuration conf = new Configuration();// 得到配置對(duì)象

    FileSystem fs; // 文件系統(tǒng)

    try {

    fs = FileSystem.get(URI.create(hdfsSrc), conf);

    // 輸出流,創(chuàng)建一個(gè)輸出流

    OutputStream out = fs.create(new Path(hdfsSrc),

    new Progressable() {

    // 重寫(xiě)progress方法

    public void progress() {

    // System.out.println("上傳完一個(gè)設(shè)定緩存區(qū)大小容量的文件!");

    }

    });

    // 連接兩個(gè)流,形成通道,使輸入流向輸出流傳輸數(shù)據(jù),

    IOUtils.copyBytes(in, out, 10240, true); // in為輸入流對(duì)象,out為輸出流對(duì)象,4096為緩沖區(qū)大小,true為上傳后關(guān)閉流

    return true;

    } catch (IOException e) {

    e.printStackTrace();

    }

    } catch (FileNotFoundException e) {

    e.printStackTrace();

    }

    return false;

    }

    /**

    * 移動(dòng)

    *

    * @param old_st原來(lái)存放的路徑

    * @param new_st移動(dòng)到的路徑

    */

    public boolean moveFileName(String old_st, String new_st) {

    try {

    // 下載到服務(wù)器本地

    boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文檔/temp");

    Configuration conf = new Configuration();

    FileSystem fs = null;

    // 刪除源文件

    try {

    fs = FileSystem.get(URI.create(old_st), conf);

    Path hdfs_path = new Path(old_st);

    fs.delete(hdfs_path);

    } catch (IOException e) {

    e.printStackTrace();

    }

    // 從服務(wù)器本地傳到新路徑

    new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));

    boolean uplod_flag = sendToHdfs1("/home/hadoop/文檔/temp", new_st);

    if (down_flag && uplod_flag) {

    return true;

    }

    } catch (Exception e) {

    e.printStackTrace();

    }

    return false;

    }

    // copy本地文件到hdfs

    private static void CopyFromLocalFile(Configuration conf) throws Exception {

    FileSystem fs = FileSystem.get(conf);

    Path src = new Path("/home/hadoop/word.txt");

    Path dst = new Path("/user/hadoop/data/");

    fs.copyFromLocalFile(src, dst);

    fs.close();

    }

    // 獲取給定目錄下的所有子目錄以及子文件

    private static void getAllChildFile(Configuration conf) throws Exception {

    FileSystem fs = FileSystem.get(conf);

    Path path = new Path("/user/hadoop");

    getFile(path, fs);

    }

    private static void getFile(Path path, FileSystem fs)throws Exception {

    FileStatus[] fileStatus = fs.listStatus(path);

    for (int i = 0; i < fileStatus.length; i++) {

    if (fileStatus[i].isDir()) {

    Path p = new Path(fileStatus[i].getPath().toString());

    getFile(p, fs);

    } else {

    System.out.println(fileStatus[i].getPath().toString());

    }

    }

    }

    //判斷文件是否存在

    private static boolean isExist(Configuration conf,String path)throws Exception{

    FileSystem fileSystem = FileSystem.get(conf);

    return fileSystem.exists(new Path(path));

    }

    //獲取hdfs集群所有主機(jī)結(jié)點(diǎn)數(shù)據(jù)

    private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

    FileSystem fs = FileSystem.get(conf);

    DistributedFileSystem hdfs = (DistributedFileSystem)fs;

    DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

    String[] names = new String[dataNodeStats.length];

    System.out.println("list of all the nodes in HDFS cluster:"); //print info

    for(int i=0; i < dataNodeStats.length; i++){

    names[i] = dataNodeStats[i].getHostName();

    System.out.println(names[i]); //print info

    }

    }

    //get the locations of a file in HDFS

    private static void getFileLocation(Configuration conf)throws Exception{

    FileSystem fs = FileSystem.get(conf);

    Path f = new Path("/user/cluster/dfs.txt");

    FileStatus filestatus = fs.getFileStatus(f);

    BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());

    int blkCount = blkLocations.length;

    for(int i=0; i < blkCount; i++){

    String[] hosts = blkLocations[i].getHosts();

    //Do sth with the block hosts

    System.out.println(hosts);

    }

    }

    //get HDFS file last modification time

    private static void getModificationTime(Configuration conf)throws Exception{

    FileSystem fs = FileSystem.get(conf);

    Path f = new Path("/user/cluster/dfs.txt");

    FileStatus filestatus = fs.getFileStatus(f);

    long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch

    Date d = new Date(modificationTime);

    System.out.println(d);

    }

    }

    以上就是關(guān)于請(qǐng)簡(jiǎn)述一下HDFS數(shù)據(jù)讀取和寫(xiě)入流程相關(guān)問(wèn)題的回答。希望能幫到你,如有更多相關(guān)問(wèn)題,您也可以聯(lián)系我們的客服進(jìn)行咨詢,客服也會(huì)為您講解更多精彩的知識(shí)和內(nèi)容。


    推薦閱讀:

    請(qǐng)簡(jiǎn)述網(wǎng)絡(luò)營(yíng)銷策略有哪幾種(請(qǐng)簡(jiǎn)述網(wǎng)絡(luò)營(yíng)銷策略有哪幾種形式)

    請(qǐng)簡(jiǎn)述包裝流程(簡(jiǎn)述包裝工作流程)

    簡(jiǎn)述推銷與營(yíng)銷的區(qū)別和聯(lián)系(請(qǐng)簡(jiǎn)述推銷與營(yíng)銷的區(qū)別和聯(lián)系)

    廣告高級(jí)背景圖片素材(廣告高級(jí)背景圖片素材高清)

    家裝報(bào)價(jià)清單明細(xì)表(全包裝修價(jià)格一覽表)