From b6c1d1290d789169a1cc0a11a210738284fa0df7 Mon Sep 17 00:00:00 2001 From: Morhaus Date: Wed, 8 Jan 2014 21:11:07 +0100 Subject: [PATCH] v0.2.0: bunch of bugfixes (requests now correctly end), add proxy CLI option, preliminary redirect support --- package.json | 2 +- src/cli.coffee | 9 ++-- src/proxy.coffee | 29 +++++++++++-- src/request.coffee | 104 +++++++++++++++++---------------------------- src/thread.coffee | 18 ++++---- 5 files changed, 79 insertions(+), 83 deletions(-) diff --git a/package.json b/package.json index f761d5a..c6708ed 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "catalyst-proxy", "description": "A proxy that accelerates requests via multipart downloading", "author": "Alexandre Kirszenberg ", - "version": "0.1.2", + "version": "0.2.0", "license": "MIT", "keywords": [ "multipart", diff --git a/src/cli.coffee b/src/cli.coffee index 0787e5f..0a98860 100644 --- a/src/cli.coffee +++ b/src/cli.coffee @@ -4,11 +4,11 @@ program = require 'commander' Logger = require 'tmpl-log' Proxy = require './proxy' +pckg = require '../package.json' logr = new Logger -program - .version('0.1.2') +program.version pckg.version program .command('start') @@ -17,8 +17,9 @@ program .option('-t, --threads ', 'max concurrent threads', Number, 12) .option('-s, --partSize

', 'thread part size', Number, 1024 * 1024 * 2) .option('-l, --contentLength ', 'min content length for threaded downloading', Number, 1024 * 1024 * 4) - .action (args..., { port, host, threads, partSize, contentLength }) -> - proxy = new Proxy { threads, partSize, contentLength } + .option('-P, --proxy

', 'proxy to forward requests to', String) + .action (args..., { port, host, threads, partSize, contentLength, proxy }) -> + proxy = new Proxy { threads, partSize, contentLength, proxy } proxy.on 'error', (e) -> if e.syscall is 'listen' switch e.code diff --git a/src/proxy.coffee b/src/proxy.coffee index 804c0a1..9b68e7f 100644 --- a/src/proxy.coffee +++ b/src/proxy.coffee @@ -16,27 +16,48 @@ module.exports = class Proxy extends EventEmitter partSize: 1024 * 1024 / 4 contentLength: 1024 * 1024 + if typeof @opts.proxy is 'string' + protocol = @opts.proxy.match /^([A-Za-z]+)\:\/\// + @opts.proxy = "http://#{@opts.proxy}" unless protocol + @opts.proxy = url.parse @opts.proxy + @server = http.createServer() @server.on 'request', @request @server.on 'error', (e) => @emit 'error', e _isDownloadable: (res) -> - res.statusCode in [200, 206] and (parseInt res.headers['content-length']) >= @opts.contentLength + res.statusCode in [200, 206] and + res.headers['content-range']? and + (parseInt res.headers['content-length']) >= @opts.contentLength request: (clientReq, clientRes) => reqOpts = url.parse clientReq.url reqOpts.method = clientReq.method reqOpts.headers = clientReq.headers - reqOpts.agent = new http.Agent maxSockets: @opts.threads + reqOpts.agent = new http.Agent maxSockets: Infinity + + if @opts.proxy + { hostname, protocol } = reqOpts + reqOpts.port = @opts.proxy.port + # hostname is preferred over host, override the two + reqOpts.host = reqOpts.hostname = @opts.proxy.hostname + # path now needs to contain the complete url + reqOpts.path = reqOpts.href # Create new request request = new Request reqOpts, @opts # Listen to connect and determine whether to stream download or not request.on 'connect', (res, callback) => - clientRes.writeHead res.statusCode, res.headers - callback (@_isDownloadable res) + opts = @_isDownloadable res + + headers = _.clone res.headers + # Remove response's content-range header if the client didn't send a range header + delete headers['content-range'] unless clientReq.headers['range'] + clientRes.writeHead res.statusCode, headers + + callback opts # Pipe clientRequest to request, request to clientResponse clientReq.pipe request diff --git a/src/request.coffee b/src/request.coffee index db9400d..48ee23d 100644 --- a/src/request.coffee +++ b/src/request.coffee @@ -25,7 +25,6 @@ module.exports = class Request extends Duplex # Parse and setup initial range range = parseRange @reqOpts.headers['range'] - console.log 'parsed', range, 'from', @reqOpts.headers['range'] if range is false throw new Error "unsupported range header: #{@reqOpts.headers['range']}" else if range @@ -33,17 +32,15 @@ module.exports = class Request extends Duplex else @range = [0, null] + @readOffset = 0 @offset = @range[0] - @connecting = false - @connected = false - @pool = [] @offsets = [] @runningPool = [] # Create a multisource stream and set its start offset at the range's start - @stream = new Multisource + @stream = new Multisource highWaterMark: @opts.partSize @stream.offset = @offset # Push data when available @@ -51,13 +48,7 @@ module.exports = class Request extends Duplex @_readable = true @_readStream() if @_reading - # When the stream ends, end the request as well - @stream.on 'end', => - @push null - _createThread: (offset, length) -> - console.log "CREATE #{@pool.length} from #{offset} til #{offset + length}" - thread = new Thread offset, length, @reqOpts, endOffset: @range[1] # Insert the thread at the correct place in the pool @@ -66,13 +57,11 @@ module.exports = class Request extends Duplex @pool.splice idx, 0, thread # Pipe the thread to the stream from its offset - thread.pipe (@stream.from offset) + thread.pipe (@stream.from offset, highWaterMark: @opts.partSize) return thread start: -> - @connecting = true - # Calculate correct thread length length = if @range[1] then (@range[1] - @offset) else Infinity length = @opts.partSize if length > @opts.partSize @@ -81,9 +70,6 @@ module.exports = class Request extends Duplex thread = @_createThread @offset, length thread.once 'connect', (res) => - @connecting = false - @connected = true - # Retrieve the length from the thread @length = thread.length # Set correct end range @@ -98,16 +84,13 @@ module.exports = class Request extends Duplex called = true if streaming - console.log 'STREAM', (url.format @reqOpts), @length, @range @_streamingMode() else - console.log 'THROUGH', (url.format @reqOpts), @length, @range - thread.on 'end', => @stream.end() - thread.on 'disconnect', => @stream.end() + thread.on 'end', => @push null + thread.on 'disconnect', => @push null - thread.on 'error', (e) => - console.log 'ERROR, cancelling thread' - @stream.end() + thread.on 'error', -> + @push null # Start the thread thread.start() @@ -122,27 +105,29 @@ module.exports = class Request extends Duplex # Stops the request stop: -> - @connecting = false - @connected = false - thread.stop() for thread in @runningPool @pool = [] @offsets = [] @runningPool = [] - @offset = @stream.offset - @range[0] = @stream.offset + @offset = @readOffset + @range[0] = @readOffset + @readOffset = 0 _readStream: -> data = @stream.read() - @_reading = @push data if data + if data + @readOffset += data.length + @_reading = @push data + @push null if @readOffset is @range[1] _read: -> @_reading = true @_readStream() if @_readable _write: (chunk, encoding, callback) -> + # Write data to the first thread (this means the request has a body, not GET) @pool[0].write chunk, encoding, callback # Switches to threaded streaming downloading @@ -164,55 +149,42 @@ module.exports = class Request extends Duplex # Binds event listeners to a thread _bindThread: (thread) -> thread.on 'cross', => - console.log 'CROSS', (@pool.indexOf thread) next = @_getNextThread thread - if not next - console.log @stream.offset, @range[1], @runningPool.length - if @stream.offset is @range[1] - @_onEnd() - else - thread.range[1] += @opts.partSize - thread.crossed = false - else if not next.connected - # Replacing the next thread + if not next and thread.range[1] isnt @range[1] + # The most recent running thread has ended, extend it + thread.range[1] += @opts.partSize + thread.crossed = false + @offset += @opts.partSize + else if next and not next.connected and not next.crossed + # The next thread isn't running and hasn't completed, replace it thread.range[1] = next.range[1] thread.crossed = false @_removeThread next, true @_fill() else + # The next thread is running or has completed, remove this thread @_removeThread thread @_fill() thread.on 'disconnect', => - # Clean up - thread.stop() - # Restart the thread - thread.start() - # End the thread's request - thread.end() - - console.log 'trying to reconnect' - thread.once 'connect', => - console.log 'successfully reconnected' + @_restartThread() thread.on 'error', => - # Clean up - thread.stop() - # Restart the thread - thread.start() - # End the thread's request - thread.end() + @_restartThread() - console.log 'trying to reconnect' - thread.once 'connect', => - console.log 'successfully reconnected' + _restartThread: (thread) -> + # Clean up + thread.stop() + # Restart the thread + thread.start() + # End the thread's request + thread.end() # Stops and removes a thread from the running pool _removeThread: (thread, hard = false) -> threadIdx = @pool.indexOf thread runningIdx = @runningPool.indexOf thread - console.log "REMOVE #{threadIdx}" if runningIdx isnt -1 thread.stop() @@ -225,7 +197,6 @@ module.exports = class Request extends Duplex # Fills the running thread pool _fill: -> - console.log "FILL #{@runningPool.length}/#{@opts.threads}, #{@offset}/#{@range[1]}, #{@pool.indexOf @runningPool[0]}" if @runningPool.length < @opts.threads and @offset isnt @range[1] # Calculate correct thread length length = @range[1] - @offset @@ -235,6 +206,12 @@ module.exports = class Request extends Duplex # Continue filling on thread connect thread.once 'connect', => @_fill() + # If the thread is redirected, change request options and restart it + thread.on 'redirect', (res) => + location = url.parse res.headers['location'] + _.extend @reqOpts, location + _.extend thread.reqOpts, location + @_restartThread thread # Bind thread events @_bindThread thread @@ -247,8 +224,3 @@ module.exports = class Request extends Duplex # Increment offset @offset += length - - _onEnd: -> - @connected = false - console.log "END #{@offset}/#{@range[1]}" - @stream.end() diff --git a/src/thread.coffee b/src/thread.coffee index 3daa80c..b37be1a 100644 --- a/src/thread.coffee +++ b/src/thread.coffee @@ -1,4 +1,5 @@ http = require 'http' +url = require 'url' { Duplex } = require 'stream' _ = require 'lodash' @@ -39,17 +40,19 @@ module.exports = class Thread extends Duplex @connecting = true # Set range header - if @range[0] isnt 0 + if @reqOpts.method is 'GET' end = if @opts.endOffset then (@opts.endOffset - 1) else null @reqOpts.headers['range'] = createRange @offset, end - else if @reqOpts.headers['range'] - delete @reqOpts.headers['range'] # Create request @_req = http.request @reqOpts # Listen to response @_req.on 'response', (@_res) => + if @_res.statusCode is 302 + @emit 'redirect', @_res + return + @connected = true @connecting = false @@ -68,7 +71,7 @@ module.exports = class Thread extends Duplex @_readRes() if @_reading @_res.on 'end', => - if @offset is @range[1] + if @offset >= @range[1] @push null else @connected = false @@ -77,8 +80,7 @@ module.exports = class Thread extends Duplex @emit 'connect', @_res - @_req.on 'error', (e) => - @emit 'error', e + @_req.on 'error', (e) => @emit 'error', e _readRes: -> data = @_res.read() @@ -108,12 +110,12 @@ module.exports = class Thread extends Duplex @connected = false @_req.removeAllListeners() - @_req.on 'error', (e) -> console.log 'req error', e # Dummy error listener + @_req.on 'error', (e) -> # Dummy error listener @_req.abort() if @_res @_res.removeAllListeners() - @_res.on 'error', (e) -> console.log 'res error', e # Dummy error listener + @_res.on 'error', (e) -> # Dummy error listener @_res.connection.destroy() # Ends the request