This commit is contained in:
2019-09-17 13:20:42 -04:00
parent d211d1dc34
commit bef10ce4c9
8352 changed files with 568242 additions and 51 deletions
+160
View File
@@ -0,0 +1,160 @@
'use strict';
var util = require('util');
var Duplex = require('readable-stream').Duplex;
function FirstChunkStream(options, cb) {
var _this = this;
var _state = {
sent: false,
chunks: [],
size: 0
};
if (!(this instanceof FirstChunkStream)) {
return new FirstChunkStream(options, cb);
}
options = options || {};
if (!(cb instanceof Function)) {
throw new Error('FirstChunkStream constructor requires a callback as its second argument.');
}
if (typeof options.chunkLength !== 'number') {
throw new Error('FirstChunkStream constructor requires `options.chunkLength` to be a number.');
}
if (options.objectMode) {
throw new Error('FirstChunkStream doesn\'t support `objectMode` yet.');
}
Duplex.call(this, options);
// Initialize the internal state
_state.manager = createReadStreamBackpressureManager(this);
// Errors management
// We need to execute the callback or emit en error dependending on the fact
// the firstChunk is sent or not
_state.errorHandler = function firstChunkStreamErrorHandler(err) {
processCallback(err, Buffer.concat(_state.chunks, _state.size), _state.encoding, function () {});
};
this.on('error', _state.errorHandler);
// Callback management
function processCallback(err, buf, encoding, done) {
// When doing sync writes + emiting an errror it can happen that
// Remove the error listener on the next tick if an error where fired
// to avoid unwanted error throwing
if (err) {
setImmediate(function () {
_this.removeListener('error', _state.errorHandler);
});
} else {
_this.removeListener('error', _state.errorHandler);
}
_state.sent = true;
cb(err, buf, encoding, function (err, buf, encoding) {
if (err) {
setImmediate(function () {
_this.emit('error', err);
});
return;
}
if (!buf) {
done();
return;
}
_state.manager.programPush(buf, encoding, done);
});
}
// Writes management
this._write = function firstChunkStreamWrite(chunk, encoding, done) {
_state.encoding = encoding;
if (_state.sent) {
_state.manager.programPush(chunk, _state.encoding, done);
} else if (chunk.length < options.chunkLength - _state.size) {
_state.chunks.push(chunk);
_state.size += chunk.length;
done();
} else {
_state.chunks.push(chunk.slice(0, options.chunkLength - _state.size));
chunk = chunk.slice(options.chunkLength - _state.size);
_state.size += _state.chunks[_state.chunks.length - 1].length;
processCallback(null, Buffer.concat(_state.chunks, _state.size), _state.encoding, function () {
if (!chunk.length) {
done();
return;
}
_state.manager.programPush(chunk, _state.encoding, done);
});
}
};
this.on('finish', function firstChunkStreamFinish() {
if (!_state.sent) {
return processCallback(null, Buffer.concat(_state.chunks, _state.size), _state.encoding, function () {
_state.manager.programPush(null, _state.encoding);
});
}
_state.manager.programPush(null, _state.encoding);
});
}
util.inherits(FirstChunkStream, Duplex);
// Utils to manage readable stream backpressure
function createReadStreamBackpressureManager(readableStream) {
var manager = {
waitPush: true,
programmedPushs: [],
programPush: function programPush(chunk, encoding, done) {
done = done || function () {};
// Store the current write
manager.programmedPushs.push([chunk, encoding, done]);
// Need to be async to avoid nested push attempts
// Programm a push attempt
setImmediate(manager.attemptPush);
// Let's say we're ready for a read
readableStream.emit('readable');
readableStream.emit('drain');
},
attemptPush: function () {
var nextPush;
if (manager.waitPush) {
if (manager.programmedPushs.length) {
nextPush = manager.programmedPushs.shift();
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]);
(nextPush[2])();
}
} else {
setImmediate(function () {
// Need to be async to avoid nested push attempts
readableStream.emit('readable');
});
}
}
};
// Patch the readable stream to manage reads
readableStream._read = function streamFilterRestoreRead() {
manager.waitPush = true;
// Need to be async to avoid nested push attempts
setImmediate(manager.attemptPush);
};
return manager;
}
module.exports = FirstChunkStream;
+21
View File
@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (sindresorhus.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
+43
View File
@@ -0,0 +1,43 @@
{
"name": "first-chunk-stream",
"version": "2.0.0",
"description": "Transform the first chunk in a stream",
"license": "MIT",
"repository": "sindresorhus/first-chunk-stream",
"author": {
"name": "Sindre Sorhus",
"email": "sindresorhus@gmail.com",
"url": "sindresorhus.com"
},
"engines": {
"node": ">=0.10.0"
},
"scripts": {
"test": "xo && mocha",
"cover": "istanbul cover --report html _mocha -- test.js -R spec -t 5000"
},
"files": [
"index.js"
],
"keywords": [
"buffer",
"stream",
"streams",
"transform",
"first",
"chunk",
"size",
"min",
"minimum"
],
"dependencies": {
"readable-stream": "^2.0.2"
},
"devDependencies": {
"istanbul": "^0.3.19",
"mocha": "*",
"mocha-lcov-reporter": "0.0.2",
"streamtest": "^1.2.1",
"xo": "*"
}
}
+64
View File
@@ -0,0 +1,64 @@
# first-chunk-stream [![Build Status](https://travis-ci.org/sindresorhus/first-chunk-stream.svg?branch=master)](https://travis-ci.org/sindresorhus/first-chunk-stream)
> Buffer and transform the n first bytes of a stream
## Install
```
$ npm install --save first-chunk-stream
```
## Usage
```js
const fs = require('fs');
const concatStream = require('concat-stream');
const firstChunkStream = require('first-chunk-stream');
// unicorn.txt => unicorn rainbow
fs.createReadStream('unicorn.txt')
.pipe(firstChunkStream({chunkLength: 7}, function (chunk, enc, cb) {
this.push(chunk.toUpperCase());
cb();
}))
.pipe(concatStream(function (data) {
if (data.length < 7) {
throw new Error('Couldn\'t get the minimum required first chunk length');
}
console.log(data);
//=> 'UNICORN rainbow'
}));
```
## API
### firstChunkStream([options], transform)
Returns a `FirstChunkStream` instance.
#### options
The options object is passed to the [`Duplex` stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex) constructor allowing you to customize your stream behavior. In addition you can specify the following option:
##### chunkLength
Type: `number`
How many bytes you want to buffer.
#### transform(chunk, encoding, callback)
Type: `function`
The function that gets the required `options.chunkLength` bytes.
Note that the buffer can have a smaller length than the required one. In that case, it will be due to the fact that the complete stream contents has a length less than the `òptions.chunkLength` value. You should check for this yourself if you strictly depend on the length.
## License
MIT © [Sindre Sorhus](http://sindresorhus.com)