add websocket server to this dir. fix stuff for client

This commit is contained in:
2024-10-30 15:12:52 -05:00
parent f8152c6db8
commit 4886bc5b1f
3058 changed files with 1201180 additions and 2 deletions

View File

@ -0,0 +1,3 @@
node_modules
node_modules/*
npm_debug.log

View File

@ -0,0 +1,3 @@
language: node_js
node_js:
- "0.10"

24
websocket_server/node_modules/event-stream/LICENCE generated vendored Normal file
View File

@ -0,0 +1,24 @@
The MIT License (MIT)
Copyright (c) 2011 Dominic Tarr
Permission is hereby granted, free of charge,
to any person obtaining a copy of this software and
associated documentation files (the "Software"), to
deal in the Software without restriction, including
without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom
the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,25 @@
var inspect = require('util').inspect
if(!module.parent) {
var es = require('..') //load event-stream
es.pipe( //pipe joins streams together
process.openStdin(), //open stdin
es.split(), //split stream to break on newlines
es.map(function (data, callback) {//turn this async function into a stream
var j
try {
j = JSON.parse(data) //try to parse input into json
} catch (err) {
return callback(null, data) //if it fails just pass it anyway
}
callback(null, inspect(j)) //render it nicely
}),
process.stdout // pipe it to stdout !
)
}
// run this
//
// curl -sS registry.npmjs.org/event-stream | node pretty.js
//

324
websocket_server/node_modules/event-stream/index.js generated vendored Normal file
View File

@ -0,0 +1,324 @@
//filter will reemit the data if cb(err,pass) pass is truthy
// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'
var Stream = require('stream').Stream
, es = exports
, through = require('through')
, from = require('from')
, duplex = require('duplexer')
, map = require('map-stream')
, pause = require('pause-stream')
, split = require('split')
, pipeline = require('stream-combiner')
, immediately = global.setImmediate || process.nextTick;
es.Stream = Stream //re-export Stream from core
es.through = through
es.from = from
es.duplex = duplex
es.map = map
es.pause = pause
es.split = split
es.pipeline = es.connect = es.pipe = pipeline
// merge / concat
//
// combine multiple streams into a single stream.
// will emit end only once
es.concat = //actually this should be called concat
es.merge = function (/*streams...*/) {
var toMerge = [].slice.call(arguments)
if (toMerge.length === 1 && (toMerge[0] instanceof Array)) {
toMerge = toMerge[0] //handle array as arguments object
}
var stream = new Stream()
stream.setMaxListeners(0) // allow adding more than 11 streams
var endCount = 0
stream.writable = stream.readable = true
if (toMerge.length) {
toMerge.forEach(function (e) {
e.pipe(stream, {end: false})
var ended = false
e.on('end', function () {
if(ended) return
ended = true
endCount ++
if(endCount == toMerge.length)
stream.emit('end')
})
})
} else {
process.nextTick(function () {
stream.emit('end')
})
}
stream.write = function (data) {
this.emit('data', data)
}
stream.destroy = function () {
toMerge.forEach(function (e) {
if(e.destroy) e.destroy()
})
}
return stream
}
// writable stream, collects all events into an array
// and calls back when 'end' occurs
// mainly I'm using this to test the other functions
es.writeArray = function (done) {
if ('function' !== typeof done)
throw new Error('function writeArray (done): done must be function')
var a = new Stream ()
, array = [], isDone = false
a.write = function (l) {
array.push(l)
}
a.end = function () {
isDone = true
done(null, array)
}
a.writable = true
a.readable = false
a.destroy = function () {
a.writable = a.readable = false
if(isDone) return
done(new Error('destroyed before end'), array)
}
return a
}
//return a Stream that reads the properties of an object
//respecting pause() and resume()
es.readArray = function (array) {
var stream = new Stream()
, i = 0
, paused = false
, ended = false
stream.readable = true
stream.writable = false
if(!Array.isArray(array))
throw new Error('event-stream.read expects an array')
stream.resume = function () {
if(ended) return
paused = false
var l = array.length
while(i < l && !paused && !ended) {
stream.emit('data', array[i++])
}
if(i == l && !ended)
ended = true, stream.readable = false, stream.emit('end')
}
process.nextTick(stream.resume)
stream.pause = function () {
paused = true
}
stream.destroy = function () {
ended = true
stream.emit('close')
}
return stream
}
//
// readable (asyncFunction)
// return a stream that calls an async function while the stream is not paused.
//
// the function must take: (count, callback) {...
//
es.readable =
function (func, continueOnError) {
var stream = new Stream()
, i = 0
, paused = false
, ended = false
, reading = false
stream.readable = true
stream.writable = false
if('function' !== typeof func)
throw new Error('event-stream.readable expects async function')
stream.on('end', function () { ended = true })
function get (err, data) {
if(err) {
stream.emit('error', err)
if(!continueOnError) stream.emit('end')
} else if (arguments.length > 1)
stream.emit('data', data)
immediately(function () {
if(ended || paused || reading) return
try {
reading = true
func.call(stream, i++, function () {
reading = false
get.apply(null, arguments)
})
} catch (err) {
stream.emit('error', err)
}
})
}
stream.resume = function () {
paused = false
get()
}
process.nextTick(get)
stream.pause = function () {
paused = true
}
stream.destroy = function () {
stream.emit('end')
stream.emit('close')
ended = true
}
return stream
}
//
// map sync
//
es.mapSync = function (sync) {
return es.through(function write(data) {
var mappedData
try {
mappedData = sync(data)
} catch (err) {
return this.emit('error', err)
}
if (mappedData !== undefined)
this.emit('data', mappedData)
})
}
//
// log just print out what is coming through the stream, for debugging
//
es.log = function (name) {
return es.through(function (data) {
var args = [].slice.call(arguments)
if(name) console.error(name, data)
else console.error(data)
this.emit('data', data)
})
}
//
// child -- pipe through a child process
//
es.child = function (child) {
return es.duplex(child.stdin, child.stdout)
}
//
// parse
//
// must be used after es.split() to ensure that each chunk represents a line
// source.pipe(es.split()).pipe(es.parse())
es.parse = function (options) {
var emitError = !!(options ? options.error : false)
return es.through(function (data) {
var obj
try {
if(data) //ignore empty lines
obj = JSON.parse(data.toString())
} catch (err) {
if (emitError)
return this.emit('error', err)
return console.error(err, 'attempting to parse:', data)
}
//ignore lines that where only whitespace.
if(obj !== undefined)
this.emit('data', obj)
})
}
//
// stringify
//
es.stringify = function () {
var Buffer = require('buffer').Buffer
return es.mapSync(function (e){
return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
})
}
//
// replace a string within a stream.
//
// warn: just concatenates the string and then does str.split().join().
// probably not optimal.
// for smallish responses, who cares?
// I need this for shadow-npm so it's only relatively small json files.
es.replace = function (from, to) {
return es.pipeline(es.split(from), es.join(to))
}
//
// join chunks with a joiner. just like Array#join
// also accepts a callback that is passed the chunks appended together
// this is still supported for legacy reasons.
//
es.join = function (str) {
//legacy api
if('function' === typeof str)
return es.wait(str)
var first = true
return es.through(function (data) {
if(!first)
this.emit('data', str)
first = false
this.emit('data', data)
return true
})
}
//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//
es.wait = function (callback) {
var arr = []
return es.through(function (data) { arr.push(data) },
function () {
var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
: arr.join('')
this.emit('data', body)
this.emit('end')
if(callback) callback(null, body)
})
}
es.pipeable = function () {
throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
}

View File

@ -0,0 +1,54 @@
{
"name": "event-stream",
"version": "3.3.4",
"description": "construct pipes of streams of events",
"homepage": "http://github.com/dominictarr/event-stream",
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/event-stream.git"
},
"dependencies": {
"through": "~2.3.1",
"duplexer": "~0.1.1",
"from": "~0",
"map-stream": "~0.1.0",
"pause-stream": "0.0.11",
"split": "0.3",
"stream-combiner": "~0.0.4"
},
"devDependencies": {
"asynct": "*",
"it-is": "1",
"ubelt": "~3.2.2",
"stream-spec": "~0.3.5",
"tape": "~2.3.0"
},
"scripts": {
"prepublish": "npm ls && npm test",
"test": "asynct test/",
"test_tap": "set -e; for t in test/*.js; do node $t; done"
},
"testling": {
"files": "test/*.js",
"browsers": {
"ie": [
8,
9
],
"firefox": [
13
],
"chrome": [
20
],
"safari": [
5.1
],
"opera": [
12
]
}
},
"license": "MIT",
"author": "Dominic Tarr <dominic.tarr@gmail.com> (http://bit.ly/dominictarr)"
}

View File

@ -0,0 +1,314 @@
# EventStream
<img src=https://secure.travis-ci.org/dominictarr/event-stream.png?branch=master>
[![browser status](http://ci.testling.com/dominictarr/event-stream.png)]
(http://ci.testling.com/dominictarr/event-stream)
[Streams](http://nodejs.org/api/stream.html "Stream") are node's best and most misunderstood idea, and
_<em>EventStream</em>_ is a toolkit to make creating and working with streams <em>easy</em>.
Normally, streams are only used for IO,
but in event stream we send all kinds of objects down the pipe.
If your application's <em>input</em> and <em>output</em> are streams,
shouldn't the <em>throughput</em> be a stream too?
The *EventStream* functions resemble the array functions,
because Streams are like Arrays, but laid out in time, rather than in memory.
<em>All the `event-stream` functions return instances of `Stream`</em>.
`event-stream` creates
[0.8 streams](https://github.com/joyent/node/blob/v0.8/doc/api/stream.markdown)
, which are compatible with [0.10 streams](http://nodejs.org/api/stream.html "Stream").
>NOTE: I shall use the term <em>"through stream"</em> to refer to a stream that is writable <em>and</em> readable.
### [simple example](https://github.com/dominictarr/event-stream/blob/master/examples/pretty.js):
``` js
//pretty.js
if(!module.parent) {
var es = require('event-stream')
var inspect = require('util').inspect
process.stdin //connect streams together with `pipe`
.pipe(es.split()) //split stream to break on newlines
.pipe(es.map(function (data, cb) { //turn this async function into a stream
cb(null
, inspect(JSON.parse(data))) //render it nicely
}))
.pipe(process.stdout) // pipe it to stdout !
}
```
run it ...
``` bash
curl -sS registry.npmjs.org/event-stream | node pretty.js
```
[node Stream documentation](http://nodejs.org/api/stream.html)
## through (write?, end?)
Re-emits data synchronously. Easy way to create synchronous through streams.
Pass in optional `write` and `end` methods. They will be called in the
context of the stream. Use `this.pause()` and `this.resume()` to manage flow.
Check `this.paused` to see current flow state. (write always returns `!this.paused`)
this function is the basis for most of the synchronous streams in `event-stream`.
``` js
es.through(function write(data) {
this.emit('data', data)
//this.pause()
},
function end () { //optional
this.emit('end')
})
```
## map (asyncFunction)
Create a through stream from an asynchronous function.
``` js
var es = require('event-stream')
es.map(function (data, callback) {
//transform data
// ...
callback(null, data)
})
```
Each map MUST call the callback. It may callback with data, with an error or with no arguments,
* `callback()` drop this data.
this makes the map work like `filter`,
note:`callback(null,null)` is not the same, and will emit `null`
* `callback(null, newData)` turn data into newData
* `callback(error)` emit an error for this item.
>Note: if a callback is not called, `map` will think that it is still being processed,
>every call must be answered or the stream will not know when to end.
>
>Also, if the callback is called more than once, every call but the first will be ignored.
## mapSync (syncFunction)
Same as `map`, but the callback is called synchronously. Based on `es.through`
## split (matcher)
Break up a stream and reassemble it so that each line is a chunk. matcher may be a `String`, or a `RegExp`
Example, read every line in a file ...
``` js
fs.createReadStream(file, {flags: 'r'})
.pipe(es.split())
.pipe(es.map(function (line, cb) {
//do something with the line
cb(null, line)
}))
```
`split` takes the same arguments as `string.split` except it defaults to '\n' instead of ',', and the optional `limit` parameter is ignored.
[String#split](https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/String/split)
## join (separator)
Create a through stream that emits `separator` between each chunk, just like Array#join.
(for legacy reasons, if you pass a callback instead of a string, join is a synonym for `es.wait`)
## merge (stream1,...,streamN) or merge (streamArray)
> concat → merge
Merges streams into one and returns it.
Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: `data1 data1 data2 data1 data2` - where `data1` and `data2` is data from two streams).
Counts how many streams were passed to it and emits end only when all streams emitted end.
```js
es.merge(
process.stdout,
process.stderr
).pipe(fs.createWriteStream('output.log'));
```
It can also take an Array of streams as input like this:
```js
es.merge([
fs.createReadStream('input1.txt'),
fs.createReadStream('input2.txt')
]).pipe(fs.createWriteStream('output.log'));
```
## replace (from, to)
Replace all occurrences of `from` with `to`. `from` may be a `String` or a `RegExp`.
Works just like `string.split(from).join(to)`, but streaming.
## parse
Convenience function for parsing JSON chunks. For newline separated JSON,
use with `es.split`. By default it logs parsing errors by `console.error`;
for another behaviour, transforms created by `es.parse({error: true})` will
emit error events for exceptions thrown from `JSON.parse`, unmodified.
``` js
fs.createReadStream(filename)
.pipe(es.split()) //defaults to lines.
.pipe(es.parse())
```
## stringify
convert javascript objects into lines of text. The text will have whitespace escaped and have a `\n` appended, so it will be compatible with `es.parse`
``` js
objectStream
.pipe(es.stringify())
.pipe(fs.createWriteStream(filename))
```
## readable (asyncFunction)
create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with `(count, callback)`,
and `this` will be the readable stream.
``` js
es.readable(function (count, callback) {
if(streamHasEnded)
return this.emit('end')
//...
this.emit('data', data) //use this way to emit multiple chunks per call.
callback() // you MUST always call the callback eventually.
// the function will not be called again until you do this.
})
```
you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.
## readArray (array)
Create a readable stream from an Array.
Just emit each item as a data event, respecting `pause` and `resume`.
``` js
var es = require('event-stream')
, reader = es.readArray([1,2,3])
reader.pipe(...)
```
If you want the stream behave like a 0.10 stream you will need to wrap it using [`Readable.wrap()`](http://nodejs.org/api/stream.html#stream_readable_wrap_stream) function. Example:
``` js
var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));
```
## writeArray (callback)
create a writeable stream from a callback,
all `data` events are stored in an array, which is passed to the callback when the stream ends.
``` js
var es = require('event-stream')
, reader = es.readArray([1, 2, 3])
, writer = es.writeArray(function (err, array){
//array deepEqual [1, 2, 3]
})
reader.pipe(writer)
```
## pause ()
A stream that buffers all chunks when paused.
``` js
var ps = es.pause()
ps.pause() //buffer the stream, also do not allow 'end'
ps.resume() //allow chunks through
```
## duplex (writeStream, readStream)
Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
It is assumed that the two streams are connected to each other in some way.
(This is used by `pipeline` and `child`.)
``` js
var grep = cp.exec('grep Stream')
es.duplex(grep.stdin, grep.stdout)
```
## child (child_process)
Create a through stream from a child process ...
``` js
var cp = require('child_process')
es.child(cp.exec('grep Stream')) // a through stream
```
## wait (callback)
waits for stream to emit 'end'.
joins chunks of a stream into a single string or buffer.
takes an optional callback, which will be passed the
complete string/buffer when it receives the 'end' event.
also, emits a single 'data' event.
``` js
readStream.pipe(es.wait(function (err, body) {
// have complete text here.
}))
```
# Other Stream Modules
These modules are not included as a part of *EventStream* but may be
useful when working with streams.
## [reduce (syncFunction, initial)](https://github.com/parshap/node-stream-reduce)
Like `Array.prototype.reduce` but for streams. Given a sync reduce
function and an initial value it will return a through stream that emits
a single data event with the reduced value once the input stream ends.
``` js
var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
return acc + data.length;
}, 0)).on("data", function(length) {
console.log("stdin size:", length);
});
```

View File

@ -0,0 +1,86 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
function makeExamplePipe() {
return es.connect(
es.map(function (data, callback) {
callback(null, data * 2)
}),
es.map(function (data, callback) {
d.delay(callback)(null, data)
}),
es.map(function (data, callback) {
callback(null, data + 2)
}))
}
exports['simple pipe'] = function (test) {
var pipe = makeExamplePipe()
pipe.on('data', function (data) {
it(data).equal(18)
test.done()
})
pipe.write(8)
}
exports['read array then map'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
, first = es.readArray(readThis)
, read = []
, pipe =
es.connect(
first,
es.map(function (data, callback) {
callback(null, {data: data})
}),
es.map(function (data, callback) {
callback(null, {data: data})
}),
es.writeArray(function (err, array) {
it(array).deepEqual(d.map(readThis, function (data) {
return {data: {data: data}}
}))
test.done()
})
)
}
exports ['connect returns a stream'] = function (test) {
var rw =
es.connect(
es.map(function (data, callback) {
callback(null, data * 2)
}),
es.map(function (data, callback) {
callback(null, data * 5)
})
)
it(rw).has({readable: true, writable: true})
var array = [190, 24, 6, 7, 40, 57, 4, 6]
, _array = []
, c =
es.connect(
es.readArray(array),
rw,
es.log('after rw:'),
es.writeArray(function (err, _array) {
it(_array).deepEqual(array.map(function (e) { return e * 10 }))
test.done()
})
)
}
require('./helper')(module)

View File

@ -0,0 +1,12 @@
var tape = require('tape')
module.exports = function (m) {
if(m.parent) return
for(var name in m.exports) {
tape(name, function (t) {
console.log('start', name)
t.done = t.end
m.exports[name](t)
})
}
}

View File

@ -0,0 +1,29 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
exports.merge = function (t) {
var odd = d.map(1, 3, 100, d.id) //array of multiples of 3 < 100
var even = d.map(2, 4, 100, d.id) //array of multiples of 3 < 100
var r1 = es.readArray(even)
var r2 = es.readArray(odd)
var endCount = 0
var writer = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array.sort()).deepEqual(even.concat(odd).sort())
if (++endCount === 2) t.done()
})
var writer2 = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array.sort()).deepEqual(even.concat(odd).sort())
if (++endCount === 2) t.done()
})
es.merge(r1, r2).pipe(writer)
es.merge([r1, r2]).pipe(writer2)
}
require('./helper')(module)

View File

@ -0,0 +1,32 @@
var es = require('../')
, it = require('it-is').style('colour')
exports ['es.parse() writes parsing errors with console.error'] = function (test) {
var parseStream = es.parse()
var oldConsoleError = console.error
console.error = function () {
console.error = oldConsoleError
it(arguments.length > 0).ok()
test.done()
}
// bare word is not valid JSON
parseStream.write('A')
}
exports ['es.parse({error: true(thy)}) emits error events from parsing'] = function (test) {
var parseStream = es.parse({error: 1})
var expectedError
try {
JSON.parse('A')
} catch(e) {
expectedError = e
}
parseStream.on('error', function (e) {
it(e).deepEqual(expectedError)
process.nextTick(function () {
test.done()
})
}).write('A')
}

View File

@ -0,0 +1,39 @@
var es = require('../')
, it = require('it-is')
, d = require('ubelt')
exports ['gate buffers when shut'] = function (test) {
var hundy = d.map(1,100, d.id)
, gate = es.pause()
, ten = 10
es.connect(
es.readArray(hundy),
es.log('after readArray'),
gate,
//es.log('after gate'),
es.map(function (num, next) {
//stick a map in here to check that gate never emits when open
it(gate.paused).equal(false)
console.log('data', num)
if(!--ten) {
console.log('PAUSE')
gate.pause()//.resume()
d.delay(gate.resume.bind(gate), 10)()
ten = 10
}
next(null, num)
}),
es.writeArray(function (err, array) { //just realized that I should remove the error param. errors will be emitted
console.log('eonuhoenuoecbulc')
it(array).deepEqual(hundy)
test.done()
})
)
gate.resume()
}
require('./helper')(module)

View File

@ -0,0 +1,52 @@
var es = require('..')
exports['do not duplicate errors'] = function (test) {
var errors = 0;
var pipe = es.pipeline(
es.through(function(data) {
return this.emit('data', data);
}),
es.through(function(data) {
return this.emit('error', new Error(data));
})
)
pipe.on('error', function(err) {
errors++
console.log('error count', errors)
process.nextTick(function () {
return test.done();
})
})
return pipe.write('meh');
}
exports['3 pipe do not duplicate errors'] = function (test) {
var errors = 0;
var pipe = es.pipeline(
es.through(function(data) {
return this.emit('data', data);
}),
es.through(function(data) {
return this.emit('error', new Error(data));
}),
es.through()
)
pipe.on('error', function(err) {
errors++
console.log('error count', errors)
process.nextTick(function () {
return test.done();
})
})
return pipe.write('meh');
}
require('./helper')(module)

View File

@ -0,0 +1,89 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
function readStream(stream, pauseAt, done) {
if(!done) done = pauseAt, pauseAt = -1
var array = []
stream.on('data', function (data) {
array.push(data)
if(!--pauseAt )
stream.pause(), done(null, array)
})
stream.on('error', done)
stream.on('end', function (data) {
done(null, array)
})
}
exports ['read an array'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var reader = es.readArray(readThis)
var writer = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['read an array and pause it.'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var reader = es.readArray(readThis)
readStream(reader, 10, function (err, data) {
if(err) throw err
it(data).deepEqual([3, 6, 9, 12, 15, 18, 21, 24, 27, 30])
readStream(reader, 10, function (err, data) {
it(data).deepEqual([33, 36, 39, 42, 45, 48, 51, 54, 57, 60])
test.done()
})
reader.resume()
})
}
exports ['reader is readable, but not writeable'] = function (test) {
var reader = es.readArray([1])
it(reader).has({
readable: true,
writable: false
})
test.done()
}
exports ['read one item per tick'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var drains = 0
var reader = es.readArray(readThis)
var tickMapper = es.map(function (data,callback) {
process.nextTick(function () {
callback(null, data)
})
//since tickMapper is returning false
//pipe should pause the writer until a drain occurs
return false
})
reader.pipe(tickMapper)
readStream(tickMapper, function (err, array) {
it(array).deepEqual(readThis)
it(array.length).deepEqual(readThis.length)
it(drains).equal(readThis.length)
test.done()
})
tickMapper.on('drain', function () {
drains ++
})
}
require('./helper')(module)

View File

@ -0,0 +1,197 @@
var es = require('../')
, it = require('it-is').style('colour')
, u = require('ubelt')
exports ['read an array'] = function (test) {
console.log('readable')
return test.end()
var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
console.log('readable')
var reader =
es.readable(function (i, callback) {
if(i >= readThis.length)
return this.emit('end')
console.log('readable')
callback(null, readThis[i])
})
var writer = es.writeArray(function (err, array){
if(err) throw err
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['read an array - async'] = function (test) {
var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
var reader =
es.readable(function (i, callback) {
if(i >= readThis.length)
return this.emit('end')
u.delay(callback)(null, readThis[i])
})
var writer = es.writeArray(function (err, array){
if(err) throw err
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['emit data then call next() also works'] = function (test) {
var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
var reader =
es.readable(function (i, next) {
if(i >= readThis.length)
return this.emit('end')
this.emit('data', readThis[i])
next()
})
var writer = es.writeArray(function (err, array){
if(err) throw err
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['callback emits error, then stops'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, called = 0
var reader =
es.readable(function (i, callback) {
if(called++)
return
callback(err)
})
reader.on('error', function (_err){
it(_err).deepEqual(err)
u.delay(function() {
it(called).equal(1)
test.done()
}, 50)()
})
}
exports['readable does not call read concurrently'] = function (test) {
var current = 0
var source = es.readable(function(count, cb){
current ++
if(count > 100)
return this.emit('end')
u.delay(function(){
current --
it(current).equal(0)
cb(null, {ok: true, n: count});
})();
});
var destination = es.map(function(data, cb){
//console.info(data);
cb();
});
var all = es.connect(source, destination);
destination.on('end', test.done)
}
exports ['does not raise a warning: Recursive process.nextTick detected'] = function (test) {
var readThisDelayed;
u.delay(function () {
readThisDelayed = [1, 3, 5];
})();
es.readable(function (count, callback) {
if (readThisDelayed) {
var that = this;
readThisDelayed.forEach(function (item) {
that.emit('data', item);
});
this.emit('end');
test.done();
}
callback();
});
};
//
// emitting multiple errors is not supported by stream.
//
// I do not think that this is a good idea, at least, there should be an option to pipe to
// continue on error. it makes alot ef sense, if you are using Stream like I am, to be able to emit multiple errors.
// an error might not necessarily mean the end of the stream. it depends on the error, at least.
//
// I will start a thread on the mailing list. I'd rather that than use a custom `pipe` implementation.
//
// basically, I want to be able use pipe to transform objects, and if one object is invalid,
// the next might still be good, so I should get to choose if it's gonna stop.
// re-enstate this test when this issue progresses.
//
// hmm. I could add this to es.connect by deregistering the error listener,
// but I would rather it be an option in core.
/*
exports ['emit multiple errors, with 2nd parameter (continueOnError)'] = function (test) {
var readThis = d.map(1, 100, d.id)
, errors = 0
var reader =
es.readable(function (i, callback) {
console.log(i, readThis.length)
if(i >= readThis.length)
return this.emit('end')
if(!(readThis[i] % 7))
return callback(readThis[i])
callback(null, readThis[i])
}, true)
var writer = es.writeArray(function (err, array) {
if(err) throw err
it(array).every(function (u){
it(u % 7).notEqual(0)
}).property('length', 80)
it(errors).equal(14)
test.done()
})
reader.on('error', function (u) {
errors ++
console.log(u)
if('number' !== typeof u)
throw u
it(u % 7).equal(0)
})
reader.pipe(writer)
}
*/
require('./helper')(module)

View File

@ -0,0 +1,76 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
, spec = require('stream-spec')
var next = process.nextTick
var fizzbuzz = '12F4BF78FB11F1314FB1617F19BF2223FB26F2829FB3132F34BF3738FB41F4344FB4647F49BF5253FB56F5859FB6162F64BF6768FB71F7374FB7677F79BF8283FB86F8889FB9192F94BF9798FB'
, fizz7buzz = '12F4BFseven8FB11F1314FB161sevenF19BF2223FB26F2829FB3132F34BF3seven38FB41F4344FB464sevenF49BF5253FB56F5859FB6162F64BF6seven68FBseven1Fseven3seven4FBseven6sevensevenFseven9BF8283FB86F8889FB9192F94BF9seven98FB'
, fizzbuzzwhitespce = ' 12F4BF78FB11F1314FB1617F19BF2223FB26F2829FB3132F34BF3738FB41F4344FB4647F49BF5253FB56F5859FB6162F64BF6768FB71F7374FB7677F79BF8283FB86F8889FB9192F94BF9798FB '
exports ['fizz buzz'] = function (test) {
var readThis = d.map(1, 100, function (i) {
return (
! (i % 3 || i % 5) ? "FB" :
!(i % 3) ? "F" :
!(i % 5) ? "B" :
''+i
)
}) //array of multiples of 3 < 100
var reader = es.readArray(readThis)
var join = es.wait(function (err, string){
it(string).equal(fizzbuzz)
test.done()
})
reader.pipe(join)
}
exports ['fizz buzz replace'] = function (test) {
var split = es.split(/(1)/)
var replace = es.replace('7', 'seven')
// var x = spec(replace).through()
split
.pipe(replace)
.pipe(es.join(function (err, string) {
it(string).equal(fizz7buzz)
}))
replace.on('close', function () {
// x.validate()
test.done()
})
split.write(fizzbuzz)
split.end()
}
exports ['fizz buzz replace whitespace using regexp'] = function (test) {
var split = es.split(/(1)/)
var replaceLeading = es.replace(/^[\s]*/, '')
var replaceTrailing = es.replace(/[\s]*$/, '')
// var x = spec(replace).through()
split
.pipe(replaceLeading)
.pipe(replaceTrailing)
.pipe(es.join(function (err, string) {
it(string).equal(fizzbuzz)
}))
replaceTrailing.on('close', function () {
// x.validate()
test.done()
})
split.write(fizzbuzz)
split.end()
}
require('./helper')(module)

View File

@ -0,0 +1,343 @@
'use strict';
var es = require('../')
, it = require('it-is')
, u = require('ubelt')
, spec = require('stream-spec')
, Stream = require('stream')
, from = require('from')
, through = require('through')
//REFACTOR THIS TEST TO USE es.readArray and es.writeArray
function writeArray(array, stream) {
array.forEach( function (j) {
stream.write(j)
})
stream.end()
}
function readStream(stream, done) {
var array = []
stream.on('data', function (data) {
array.push(data)
})
stream.on('error', done)
stream.on('end', function (data) {
done(null, array)
})
}
//call sink on each write,
//and complete when finished.
function pauseStream (prob, delay) {
var pauseIf = (
'number' == typeof prob
? function () {
return Math.random() < prob
}
: 'function' == typeof prob
? prob
: 0.1
)
var delayer = (
!delay
? process.nextTick
: 'number' == typeof delay
? function (next) { setTimeout(next, delay) }
: delay
)
return es.through(function (data) {
if(!this.paused && pauseIf()) {
console.log('PAUSE STREAM PAUSING')
this.pause()
var self = this
delayer(function () {
console.log('PAUSE STREAM RESUMING')
self.resume()
})
}
console.log("emit ('data', " + data + ')')
this.emit('data', data)
})
}
exports ['simple map'] = function (test) {
var input = u.map(1, 1000, function () {
return Math.random()
})
var expected = input.map(function (v) {
return v * 2
})
var pause = pauseStream(0.1)
var fs = from(input)
var ts = es.writeArray(function (err, ar) {
it(ar).deepEqual(expected)
test.done()
})
var map = es.through(function (data) {
this.emit('data', data * 2)
})
spec(map).through().validateOnExit()
spec(pause).through().validateOnExit()
fs.pipe(map).pipe(pause).pipe(ts)
}
exports ['simple map applied to a stream'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
//create event stream from
var doubler = es.map(function (data, cb) {
cb(null, data * 2)
})
spec(doubler).through().validateOnExit()
//a map is only a middle man, so it is both readable and writable
it(doubler).has({
readable: true,
writable: true,
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 2
}))
// process.nextTick(x.validate)
test.done()
})
writeArray(input, doubler)
}
exports['pipe two maps together'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
//create event stream from
function dd (data, cb) {
cb(null, data * 2)
}
var doubler1 = es.map(dd), doubler2 = es.map(dd)
doubler1.pipe(doubler2)
spec(doubler1).through().validateOnExit()
spec(doubler2).through().validateOnExit()
readStream(doubler2, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 4
}))
test.done()
})
writeArray(input, doubler1)
}
//next:
//
// test pause, resume and drian.
//
// then make a pipe joiner:
//
// plumber (evStr1, evStr2, evStr3, evStr4, evStr5)
//
// will return a single stream that write goes to the first
exports ['map will not call end until the callback'] = function (test) {
var ticker = es.map(function (data, cb) {
process.nextTick(function () {
cb(null, data * 2)
})
})
spec(ticker).through().validateOnExit()
ticker.write('x')
ticker.end()
ticker.on('end', function () {
test.done()
})
}
exports ['emit error thrown'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
es.map(function () {
throw err
})
mapper.on('error', function (_err) {
it(_err).equal(err)
test.done()
})
// onExit(spec(mapper).basic().validate)
//need spec that says stream may error.
mapper.write('hello')
}
exports ['emit error calledback'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
es.map(function (data, callback) {
callback(err)
})
mapper.on('error', function (_err) {
it(_err).equal(err)
test.done()
})
mapper.write('hello')
}
exports ['do not emit drain if not paused'] = function (test) {
var map = es.map(function (data, callback) {
u.delay(callback)(null, 1)
return true
})
spec(map).through().pausable().validateOnExit()
map.on('drain', function () {
it(false).ok('should not emit drain unless the stream is paused')
})
it(map.write('hello')).equal(true)
it(map.write('hello')).equal(true)
it(map.write('hello')).equal(true)
setTimeout(function () {map.end()},10)
map.on('end', test.done)
}
exports ['emits drain if paused, when all '] = function (test) {
var active = 0
var drained = false
var map = es.map(function (data, callback) {
active ++
u.delay(function () {
active --
callback(null, 1)
})()
console.log('WRITE', false)
return false
})
spec(map).through().validateOnExit()
map.on('drain', function () {
drained = true
it(active).equal(0, 'should emit drain when all maps are done')
})
it(map.write('hello')).equal(false)
it(map.write('hello')).equal(false)
it(map.write('hello')).equal(false)
process.nextTick(function () {map.end()},10)
map.on('end', function () {
console.log('end')
it(drained).ok('shoud have emitted drain before end')
test.done()
})
}
exports ['map applied to a stream with filtering'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = es.map(function (data, callback) {
if (data % 2)
callback(null, data * 2)
else
callback()
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.filter(function (j) {
return j % 2
}).map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}
exports ['simple mapSync applied to a stream'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = es.mapSync(function (data) {
return data * 2
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}
exports ['mapSync applied to a stream with filtering'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = es.mapSync(function (data) {
if (data % 2)
return data * 2
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.filter(function (j) {
return j % 2
}).map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}
require('./helper')(module)

View File

@ -0,0 +1,86 @@
/*
assert that data is called many times
assert that end is called eventually
assert that when stream enters pause state,
on drain is emitted eventually.
*/
var es = require('..')
var it = require('it-is').style('colour')
var spec = require('stream-spec')
exports['simple stream'] = function (test) {
var stream = es.through()
var x = spec(stream).basic().pausable()
stream.write(1)
stream.write(1)
stream.pause()
stream.write(1)
stream.resume()
stream.write(1)
stream.end(2) //this will call write()
process.nextTick(function (){
x.validate()
test.done()
})
}
exports['throw on write when !writable'] = function (test) {
var stream = es.through()
var x = spec(stream).basic().pausable()
stream.write(1)
stream.write(1)
stream.end(2) //this will call write()
stream.write(1) //this will be throwing..., but the spec will catch it.
process.nextTick(function () {
x.validate()
test.done()
})
}
exports['end fast'] = function (test) {
var stream = es.through()
var x = spec(stream).basic().pausable()
stream.end() //this will call write()
process.nextTick(function () {
x.validate()
test.done()
})
}
/*
okay, that was easy enough, whats next?
say, after you call paused(), write should return false
until resume is called.
simple way to implement this:
write must return !paused
after pause() paused = true
after resume() paused = false
on resume, if !paused drain is emitted again.
after drain, !paused
there are lots of subtle ordering bugs in streams.
example, set !paused before emitting drain.
the stream api is stateful.
*/
require('./helper')(module)

View File

@ -0,0 +1,47 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
, join = require('path').join
, fs = require('fs')
, Stream = require('stream').Stream
, spec = require('stream-spec')
exports ['es.split() works like String#split'] = function (test) {
var readme = join(__filename)
, expected = fs.readFileSync(readme, 'utf-8').split('\n')
, cs = es.split()
, actual = []
, ended = false
, x = spec(cs).through()
var a = new Stream ()
a.write = function (l) {
actual.push(l.trim())
}
a.end = function () {
ended = true
expected.forEach(function (v,k) {
//String.split will append an empty string ''
//if the string ends in a split pattern.
//es.split doesn't which was breaking this test.
//clearly, appending the empty string is correct.
//tests are passing though. which is the current job.
if(v)
it(actual[k]).like(v)
})
//give the stream time to close
process.nextTick(function () {
test.done()
x.validate()
})
}
a.writable = true
fs.createReadStream(readme, {flags: 'r'}).pipe(cs)
cs.pipe(a)
}
require('./helper')(module)

View File

@ -0,0 +1,15 @@
var es = require('../')
exports['handle buffer'] = function (t) {
es.stringify().on('data', function (d) {
t.equal(d.trim(), JSON.stringify('HELLO'))
t.end()
}).write(new Buffer('HELLO'))
}
require('./helper')(module)

View File

@ -0,0 +1,31 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
exports ['write an array'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var writer = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array).deepEqual(readThis)
test.done()
})
d.each(readThis, writer.write.bind(writer))
writer.end()
}
exports ['writer is writable, but not readable'] = function (test) {
var reader = es.writeArray(function () {})
it(reader).has({
readable: false,
writable: true
})
test.done()
}
require('./helper')(module)