mirror of
https://github.com/danbulant/koa-plugins
synced 2026-05-19 12:18:46 +00:00
164 lines
4 KiB
JavaScript
164 lines
4 KiB
JavaScript
'use strict';
|
|
|
|
var net = require('net');
|
|
var EventEmitter = require('events').EventEmitter;
|
|
|
|
var MSG_TYPE = require('./consts').MSG_TYPE;
|
|
var PROTOCOL_STATUS = require('./consts').PROTOCOL_STATUS;
|
|
var stringifykv = require('./stringifykv');
|
|
|
|
var PADDING_BUFS = [
|
|
new Buffer(0),
|
|
new Buffer('\0'),
|
|
new Buffer('\0\0'),
|
|
new Buffer('\0\0\0'),
|
|
new Buffer('\0\0\0\0'),
|
|
new Buffer('\0\0\0\0\0'),
|
|
new Buffer('\0\0\0\0\0\0'),
|
|
new Buffer('\0\0\0\0\0\0\0'),
|
|
];
|
|
|
|
module.exports = function(client){
|
|
// init connect
|
|
var conn = new EventEmitter();
|
|
var connected = false;
|
|
|
|
var _options = client._sockFile ? { path: client._sockFile } : {port: client._port, host: client._host};
|
|
// socket basis
|
|
var socket = net.createConnection(_options, function(){
|
|
socket.setKeepAlive(true);
|
|
connected = true;
|
|
conn.emit('connect');
|
|
});
|
|
socket.on('error', function(e){
|
|
connected = false;
|
|
conn.emit('error', e);
|
|
});
|
|
socket.on('end', function(){
|
|
conn.emit('end');
|
|
});
|
|
socket.on('close', function(){
|
|
conn.emit('close');
|
|
});
|
|
conn.end = function(){
|
|
socket.end();
|
|
};
|
|
|
|
// send message
|
|
conn.send = function(msgType, reqId, obj, cb){
|
|
if(!connected) return;
|
|
cb = cb || function(){};
|
|
var data = obj;
|
|
if(data === null) {
|
|
// null
|
|
data = PADDING_BUFS[0];
|
|
} else if(! (obj instanceof Buffer)) {
|
|
// kv pairs
|
|
data = stringifykv(obj);
|
|
}
|
|
var len = data.length;
|
|
var sendMessage = function(data, start, end){
|
|
var contentLen = end - start;
|
|
var paddingLen = (8 - (contentLen % 8)) % 8;
|
|
if(start || end !== len) data = data.slice(start, end);
|
|
var buf = new Buffer(8);
|
|
buf.writeUInt8(1, 0, true);
|
|
buf.writeUInt8(msgType, 1, true);
|
|
buf.writeUInt16BE(reqId, 2, true);
|
|
buf.writeUInt16BE(contentLen, 4, true);
|
|
buf.writeUInt8(paddingLen, 6, true);
|
|
buf.writeUInt8(0, 7, true);
|
|
socket.write(buf);
|
|
if(paddingLen) {
|
|
socket.write(data);
|
|
socket.write(PADDING_BUFS[paddingLen], cb);
|
|
} else {
|
|
socket.write(data, cb);
|
|
}
|
|
};
|
|
for(var start=0; start < len - 0xffff; start += 0xffff) {
|
|
sendMessage(data, start, start + 0xffff);
|
|
}
|
|
sendMessage(data, start, len);
|
|
};
|
|
|
|
// receive message
|
|
var expectLen = 8;
|
|
var restDataBufs = [];
|
|
var restDataLen = 0;
|
|
var msgType = 0;
|
|
var reqId = 0;
|
|
var restBodyLen = 0;
|
|
var restPaddingLen = 0;
|
|
var processDataBuffer = function(buf, start, len){
|
|
if(restBodyLen) {
|
|
// in body
|
|
if(len < restBodyLen) {
|
|
restBodyLen -= len;
|
|
conn.emit('message-' + msgType, reqId, buf.slice(start, start + len));
|
|
return len;
|
|
}
|
|
var rest = restBodyLen;
|
|
restBodyLen = 0;
|
|
conn.emit('message-' + msgType, reqId, buf.slice(start, start + rest));
|
|
if(!restPaddingLen) expectLen = 8;
|
|
return rest;
|
|
}
|
|
if(restPaddingLen) {
|
|
// in padding
|
|
if(len < restPaddingLen) {
|
|
restPaddingLen -= len;
|
|
return len;
|
|
}
|
|
var rest = restPaddingLen;
|
|
restPaddingLen = 0;
|
|
expectLen = 8;
|
|
return rest;
|
|
}
|
|
// head
|
|
var headData = buf.slice(start, start + 8);
|
|
if(headData.readUInt8(0, true) !== 1) {
|
|
conn.emit('error', new Error('The server does not speak a compatible FastCGI protocol.'));
|
|
connected = false;
|
|
socket.end();
|
|
return;
|
|
}
|
|
msgType = headData.readUInt8(1, true);
|
|
reqId = headData.readUInt16BE(2, true);
|
|
restBodyLen = headData.readUInt16BE(4, true);
|
|
restPaddingLen = headData.readUInt8(6, true);
|
|
if(msgType === MSG_TYPE.FCGI_GET_VALUES_RESULT || msgType === MSG_TYPE.FCGI_END_REQUEST) {
|
|
expectLen = restBodyLen + restPaddingLen;
|
|
} else {
|
|
expectLen = 0;
|
|
}
|
|
return 8;
|
|
};
|
|
socket.on('data', function(data){
|
|
// check expect length
|
|
if(data.length + restDataLen < expectLen) {
|
|
restDataBufs.push(data);
|
|
restDataLen += data.length;
|
|
return;
|
|
}
|
|
var buf = data;
|
|
var len = buf.length;
|
|
if(restDataBufs.length) {
|
|
restDataBufs.push(data);
|
|
len = restDataLen + data.length;
|
|
buf = Buffer.concat(restDataBufs, len);
|
|
restDataBufs = [];
|
|
restDataLen = 0;
|
|
}
|
|
// process segment by segment
|
|
var start = 0;
|
|
var len = buf.length;
|
|
while(len > 0) {
|
|
var offset = processDataBuffer(buf, start, len);
|
|
start += offset;
|
|
len -= offset;
|
|
}
|
|
});
|
|
|
|
return conn;
|
|
};
|