Skip to content

Conversation

@stephenplusplus
Copy link
Contributor Author

@mafintosh, as promised, here's my ping :)

A little intro-- gcloud-node is the Node gateway to Google's cloud services, such as Datastore, PubSub, and Cloud Storage. Cloud Storage lets you store files in "buckets," so we wanted to allow users a way to stream files in and out of their buckets.

Before a user can do that, we have to generate a valid connection, which includes making an API request to retrieve a token. Because of this, Duplexify was a great fit, because we needed to set the readable and writable portions of the stream asynchronously.

This is how our API looks currently (using Duplexify behind the scenes):

var fs = require('fs');
var gcloud = require('gcloud')({ /* connection details. */ });
var bucket = gcloud.storage.bucket({ bucketName: 'my-bucket' });

// Uploading a file.
fs.createReadStream('/local/file.jpg')
  .pipe(bucket.createWriteStream('file-to-create-in-the-bucket.jpg'));

// Downloading a file.
bucket.createReadStream('file-from-the-bucket.jpg')
  .pipe(fs.createWriteStream('/new/local/file.jpg'));

This PR uses Duplexify a little differently, so that our api would look like:

var fs = require('fs');
var gcloud = require('gcloud')({ /* connection details. */ });
var bucket = gcloud.storage.bucket('my-bucket');

// Uploading a file.
fs.createReadStream('/local/file.jpg')
  .pipe(bucket.file('file-to-create-in-the-bucket.jpg'));

// Downloading a file.
bucket.file('file-from-the-bucket.jpg')
  .pipe(fs.createWriteStream('/new/local/file.jpg'));

bucket.file(/*...*/) is a wrapper for new File() -- File inherits Duplexify.

Truthfully, I have no idea if I'm doing anything shameful with this implementation, which is why I would love a review! Here is the File class: https://github.com/stephenplusplus/gcloud-node/blob/spp--storage-refactor/lib/storage/file.js (please forgive the massive amounts of comments :))

My points of concern:

The problem:

A user can ask for a File object without wanting to pipe data to/from it. A File object has other functionality beyond being used as a stream. Because we need to make an API request to fetch a token before the stream can be used, we need to wait until we know they're going to be using this as a stream.

The solution: bindEvents_

This is called at the instantiation of a File object. It caches _write and _read, then overwrites them so that the first time they are called, the code goes and creates an authorized connection, until eventually calling setReadable/Writable. After the first time being called, they are overwritten, then it re-overwrites itself back to the cached version.

The problem:

When piping has completed, we need to tear down the stream, so that the next time it's used as a stream, we go and fetch a valid token again.

The solution: bindEvents_->this.on('end')

When the end event is emitted (and complete & error (see the lines above)), this removes any bound listeners, calls Duplexify#destroy, re-initializes with the Duplexify constructor, then re-calls bindEvents_ to re-bind all of these events.

The problem:

I'm a stream fool. Put in terms of an error message:

 Error: write after end
  at writeAfterEnd (/Users/stephen/dev/gcloud-node/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:144:12)
  at File.Writable.write (/Users/stephen/dev/gcloud-node/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:192:5)
  at write (_stream_readable.js:601:24)
  at flow (_stream_readable.js:610:7)
  at ReadStream.pipeOnReadable (_stream_readable.js:642:5)
  at ReadStream.emit (events.js:92:17)
  at emitReadable_ (_stream_readable.js:426:10)
  at emitReadable (_stream_readable.js:422:5)
  at readableAddChunk (_stream_readable.js:165:9)
  at ReadStream.Readable.push (_stream_readable.js:127:10)
  at onread (fs.js:1574:12)
  at callback (/Users/stephen/dev/gcloud-node/node_modules/mocha/node_modules/glob/node_modules/graceful-fs/polyfills.js:207:17)
  at Object.wrapper [as oncomplete] (fs.js:465:17)

The solution: through().pipe(writable)

setWritableStream_ is called on the first _write. It calls out to getWritableStream_, which forms an http request through the request module, writes some header information to it, then returns it. Once I get it, I pass it to setWritable.

Here's my use case from my Mocha test that triggers the above error:

var file = bucket.file('LargeFile'); // (returns a File object)
fs.createReadStream(files.big.path)
  .pipe(file)
  .on('error', done)
  .on('complete', function(fileObject) {
    assert.equal(fileObject.md5Hash, files.big.hash);
    file.delete(done);
  });

This error goes away after calling setWritable(writable) if I also do: through().pipe(writable). I have no idea why this works. I would guess it's giving the writable stream some kind of "kick-start", and if that's the case, there's likely a better/more direct way to do it.

Sorry for throwing all of this at you. I appreciate any help you can give, no matter how small or big. Feel free to send me a bill for any headache relief medication purchases I may be inducing :)

@ryanseys
Copy link
Contributor

It caches _write and _read, then overwrites them so that the first time they are called, the code goes and creates an authorized connection, until eventually calling setReadable/Writable.

Hmm, I'm a stream & duplexify noob too but I thought duplexify let you delay setting a stream to readable / writable, allowing you to complete stuff before it would actually pipe. I don't see why we need to cache the methods. That seems hacky.

@stephenplusplus
Copy link
Contributor Author

We need a trigger to know when we're being asked for data or sent it. Creating a file object isn't that trigger, as a file object isn't only usable as a stream.

Edit for clarity:

What's great with Duplexify is that it returns a stream instance that will hold off data until we set the readable/writable stream. So, that probably normally looks like:

var dup = duplexify();

asyncGetWritableStream(function(stream) {
dup.setWritable(stream);
});

fs.createReadStream('/local/file.jpg').pipe(dup);

In our case, we want to give the user a stream instance even if they don't intend to use it as one. I don't believe this is the intended use case for Duplexify, which is why I had to do the rigging noted above.

The unique thing for us is since it doesn't have to be used as a stream, we don't want to fire any API requests to get the connection & stream until we know that it is.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@ryanseys
Copy link
Contributor

Is it bad to create a stream and not use it as one right away? :( I'm sorry I still don't really understand why _write and _read can't remain on the object until the object is used as a stream (or not used as one) and instead they have to be cached? Is _write and _read being dual purposed here?

@stephenplusplus
Copy link
Contributor Author

No problem, it's definitely my failure to explain properly.

The difference between this:

var dup = duplexify();
asyncGetWritableStream(function(stream) {
  dup.setWritable(stream);
});
fs.createReadStream('/local/file.jpg').pipe(dup);

and this:

var file = bucket.file('my-file.jpg');

is that in the first example, we are creating a stream object for the intentions of using it in a streaming pipeline. In the second example, we are creating a file object, with the chance of using it as a stream.

We want to wait until we know it's being used as a stream before making any API calls to form a connection and writable stream. If we did it on instantiation of File(), we could very likely be making unnecessary API calls, and forming requests that will never be written to.

An easy fix for this could look something like this:

var file = bucket.file('my-file.jpg');

fs.createReadStream('/local/file.png')
  .pipe(file.createWriteStream());

This is similar to how we are using Duplexify currently. Only when createWriteStream is called on the File object would we go and fetch the API token and prepare a writable request stream.

I personally like the API of this PR better than that way (treating File objects as Duplex streams). Everything [seems] to be working properly, but that's why I'm excited for a review to catch any silliness.

@ryanseys
Copy link
Contributor

Likely the recent readme changes turned this unmergable. :(

@stephenplusplus
Copy link
Contributor Author

Oops, thanks for catching that. No biggy, I'll fix that up.

@stephenplusplus
Copy link
Contributor Author

Fixed up 👍

@ryanseys
Copy link
Contributor

👍

@mafintosh
Copy link

The problem:

A user can ask for a File object without wanting to pipe data to/from it. A File object has other functionality beyond being used as a stream. Because we need to make an API request to fetch a token before the stream can be used, we need to wait until we know they're going to be using this as a stream.

The solution: bindEvents_

This is called at the instantiation of a File object. It caches _write and _read, then overwrites them so that the first time they are called, the code goes and creates an authorized connection, until eventually calling setReadable/Writable. After the first time being called, they are overwritten, then it re-overwrites itself back to the cached version.

I've had this problem before. It would be cool to create/find a lazy-stream module that solves the problem in the general case. Your solution seems fine to me though :)

The problem:

When piping has completed, we need to tear down the stream, so that the next time it's used as a stream, we go and fetch a valid token again.

Does mean you can do stuff like this?

var file = bucket.file('my-file')

file.on('data', function(data) {
  // ... read data ..
})
file.on('end', function() {
  file.on('data', function(data) {
    // .. read the data again?
  })
})

As in you can reuse the same file object as multiple streams implicitly?

If that is the case I would prefer a file.createReadStream() / file.createWriteStream() method since the above code seems a bit magic to me. I'm also not sure if we run into weird territory in regards to streams when we reinitialize the stream constructors.

Another way to solve this would be to simply reinitialize the file if you want a new stream

var file1 = bucket.file('my-file')
file1.pipe(...)

var file2 = bucket.file('my-file') // just create a new file object if you want to pipe it again
file2.pipe(...)

That way we can keep the duplex api without it becoming too hackish (the cost of recreating the file object seems negligible anyways).

The problem:

I'm a stream fool. Put in terms of an error message:

You are not a fool :)
The reason because this works is because of some crazy logic in the the request module.
Basically the request module detects whether or not it is being piped to and if not it just ends the writable end of the stream right away. Your empty pipe makes the writable end stay open.

See:
https://github.com/mikeal/request/blob/master/request.js#L628-L632
https://github.com/mikeal/request/blob/master/request.js#L679-L683

Sorry for throwing all of this at you. I appreciate any help you can give, no matter how small or big. Feel > free to send me a bill for any headache relief medication purchases I may be inducing :)

No problem at all! Glad to see more people getting into streams :)

General issue:

https://github.com/stephenplusplus/gcloud-node/blob/spp--storage-refactor/lib/storage/file.js#L410
You should not emit end yourself as that is a core stream event that is emitted when the readable part ends

@stephenplusplus
Copy link
Contributor Author

If that is the case I would prefer a file.createReadStream() / file.createWriteStream() method since the above code seems a bit magic to me.

I think that's a good idea. I like file being the stream for the easiest use possible, but it's probably better to force a user to be more direct with their intentions. Plus, doing this saves us the step of overwriting _read and _write - instead using createReadStream and createWriteStream as the trigger to fetch a token.

Your empty pipe makes the writable end stay open.

Wow, interesting. I definitely wouldn't have figured that out. Is there a better solution available or a typical way to handle this?

Thank you very much for the review! I'll make these changes and re-ping if you want to take another look, but don't feel obligated. You've been a huge help already!

@stephenplusplus
Copy link
Contributor Author

Pushed an update that removes the magic: d17d6c5

A file stream must be created on demand now:

var file = bucket.file('my-file');

file.createReadStream().pipe(/*...*/)

Docs are updated: http://stephenplusplus.github.io/gcloud-node/#/docs/master/storage/file

Also, I made https://github.com/stephenplusplus/stream-events to attempt to handle broadcasting 'writing' and 'reading' when a stream invokes _read and _write 💃 (not used in our app)

@stephenplusplus
Copy link
Contributor Author

Re: my earlier question: "Is there a better solution available or a typical way to handle this?"

Here's what I did:

- through().pipe(writable);
+ dup().pipe(writable);

I ended up ditching through as a dependency, instead just using what we already have, Duplexify, to create a simple stream.

@ryanseys
Copy link
Contributor

Aww, sad to see my magic file idea isn't very ideal. It's a little wonky that you have to force writable on request to stay open, would that be considered a bug in request? Is there any guidelines as what to do as a dual-purposed (stream and general object) object? We're effectively trying to do weird stuff like request does here. Request can be piped, be piped to, or neither, kinda like file should have been done. But I agree, that it should be more ephemeral so you can't pipe after piping. That'd be weird.

That all being said, I don't want our codebase to end up like requests :P At this point I'm just a bystander trying to learn streams.

stephenplusplus added a commit that referenced this pull request Oct 20, 2014
sofisl pushed a commit that referenced this pull request Jan 27, 2026
* chore(deps): update dependency gts to ^0.8.0

* fixup
sofisl pushed a commit that referenced this pull request Jan 27, 2026
* chore(deps): update dependency gts to ^0.8.0

* fixup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants