Merge branch 'master' into wip-sync-artery-dev-patriknw

This commit is contained in:
Patrik Nordwall 2016-08-31 08:59:49 +02:00
commit 90cce8579a
78 changed files with 3577 additions and 1303 deletions

View file

@ -0,0 +1,140 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.stream;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class HubDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer materializer;
@BeforeClass
public static void setup() {
system = ActorSystem.create("GraphDSLDocTest");
materializer = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
materializer = null;
}
@Test
public void dynamicMerge() {
//#merge-hub
// A simple consumer that will print to the console for now
Sink<String, CompletionStage<Done>> consumer = Sink.foreach(System.out::println);
// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
RunnableGraph<Sink<String, NotUsed>> runnableGraph =
MergeHub.of(String.class, 16).to(consumer);
// By running/materializing the consumer we get back a Sink, and hence
// now have access to feed elements into it. This Sink can be materialized
// any number of times, and every element that enters the Sink will
// be consumed by our consumer.
Sink<String, NotUsed> toConsumer = runnableGraph.run(materializer);
Source.single("Hello!").runWith(toConsumer, materializer);
Source.single("Hub!").runWith(toConsumer, materializer);
//#merge-hub
}
@Test
public void dynamicBroadcast() {
// Used to be able to clean up the running stream
ActorMaterializer materializer = ActorMaterializer.create(system);
//#broadcast-hub
// A simple producer that publishes a new "message" every second
Source<String, Cancellable> producer = Source.tick(
FiniteDuration.create(1, TimeUnit.SECONDS),
FiniteDuration.create(1, TimeUnit.SECONDS),
"New message"
);
// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
RunnableGraph<Source<String, NotUsed>> runnableGraph =
producer.toMat(BroadcastHub.of(String.class, 256), Keep.right());
// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);
// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
//#broadcast-hub
// Cleanup
materializer.shutdown();
}
@Test
public void mergeBroadcastCombination() {
//#pub-sub-1
// Obtain a Sink and Source which will publish and receive from the "bus" respectively.
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkAndSource =
MergeHub.of(String.class, 16)
.toMat(BroadcastHub.of(String.class, 256), Keep.both())
.run(materializer);
Sink<String, NotUsed> sink = sinkAndSource.first();
Source<String, NotUsed> source = sinkAndSource.second();
//#pub-sub-1
//#pub-sub-2
// Ensure that the Broadcast output is dropped if there are no listening parties.
// If this dropping Sink is not attached, then the broadcast hub will not drop any
// elements itself when there are no subscribers, backpressuring the producer instead.
source.runWith(Sink.ignore(), materializer);
//#pub-sub-2
//#pub-sub-3
// We create now a Flow that represents a publish-subscribe channel using the above
// started stream as its "topic". We add two more features, external cancellation of
// the registration and automatic cleanup for very slow subscribers.
Flow<String, String, UniqueKillSwitch> busFlow =
Flow.fromSinkAndSource(sink, source)
.joinMat(KillSwitches.singleBidi(), Keep.right())
.backpressureTimeout(FiniteDuration.create(1, TimeUnit.SECONDS));
//#pub-sub-3
//#pub-sub-4
UniqueKillSwitch killSwitch =
Source.repeat("Hello World!")
.viaMat(busFlow, Keep.right())
.to(Sink.foreach(System.out::println))
.run(materializer);
// Shut down externally
killSwitch.shutdown();
//#pub-sub-4
}
}

View file

@ -46,13 +46,14 @@ class KillSwitchDocTest extends AbstractJavaTest {
public void uniqueKillSwitchShutdownExample() throws Exception {
//#unique-shutdown
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
.viaMat(KillSwitches.single(), Keep.right())
.toMat(lastSnk, Keep.both()).run(mat);
.viaMat(KillSwitches.single(), Keep.right())
.toMat(lastSnk, Keep.both()).run(mat);
final UniqueKillSwitch killSwitch = stream.first();
final CompletionStage<Integer> completionStage = stream.second();
@ -60,20 +61,22 @@ class KillSwitchDocTest extends AbstractJavaTest {
doSomethingElse();
killSwitch.shutdown();
final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
final int finalCount =
completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(2, finalCount);
//#unique-shutdown
}
public static void uniqueKillSwitchAbortExample() throws Exception {
//#unique-abort
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
.viaMat(KillSwitches.single(), Keep.right())
.toMat(lastSnk, Keep.both()).run(mat);
.viaMat(KillSwitches.single(), Keep.right())
.toMat(lastSnk, Keep.both()).run(mat);
final UniqueKillSwitch killSwitch = stream.first();
final CompletionStage<Integer> completionStage = stream.second();
@ -81,31 +84,36 @@ class KillSwitchDocTest extends AbstractJavaTest {
final Exception error = new Exception("boom!");
killSwitch.abort(error);
final int result = completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
final int result =
completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
assertEquals(-1, result);
//#unique-abort
}
public void sharedKillSwitchShutdownExample() throws Exception {
//#shared-shutdown
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
final CompletionStage<Integer> completionStage = countingSrc
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
final CompletionStage<Integer> completionStageDelayed = countingSrc
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
doSomethingElse();
killSwitch.shutdown();
final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
final int finalCountDelayed = completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS);
final int finalCount =
completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
final int finalCountDelayed =
completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(2, finalCount);
assertEquals(1, finalCountDelayed);
//#shared-shutdown
@ -113,23 +121,27 @@ class KillSwitchDocTest extends AbstractJavaTest {
public static void sharedKillSwitchAbortExample() throws Exception {
//#shared-abort
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
final CompletionStage<Integer> completionStage1 = countingSrc
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
final CompletionStage<Integer> completionStage2 = countingSrc
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
.viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat);
final Exception error = new Exception("boom!");
killSwitch.abort(error);
final int result1 = completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
final int result2 = completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
final int result1 =
completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
final int result2 =
completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
assertEquals(-1, result1);
assertEquals(-1, result2);
//#shared-abort

View file

@ -240,6 +240,14 @@ Subscribers will receive ``Replicator.DataDeleted``.
.. includecode:: code/docs/ddata/DistributedDataDocTest.java#delete
.. warning::
As deleted keys continue to be included in the stored data on each node as well as in gossip
messages, a continuous series of updates and deletes of top-level entities will result in
growing memory usage until an ActorSystem runs out of memory. To use Akka Distributed Data
where frequent adds and removes are required, you should use a fixed number of top-level data
types that support both updates and removals, for example ``ORMap`` or ``ORSet``.
Data Types
==========

View file

@ -58,13 +58,13 @@ to the Actor as a message:
.. warning::
Be sure to consume the response entities ``dataBytes:Source[ByteString,Unit]`` by for example connecting it
to a ``Sink`` (for example ``response.entity.dataBytes.runWith(Sink.ignore)`` if you don't care about the
to a ``Sink`` (for example ``response.discardEntityBytes(Materializer)`` if you don't care about the
response entity), since otherwise Akka HTTP (and the underlying Streams infrastructure) will understand the
lack of entity consumption as a back-pressure signal and stop reading from the underlying TCP connection!
This is a feature of Akka HTTP that allows consuming entities (and pulling them through the network) in
a streaming fashion, and only *on demand* when the client is ready to consume the bytes -
it may be a bit suprising at first though.
it may be a bit surprising at first though.
There are tickets open about automatically dropping entities if not consumed (`#18716`_ and `#18540`_),
so these may be implemented in the near future.

View file

@ -4,18 +4,19 @@ Implications of the streaming nature of Request/Response Entities
-----------------------------------------------------------------
Akka HTTP is streaming *all the way through*, which means that the back-pressure mechanisms enabled by Akka Streams
are exposed through all layersfrom the TCP layer, through the HTTP server, all the way up to the user-facing ``HttpRequest``
are exposed through all layersfrom the TCP layer, through the HTTP server, all the way up to the user-facing ``HttpRequest``
and ``HttpResponse`` and their ``HttpEntity`` APIs.
This has suprising implications if you are used to non-streaming / not-reactive HTTP clients.
Specifically it means that: "*lack of consumption of the HTTP Entity, is signaled as back-pressure to the other
This has surprising implications if you are used to non-streaming / not-reactive HTTP clients.
Specifically it means that: "*lack of consumption of the HTTP Entity, is signaled as back-pressure to the other
side of the connection*". This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients
from overwhelming our application, possibly causing un-necessary buffering of the entity in memory.
.. warning::
Consuming (or discarding) the Entity of a request is mandatory!
If *accidentally* left neither consumed or discarded Akka HTTP will
asume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms.
If *accidentally* left neither consumed or discarded Akka HTTP will
assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms.
A client should consume the Entity regardless of the status of the ``HttpResponse``.
Client-Side handling of streaming HTTP Entities
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -25,7 +26,7 @@ Consuming the HTTP Response Entity (Client)
The most common use-case of course is consuming the response entity, which can be done via
running the underlying ``dataBytes`` Source. This is as simple as running the dataBytes source,
(or on the server-side using directives such as
(or on the server-side using directives such as ``BasicDirectives.extractDataBytes``).
It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest,
for example by framing the incoming chunks, parsing them line-by-line and then connecting the flow into another
@ -34,16 +35,16 @@ destination Sink, such as a File or other Akka Streams connector:
.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-consume-example-1
however sometimes the need may arise to consume the entire entity as ``Strict`` entity (which means that it is
completely loaded into memory). Akka HTTP provides a special ``toStrict(timeout, materializer)`` method which can be used to
completely loaded into memory). Akka HTTP provides a special ``toStrict(timeout, materializer)`` method which can be used to
eagerly consume the entity and make it available in memory:
.. includecode:: ../code/docs/http/javadsl/HttpClientExampleDocTest.java#manual-entity-consume-example-2
Discarding the HTTP Response Entity (Client)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code),
yet as explained above entity still has to be consumed in some way, otherwise we'll be exherting back-pressure on the
yet as explained above entity still has to be consumed in some way, otherwise we'll be exherting back-pressure on the
underlying TCP connection.
The ``discardEntityBytes`` convenience method serves the purpose of easily discarding the entity if it has no purpose for us.
@ -83,22 +84,22 @@ Discarding the HTTP Request Entity (Server)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sometimes, depending on some validation (e.g. checking if given user is allowed to perform uploads or not)
you may want to decide to discard the uploaded entity.
you may want to decide to discard the uploaded entity.
Please note that discarding means that the entire upload will proceed, even though you are not interested in the data
Please note that discarding means that the entire upload will proceed, even though you are not interested in the data
being streamed to the server - this may be useful if you are simply not interested in the given entity, however
you don't want to abort the entire connection (which we'll demonstrate as well), since there may be more requests
pending on the same connection still.
pending on the same connection still.
In order to discard the databytes explicitly you can invoke the ``discardEntityBytes`` bytes of the incoming ``HTTPRequest``:
.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#discard-discardEntityBytes
A related concept is *cancelling* the incoming ``entity.getDataBytes()`` stream, which results in Akka HTTP
A related concept is *cancelling* the incoming ``entity.getDataBytes()`` stream, which results in Akka HTTP
*abruptly closing the connection from the Client*. This may be useful when you detect that the given user should not be allowed to make any
uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data).
This can be done by attaching the incoming ``entity.getDataBytes()`` to a ``Sink.cancelled`` which will cancel
the entity stream, which in turn will cause the underlying connection to be shut-down by the server
This can be done by attaching the incoming ``entity.getDataBytes()`` to a ``Sink.cancelled`` which will cancel
the entity stream, which in turn will cause the underlying connection to be shut-down by the server
effectively hard-aborting the incoming request:
.. includecode:: ../code/docs/http/javadsl/server/HttpServerExampleDocTest.java#discard-close-connections
@ -112,10 +113,10 @@ Under certain conditions it is possible to detect an entity is very unlikely to
and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below
note and issues for further discussion and ideas.
.. note::
An advanced feature code named "auto draining" has been discussed and proposed for Akka HTTP, and we're hoping
.. note::
An advanced feature code named "auto draining" has been discussed and proposed for Akka HTTP, and we're hoping
to implement or help the community implement it.
You can read more about it in `issue #18716 <https://github.com/akka/akka/issues/18716>`_
You can read more about it in `issue #18716 <https://github.com/akka/akka/issues/18716>`_
as well as `issue #18540 <https://github.com/akka/akka/issues/18540>`_ ; as always, contributions are very welcome!

View file

@ -1252,6 +1252,24 @@ If materialized values needs to be collected ``prependMat`` is available.
**completes** when all upstreams complete
orElse
^^^^^^
If the primary source completes without emitting any elements, the elements from the secondary source
are emitted. If the primary source emits any elements the secondary source is cancelled.
Note that both sources are materialized directly and the secondary source is backpressured until it becomes
the source of elements or is cancelled.
Signal errors downstream, regardless which of the two sources emitted the error.
**emits** when an element is available from first stream or first stream closed without emitting any elements and an element
is available from the second stream
**backpressures** when downstream backpressures
**completes** the primary stream completes after emitting at least one element, when the primary stream completes
without emitting and the secondary stream already has completed or when the secondary stream completes
interleave
^^^^^^^^^^
Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one

View file

@ -61,3 +61,80 @@ Refer to the below for usage examples.
A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed
before any materialization takes place.
Dynamic fan-in and fan-out with MergeHub and BroadcastHub
---------------------------------------------------------
There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow)
are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph
must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs
should be used. They provide means to construct :class:`Sink` and :class:`Source` pairs that are "attached" to each
other, but one of them can be materialized multiple times to implement dynamic fan-in or fan-out.
Using the MergeHub
^^^^^^^^^^^^^^^^^^
A :class:`MergeHub` allows to implement a dynamic fan-in junction point in a graph where elements coming from
different producers are emitted in a First-Comes-First-Served fashion. If the consumer cannot keep up then *all* of the
producers are backpressured. The hub itself comes as a :class:`Source` to which the single consumer can be attached.
It is not possible to attach any producers until this :class:`Source` has been materialized (started). This is ensured
by the fact that we only get the corresponding :class:`Sink` as a materialized value. Usage might look like this:
.. includecode:: ../code/docs/stream/HubDocTest.java#merge-hub
This sequence, while might look odd at first, ensures proper startup order. Once we get the :class:`Sink`,
we can use it as many times as wanted. Everything that is fed to it will be delivered to the consumer we attached
previously until it cancels.
Using the BroadcastHub
^^^^^^^^^^^^^^^^^^^^^^
A :class:`BroadcastHub` can be used to consume elements from a common producer by a dynamic set of consumers. The
rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a :class:`Sink`
to which the single producer must be attached first. Consumers can only be attached once the :class:`Sink` has
been materialized (i.e. the producer has been started). One example of using the :class:`BroadcastHub`:
.. includecode:: ../code/docs/stream/HubDocTest.java#broadcast-hub
The resulting :class:`Source` can be materialized any number of times, each materialization effectively attaching
a new subscriber. If there are no subscribers attached to this hub then it will not drop any elements but instead
backpressure the upstream producer until subscribers arrive. This behavior can be tweaked by using the combinators
``.buffer`` for example with a drop strategy, or just attaching a subscriber that drops all messages. If there
are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new
subscriber arrives it will adaptively slow down, ensuring no more messages are dropped.
Combining dynamic stages to build a simple Publish-Subscribe service
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The features provided by the Hub implementations are limited by default. This is by design, as various combinations
can be used to express additional features like unsubscribing producers or consumers externally. We show here
an example that builds a :class:`Flow` representing a publish-subscribe channel. The input of the :class:`Flow` is
published to all subscribers while the output streams all the elements published.
First, we connect a :class:`MergeHub` and a :class:`BroadcastHub` together to form a publish-subscribe channel. Once
we materialize this small stream, we get back a pair of :class:`Source` and :class:`Sink` that together define
the publish and subscribe sides of our channel.
.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-1
We now use a few tricks to add more features. First of all, we attach a ``Sink.ignore``
at the broadcast side of the channel to keep it drained when there are no subscribers. If this behavior is not the
desired one this line can be simply dropped.
.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-2
We now wrap the :class:`Sink` and :class:`Source` in a :class:`Flow` using ``Flow.fromSinkAndSource``. This bundles
up the two sides of the channel into one and forces users of it to always define a publisher and subscriber side
(even if the subscriber side is just dropping). It also allows us to very simply attach a :class:`KillSwitch` as
a :class:`BidiStage` which in turn makes it possible to close both the original :class:`Sink` and :class:`Source` at the
same time.
Finally, we add ``backpressureTimeout`` on the consumer side to ensure that subscribers that block the channel for more
than 3 seconds are forcefully removed (and their stream failed).
.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-3
The resulting Flow now has a type of ``Flow[String, String, UniqueKillSwitch]`` representing a publish-subscribe
channel which can be used any number of times to attach new producers or consumers. In addition, it materializes
to a :class:`UniqueKillSwitch` (see :ref:`unique-kill-switch-java`) that can be used to deregister a single user externally:
.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-4