422 lines
9.8 KiB
JavaScript
422 lines
9.8 KiB
JavaScript
'use strict'
|
|
const MiniPass = require('minipass')
|
|
const EE = require('events').EventEmitter
|
|
const fs = require('fs')
|
|
|
|
let writev = fs.writev
|
|
/* istanbul ignore next */
|
|
if (!writev) {
|
|
// This entire block can be removed if support for earlier than Node.js
|
|
// 12.9.0 is not needed.
|
|
const binding = process.binding('fs')
|
|
const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
|
|
|
|
writev = (fd, iovec, pos, cb) => {
|
|
const done = (er, bw) => cb(er, bw, iovec)
|
|
const req = new FSReqWrap()
|
|
req.oncomplete = done
|
|
binding.writeBuffers(fd, iovec, pos, req)
|
|
}
|
|
}
|
|
|
|
const _autoClose = Symbol('_autoClose')
|
|
const _close = Symbol('_close')
|
|
const _ended = Symbol('_ended')
|
|
const _fd = Symbol('_fd')
|
|
const _finished = Symbol('_finished')
|
|
const _flags = Symbol('_flags')
|
|
const _flush = Symbol('_flush')
|
|
const _handleChunk = Symbol('_handleChunk')
|
|
const _makeBuf = Symbol('_makeBuf')
|
|
const _mode = Symbol('_mode')
|
|
const _needDrain = Symbol('_needDrain')
|
|
const _onerror = Symbol('_onerror')
|
|
const _onopen = Symbol('_onopen')
|
|
const _onread = Symbol('_onread')
|
|
const _onwrite = Symbol('_onwrite')
|
|
const _open = Symbol('_open')
|
|
const _path = Symbol('_path')
|
|
const _pos = Symbol('_pos')
|
|
const _queue = Symbol('_queue')
|
|
const _read = Symbol('_read')
|
|
const _readSize = Symbol('_readSize')
|
|
const _reading = Symbol('_reading')
|
|
const _remain = Symbol('_remain')
|
|
const _size = Symbol('_size')
|
|
const _write = Symbol('_write')
|
|
const _writing = Symbol('_writing')
|
|
const _defaultFlag = Symbol('_defaultFlag')
|
|
const _errored = Symbol('_errored')
|
|
|
|
class ReadStream extends MiniPass {
|
|
constructor (path, opt) {
|
|
opt = opt || {}
|
|
super(opt)
|
|
|
|
this.readable = true
|
|
this.writable = false
|
|
|
|
if (typeof path !== 'string')
|
|
throw new TypeError('path must be a string')
|
|
|
|
this[_errored] = false
|
|
this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
|
|
this[_path] = path
|
|
this[_readSize] = opt.readSize || 16*1024*1024
|
|
this[_reading] = false
|
|
this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
|
|
this[_remain] = this[_size]
|
|
this[_autoClose] = typeof opt.autoClose === 'boolean' ?
|
|
opt.autoClose : true
|
|
|
|
if (typeof this[_fd] === 'number')
|
|
this[_read]()
|
|
else
|
|
this[_open]()
|
|
}
|
|
|
|
get fd () { return this[_fd] }
|
|
get path () { return this[_path] }
|
|
|
|
write () {
|
|
throw new TypeError('this is a readable stream')
|
|
}
|
|
|
|
end () {
|
|
throw new TypeError('this is a readable stream')
|
|
}
|
|
|
|
[_open] () {
|
|
fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
|
|
}
|
|
|
|
[_onopen] (er, fd) {
|
|
if (er)
|
|
this[_onerror](er)
|
|
else {
|
|
this[_fd] = fd
|
|
this.emit('open', fd)
|
|
this[_read]()
|
|
}
|
|
}
|
|
|
|
[_makeBuf] () {
|
|
return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
|
|
}
|
|
|
|
[_read] () {
|
|
if (!this[_reading]) {
|
|
this[_reading] = true
|
|
const buf = this[_makeBuf]()
|
|
/* istanbul ignore if */
|
|
if (buf.length === 0)
|
|
return process.nextTick(() => this[_onread](null, 0, buf))
|
|
fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
|
|
this[_onread](er, br, buf))
|
|
}
|
|
}
|
|
|
|
[_onread] (er, br, buf) {
|
|
this[_reading] = false
|
|
if (er)
|
|
this[_onerror](er)
|
|
else if (this[_handleChunk](br, buf))
|
|
this[_read]()
|
|
}
|
|
|
|
[_close] () {
|
|
if (this[_autoClose] && typeof this[_fd] === 'number') {
|
|
const fd = this[_fd]
|
|
this[_fd] = null
|
|
fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
|
|
}
|
|
}
|
|
|
|
[_onerror] (er) {
|
|
this[_reading] = true
|
|
this[_close]()
|
|
this.emit('error', er)
|
|
}
|
|
|
|
[_handleChunk] (br, buf) {
|
|
let ret = false
|
|
// no effect if infinite
|
|
this[_remain] -= br
|
|
if (br > 0)
|
|
ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
|
|
|
|
if (br === 0 || this[_remain] <= 0) {
|
|
ret = false
|
|
this[_close]()
|
|
super.end()
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
emit (ev, data) {
|
|
switch (ev) {
|
|
case 'prefinish':
|
|
case 'finish':
|
|
break
|
|
|
|
case 'drain':
|
|
if (typeof this[_fd] === 'number')
|
|
this[_read]()
|
|
break
|
|
|
|
case 'error':
|
|
if (this[_errored])
|
|
return
|
|
this[_errored] = true
|
|
return super.emit(ev, data)
|
|
|
|
default:
|
|
return super.emit(ev, data)
|
|
}
|
|
}
|
|
}
|
|
|
|
class ReadStreamSync extends ReadStream {
|
|
[_open] () {
|
|
let threw = true
|
|
try {
|
|
this[_onopen](null, fs.openSync(this[_path], 'r'))
|
|
threw = false
|
|
} finally {
|
|
if (threw)
|
|
this[_close]()
|
|
}
|
|
}
|
|
|
|
[_read] () {
|
|
let threw = true
|
|
try {
|
|
if (!this[_reading]) {
|
|
this[_reading] = true
|
|
do {
|
|
const buf = this[_makeBuf]()
|
|
/* istanbul ignore next */
|
|
const br = buf.length === 0 ? 0
|
|
: fs.readSync(this[_fd], buf, 0, buf.length, null)
|
|
if (!this[_handleChunk](br, buf))
|
|
break
|
|
} while (true)
|
|
this[_reading] = false
|
|
}
|
|
threw = false
|
|
} finally {
|
|
if (threw)
|
|
this[_close]()
|
|
}
|
|
}
|
|
|
|
[_close] () {
|
|
if (this[_autoClose] && typeof this[_fd] === 'number') {
|
|
const fd = this[_fd]
|
|
this[_fd] = null
|
|
fs.closeSync(fd)
|
|
this.emit('close')
|
|
}
|
|
}
|
|
}
|
|
|
|
class WriteStream extends EE {
|
|
constructor (path, opt) {
|
|
opt = opt || {}
|
|
super(opt)
|
|
this.readable = false
|
|
this.writable = true
|
|
this[_errored] = false
|
|
this[_writing] = false
|
|
this[_ended] = false
|
|
this[_needDrain] = false
|
|
this[_queue] = []
|
|
this[_path] = path
|
|
this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
|
|
this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
|
|
this[_pos] = typeof opt.start === 'number' ? opt.start : null
|
|
this[_autoClose] = typeof opt.autoClose === 'boolean' ?
|
|
opt.autoClose : true
|
|
|
|
// truncating makes no sense when writing into the middle
|
|
const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
|
|
this[_defaultFlag] = opt.flags === undefined
|
|
this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
|
|
|
|
if (this[_fd] === null)
|
|
this[_open]()
|
|
}
|
|
|
|
emit (ev, data) {
|
|
if (ev === 'error') {
|
|
if (this[_errored])
|
|
return
|
|
this[_errored] = true
|
|
}
|
|
return super.emit(ev, data)
|
|
}
|
|
|
|
|
|
get fd () { return this[_fd] }
|
|
get path () { return this[_path] }
|
|
|
|
[_onerror] (er) {
|
|
this[_close]()
|
|
this[_writing] = true
|
|
this.emit('error', er)
|
|
}
|
|
|
|
[_open] () {
|
|
fs.open(this[_path], this[_flags], this[_mode],
|
|
(er, fd) => this[_onopen](er, fd))
|
|
}
|
|
|
|
[_onopen] (er, fd) {
|
|
if (this[_defaultFlag] &&
|
|
this[_flags] === 'r+' &&
|
|
er && er.code === 'ENOENT') {
|
|
this[_flags] = 'w'
|
|
this[_open]()
|
|
} else if (er)
|
|
this[_onerror](er)
|
|
else {
|
|
this[_fd] = fd
|
|
this.emit('open', fd)
|
|
this[_flush]()
|
|
}
|
|
}
|
|
|
|
end (buf, enc) {
|
|
if (buf)
|
|
this.write(buf, enc)
|
|
|
|
this[_ended] = true
|
|
|
|
// synthetic after-write logic, where drain/finish live
|
|
if (!this[_writing] && !this[_queue].length &&
|
|
typeof this[_fd] === 'number')
|
|
this[_onwrite](null, 0)
|
|
return this
|
|
}
|
|
|
|
write (buf, enc) {
|
|
if (typeof buf === 'string')
|
|
buf = Buffer.from(buf, enc)
|
|
|
|
if (this[_ended]) {
|
|
this.emit('error', new Error('write() after end()'))
|
|
return false
|
|
}
|
|
|
|
if (this[_fd] === null || this[_writing] || this[_queue].length) {
|
|
this[_queue].push(buf)
|
|
this[_needDrain] = true
|
|
return false
|
|
}
|
|
|
|
this[_writing] = true
|
|
this[_write](buf)
|
|
return true
|
|
}
|
|
|
|
[_write] (buf) {
|
|
fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
|
|
this[_onwrite](er, bw))
|
|
}
|
|
|
|
[_onwrite] (er, bw) {
|
|
if (er)
|
|
this[_onerror](er)
|
|
else {
|
|
if (this[_pos] !== null)
|
|
this[_pos] += bw
|
|
if (this[_queue].length)
|
|
this[_flush]()
|
|
else {
|
|
this[_writing] = false
|
|
|
|
if (this[_ended] && !this[_finished]) {
|
|
this[_finished] = true
|
|
this[_close]()
|
|
this.emit('finish')
|
|
} else if (this[_needDrain]) {
|
|
this[_needDrain] = false
|
|
this.emit('drain')
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
[_flush] () {
|
|
if (this[_queue].length === 0) {
|
|
if (this[_ended])
|
|
this[_onwrite](null, 0)
|
|
} else if (this[_queue].length === 1)
|
|
this[_write](this[_queue].pop())
|
|
else {
|
|
const iovec = this[_queue]
|
|
this[_queue] = []
|
|
writev(this[_fd], iovec, this[_pos],
|
|
(er, bw) => this[_onwrite](er, bw))
|
|
}
|
|
}
|
|
|
|
[_close] () {
|
|
if (this[_autoClose] && typeof this[_fd] === 'number') {
|
|
const fd = this[_fd]
|
|
this[_fd] = null
|
|
fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))
|
|
}
|
|
}
|
|
}
|
|
|
|
class WriteStreamSync extends WriteStream {
|
|
[_open] () {
|
|
let fd
|
|
// only wrap in a try{} block if we know we'll retry, to avoid
|
|
// the rethrow obscuring the error's source frame in most cases.
|
|
if (this[_defaultFlag] && this[_flags] === 'r+') {
|
|
try {
|
|
fd = fs.openSync(this[_path], this[_flags], this[_mode])
|
|
} catch (er) {
|
|
if (er.code === 'ENOENT') {
|
|
this[_flags] = 'w'
|
|
return this[_open]()
|
|
} else
|
|
throw er
|
|
}
|
|
} else
|
|
fd = fs.openSync(this[_path], this[_flags], this[_mode])
|
|
|
|
this[_onopen](null, fd)
|
|
}
|
|
|
|
[_close] () {
|
|
if (this[_autoClose] && typeof this[_fd] === 'number') {
|
|
const fd = this[_fd]
|
|
this[_fd] = null
|
|
fs.closeSync(fd)
|
|
this.emit('close')
|
|
}
|
|
}
|
|
|
|
[_write] (buf) {
|
|
// throw the original, but try to close if it fails
|
|
let threw = true
|
|
try {
|
|
this[_onwrite](null,
|
|
fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
|
|
threw = false
|
|
} finally {
|
|
if (threw)
|
|
try { this[_close]() } catch (_) {}
|
|
}
|
|
}
|
|
}
|
|
|
|
exports.ReadStream = ReadStream
|
|
exports.ReadStreamSync = ReadStreamSync
|
|
|
|
exports.WriteStream = WriteStream
|
|
exports.WriteStreamSync = WriteStreamSync
|