diff --git a/src/index.coffee b/src/index.coffee index f8b0c07..bf225f1 100644 --- a/src/index.coffee +++ b/src/index.coffee @@ -1,3 +1,6 @@ +http = require 'http' Proxy = require './proxy' -new Proxy 3000 +http.globalAgent.maxSockets = Infinity + +new Proxy 8080 diff --git a/src/proxy.coffee b/src/proxy.coffee index 0724443..2bddd76 100644 --- a/src/proxy.coffee +++ b/src/proxy.coffee @@ -1,19 +1,50 @@ http = require 'http' url = require 'url' + +_ = require 'lodash' + Request = require './request' module.exports = class Proxy - constructor: (listenPort, listenHost) -> - @agent = new http.Agent maxSockets: Infinity + constructor: (listenPort, listenHost, opts = {}) -> + if typeof listenHost is 'object' + opts = listenHost + listenHost = undefined + + @opts = _.clone opts + _.defaults @opts, + maxConcurrent: 12 + partSize: 1024 * 1024 / 4 + minContentLength: 1024 * 1024 http.createServer() .on('request', @request) .listen(listenPort, listenHost) - request: (req, res) => - opts = url.parse req.url - opts.method = req.method - opts.headers = req.headers - opts.agent = @agent + _isDownloadable: (res) -> + res.statusCode in [200, 206] and (parseInt res.headers['content-length']) >= @opts.minContentLength - request = new Request opts, req, res + request: (clientReq, clientRes) => + reqOpts = url.parse clientReq.url + reqOpts.method = clientReq.method + reqOpts.headers = clientReq.headers + reqOpts.agent = new http.Agent maxSockets: @opts.maxConcurrent + + # Create new request + request = new Request reqOpts, @opts + + # Listen to connect and determine whether to stream download or not + request.on 'connect', (res, callback) => + clientRes.writeHead res.statusCode, res.headers + callback (@_isDownloadable res) + + # Pipe clientRequest to request, request to clientResponse + clientReq.pipe request + request.pipe clientRes + + # Start the request + request.start() + + # Stop the request whether or not it has completed + clientRes.on 'close', -> + request.stop() diff --git a/src/request.coffee b/src/request.coffee index e35eae8..044a16d 100644 --- a/src/request.coffee +++ b/src/request.coffee @@ -1,196 +1,253 @@ http = require 'http' url = require 'url' +{ Duplex } = require 'stream' _ = require 'lodash' -async = require 'async' -Multisource = require 'multisource-stream' +Multisource = require '../../multisource-stream' -isDownloadable = (res, opts) -> - res.statusCode in [200, 206] and (parseInt res.headers['content-length']) > opts.contentLength +Thread = require './thread' + +parseRange = (str = null) -> + return null if str is null + match = str.match /bytes=([0-9]+)?-([0-9]+)?/ + return false unless match[1] + range = [(parseInt match[1]), (parseInt match[2]) or null] + return range + +module.exports = class Request extends Duplex + constructor: (reqOpts, opts) -> + super() + + @reqOpts = _.clone reqOpts + @reqOpts.headers = _.clone reqOpts.headers -module.exports = class Request - constructor: (opts, @_req, @_res) -> @opts = _.clone opts - @opts.headers = _.clone opts.headers - _.defaults @opts, - concurrent: 4 - partSize: 1024 * 1024 * 4 - contentLength: 1024 * 1024 * 4 - - req = http.request @opts - # Dummy error listener to avoid uncaught exception - req.on 'error', -> - if opts.method is 'GET' - req.on 'response', (res) => - if isDownloadable res, @opts - @_part req, res - else - @_through res + # Parse and setup initial range + range = parseRange @reqOpts.headers['range'] + console.log 'parsed', range, 'from', @reqOpts.headers['range'] + if range is false + throw new Error "unsupported range header: #{@reqOpts.headers['range']}" + else if range + @range = [range[0], if range[1] then (range[1] + 1) else null] else - req.on 'response', (res) => @_through res - @_req.pipe req + @range = [0, null] - _through: (res) -> - @_res.writeHeader res.statusCode, res.headers - res.pipe @_res + @offset = @range[0] - _setRange: (opts, start, end = '') -> - opts.headers['range'] = "bytes=#{start}-#{end}" + @connecting = false + @connected = false - _getRange: (res) -> - return null if not res.headers['content-range'] - raw = res.headers['content-range'].match /bytes ([0-9]+)-([0-9]+)\/([0-9]+)/ - range = [(parseInt raw[1]), (parseInt raw[2])] - return range + @pool = [] + @offsets = [] + @runningPool = [] - _part: (req, res) -> - @_res.writeHeader res.statusCode, res.headers + # Create a multisource stream and set its start offset at the range's start + @stream = new Multisource + @stream.offset = @offset - @source = new Multisource - @source.pipe @_res + # Push data when available + @stream.on 'readable', => + @_readable = true + @_readStream() if @_reading - # Retrieve content length - @_length = parseInt res.headers['content-length'] - # Retrieve initial range - @_offset = if res.statusCode is 206 then (@_getRange res)[0] else 0 + # When the stream ends, end the request as well + @stream.on 'end', => + @push null - console.log "PART - #{@_length} at offset #{@_offset}" + _createThread: (offset, length) -> + console.log "CREATE #{@pool.length} from #{offset} til #{offset + length}" - thread = { offset: 0, length: @opts.partSize, dlength: 0, running: true, req } - thread.opts = _.clone @opts - thread.opts.headers = _.clone @opts.headers - res.headers['content-range'] = "bytes 0-#{@_length - 1}/#{@_length}" + thread = new Thread offset, length, @reqOpts, endOffset: @range[1] - # Setup initial thread - @_gaps = [[@opts.partSize + @_offset, @_length]] - @_pool = [res] - @_running = 1 + # Insert the thread at the correct place in the pool + idx = _.sortedIndex @offsets, offset + @offsets.splice idx, 0, offset + @pool.splice idx, 0, thread - @_bind thread, res, (e) => - # Start filling thread pool - @_fill() + # Pipe the thread to the stream from its offset + thread.pipe (@stream.from offset) - _thread: (offset, length, callback) -> - thread = { offset, length, dlength: 0, running: true } - console.log "THREAD CREATE - #{thread.offset} to #{thread.offset + thread.length}" + return thread - # Add range header and create a request - thread.opts = _.clone @opts - thread.opts.headers = _.clone @opts.headers - @_setRange thread.opts, offset # Open range, in case the next thread doesn't start before that one ends - thread.req = http.request thread.opts + start: -> + @connecting = true - thread.req.on 'response', (res) => @_bind thread, res, callback - thread.req.on 'error', (e) => - console.log 'req error', e - @_stop thread - thread.req.on 'end', => - console.log 'req error', e - @_stop thread - @_pool.push thread + # Calculate correct thread length + length = if @range[1] then (@range[1] - @offset) else Infinity + length = @opts.partSize if length > @opts.partSize - @_running++ - thread.running = true - thread.req.end() + # Create the thread at current offset + thread = @_createThread @offset, length - _bind: (thread, res, callback) -> - console.log "THREAD BIND - #{thread.offset} to #{thread.offset + thread.length}" + thread.once 'connect', (res) => + @connecting = false + @connected = true - thread.res = res + # Retrieve the length from the thread + @length = thread.length + # Set correct end range + @range[1] = @range[0] + @length - range = @_getRange res - unless range - @_stop thread - callback 'RANGE' - return - - console.log "THREAD RANGE - #{range[0]} to #{range[1]}, expected #{thread.offset} to #{@_length - 1}" - console.log "THREAD RANGE - sent #{thread.opts.headers['range']}" - console.log "THREAD RANGE - received #{thread.res.headers['content-range']}" - - # In case the server returned a wrong range - if range[0] isnt thread.offset - @_gaps.push [thread.offset, range[0] - thread.offset] - @_fill() - thread.offset = range[0] - - # Create a new source from the thread offset (should support negative offset) - thread.source = @source.from thread.offset - @_offset - - res.on 'data', (chunk) => - thread.source.write chunk - thread.dlength += chunk.length - - if thread.dlength > thread.length - console.log "THREAD CROSS - #{thread.offset} to #{thread.offset + thread.length}" - # The thread has passed its assigned length - next = @_pool[(@_pool.indexOf thread) + 1] - if not next - @_stop thread - @_fill() - else if next.dlength is 0 - # If the next thread hasn't started, we abort it and continue with the current thread instead - @_stop next - @_pool.splice (@_pool.indexOf next), 1 - thread.length += next.length - console.log "THREAD EXTEND - #{thread.offset} to #{thread.offset + thread.length}" + # Emit connect and let the user decide of the downloading mode + called = false + @emit 'connect', res, (streaming) => + if called + throw new Error 'connect callback has already been called' else - console.log "THREAD STOP - #{thread.offset} to #{thread.offset + thread.length}" - # Else stop the thread - @_stop thread + called = true - res.on 'error', (e) => - console.log "THREAD ERROR - #{thread.offset} to #{thread.offset + thread.length}, #{thread.dlength}/#{thread.length}" - @_stop thread - res.on 'end', => - console.log "THREAD END - #{thread.offset} to #{thread.offset + thread.length}, #{thread.dlength}/#{thread.length}" - @_stop thread + if streaming + console.log 'STREAM', @length, @range + @_streamingMode() + else + thread.on 'end', => @stream.end() + thread.on 'disconnect', => @stream.end() - callback() + thread.on 'error', (e) => + console.log 'ERROR, cancelling thread' + @stream.end() - _stop: (thread) -> - return unless thread.running - thread.running = false - @_running-- + # Start the thread + thread.start() + @runningPool.push thread - # If the thread hasn't completed, create a gap - if thread.dlength < thread.length - @_gaps.push [thread.offset + thread.dlength, thread.offset + thread.length] + # Increment offset + @offset += length - if thread.req - thread.req.removeAllListeners() - # Dummy error listener to avoid uncaught exception - thread.req.on 'error', -> - thread.req.abort() + # Ends the first thread's request + end: -> + @pool[0].end() - if thread.res - thread.res.removeAllListeners() - # Dummy error listener to avoid uncaught exception - thread.res.on 'error', -> - thread.res.socket.destroy() + # Stops the request + stop: -> + @connecting = false + @connected = false + thread.stop() for thread in @runningPool + + @pool = [] + @offsets = [] + @runningPool = [] + + @offset = @stream.offset + @range[0] = @stream.offset + + _readStream: -> + data = @stream.read() + @_reading = @push data if data + + _read: -> + @_reading = true + @_readStream() if @_readable + + _write: (chunk, encoding, callback) -> + @pool[0].write chunk, encoding, callback + + # Switches to threaded streaming downloading + _streamingMode: -> + # Retrieve initial thread + thread = @pool[0] + # Setup correct thread range + thread.range[1] = thread.range[0] + @opts.partSize + # Set the offset at the end of the thread range + @offset = thread.range[1] + + @_bindThread @pool[0] @_fill() - _fill: -> - async.whilst (=> @_running < @opts.concurrent and @_gaps.length > 0), (next) => - gap = @_gaps[@_gaps.length - 1] - unless gap - next 'END' - return + # Returns the next thread in the thread pool + _getNextThread: (thread) -> + @pool[(@pool.indexOf thread) + 1] or null - offset = gap[0] - length = gap[1] - gap[0] + # Binds event listeners to a thread + _bindThread: (thread) -> + thread.on 'cross', => + console.log 'CROSS', (@pool.indexOf thread) + next = @_getNextThread thread - if length > @opts.partSize - # Adjust to the wanted thread size - length = @opts.partSize - gap[0] += length + if not next + console.log @stream.offset, @range[1], @runningPool.length + if @stream.offset is @range[1] + @_onEnd() + else + thread.range[1] += @opts.partSize + thread.crossed = false + else if not next.connected + # Replacing the next thread + thread.range[1] = next.range[1] + thread.crossed = false + @_removeThread next, true + @_fill() else - # The gap is filled - @_gaps.splice @_gaps.length - 1, 1 + @_removeThread thread + @_fill() - @_thread offset, length, (e) -> - next e - , => - console.log "FILLED - #{@_running}/#{@opts.concurrent}, #{@_gaps.length} gaps left" + thread.on 'disconnect', => + # Clean up + thread.stop() + # Restart the thread + thread.start() + # End the thread's request + thread.end() + + console.log 'trying to reconnect' + thread.once 'connect', => + console.log 'successfully reconnected' + + thread.on 'error', => + # Clean up + thread.stop() + # Restart the thread + thread.start() + # End the thread's request + thread.end() + + console.log 'trying to reconnect' + thread.once 'connect', => + console.log 'successfully reconnected' + + # Stops and removes a thread from the running pool + _removeThread: (thread, hard = false) -> + threadIdx = @pool.indexOf thread + runningIdx = @runningPool.indexOf thread + console.log "REMOVE #{threadIdx}" + + if runningIdx isnt -1 + thread.stop() + @runningPool.splice runningIdx, 1 + + if hard + # Also remove from the thread pool + threadIdx = @pool.indexOf thread + @pool.splice threadIdx, 1 + + # Fills the running thread pool + _fill: -> + console.log "FILL #{@runningPool.length}/#{@opts.maxConcurrent}, #{@offset}/#{@range[1]}, #{@pool.indexOf @runningPool[0]}" + if @runningPool.length < @opts.maxConcurrent and @offset isnt @range[1] + # Calculate correct thread length + length = @range[1] - @offset + length = @opts.partSize if length > @opts.partSize + + thread = @_createThread @offset, length + + # Continue filling on thread connect + thread.once 'connect', => @_fill() + # Bind thread events + @_bindThread thread + + # Start the thread + thread.start() + @runningPool.push thread + + # Call end on the thread to end its request + thread.end() + + # Increment offset + @offset += length + + _onEnd: -> + @connected = false + console.log "END #{@offset}/#{@range[1]}" + @stream.end() diff --git a/src/thread.coffee b/src/thread.coffee new file mode 100644 index 0000000..f480ff7 --- /dev/null +++ b/src/thread.coffee @@ -0,0 +1,121 @@ +http = require 'http' +{ Duplex } = require 'stream' + +_ = require 'lodash' + +createRange = (start = 0, end = null) -> + if end is null + return "bytes=#{start}-" + else + return "bytes=#{start}-#{end}" + +parseContentRange = (str = null) -> + return null if str is null + match = str.match /bytes ([0-9]+)-([0-9]+)\/([0-9]+)/ + range = [(parseInt match[1]), (parseInt match[2]), (parseInt match[3])] + return range + +module.exports = class Thread extends Duplex + constructor: (offset, length, reqOpts, opts) -> + super() + + @_reading = false + @_readable = false + @_tries = 0 + + @connecting = false + @connected = false + + @range = [offset, offset + length] + @offset = offset + + @reqOpts = _.clone reqOpts + @reqOpts.headers = _.clone reqOpts.headers + + @opts = _.clone opts + + # Tries to connect + _connect: -> + @connecting = true + + # Set range header + if @range[0] isnt 0 + end = if @opts.endOffset then (@opts.endOffset - 1) else null + @reqOpts.headers['range'] = createRange @offset, end + else if @reqOpts.headers['range'] + delete @reqOpts.headers['range'] + + # Create request + @_req = http.request @reqOpts + + # Listen to response + @_req.on 'response', (@_res) => + @connected = true + @connecting = false + + range = parseContentRange @_res.headers['content-range'] + + @length = (parseInt @_res.headers['content-length']) or null + + # Parse and validate range + if (range and (range[0] isnt @offset)) or (not range and @offset isnt 0) + console.log "received invalid range: #{range}, expected #{@offset},#{@offset + @length - 1}" + return + + # Push data when available + @_res.on 'readable', => + @_readable = true + @_readRes() if @_reading + + @_res.on 'end', => + if @offset is @range[1] + @push null + else + @connected = false + # Notify the user + @emit 'disconnect' + + @emit 'connect', @_res + + @_req.on 'error', (e) => + @emit 'error', e + + _readRes: -> + data = @_res.read() + if data + @_reading = @push data + @offset += data.length + + if not @crossed and @offset >= @range[1] + @crossed = true + @emit 'cross' + + _read: -> + @_reading = true + @_readRes() if @_readable + + _write: (chunk, encoding, callback) -> + @_req.write chunk, encoding, callback + + # Starts the thread + start: -> + @_connect() + + # Stops the thread + stop: -> + @_readable = false + @connecting = false + @connected = false + + @_req.removeAllListeners() + @_req.on 'error', (e) -> console.log 'req error', e # Dummy error listener + @_req.connection.destroy() + + if @_res + @_res.removeAllListeners() + @_res.on 'error', (e) -> console.log 'res error', e # Dummy error listener + @_res.connection.destroy() + + # Ends the request + end: -> + @_req.end()