Node.js 中的流(Stream)是出了名的難用甚至是難以理解?!疽曨l教程推薦:nodejs視頻教程 】
用 Dominic Tarr 的話來說:“流是 Node 中最好的,也是最容易被誤解的想法?!奔词故?Redux 的創(chuàng)建者和 React.js 的核心團隊成員 Dan Abramov 也害怕 Node 流。
本文將幫助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!
什么是流(Stream)?
流(Stream)是為 Node.js 應(yīng)用提供動力的基本概念之一。它們是數(shù)據(jù)處理方法,用于將輸入的數(shù)據(jù)順序讀取或把數(shù)據(jù)寫入輸出。
流是一種以有效方式處理讀寫文件、網(wǎng)絡(luò)通信或任何類型的端到端信息交換的方式。
流的處理方式非常獨特,流不是像傳統(tǒng)方式那樣將文件一次全部讀取到存儲器中,而是逐段讀取數(shù)據(jù)塊并處理數(shù)據(jù)的內(nèi)容,不將其全部保留在內(nèi)存中。
這種方式使流在處理大量數(shù)據(jù)時非常強大,例如,文件的大小可能大于可用的內(nèi)存空間,從而無法將整個文件讀入內(nèi)存進(jìn)行處理。那是流的用武之地!
既能用流來處理較小的數(shù)據(jù)塊,也可以讀取較大的文件。
以 YouTube 或 Netflix 之類的“流媒體”服務(wù)為例:這些服務(wù)不會讓你你立即下載視頻和音頻文件。取而代之的是,你的瀏覽器以連續(xù)的塊流形式接收視頻,從而使接收者幾乎可以立即開始觀看和收聽。
但是,流不僅涉及處理媒體和大數(shù)據(jù)。它們還在代碼中賦予了我們“可組合性”的力量。考慮可組合性的設(shè)計意味著能夠以某種方式組合多個組件以產(chǎn)生相同類型的結(jié)果。在 Node.js 中,可以通過流在其他較小的代碼段中傳遞數(shù)據(jù),從而組成功能強大的代碼段。
為什么使用流?
與其他數(shù)據(jù)處理方法相比,流基本上具有兩個主要優(yōu)點:
內(nèi)存效率:你無需事先把大量數(shù)據(jù)加載到內(nèi)存中即可進(jìn)行處理時間效率:得到數(shù)據(jù)后立即開始處所需的時間大大減少,不必等到整個有效數(shù)據(jù)全部發(fā)送完畢才開始處理Node.js 中有 4 種流:
可寫流:可以向其中寫入數(shù)據(jù)的流。例如,fs.createWriteStream()
使我們可以使用流將數(shù)據(jù)寫入文件??勺x流:可從中讀取數(shù)據(jù)的流。例如:fs.createReadStream()
讓我們讀取文件的內(nèi)容。雙工流(可讀寫的流):可讀和可寫的流。例如,net.Socket
Transform:可在寫入和讀取時修改或轉(zhuǎn)換數(shù)據(jù)。例如在文件壓縮的情況下,你可以在文件中寫入壓縮數(shù)據(jù),也可以從文件中讀取解壓縮的數(shù)據(jù)。如果你已經(jīng)使用過 Node.js,則可能遇到過流。例如在基于 Node.js 的 HTTP 服務(wù)器中,request
是可讀流,而 response
是可寫流。你可能用過 fs
模塊,該模塊可讓你用可讀和可寫文件流。每當(dāng)使用 Express 時,你都在使用流與客戶端進(jìn)行交互,而且由于 TCP 套接字、TLS棧和其他連接都基于 Node.js,所以在每個可以使用的數(shù)據(jù)庫連接驅(qū)動的程序中使用流。
如何創(chuàng)建可讀流?
首先需要可讀性流,然后將其初始化。
const Stream = require('stream') const readableStream = new Stream.Readable()
現(xiàn)在,流已初始化,可以向其發(fā)送數(shù)據(jù)了:
readableStream.push('ping!') readableStream.push('pong!')
異步迭代器
強烈建議在使用流時配合異步迭代器(async iterator)。根據(jù) Axel Rauschmayer 博士的說法,異步迭代是一種用于異步檢索數(shù)據(jù)容器內(nèi)容的協(xié)議(這意味著當(dāng)前“任務(wù)”可以在檢索項目之前被暫停)。另外必須提及的是,流異步迭代器實現(xiàn)使用內(nèi)部的 readable
事件。
從可讀流中讀取時,可以使用異步迭代器:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!\\n'
也可以用字符串收集可讀流的內(nèi)容:
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
注意,在這種情況下必須使用異步函數(shù),因為我們想返回 Promise。
請切記不要將異步功能與 EventEmitter
混合使用,因為當(dāng)前在事件處理程序中發(fā)出拒絕時,無法捕獲拒絕,從而導(dǎo)致難以跟蹤錯誤和內(nèi)存泄漏。目前的實踐是始終將異步函數(shù)的內(nèi)容包裝在 try/catch 塊中并處理錯誤,但這很容易出錯。 這個 pull request 旨在解決一旦其落在 Node 核心上產(chǎn)生的問題。
要了解有關(guān)異步迭代的 Node.js 流的更多信息,請查看這篇很棒的文章。
Readable.from():從可迭代對象創(chuàng)建可讀流
stream.Readable.from(iterable, [options])
這是一種實用方法,用于從迭代器中創(chuàng)建可讀流,該迭代器保存可迭代對象中包含的數(shù)據(jù)??傻鷮ο罂梢允峭娇傻鷮ο蠡虍惒娇傻鷮ο?。參數(shù)選項是可選的,除其他作用外,還可以用于指定文本編碼。
const { Readable } = require('stream'); async function * generate() { yield 'hello'; yield 'streams'; } const readable = Readable.from(generate()); readable.on('data', (chunk) => { console.log(chunk); });
兩種讀取模式
根據(jù) Streams API,可讀流有效地以兩種模式之一運行:flowing和paused??勺x流可以處于對象模式,無論處于 flowing 模式還是 paused 模式。
在流模式下,將自動從底層系統(tǒng)讀取數(shù)據(jù),并通過EventEmitter
接口使用事件將其盡快提供給程序。在 paused 模式下,必須顯式調(diào)用 stream.read()
方法以從流中讀取數(shù)據(jù)塊。在 flowing 模式中,要從流中讀取數(shù)據(jù),可以監(jiān)聽數(shù)據(jù)事件并附加回調(diào)。當(dāng)有大量數(shù)據(jù)可用時,可讀流將發(fā)出一個數(shù)據(jù)事件,并執(zhí)行你的回調(diào)。看下面的代碼片段:
var fs = require("fs"); var data = ''; var readerStream = fs.createReadStream('file.txt'); //Create a readable stream readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. // Handle stream events --> data, end, and error readerStream.on('data', function(chunk) { data += chunk; }); readerStream.on('end',function() { console.log(data); }); readerStream.on('error', function(err) { console.log(err.stack); }); console.log("Program Ended");
函數(shù)調(diào)用 fs.createReadStream()
給你一個可讀流。最初流處于靜態(tài)狀態(tài)。一旦你偵聽數(shù)據(jù)事件并附加了回調(diào),它就會開始流動。之后將讀取大塊數(shù)據(jù)并將其傳遞給你的回調(diào)。流實現(xiàn)者決定發(fā)送數(shù)據(jù)事件的頻率。例如,每當(dāng)有幾 KB 的數(shù)據(jù)被讀取時,HTTP 請求就可能發(fā)出一個數(shù)據(jù)事件。當(dāng)從文件中讀取數(shù)據(jù)時,你可能會決定讀取一行后就發(fā)出數(shù)據(jù)事件。
當(dāng)沒有更多數(shù)據(jù)要讀取(結(jié)束)時,流將發(fā)出結(jié)束事件。在以上代碼段中,我們監(jiān)聽此事件以在結(jié)束時得到通知。
另外,如果有錯誤,流將發(fā)出并通知錯誤。
在 paused 模式下,你只需在流實例上重復(fù)調(diào)用 read()
,直到讀完所有數(shù)據(jù)塊為止,如以下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; var chunk; readableStream.on('readable', function() { while ((chunk=readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data) });
read()
函數(shù)從內(nèi)部緩沖區(qū)讀取一些數(shù)據(jù)并將其返回。當(dāng)沒有內(nèi)容可讀取時返回 null
。所以在 while
循環(huán)中,我們檢查是否為 null
并終止循環(huán)。請注意,當(dāng)可以從流中讀取大量數(shù)據(jù)時,將會發(fā)出可讀事件。
所有 Readable
流均以 paused 模式開始,但可以通過以下方式之一切換為 flowing 模式:
stream.resume()
方法。調(diào)用 stream.pipe()
方法將數(shù)據(jù)發(fā)送到可寫對象。Readable
可以使以下方法之一切換回 paused 模式:
stream.pause()
方法。如果有管道目標(biāo),請刪除所有管道目標(biāo)??梢酝ㄟ^調(diào)用 stream.unpipe()
方法來刪除多個管道目標(biāo)。一個需要記住的重要概念是,除非提供了一種用于消耗或忽略該數(shù)據(jù)的機制,否則 Readable
將不會生成數(shù)據(jù)。如果使用機制被禁用或取消,則 Readable
將會試圖停止生成數(shù)據(jù)。添加 readable
事件處理會自動使流停止 flowing,并通過 read.read()
得到數(shù)據(jù)。如果刪除了 readable
事件處理,那么如果存在 'data' 事件處理,則流將再次開始 flowing。
如何創(chuàng)建可寫流?
要將數(shù)據(jù)寫入可寫流,你需要在流實例上調(diào)用 write()
。如以下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk); });
上面的代碼很簡單。它只是簡單地從輸入流中讀取數(shù)據(jù)塊,并使用 write()
寫入目的地。該函數(shù)返回一個布爾值,指示操作是否成功。如果為 true
,則寫入成功,你可以繼續(xù)寫入更多數(shù)據(jù)。如果返回 false
,則表示出了點問題,你目前無法寫任何內(nèi)容??蓪懥鲗⑼ㄟ^發(fā)出 drain
事件來通知你什么時候可以開始寫入更多數(shù)據(jù)。
調(diào)用 writable.end()
方法表示沒有更多數(shù)據(jù)將被寫入 Writable。如果提供,則可選的回調(diào)函數(shù)將作為 finish
事件的偵聽器附加。
// Write 'hello, ' and then end with 'world!'. const fs = require('fs'); const file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // Writing more now is not allowed!
你可以用可寫流從可讀流中讀取數(shù)據(jù):
const Stream = require('stream') const readableStream = new Stream.Readable() const writableStream = new Stream.Writable() writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() } readableStream.pipe(writableStream) readableStream.push('ping!') readableStream.push('pong!') writableStream.end()
還可以用異步迭代器來寫入可寫流,建議使用
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text.\\n'], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\\n');
stream.finished()
的默認(rèn)版本是基于回調(diào)的,但是可以通過 util.promisify()
轉(zhuǎn)換為基于 Promise 的版本(A行)。
在此例中,使用以下兩種模式:
Writing to a writable stream while handling backpressure (line B):
在處理 backpressure
時寫入可寫流(B行):
if (!writable.write(chunk)) { await once(writable, 'drain'); }
關(guān)閉可寫流,并等待寫入完成(C行):
writable.end(); await finished(writable);pipeline()
pipeline(管道)是一種機制,可以將一個流的輸出作為另一流的輸入。它通常用于從一個流中獲取數(shù)據(jù)并將該流的輸出傳遞到另一個流。管道操作沒有限制。換句話說,管道可用于分多個步驟處理流數(shù)據(jù)。
在 Node 10.x 中引入了 stream.pipeline()
。這是一種模塊方法,用于在流轉(zhuǎn)發(fā)錯誤和正確清理之間進(jìn)行管道傳輸,并在管道完成后提供回調(diào)。
這是使用管道的例子:
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // 使用 pipeline API 可以輕松將一系列流 // 通過管道傳輸在一起,并在管道完全完成后得到通知。 // 一個有效地用 gzip壓縮巨大視頻文件的管道: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } } );
由于pipe
不安全,應(yīng)使用 pipeline
代替 pipe
。
流模塊
Node.js 流模塊 提供了構(gòu)建所有流 API 的基礎(chǔ)。
Stream 模塊是 Node.js 中默認(rèn)提供的原生模塊。 Stream 是 EventEmitter 類的實例,該類在 Node 中異步處理事件。因此流本質(zhì)上是基于事件的。
要訪問流模塊:
const stream = require('stream');
stream
模塊對于創(chuàng)建新型流實例非常有用。通常不需要使用 stream
模塊來消耗流。
流驅(qū)動的 Node API
由于它們的優(yōu)點,許多 Node.js 核心模塊提供了原生流處理功能,最值得注意的是:
net.Socket
是流所基于的主 API 節(jié)點,它是以下大多數(shù) API 的基礎(chǔ)process.stdin
返回連接到 stdin 的流process.stdout
返回連接到 stdout 的流process.stderr
返回連接到 stderr 的流fs.createReadStream()
創(chuàng)建一個可讀的文件流fs.createWriteStream()
創(chuàng)建可寫的文件流net.connect()
啟動基于流的連接http.request()
返回 http.ClientRequest
類的實例,它是可寫流zlib.createGzip()
使用gzip(一種壓縮算法)將數(shù)據(jù)壓縮到流中zlib.createGunzip()
解壓縮 gzip 流。zlib.createDeflate()
deflate(壓縮算法)將數(shù)據(jù)壓縮到流中zlib.createInflate()
解壓縮一個deflate流流 備忘單:
查看更多:Node.js 流速查表
以下是與可寫流相關(guān)的一些重要事件:
error
–表示在寫或配置管道時發(fā)生了錯誤。pipeline
– 當(dāng)把可讀流傳遞到可寫流中時,該事件由可寫流發(fā)出。unpipe
– 當(dāng)你在可讀流上調(diào)用 unpipe 并停止將其輸送到目標(biāo)流中時發(fā)出。結(jié)論這就是所有關(guān)于流的基礎(chǔ)知識。流、管道和鏈?zhǔn)?Node.js 的核心和最強大的功能。流確實可以幫你編寫簡潔而高效的代碼來執(zhí)行 I/O。
另外,還有一個值得期待的 Node.js 戰(zhàn)略計劃,稱為 BOB,旨在改善 Node.js 的內(nèi)部數(shù)據(jù)流以及希望作為未來 Node.js 流數(shù)據(jù)接口的公共 API 的。
英文原文地址:https://nodesource.com/blog/understanding-streams-in-nodejs
作者:Liz Parody
翻譯:瘋狂的技術(shù)宅
相關(guān)推薦:nodejs 教程
網(wǎng)站標(biāo)題:深入理解Node.js中的流(Stream)
轉(zhuǎn)載源于:http://www.ekvhdxd.cn/article28/cjcpjp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、外貿(mào)建站、手機網(wǎng)站建設(shè)、自適應(yīng)網(wǎng)站、搜索引擎優(yōu)化、企業(yè)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)