complete rewrite, new thread module

This commit is contained in:
Morhaus 2014-01-06 19:19:44 +01:00
commit 9f1006fd6e
4 changed files with 403 additions and 191 deletions

View file

@ -1,3 +1,6 @@
http = require 'http'
Proxy = require './proxy' Proxy = require './proxy'
new Proxy 3000 http.globalAgent.maxSockets = Infinity
new Proxy 8080

View file

@ -1,19 +1,50 @@
http = require 'http' http = require 'http'
url = require 'url' url = require 'url'
_ = require 'lodash'
Request = require './request' Request = require './request'
module.exports = class Proxy module.exports = class Proxy
constructor: (listenPort, listenHost) -> constructor: (listenPort, listenHost, opts = {}) ->
@agent = new http.Agent maxSockets: Infinity if typeof listenHost is 'object'
opts = listenHost
listenHost = undefined
@opts = _.clone opts
_.defaults @opts,
maxConcurrent: 12
partSize: 1024 * 1024 / 4
minContentLength: 1024 * 1024
http.createServer() http.createServer()
.on('request', @request) .on('request', @request)
.listen(listenPort, listenHost) .listen(listenPort, listenHost)
request: (req, res) => _isDownloadable: (res) ->
opts = url.parse req.url res.statusCode in [200, 206] and (parseInt res.headers['content-length']) >= @opts.minContentLength
opts.method = req.method
opts.headers = req.headers
opts.agent = @agent
request = new Request opts, req, res request: (clientReq, clientRes) =>
reqOpts = url.parse clientReq.url
reqOpts.method = clientReq.method
reqOpts.headers = clientReq.headers
reqOpts.agent = new http.Agent maxSockets: @opts.maxConcurrent
# Create new request
request = new Request reqOpts, @opts
# Listen to connect and determine whether to stream download or not
request.on 'connect', (res, callback) =>
clientRes.writeHead res.statusCode, res.headers
callback (@_isDownloadable res)
# Pipe clientRequest to request, request to clientResponse
clientReq.pipe request
request.pipe clientRes
# Start the request
request.start()
# Stop the request whether or not it has completed
clientRes.on 'close', ->
request.stop()

View file

@ -1,196 +1,253 @@
http = require 'http' http = require 'http'
url = require 'url' url = require 'url'
{ Duplex } = require 'stream'
_ = require 'lodash' _ = require 'lodash'
async = require 'async' Multisource = require '../../multisource-stream'
Multisource = require 'multisource-stream'
isDownloadable = (res, opts) -> Thread = require './thread'
res.statusCode in [200, 206] and (parseInt res.headers['content-length']) > opts.contentLength
parseRange = (str = null) ->
return null if str is null
match = str.match /bytes=([0-9]+)?-([0-9]+)?/
return false unless match[1]
range = [(parseInt match[1]), (parseInt match[2]) or null]
return range
module.exports = class Request extends Duplex
constructor: (reqOpts, opts) ->
super()
@reqOpts = _.clone reqOpts
@reqOpts.headers = _.clone reqOpts.headers
module.exports = class Request
constructor: (opts, @_req, @_res) ->
@opts = _.clone opts @opts = _.clone opts
@opts.headers = _.clone opts.headers
_.defaults @opts, # Parse and setup initial range
concurrent: 4 range = parseRange @reqOpts.headers['range']
partSize: 1024 * 1024 * 4 console.log 'parsed', range, 'from', @reqOpts.headers['range']
contentLength: 1024 * 1024 * 4 if range is false
throw new Error "unsupported range header: #{@reqOpts.headers['range']}"
req = http.request @opts else if range
# Dummy error listener to avoid uncaught exception @range = [range[0], if range[1] then (range[1] + 1) else null]
req.on 'error', ->
if opts.method is 'GET'
req.on 'response', (res) =>
if isDownloadable res, @opts
@_part req, res
else
@_through res
else else
req.on 'response', (res) => @_through res @range = [0, null]
@_req.pipe req
_through: (res) -> @offset = @range[0]
@_res.writeHeader res.statusCode, res.headers
res.pipe @_res
_setRange: (opts, start, end = '') -> @connecting = false
opts.headers['range'] = "bytes=#{start}-#{end}" @connected = false
_getRange: (res) -> @pool = []
return null if not res.headers['content-range'] @offsets = []
raw = res.headers['content-range'].match /bytes ([0-9]+)-([0-9]+)\/([0-9]+)/ @runningPool = []
range = [(parseInt raw[1]), (parseInt raw[2])]
return range
_part: (req, res) -> # Create a multisource stream and set its start offset at the range's start
@_res.writeHeader res.statusCode, res.headers @stream = new Multisource
@stream.offset = @offset
@source = new Multisource # Push data when available
@source.pipe @_res @stream.on 'readable', =>
@_readable = true
@_readStream() if @_reading
# Retrieve content length # When the stream ends, end the request as well
@_length = parseInt res.headers['content-length'] @stream.on 'end', =>
# Retrieve initial range @push null
@_offset = if res.statusCode is 206 then (@_getRange res)[0] else 0
console.log "PART - #{@_length} at offset #{@_offset}" _createThread: (offset, length) ->
console.log "CREATE #{@pool.length} from #{offset} til #{offset + length}"
thread = { offset: 0, length: @opts.partSize, dlength: 0, running: true, req } thread = new Thread offset, length, @reqOpts, endOffset: @range[1]
thread.opts = _.clone @opts
thread.opts.headers = _.clone @opts.headers
res.headers['content-range'] = "bytes 0-#{@_length - 1}/#{@_length}"
# Setup initial thread # Insert the thread at the correct place in the pool
@_gaps = [[@opts.partSize + @_offset, @_length]] idx = _.sortedIndex @offsets, offset
@_pool = [res] @offsets.splice idx, 0, offset
@_running = 1 @pool.splice idx, 0, thread
@_bind thread, res, (e) => # Pipe the thread to the stream from its offset
# Start filling thread pool thread.pipe (@stream.from offset)
@_fill()
_thread: (offset, length, callback) -> return thread
thread = { offset, length, dlength: 0, running: true }
console.log "THREAD CREATE - #{thread.offset} to #{thread.offset + thread.length}"
# Add range header and create a request start: ->
thread.opts = _.clone @opts @connecting = true
thread.opts.headers = _.clone @opts.headers
@_setRange thread.opts, offset # Open range, in case the next thread doesn't start before that one ends
thread.req = http.request thread.opts
thread.req.on 'response', (res) => @_bind thread, res, callback # Calculate correct thread length
thread.req.on 'error', (e) => length = if @range[1] then (@range[1] - @offset) else Infinity
console.log 'req error', e length = @opts.partSize if length > @opts.partSize
@_stop thread
thread.req.on 'end', =>
console.log 'req error', e
@_stop thread
@_pool.push thread
@_running++ # Create the thread at current offset
thread.running = true thread = @_createThread @offset, length
thread.req.end()
_bind: (thread, res, callback) -> thread.once 'connect', (res) =>
console.log "THREAD BIND - #{thread.offset} to #{thread.offset + thread.length}" @connecting = false
@connected = true
thread.res = res # Retrieve the length from the thread
@length = thread.length
# Set correct end range
@range[1] = @range[0] + @length
range = @_getRange res # Emit connect and let the user decide of the downloading mode
unless range called = false
@_stop thread @emit 'connect', res, (streaming) =>
callback 'RANGE' if called
return throw new Error 'connect callback has already been called'
console.log "THREAD RANGE - #{range[0]} to #{range[1]}, expected #{thread.offset} to #{@_length - 1}"
console.log "THREAD RANGE - sent #{thread.opts.headers['range']}"
console.log "THREAD RANGE - received #{thread.res.headers['content-range']}"
# In case the server returned a wrong range
if range[0] isnt thread.offset
@_gaps.push [thread.offset, range[0] - thread.offset]
@_fill()
thread.offset = range[0]
# Create a new source from the thread offset (should support negative offset)
thread.source = @source.from thread.offset - @_offset
res.on 'data', (chunk) =>
thread.source.write chunk
thread.dlength += chunk.length
if thread.dlength > thread.length
console.log "THREAD CROSS - #{thread.offset} to #{thread.offset + thread.length}"
# The thread has passed its assigned length
next = @_pool[(@_pool.indexOf thread) + 1]
if not next
@_stop thread
@_fill()
else if next.dlength is 0
# If the next thread hasn't started, we abort it and continue with the current thread instead
@_stop next
@_pool.splice (@_pool.indexOf next), 1
thread.length += next.length
console.log "THREAD EXTEND - #{thread.offset} to #{thread.offset + thread.length}"
else else
console.log "THREAD STOP - #{thread.offset} to #{thread.offset + thread.length}" called = true
# Else stop the thread
@_stop thread
res.on 'error', (e) => if streaming
console.log "THREAD ERROR - #{thread.offset} to #{thread.offset + thread.length}, #{thread.dlength}/#{thread.length}" console.log 'STREAM', @length, @range
@_stop thread @_streamingMode()
res.on 'end', => else
console.log "THREAD END - #{thread.offset} to #{thread.offset + thread.length}, #{thread.dlength}/#{thread.length}" thread.on 'end', => @stream.end()
@_stop thread thread.on 'disconnect', => @stream.end()
callback() thread.on 'error', (e) =>
console.log 'ERROR, cancelling thread'
@stream.end()
_stop: (thread) -> # Start the thread
return unless thread.running thread.start()
thread.running = false @runningPool.push thread
@_running--
# If the thread hasn't completed, create a gap # Increment offset
if thread.dlength < thread.length @offset += length
@_gaps.push [thread.offset + thread.dlength, thread.offset + thread.length]
if thread.req # Ends the first thread's request
thread.req.removeAllListeners() end: ->
# Dummy error listener to avoid uncaught exception @pool[0].end()
thread.req.on 'error', ->
thread.req.abort()
if thread.res # Stops the request
thread.res.removeAllListeners() stop: ->
# Dummy error listener to avoid uncaught exception @connecting = false
thread.res.on 'error', -> @connected = false
thread.res.socket.destroy()
thread.stop() for thread in @runningPool
@pool = []
@offsets = []
@runningPool = []
@offset = @stream.offset
@range[0] = @stream.offset
_readStream: ->
data = @stream.read()
@_reading = @push data if data
_read: ->
@_reading = true
@_readStream() if @_readable
_write: (chunk, encoding, callback) ->
@pool[0].write chunk, encoding, callback
# Switches to threaded streaming downloading
_streamingMode: ->
# Retrieve initial thread
thread = @pool[0]
# Setup correct thread range
thread.range[1] = thread.range[0] + @opts.partSize
# Set the offset at the end of the thread range
@offset = thread.range[1]
@_bindThread @pool[0]
@_fill() @_fill()
_fill: -> # Returns the next thread in the thread pool
async.whilst (=> @_running < @opts.concurrent and @_gaps.length > 0), (next) => _getNextThread: (thread) ->
gap = @_gaps[@_gaps.length - 1] @pool[(@pool.indexOf thread) + 1] or null
unless gap
next 'END'
return
offset = gap[0] # Binds event listeners to a thread
length = gap[1] - gap[0] _bindThread: (thread) ->
thread.on 'cross', =>
console.log 'CROSS', (@pool.indexOf thread)
next = @_getNextThread thread
if length > @opts.partSize if not next
# Adjust to the wanted thread size console.log @stream.offset, @range[1], @runningPool.length
length = @opts.partSize if @stream.offset is @range[1]
gap[0] += length @_onEnd()
else
thread.range[1] += @opts.partSize
thread.crossed = false
else if not next.connected
# Replacing the next thread
thread.range[1] = next.range[1]
thread.crossed = false
@_removeThread next, true
@_fill()
else else
# The gap is filled @_removeThread thread
@_gaps.splice @_gaps.length - 1, 1 @_fill()
@_thread offset, length, (e) -> thread.on 'disconnect', =>
next e # Clean up
, => thread.stop()
console.log "FILLED - #{@_running}/#{@opts.concurrent}, #{@_gaps.length} gaps left" # Restart the thread
thread.start()
# End the thread's request
thread.end()
console.log 'trying to reconnect'
thread.once 'connect', =>
console.log 'successfully reconnected'
thread.on 'error', =>
# Clean up
thread.stop()
# Restart the thread
thread.start()
# End the thread's request
thread.end()
console.log 'trying to reconnect'
thread.once 'connect', =>
console.log 'successfully reconnected'
# Stops and removes a thread from the running pool
_removeThread: (thread, hard = false) ->
threadIdx = @pool.indexOf thread
runningIdx = @runningPool.indexOf thread
console.log "REMOVE #{threadIdx}"
if runningIdx isnt -1
thread.stop()
@runningPool.splice runningIdx, 1
if hard
# Also remove from the thread pool
threadIdx = @pool.indexOf thread
@pool.splice threadIdx, 1
# Fills the running thread pool
_fill: ->
console.log "FILL #{@runningPool.length}/#{@opts.maxConcurrent}, #{@offset}/#{@range[1]}, #{@pool.indexOf @runningPool[0]}"
if @runningPool.length < @opts.maxConcurrent and @offset isnt @range[1]
# Calculate correct thread length
length = @range[1] - @offset
length = @opts.partSize if length > @opts.partSize
thread = @_createThread @offset, length
# Continue filling on thread connect
thread.once 'connect', => @_fill()
# Bind thread events
@_bindThread thread
# Start the thread
thread.start()
@runningPool.push thread
# Call end on the thread to end its request
thread.end()
# Increment offset
@offset += length
_onEnd: ->
@connected = false
console.log "END #{@offset}/#{@range[1]}"
@stream.end()

121
src/thread.coffee Normal file
View file

@ -0,0 +1,121 @@
http = require 'http'
{ Duplex } = require 'stream'
_ = require 'lodash'
createRange = (start = 0, end = null) ->
if end is null
return "bytes=#{start}-"
else
return "bytes=#{start}-#{end}"
parseContentRange = (str = null) ->
return null if str is null
match = str.match /bytes ([0-9]+)-([0-9]+)\/([0-9]+)/
range = [(parseInt match[1]), (parseInt match[2]), (parseInt match[3])]
return range
module.exports = class Thread extends Duplex
constructor: (offset, length, reqOpts, opts) ->
super()
@_reading = false
@_readable = false
@_tries = 0
@connecting = false
@connected = false
@range = [offset, offset + length]
@offset = offset
@reqOpts = _.clone reqOpts
@reqOpts.headers = _.clone reqOpts.headers
@opts = _.clone opts
# Tries to connect
_connect: ->
@connecting = true
# Set range header
if @range[0] isnt 0
end = if @opts.endOffset then (@opts.endOffset - 1) else null
@reqOpts.headers['range'] = createRange @offset, end
else if @reqOpts.headers['range']
delete @reqOpts.headers['range']
# Create request
@_req = http.request @reqOpts
# Listen to response
@_req.on 'response', (@_res) =>
@connected = true
@connecting = false
range = parseContentRange @_res.headers['content-range']
@length = (parseInt @_res.headers['content-length']) or null
# Parse and validate range
if (range and (range[0] isnt @offset)) or (not range and @offset isnt 0)
console.log "received invalid range: #{range}, expected #{@offset},#{@offset + @length - 1}"
return
# Push data when available
@_res.on 'readable', =>
@_readable = true
@_readRes() if @_reading
@_res.on 'end', =>
if @offset is @range[1]
@push null
else
@connected = false
# Notify the user
@emit 'disconnect'
@emit 'connect', @_res
@_req.on 'error', (e) =>
@emit 'error', e
_readRes: ->
data = @_res.read()
if data
@_reading = @push data
@offset += data.length
if not @crossed and @offset >= @range[1]
@crossed = true
@emit 'cross'
_read: ->
@_reading = true
@_readRes() if @_readable
_write: (chunk, encoding, callback) ->
@_req.write chunk, encoding, callback
# Starts the thread
start: ->
@_connect()
# Stops the thread
stop: ->
@_readable = false
@connecting = false
@connected = false
@_req.removeAllListeners()
@_req.on 'error', (e) -> console.log 'req error', e # Dummy error listener
@_req.connection.destroy()
if @_res
@_res.removeAllListeners()
@_res.on 'error', (e) -> console.log 'res error', e # Dummy error listener
@_res.connection.destroy()
# Ends the request
end: ->
@_req.end()