commit 3693843dc17dac8f3a409cd7d771f6c6ca27ad21 Author: Morhaus Date: Fri Jan 3 19:33:10 2014 +0100 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..acd0605 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +npm-debug.log +node_modules/* +lib/ diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..e2a3065 --- /dev/null +++ b/.npmignore @@ -0,0 +1,2 @@ +node_modules/* +src/ diff --git a/Gruntfile.coffee b/Gruntfile.coffee new file mode 100644 index 0000000..2298838 --- /dev/null +++ b/Gruntfile.coffee @@ -0,0 +1,29 @@ +module.exports = (grunt) -> + grunt.initConfig + pkg: grunt.file.readJSON 'package.json' + + watch: + src: + files: 'src/**/*.coffee' + tasks: ['default'] + options: + spawn: false + + coffee: + compile: + expand: true + cwd: 'src/' + src: '**/*.coffee' + dest: 'lib/' + ext: '.js' + options: + bare: true + + clean: ['lib/'] + + grunt.loadNpmTasks 'grunt-contrib-watch' + grunt.loadNpmTasks 'grunt-contrib-coffee' + grunt.loadNpmTasks 'grunt-contrib-clean' + + grunt.registerTask 'default', ['clean', 'coffee:compile'] + grunt.registerTask 'demon', ['default', 'watch:src'] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f06869c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2013 Alexandre Kirszenberg + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..0353b52 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +multipart-proxy +============== +A proxy that accelerates requests via multipart downloading. Very much WIP. diff --git a/package.json b/package.json new file mode 100644 index 0000000..1b41754 --- /dev/null +++ b/package.json @@ -0,0 +1,43 @@ +{ + "name": "multipart-proxy", + "description": "A proxy that accelerates requests via multipart downloading", + "author": "Alexandre Kirszenberg ", + "version": "0.0.1", + "license": "MIT", + "keywords": [ + "multipart", + "proxy", + "http" + ], + + "repository": { + "type": "git", + "url": "git@github.com:morhaus/multipart-proxy.git" + }, + + "scripts": { + "prepublish": "grunt" + }, + + "engines": { + "node": ">=0.10.0" + }, + + "engineStrict": true, + + "dependencies": { + "commander": "2.0.0", + "lodash": "*", + "async": "*", + "multisource-stream": "0.0.*", + "tmpl-log": "0.0.*" + }, + + "devDependencies": { + "grunt": "*", + "grunt-cli": "*", + "grunt-contrib-watch": "*", + "grunt-contrib-coffee": "*", + "grunt-contrib-clean": "*" + } +} diff --git a/src/index.coffee b/src/index.coffee new file mode 100644 index 0000000..f8b0c07 --- /dev/null +++ b/src/index.coffee @@ -0,0 +1,3 @@ +Proxy = require './proxy' + +new Proxy 3000 diff --git a/src/proxy.coffee b/src/proxy.coffee new file mode 100644 index 0000000..0724443 --- /dev/null +++ b/src/proxy.coffee @@ -0,0 +1,19 @@ +http = require 'http' +url = require 'url' +Request = require './request' + +module.exports = class Proxy + constructor: (listenPort, listenHost) -> + @agent = new http.Agent maxSockets: Infinity + + http.createServer() + .on('request', @request) + .listen(listenPort, listenHost) + + request: (req, res) => + opts = url.parse req.url + opts.method = req.method + opts.headers = req.headers + opts.agent = @agent + + request = new Request opts, req, res diff --git a/src/request.coffee b/src/request.coffee new file mode 100644 index 0000000..e35eae8 --- /dev/null +++ b/src/request.coffee @@ -0,0 +1,196 @@ +http = require 'http' +url = require 'url' + +_ = require 'lodash' +async = require 'async' +Multisource = require 'multisource-stream' + +isDownloadable = (res, opts) -> + res.statusCode in [200, 206] and (parseInt res.headers['content-length']) > opts.contentLength + +module.exports = class Request + constructor: (opts, @_req, @_res) -> + @opts = _.clone opts + @opts.headers = _.clone opts.headers + + _.defaults @opts, + concurrent: 4 + partSize: 1024 * 1024 * 4 + contentLength: 1024 * 1024 * 4 + + req = http.request @opts + # Dummy error listener to avoid uncaught exception + req.on 'error', -> + if opts.method is 'GET' + req.on 'response', (res) => + if isDownloadable res, @opts + @_part req, res + else + @_through res + else + req.on 'response', (res) => @_through res + @_req.pipe req + + _through: (res) -> + @_res.writeHeader res.statusCode, res.headers + res.pipe @_res + + _setRange: (opts, start, end = '') -> + opts.headers['range'] = "bytes=#{start}-#{end}" + + _getRange: (res) -> + return null if not res.headers['content-range'] + raw = res.headers['content-range'].match /bytes ([0-9]+)-([0-9]+)\/([0-9]+)/ + range = [(parseInt raw[1]), (parseInt raw[2])] + return range + + _part: (req, res) -> + @_res.writeHeader res.statusCode, res.headers + + @source = new Multisource + @source.pipe @_res + + # Retrieve content length + @_length = parseInt res.headers['content-length'] + # Retrieve initial range + @_offset = if res.statusCode is 206 then (@_getRange res)[0] else 0 + + console.log "PART - #{@_length} at offset #{@_offset}" + + thread = { offset: 0, length: @opts.partSize, dlength: 0, running: true, req } + thread.opts = _.clone @opts + thread.opts.headers = _.clone @opts.headers + res.headers['content-range'] = "bytes 0-#{@_length - 1}/#{@_length}" + + # Setup initial thread + @_gaps = [[@opts.partSize + @_offset, @_length]] + @_pool = [res] + @_running = 1 + + @_bind thread, res, (e) => + # Start filling thread pool + @_fill() + + _thread: (offset, length, callback) -> + thread = { offset, length, dlength: 0, running: true } + console.log "THREAD CREATE - #{thread.offset} to #{thread.offset + thread.length}" + + # Add range header and create a request + thread.opts = _.clone @opts + thread.opts.headers = _.clone @opts.headers + @_setRange thread.opts, offset # Open range, in case the next thread doesn't start before that one ends + thread.req = http.request thread.opts + + thread.req.on 'response', (res) => @_bind thread, res, callback + thread.req.on 'error', (e) => + console.log 'req error', e + @_stop thread + thread.req.on 'end', => + console.log 'req error', e + @_stop thread + @_pool.push thread + + @_running++ + thread.running = true + thread.req.end() + + _bind: (thread, res, callback) -> + console.log "THREAD BIND - #{thread.offset} to #{thread.offset + thread.length}" + + thread.res = res + + range = @_getRange res + unless range + @_stop thread + callback 'RANGE' + return + + console.log "THREAD RANGE - #{range[0]} to #{range[1]}, expected #{thread.offset} to #{@_length - 1}" + console.log "THREAD RANGE - sent #{thread.opts.headers['range']}" + console.log "THREAD RANGE - received #{thread.res.headers['content-range']}" + + # In case the server returned a wrong range + if range[0] isnt thread.offset + @_gaps.push [thread.offset, range[0] - thread.offset] + @_fill() + thread.offset = range[0] + + # Create a new source from the thread offset (should support negative offset) + thread.source = @source.from thread.offset - @_offset + + res.on 'data', (chunk) => + thread.source.write chunk + thread.dlength += chunk.length + + if thread.dlength > thread.length + console.log "THREAD CROSS - #{thread.offset} to #{thread.offset + thread.length}" + # The thread has passed its assigned length + next = @_pool[(@_pool.indexOf thread) + 1] + if not next + @_stop thread + @_fill() + else if next.dlength is 0 + # If the next thread hasn't started, we abort it and continue with the current thread instead + @_stop next + @_pool.splice (@_pool.indexOf next), 1 + thread.length += next.length + console.log "THREAD EXTEND - #{thread.offset} to #{thread.offset + thread.length}" + else + console.log "THREAD STOP - #{thread.offset} to #{thread.offset + thread.length}" + # Else stop the thread + @_stop thread + + res.on 'error', (e) => + console.log "THREAD ERROR - #{thread.offset} to #{thread.offset + thread.length}, #{thread.dlength}/#{thread.length}" + @_stop thread + res.on 'end', => + console.log "THREAD END - #{thread.offset} to #{thread.offset + thread.length}, #{thread.dlength}/#{thread.length}" + @_stop thread + + callback() + + _stop: (thread) -> + return unless thread.running + thread.running = false + @_running-- + + # If the thread hasn't completed, create a gap + if thread.dlength < thread.length + @_gaps.push [thread.offset + thread.dlength, thread.offset + thread.length] + + if thread.req + thread.req.removeAllListeners() + # Dummy error listener to avoid uncaught exception + thread.req.on 'error', -> + thread.req.abort() + + if thread.res + thread.res.removeAllListeners() + # Dummy error listener to avoid uncaught exception + thread.res.on 'error', -> + thread.res.socket.destroy() + + @_fill() + + _fill: -> + async.whilst (=> @_running < @opts.concurrent and @_gaps.length > 0), (next) => + gap = @_gaps[@_gaps.length - 1] + unless gap + next 'END' + return + + offset = gap[0] + length = gap[1] - gap[0] + + if length > @opts.partSize + # Adjust to the wanted thread size + length = @opts.partSize + gap[0] += length + else + # The gap is filled + @_gaps.splice @_gaps.length - 1, 1 + + @_thread offset, length, (e) -> + next e + , => + console.log "FILLED - #{@_running}/#{@opts.concurrent}, #{@_gaps.length} gaps left"