Streams and Sinks in Dart and Flutter

Streams and sinks are mainstays in Dart and Flutter asynchronous programming, and now it's time to explore what streams are and how they can be used to solve problems.

The code for this article was tested with Dart 2.8.4 and Flutter 1.17.5.
Note: In order to get the most out of this article, it's best to be familiar with the concepts detailed in the Asynchrony Primer for Dart and Flutter.

What are streams?

Simply put, streams are a source of asynchronous events delivered sequentially. There are data events, which are sometimes referred to as elements of the stream due to a stream's similarity to a list, and there are error events, which are notifications of failure. Once all data elements have been emitted, a special event signaling the stream is done will notify any listeners that there is no more.

The primary advantage of using streams to communicate is that it keeps code loosely coupled. The owner of a stream can emit values as they become available, and it doesn't need to know anything about who's listening or why. Similarly, consumers of the data need only adhere to the stream interface, and the means by which the stream's data is generated are entirely hidden.

There are four main classes in Dart's async libraries that are used to manage streams:

  • Stream: This class represents an asynchronous stream of data. Listeners can subscribe to be notified of the arrival of new data events.
  • EventSink: A sink is like a stream that flows in the opposite direction. Adding data events to an EventSink funnels that data into a connected stream.
  • StreamController: A StreamController simplifies stream management, automatically creating a stream and sink, and providing methods for controlling a stream's behavior.
  • StreamSubscription: Listeners on a stream can save a reference to their subscription, which will allow them to pause, resume, or cancel the flow of data they receive.

Most of the time you won't directly instantiate the first two, because when you create a StreamController, you get the stream and sink for free. Data subscribers listen for updates on a Stream instance, and an EventSink is used to add new data to the stream. Subscribers to the stream can manage their subscription with a StreamSubscription instance.

Let's take a look at some basic stream code to become familiar with how the various classes can be used. Mastery of these patterns will help when creating your own Flutter widgets that need to communicate with outside code, and they will allow you to facilitate class-to-class communications in a loosely coupled way.

Stream basics

Here's a basic example demonstrating the use of all four classes with a stream of string data:

final controller = StreamController<String>();

final subscription = controller.stream.listen((String data) {
  print(data);
});

controller.sink.add("Data!");

With a StreamController instance, you can access a stream to listen for and react to data events using the Stream instance's listen() method, and you can access a sink to add new data events to the stream using the add() method of EventSink. The stream's listen() method returns an instance of StreamSubscription that you can use to manage your subscription to the stream.

It should be noted that controllers expose a convenience add() method that handles forwarding any data to the sink:

controller.add("Data!");

You don't need to explicitly use the sink reference to add data to the stream, but that's what happens behind the scenes.

If an error occurs and your stream's listeners need to be informed, you can use addError() instead of add():

controller.addError("Error!");

Just as with add(), the error will be sent over the stream via the sink.

Next, we'll explore typical patterns for constructing controllers and exposing streams in a more real-world context.

Using streams

Typically, a controller and its sink are kept private to the data producer, while the stream is exposed to one or more consumers. If you have a class that needs to communicate with code outside itself, perhaps a data service class of some kind, you might use a pattern like this:

import 'dart:async';

class MyDataService {
  final _onNewData = StreamController<String>();
  Stream<String> get onNewData => _onNewData.stream;
}

You need to import the dart:async library to gain access to StreamController. The private _onNewData variable represents the stream controller for providing incoming data to any users of the service, and we use generics to specify that all data is expected to be in string form. The name of the controller variable is deliberately matched to the public getter onNewData so that it's clear which controller belongs to which stream. The getter returns the controller's Stream instance, with which a listener can provide a callback to receive data updates.

To listen for new data events:

final service = MyDataService();

service.onNewData.listen((String data) {
  print(data);
});

After creating a reference to the data service, you can register a callback to receive data as it added to the stream.

You can optionally provide callbacks for errors and to be notified when the stream is closed by the controller:

service.onNewData.listen((String data) {
  print(data);
},
onError: (error) {
  print(error);
},
onDone: () {
  print("Stream closed!");
});

Here, we've included anonymous callback functions for the stream's listen() method's onError and onDone parameters.

In the example, we've created a stream that will accommodate only a single listener. What if you need more than that?

Multi-user streams

Sometimes a stream's data is intended for a single recipient, but in other cases, you may want to allow any number of recipients. For instance, it's possible that disparate parts of your app could rely on updates from a single data source, both user interface elements or other logic components. If you want to allow multiple listeners on your stream, you need to create a broadcast stream:

class MyDataService {
  final _onNewData = StreamController<String>.broadcast();
  Stream<String> get onNewData => _onNewData.stream;
}

Using the broadcast() named constructor for the StreamController will provide a multi-user stream. With this, any number of listeners may register a callback to be notified of new elements on the stream.

Next, we'll see what to do when a stream is no longer needed.

Closing streams

If you have a data provider that has no more data to offer, you can use the controller to close the stream. All registered onDone callbacks will be called:

class MyDataService {
  final _onNewData = StreamController<String>.broadcast();
  Stream<String> get onNewData => _onNewData.stream;
  
  void dispose() {
    _onNewData.close();
  }
}

This version of the data service class includes a dispose() method that can be used to tie off loose ends. In its body, the controller's close() method destroys the stream associated with it. Streams should always be closed when they're no longer needed. If the data service instance is discarded and scheduled for garbage collection without having closed its streams, you may get memory leaks in your app.

A stream's consumer may also need to manage the flow of data, and that's what subscriptions are for.

Managing a stream subscription

A listener that has saved a reference to a stream subscription can pause, resume, or permanently cancel that subscription. A paused subscription will not produce any more stream data until it has been resumed, though data events will be buffered until then, and they'll all be delivered if the stream is resumed.

To pause and then resume a stream subscription:

final service = MyDataService();

final subscription = service.onNewData.listen((String data) {
  print(data);
});

subscription.pause();

subscription.resume();

Obviously, you wouldn't normally pause and then resume a subscription immediately, but the code snippet serves to illustrate the correct method calls.

If a listener no longer needs data from a stream subscription, the subscription can be canceled:

subscription.cancel();

It is possible to register a new listener callback at any time after canceling a subscription, but a new subscription instance will be generated. You cannot reuse a subscription once it's been canceled.

Next, we'll look at another way you can provide data to a stream.

Asynchronous generators

We've already seen how Dart's async keyword can be added to a function to make it return a single value asynchronously via a future. It turns out there is a version of that concept for streams in the form of the async* keyword. Marking a function with async* turns it into a data generator function capable of returning a sequence of values asynchronously. This is a pattern put to good use in Flutter's most popular BLoC implementation, flutter_bloc, for managing a Flutter application's state.

Let's look at a simple example:

Stream<int> count(int countTo) async* {
  for (int i = 1; i <= countTo; i++) {
    yield i;
  }
}

// place this code in a function somewhere
count(10).listen((int value) {
  print(value);
});

This code will print out the values 1 through 10. The async* keyword makes count() an asynchronous generator function. When count() is called, a Stream<int> is immediately returned, which is why we can call listen() directly on that invocation. The stream's listen() method expects a callback function, in which we print each value as it arrives.

The generator function uses the yield keyword to inject values into the stream one at a time. In essence, yield is calling a StreamController instance's add() method for you. You could manually produce a generator function like this without the special keywords, but it would involve using patterns discussed earlier, such as creating your own StreamController, which would be much more verbose and require you to keep track of everything more explicitly.

It's important to understand that the key advantage of an asynchronous generator function is its asynchronous nature, which isn't obvious in the previous example. Let's add a small variation to make things clearer:

Stream<int> count(int countTo) async* {
  for (int i = 1; i <= countTo; i++) {
    yield i;
    await Future.delayed(const Duration(seconds: 1));
  }
}

count(10).listen((int value) {
  print(value);
});

If we add a delay of one second between each yield statement, values will be added to the stream every second instead of almost instantaneously. When this code executes, the values from 1 to 10 will appear in the debug console in a staggered fashion instead of all at once. The generator function is free to take all the time it needs to produce values, yielding each only when it's ready.

Conclusion

This article has provided you a crash course in the use of streams and sinks in Dart and Flutter for managing asynchronous data and events. For further reading, check out these related articles: