A Guided Tour of Streams in Rust
When collecting information on how to write GRPC or Websocket servers for our Qovery infrastructure, I came across a lot of resources. But while many guides provided an in-depth insight into futures, they sorely lacked information on how the Stream API works in Rust. And, more importantly, on how to use it properly.
Sadly, you can't turn a blind eye on streams. As soon as you go beyond the simple request/response protocol of our beloved REST APIs, the notions of flow, async generator, and so on, inevitably arise.
This is especially true when it comes to Rust. When you decide to use tonic for your GRPC or tokio tungstenite for your Websocket, the only usable interfaces with those libraries revolve around streams.
This is why this article focuses on introducing streams in the context of Rust.
Romain Gérard
May 13, 2022 · 12 min read#What is a stream?
A stream is an iterator when seen from the asynchronous world. If you wander around the synchronous world, and you observe an iterator, it will look like something like this:
Iterator<MyItem>
Which represents a sequence of 0..N objects of MyItem that can be retrieved, if asked nicely to the iterator.
Chances are, you already saw one in the flesh. They have been around in Java since v1.2, and you can also find them in the C++ standard library, as well as in Python.
An iterator usually comes up when you want to iterate over a collection (such as a list, a vector, a tree, etc.). It is a common abstraction that makes it possible to decouple from the collection implementation, and represent the intent of a sequence of items that can be retrieved in a linear fashion.
In Rust, an Iterator only has 2 requirements:
pub trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;
}
- an associated type Item, which represents the type of objects that our iterator is going to yield back,
- and a next() method that returns an Option of the associated Item.
Why an Option? Using an option is useful when receiving a None, as it can inform you that the iterator does not have any elements left, and is now exhausted. If you take the Java API for example, it has 2 methods–one called next() , same as in Rust, but that returns an item directly, and another one called hasNext(). It is up to you, as a developer, to call hasNext() before calling next(). Forgetting to do so constitutes a logical error that can cause your program to crash. By merging hasNext() and next(), and returning an Option(), Rust prevents this kind of errors from arising, and provides a safer API interface for developers.
#Stream: An Asynchronous Iterator
Now that we’ve been over what an iterator is, let’s go back to streams. As we’ve seen, a stream is an asynchronous version of an iterator.
Ok, let’s check its definition:
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Option<Self::Item>>;
}
Hold on, what’s with the poll/pin stuff? Let’s put them aside for now, and alter the definition to gain a better understanding:
pub trait Stream {
type Item;
fn next(self: &mut Self ) -> impl Future<Output = Option<Self::Item>>;
// Warning, the trait does not really return a Future, it is more correct to say that Stream trait is also a Future
// but for the sake of the explanation, we are going to say that next returns a future
}
Functions that return Future in Rust can be elided thanks to the special async keyword. So if we alter the definition again, it turns out the stream definition is mentally equivalent to:
pub trait Stream {
type Item;
async fn next(self: &mut Self) -> Option<Self::Item>;
}
Here, we can see that the stream trait is equivalent to the Iterator trait, only with an async keyword in front of the function.
At first, the definition of the stream seems more complex than the Iterator trait’s one, but this is only due to the inherent complexity/machinery necessary for async to work in Rust.
#A simplified explanation of Async in Rust
To allow you to understand what is going on in the original trait, let’s take a quick look at the following type definitions:
Future is a type that promises to yield a value in the future. It can be seen as a task that is going to perform some action in order to fulfill this promise and return the value.
Poll is the type that builds a bridge between the asynchronous world and the synchronous world. It can only have 2 variants:
pub enum Poll<T> {
Ready(T),
Pending,
}
If Future can be seen as a task that returns a value at some point, Poll is the return type of the future, indicating if the value is ready or not. This type keeps the runtime that is responsible for making the task/future progress (usually an executor, i.e: https://tokio.rs/) posted on whether the value is ready to be consumed, or if the task/future should be re-polled at a later time.
Context is a struct that only contains a Waker. Its goal is to prevent the async runtime from polling your future again and again even though it is not ready yet (which consumes CPU time, and starves other futures to be polled). The Waker type allows the future to notify the runtime when it is ready to make some progress.
This is a common scenario, where you don’t know when your value is ready because it is waiting on an external factor (i.e a network socket with data ready to be read, getting the lock of a mutex). Therefore, instead of wasting CPU time, the future registers the Waker to be called, by passing it to a reactor (kqueue, epoll, io completion) or something that gets notified when a value is potentially ready.
As for Pin, it is a type that prevents your object from moving into memory. I won’t go into detail, and frankly you don’t need to acknowledge it when you see it. But should you want to dig deeper into this complex topic, you can always take a look at this fasterthanli article, or the shorter Cloudflare article.
#Simplified flow of value retrieval from a future
In the above diagram, we have a future that is responsible for retrieving the value from a TCP socket.
At first, there aren’t any bytes ready, so the Waker is forwarded to the reactor. When the reactor receives data, it calls the Waker.wake() function, which in turn notifies the async runtime that the task is ready to be polled again.
#How to create a stream
Now that we’ve been through all the theories, let’s dive into practical examples, and figure out how we can create streams.
Let’s start with the Stream::iter function (aka the easiest one), which allows you to create a stream from an iterator. As a stream is an async iterator, this function creates a stream that, whenever polled, returns a Poll::Ready(next value from the iterator). Basically, the future is never yielding/awaiting, it is always ready when polled:
use futures::stream::{self, StreamExt};
let stream = stream::iter(vec![17, 19]);
assert_eq!(vec![17, 19], stream.collect::<Vec<i32>>().await);
Stream::iter is useful for tests, when you don’t care about the async/await stuff, and are only interested in the stream of values.
Another interesting one is repeat_with, where you can pass a lambda/closure in order to generate the stream of values on demand/lazily:
use futures::stream::{self, StreamExt};
// From the zeroth to the third power of two:
let mut curr = 1;
let mut pow2 = stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });
assert_eq!(Some(1), pow2.next().await);
assert_eq!(Some(2), pow2.next().await);
assert_eq!(Some(4), pow2.next().await);
assert_eq!(Some(8), pow2.next().await);
#Gotchas
If you take a look at the import, you will see that we use StreamExt, which is a shortcut for StreamExtension. It is a common practice in Rust to put only the minimal definition of a trait in a file, and additional/helper/nicer api in another Extension file. In our case, all the goodies and easy-to-use functions are stored in the StreamExt module, and if you don’t import it, you will end up only with poll_next from the Stream module, which is not really friendly.
Be aware, also, that stream traits are not (yet) in the rust std::core like futures. They are stored in the future_utils crate, and StreamExtensions also aren’t standard yet. This often means that you can get confusing/conflicting imports due to different libraries providing different ones. For example, tokio provides different StreamExt from futures_utils. If you can, try to stick to futures_utils, as it is the most commonly used crate for everything async/await.
Ok, so far we’ve only seen how to create a stream from normal synchronous code, but how can we create a stream that awaits something for real?
A handy function for that is Stream::unfold. It takes a state/seed as a parameter, and invokes an async future passing it, that is responsible for generating the next element of the stream and the new state that is going to be passed.
For example, if we want to generate a stream that iterates over the paginated response of an API endpoint, we will do something like this:
use futures::stream::{self, StreamExt};
let stream = stream::unfold(0, |page_nb| async move {
if page_nb > 50 {
return None;
}
let events = get_events_from_page(page_nb).await;
Some((events, page_nb + 1))
});
You can even create a state machine or sequence of different action with unfold using:
let state = (true, true, true);
let stream = stream::unfold(state, |state| async move {
match state {
(true, phase2, phase3) => {
// do some stuff for phase 1
let item = async { 1 }.await;
Some((item, (false, phase2, phase3)))
},
(phase1, true, phase3) => {
// do some stuff for phase 2
let item = async { 2 }.await;
Some((item, (false, false, phase3)))
},
(phase1, phase2, true) => {
// do some stuff for phase 3
let item = async { 3 }.await;
Some((item, (false, false, false)))
},
_ => None,
}
});
assert_eq!(Some(1), stream.next().await);
assert_eq!(Some(2), stream.next().await);
assert_eq!(Some(3), stream.next().await);
assert_eq!(None, stream.next().await);
Here, the state only shows the phase we are in, and helps to unroll the seed/state in order to generate the correct computation for each step.
Manually unrolling our stream from a state can seem fun at first, but it can quickly become a hassle if we have to do it by hand every time. Wouldn’t it be nice to have something more user-friendly, such as a generator in Python, where a special keyword yield would do all the magic for you? This may just do the trick:
# A generator in python with the special keyword yield
def firstn(n):
num = 0
while num <= n:
yield num
num += 1
And just like with everything in Rust, if it is not in the language, there is a macro! for that.
#A higher-level API
The async-stream crate provides 2 macros, stream! and try_stream!, that allows you to create a stream as if it was normal Rust code:
let s = stream! {
for i in 0..3 {
yield i;
}
};
If we go back to our earlier example with 3 phases, it will simply be altered as such:
let stream = stream! {
yield async { 1 }.await;
yield async { 2 }.await;
yield async { 3 }.await;
});
assert_eq!(Some(1), stream.next().await);
assert_eq!(Some(2), stream.next().await);
assert_eq!(Some(3), stream.next().await);
assert_eq!(None, stream.next().await);
Much easier to read, right? Internally, the macro is going to create a tiny channel on the thread-local storage, between the future provided and the stream object. The macro replaces all the yield keywords via a send to this tiny channel. Finally, when the stream is polled for the next item, it, in turn, polls the provided future and then checks if there is something in the tiny channel for the stream to yield back.
It basically works the same as with the stream::unfold function, but with a bit more machinery in order to propose a more high-level API, where the responsibility of propagating the state of stream is handled on your behalf.
#Gotchas
If you take a close look at the traits provided for Stream, you will see that there is not 1, not 2, but 3 traits that look like a stream. Namely Stream , TryStream and FusedStream. What are they?
- Stream is very similar to its Iterator counterpart, except you should not re-poll the stream once it has returned a None indicating the stream is exhausted. If you do so, you enter the realm of undefined behavior and the implementation is allowed to do what it is best. See panics section for me details
- FusedStream is the same thing as a Stream, except it lets the user know if the stream is really exhausted after the first None, or if it is safe to poll it again. For example, let’s say you want to create a stream that is backed by a circular buffer. After the first iteration, the FusedStream is going to return None, but it is safe again to re-poll the FusedStream after that, in order to re-resume a new iterator of this buffer.
- TryStream is a special trait tailored around streams that produce Result<value, error>. TryStreams propose functions to easily match and transform the inner result. You can see them as a more convenient API for streams that yield Result items.
#How to consume a stream ?
The only function you will ever need is next(), which can be found in the StreamExt trait.
From there, you are able to consume it in a regular for/loop fashion inside any async block.
Here is a simple async function that takes a stream emitting int, and returns the sum of its values.
use futures_util::{pin_mut, Stream, stream, StreamExt};
async fn sum(stream: impl Stream<Item=usize>) -> usize {
pin_mut!(stream);
let mut sum: usize = 0;
while let Some(item) = stream.next().await {
sum = sum + item;
}
sum
}
You can see how it is possible to send an item into a channel, pass it onto another function that handles single elements, consume part of the stream, etc.
#Gotchas
Don't forget to pin your stream before iterating on it. The compiler is going to warn you if you do but will recommend using Box::pin, which is not necessary if you can stack pin it. For this, you can use the pin_mut! macro from futures_utils.
Streams can also be consumed thanks to combinators. There are so many around, such as the well-known map, filter, for_each, skip, etc. **I’ll let you check them all out directly in the documentation
A useful one for debugging or simply logging is the inspect combinator. It allows you to pass a lambda that will take by ref each item emitted by the stream, without consuming the item.
let stream = stream::iter(vec![1, 2, 3]);
let mut stream = stream.inspect(|val| println!("{}", val));
assert_eq!(stream.next().await, Some(1));
// will print also in the console "1"
#Gotchas
If you use a combinator from TryStream(Ext), make sure you read the documentation, especially for methods that start with try_xxx. Some methods will tell you that the stream is exhausted at the first Err(), while others won’t. The purpose of the TryStream trait is to allow you to easily work around Result items, therefore all those methods handle Err() the way they would a snowflake, but with a special behavior for it. To save you some unexpected trouble, I strongly recommend you read the documentation first!
This article is the first part of a series regarding Streams. We saw how they compare in Rust to an asynchronous iterator, the main ways to create, use and combine them.
The next article will focus on the practical usage of Stream with Websockets, so stay tuned :)
--
Your Favorite DevOps Automation Platform
Qovery is a DevOps Automation Platform Helping 200+ Organizations To Ship Faster and Eliminate DevOps Hiring Needs
Try it out now!