catch async exceptions and emit error

This commit is contained in:
Luke Page 2015-08-08 06:53:15 +01:00
parent 3ccd332f5a
commit 6cf6d787bc
3 changed files with 82 additions and 72 deletions

View file

@ -89,7 +89,6 @@ function bitRetriever(data, depth) {
},
end: function() {
if (i !== data.length) {
// todo these exceptions should be emitting errors
throw new Error('extra data found');
}
}

View file

@ -117,82 +117,93 @@ ChunkStream.prototype.destroy = function() {
this.emit('close');
};
ChunkStream.prototype._process = function() {
ChunkStream.prototype._processReadAllowingLess = function(read) {
// ok there is any data so that we can satisfy this request
this._reads.shift(); // == read
// as long as there is any data and read requests
while (this._buffered > 0 && this._reads && this._reads.length > 0) {
// first we need to peek into first buffer
var smallerBuf = this._buffers[0];
var read = this._reads[0];
// ok there is more data than we need
if (smallerBuf.length > read.length) {
// read any data (but no more than length)
if (read.allowLess) {
this._buffered -= read.length;
this._buffers[0] = smallerBuf.slice(read.length);
// ok there is any data so that we can satisfy this request
this._reads.shift(); // == read
read.func.call(this, smallerBuf.slice(0, read.length));
// first we need to peek into first buffer
var smallerBuf = this._buffers[0];
// ok there is more data than we need
if (smallerBuf.length > read.length) {
this._buffered -= read.length;
this._buffers[0] = smallerBuf.slice(read.length);
read.func.call(this, smallerBuf.slice(0, read.length));
}
else {
// ok this is less than maximum length so use it all
this._buffered -= smallerBuf.length;
this._buffers.shift(); // == smallerBuf
read.func.call(this, smallerBuf);
}
}
else if (this._buffered >= read.length) {
// ok we can meet some expectations
this._reads.shift(); // == read
var pos = 0;
var count = 0;
var data = new Buffer(read.length);
// create buffer for all data
while (pos < read.length) {
var buf = this._buffers[count++];
var len = Math.min(buf.length, read.length - pos);
buf.copy(data, pos, 0, len);
pos += len;
// last buffer wasn't used all so just slice it and leave
if (len !== buf.length) {
this._buffers[--count] = buf.slice(len);
}
}
// remove all used buffers
if (count > 0) {
this._buffers.splice(0, count);
}
this._buffered -= read.length;
read.func.call(this, data);
}
else {
// not enought data to satisfy first request in queue
// so we need to wait for more
break;
}
}
else {
// ok this is less than maximum length so use it all
this._buffered -= smallerBuf.length;
this._buffers.shift(); // == smallerBuf
if (this._buffers && this._buffers.length > 0 && this._buffers[0] == null) {
this._end();
read.func.call(this, smallerBuf);
}
};
ChunkStream.prototype._processRead = function(read) {
this._reads.shift(); // == read
var pos = 0;
var count = 0;
var data = new Buffer(read.length);
// create buffer for all data
while (pos < read.length) {
var buf = this._buffers[count++];
var len = Math.min(buf.length, read.length - pos);
buf.copy(data, pos, 0, len);
pos += len;
// last buffer wasn't used all so just slice it and leave
if (len !== buf.length) {
this._buffers[--count] = buf.slice(len);
}
}
// remove all used buffers
if (count > 0) {
this._buffers.splice(0, count);
}
this._buffered -= read.length;
read.func.call(this, data);
};
ChunkStream.prototype._process = function() {
try {
// as long as there is any data and read requests
while (this._buffered > 0 && this._reads && this._reads.length > 0) {
var read = this._reads[0];
// read any data (but no more than length)
if (read.allowLess) {
this._processReadAllowingLess(read);
}
else if (this._buffered >= read.length) {
// ok we can meet some expectations
this._processRead(read);
}
else {
// not enought data to satisfy first request in queue
// so we need to wait for more
break;
}
}
if (this._buffers && this._buffers.length > 0 && this._buffers[0] == null) {
this._end();
}
}
catch(ex) {
this.emit('error', ex);
}
};

View file

@ -35,7 +35,7 @@
"coverage": "istanbul -- cover node_modules/tape/bin/tape test/*-spec.js nolarge",
"coverage-report": "npm run coverage && istanbul report html",
"coveralls": "cat ./coverage/lcov.info | coveralls",
"test": "tape test/*-spec.js | tap-dot && node test/run-compare",
"test": "npm run lint && tape test/*-spec.js | tap-dot && node test/run-compare",
"lint": "eslint lib"
},
"repository": {