I have fallen in love with the flexibility of
io.Writer when dealing with any stream of data in Go. And while I am more or less smitten at this point, the reader interface challenged me with something you might think simple: splitting it in two.
I’m not even certain “split” is the right word. I would like to receive an
io.Reader and read over it multiple times, possibly in parallel. But because readers don’t necessarily expose the
Seek method to reset them, I need a way to duplicate it. Or would that be clone it? Fork?!
Suppose you have a web service that allows a user to upload a file. The service will store the file on “the cloud”, but first it needs a bit of processing. All you have to work with is the
io.Reader from the incoming request.
There is not one way to go about solving this problem, of course. Depending on the types of files, throughput of the service and the kinds of processing required, some options are more practical than others. Below, I lay out five different methods of varying complexity and flexibility. I imagine there are many more, but these are a good starting point.
Solution #1: The Simple
If the source reader doesn’t have a
Seek method, then why not make one? You can pump the input into a
bytes.Reader and rewind it as many times as you like:
If the data is small enough, this might be the most convenient option; you could forgo the
bytes.Reader altogether and work off the byte slice instead. But suppose the file is large, such as a video or RAW photo. These behemoths will chew through memory, especially if the service is high-traffic. Not to mention, you cannot perform these actions in parallel.
Pro’s: Probably the simplest solution.
Con’s: Synchronous and not prudent if you expect many or large files.
Solution #2: The Reliable File System
OK, then how about you drop the data into a file on disk (a’la
ioutil.TempFile) and skip the penalties of storing it in RAM?
If the final destination is on the service’s file system, then this is probably your best choice (albeit with a real file), but let’s assume it will end up on the cloud. Again, if the files are large, the IO costs here could be noticeable and unnecessary. You run the risk of bugs or crashes orphaning files on the machine, and I also wouldn’t recommend this if the data is sensitive in any way.
Pro’s: Keeps the whole file out of RAM.
Con’s: Still synchronous, potential for lots of IO, disk space, and orphaned data.
Solution #3: The Duct-Tape
In some cases, the metadata you need exists in the first handful of bytes of the file. Identifying a file as a JPEG, for instance, only requires checking that the first two bytes are
0xFF 0xD8. This can be handled synchronously using a
io.MultiReader, which glues together a set of readers as if they were one. Here’s our JPEG example:
This is a great technique if you intend to gate the upload to only JPEG files. With only two bytes, you can cancel the transfer without entirely reading it into memory or writing it to disk. As you might expect, this method falters in situations where you need to read in more than a little bit of the file to gather the data, such as calculating a word count across it. Having this process blocking the upload may not be ideal for intensive tasks. And finally, most 3rd-party (and the majority of the standard library) packages entirely consume a reader, preventing you from using an
io.MultiReader in this way.
Another solution would be to use
bufio.Reader.Peek. It essentially performs the same operation but you can eschew the MultiReader. That, and it gives you access to some other useful methods on the reader.
Pro’s: Quick and dirty reads off the top of a file, can act as a gate.
Con’s: Doesn’t work for unknown-length reads, processing the whole file, intensive tasks, or with most 3rd-party packages.
Solution #4: The Single-Split
Back to our scenario of a large video file, let’s change the story a bit. Your users will upload the video in a single format, but you want your service to be able to display those videos in a couple of different formats. You have a 3rd-party transcoder that can take in an
io.Reader of (say) MP4 encoded data and return another reader of WebM data. The service will upload the original MP4 and WebM versions to the cloud. The previous solutions must perform these steps synchronously and with overhead; now, you want to do them in parallel.
Take a look at
io.TeeReader, which has the following signature:
func TeeReader(r Reader, w Writer) Reader. The docs say “TeeReader returns a Reader that writes to w what it reads from r.” This is exactly what you want! Now how do you get the data written into w to be readable? This is where
io.Pipe comes into play, yielding a connected
io.PipeWriter (i.e., writes to the latter are immediately available in the former). Let’s see it in action:
As the uploader consumes
tr, the transcoder receives and processes the same bytes before sending it off to storage. All without a buffer and in parallel! Be aware of the use of goroutines for both pathways, though.
io.Pipe blocks until something writes and reads from it. Attempting this on the same thread will give you a
fatal error: all goroutines are asleep - deadlock! panic. Another point of caution: when using pipes, you will need to explicitly trigger an EOF by closing the
io.PipeWriter at the appropriate time. In this case, you would close it after the TeeReader has been exhausted.
This method also employs channels to communicate “doneness”. If you expect a value back from these processes, you could replace the
chan bool for a more appropriate type.
Pro’s: Completely independent, parallelized streams of the same data!
Con’s: Requires the added complexity of goroutines and channels to work.
Solution #5: The Multi-Split
io.TeeReader solution works great when only one other consumer of the stream exists. As the service parallelizes more tasks (e.g., more transcoding), teeing off of tees becomes gross. Enter the
io.MultiWriter: “a writer that duplicates its writes to all provided writers.” This method utilizes pipes like in the previous solution to propagate the data, but instead of a TeeReader, you can use
io.Copy to split the data across all the pipes:
This is more or less analogous with the previous method, but noticeably cleaner when the stream needs multiple clones. Because of the pipes, you’ll again require goroutines and synchronizing channels to avoid the deadlock. We defer closing all the pipes until the copy is complete.
Pro’s: Can make as many forks of the original reader as desired.
Con’s: Even more use of goroutines and channels to coordinate.
What About Channels?
Channels are one of the most unique and powerful concurrency tools Go has to offer. Serving as a bridge between goroutines, they combine communication and synchronization in one. You can allocate a channel with or without a buffer, allowing for many creative ways to share data. So why did I not provide a solution that leverages them for more than sync?
Looking through the top-level packages of the standard library, channels rarely appear in function signatures:
time: useful for a
reflect: … ‘cause reflection
fmt: for formatting it as a pointer
builtin: exposes the
The implementation of
io.Pipe forgoes a channel in favor of
sync.Mutex to move data safely between the reader and writer. My suspicion is that channels are just not as performant, and presumably mutexes prevail for this reason.
When developing a reusable package, I’d avoid channels in my public API to be consistent with the standard library but maybe use them internally for synchronization. If the complexity is low enough, replacing them with mutexes may even be ideal. That said, within an application, channels are wonderful abstractions, easier to grok than locks and more flexible.
I’ve only broached a handful of ways to go about processing the data coming from an
io.Reader, and without a doubt there are plenty more. Go’s implicit interface model plus the standard library’s heavy use of them permits many creative ways of gluing together various components without having to worry about the source of the data. I hope some of the exploration I’ve done here will prove as useful for you as it did for me!
Errata (Updated: 2017-05-22)
Jamie Talbot kindly pointed out in the comments that solutions #4 and #5 would panic if one of the concurrent goroutines produced an error. That’s certainly not the intended effect here, especially considering the primary focus is on tee-ing an
io.Reader. I’ve since removed the error handling from those examples and will perhaps write up an article at a later date regarding handling errors from concurrent tasks. Thanks again, Jamie!