From bc358f318862b90e216486b7214ac8bca6b8a833 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 11 Aug 2016 12:21:37 +0200 Subject: [PATCH] 20890 Added MergeHub and BroadcastHub --- .../rst/java/code/docs/stream/HubDocTest.java | 140 ++++ .../code/docs/stream/KillSwitchDocTest.java | 66 +- akka-docs/rst/java/stream/stream-dynamic.rst | 77 ++ .../scala/code/docs/stream/HubsDocSpec.scala | 109 +++ akka-docs/rst/scala/stream/stream-dynamic.rst | 77 ++ .../scala/akka/stream/scaladsl/HubSpec.scala | 351 +++++++++ .../main/scala/akka/stream/javadsl/Hub.scala | 93 +++ .../main/scala/akka/stream/scaladsl/Hub.scala | 676 ++++++++++++++++++ 8 files changed, 1562 insertions(+), 27 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/stream/HubDocTest.java create mode 100644 akka-docs/rst/scala/code/docs/stream/HubsDocSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala diff --git a/akka-docs/rst/java/code/docs/stream/HubDocTest.java b/akka-docs/rst/java/code/docs/stream/HubDocTest.java new file mode 100644 index 0000000000..5e3828c9cc --- /dev/null +++ b/akka-docs/rst/java/code/docs/stream/HubDocTest.java @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package docs.stream; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.actor.Cancellable; +import akka.japi.Pair; +import akka.stream.ActorMaterializer; +import akka.stream.KillSwitches; +import akka.stream.Materializer; +import akka.stream.UniqueKillSwitch; +import akka.stream.javadsl.*; +import akka.testkit.JavaTestKit; +import docs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +public class HubDocTest extends AbstractJavaTest { + + static ActorSystem system; + static Materializer materializer; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("GraphDSLDocTest"); + materializer = ActorMaterializer.create(system); + } + + @AfterClass + public static void tearDown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + materializer = null; + } + + @Test + public void dynamicMerge() { + //#merge-hub + // A simple consumer that will print to the console for now + Sink> consumer = Sink.foreach(System.out::println); + + // Attach a MergeHub Source to the consumer. This will materialize to a + // corresponding Sink. + RunnableGraph> runnableGraph = + MergeHub.of(String.class, 16).to(consumer); + + // By running/materializing the consumer we get back a Sink, and hence + // now have access to feed elements into it. This Sink can be materialized + // any number of times, and every element that enters the Sink will + // be consumed by our consumer. + Sink toConsumer = runnableGraph.run(materializer); + + Source.single("Hello!").runWith(toConsumer, materializer); + Source.single("Hub!").runWith(toConsumer, materializer); + //#merge-hub + } + + @Test + public void dynamicBroadcast() { + // Used to be able to clean up the running stream + ActorMaterializer materializer = ActorMaterializer.create(system); + + //#broadcast-hub + // A simple producer that publishes a new "message" every second + Source producer = Source.tick( + FiniteDuration.create(1, TimeUnit.SECONDS), + FiniteDuration.create(1, TimeUnit.SECONDS), + "New message" + ); + + // Attach a BroadcastHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + RunnableGraph> runnableGraph = + producer.toMat(BroadcastHub.of(String.class, 256), Keep.right()); + + // By running/materializing the producer, we get back a Source, which + // gives us access to the elements published by the producer. + Source fromProducer = runnableGraph.run(materializer); + + // Print out messages from the producer in two independent consumers + fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); + fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer); + //#broadcast-hub + + // Cleanup + materializer.shutdown(); + } + + @Test + public void mergeBroadcastCombination() { + //#pub-sub-1 + // Obtain a Sink and Source which will publish and receive from the "bus" respectively. + Pair, Source> sinkAndSource = + MergeHub.of(String.class, 16) + .toMat(BroadcastHub.of(String.class, 256), Keep.both()) + .run(materializer); + + Sink sink = sinkAndSource.first(); + Source source = sinkAndSource.second(); + //#pub-sub-1 + + //#pub-sub-2 + // Ensure that the Broadcast output is dropped if there are no listening parties. + // If this dropping Sink is not attached, then the broadcast hub will not drop any + // elements itself when there are no subscribers, backpressuring the producer instead. + source.runWith(Sink.ignore(), materializer); + //#pub-sub-2 + + //#pub-sub-3 + // We create now a Flow that represents a publish-subscribe channel using the above + // started stream as its "topic". We add two more features, external cancellation of + // the registration and automatic cleanup for very slow subscribers. + Flow busFlow = + Flow.fromSinkAndSource(sink, source) + .joinMat(KillSwitches.singleBidi(), Keep.right()) + .backpressureTimeout(FiniteDuration.create(1, TimeUnit.SECONDS)); + //#pub-sub-3 + + //#pub-sub-4 + UniqueKillSwitch killSwitch = + Source.repeat("Hello World!") + .viaMat(busFlow, Keep.right()) + .to(Sink.foreach(System.out::println)) + .run(materializer); + + // Shut down externally + killSwitch.shutdown(); + //#pub-sub-4 + } +} diff --git a/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java b/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java index e70e7e48d3..c3613dacdb 100644 --- a/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/KillSwitchDocTest.java @@ -46,13 +46,14 @@ class KillSwitchDocTest extends AbstractJavaTest { public void uniqueKillSwitchShutdownExample() throws Exception { //#unique-shutdown - final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Source countingSrc = + Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final Pair> stream = countingSrc - .viaMat(KillSwitches.single(), Keep.right()) - .toMat(lastSnk, Keep.both()).run(mat); + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(lastSnk, Keep.both()).run(mat); final UniqueKillSwitch killSwitch = stream.first(); final CompletionStage completionStage = stream.second(); @@ -60,20 +61,22 @@ class KillSwitchDocTest extends AbstractJavaTest { doSomethingElse(); killSwitch.shutdown(); - final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + final int finalCount = + completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals(2, finalCount); //#unique-shutdown } public static void uniqueKillSwitchAbortExample() throws Exception { //#unique-abort - final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Source countingSrc = + Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final Pair> stream = countingSrc - .viaMat(KillSwitches.single(), Keep.right()) - .toMat(lastSnk, Keep.both()).run(mat); + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(lastSnk, Keep.both()).run(mat); final UniqueKillSwitch killSwitch = stream.first(); final CompletionStage completionStage = stream.second(); @@ -81,31 +84,36 @@ class KillSwitchDocTest extends AbstractJavaTest { final Exception error = new Exception("boom!"); killSwitch.abort(error); - final int result = completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + final int result = + completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); assertEquals(-1, result); //#unique-abort } public void sharedKillSwitchShutdownExample() throws Exception { //#shared-shutdown - final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Source countingSrc = + Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); final CompletionStage completionStage = countingSrc - .viaMat(killSwitch.flow(), Keep.right()) - .toMat(lastSnk, Keep.right()).run(mat); + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); final CompletionStage completionStageDelayed = countingSrc - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()) - .viaMat(killSwitch.flow(), Keep.right()) - .toMat(lastSnk, Keep.right()).run(mat); + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()) + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); doSomethingElse(); killSwitch.shutdown(); - final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); - final int finalCountDelayed = completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS); + final int finalCount = + completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS); + final int finalCountDelayed = + completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertEquals(2, finalCount); assertEquals(1, finalCountDelayed); //#shared-shutdown @@ -113,23 +121,27 @@ class KillSwitchDocTest extends AbstractJavaTest { public static void sharedKillSwitchAbortExample() throws Exception { //#shared-abort - final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + final Source countingSrc = + Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); final CompletionStage completionStage1 = countingSrc - .viaMat(killSwitch.flow(), Keep.right()) - .toMat(lastSnk, Keep.right()).run(mat); + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); final CompletionStage completionStage2 = countingSrc - .viaMat(killSwitch.flow(), Keep.right()) - .toMat(lastSnk, Keep.right()).run(mat); + .viaMat(killSwitch.flow(), Keep.right()) + .toMat(lastSnk, Keep.right()).run(mat); final Exception error = new Exception("boom!"); killSwitch.abort(error); - final int result1 = completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); - final int result2 = completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + final int result1 = + completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + final int result2 = + completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS); + assertEquals(-1, result1); assertEquals(-1, result2); //#shared-abort diff --git a/akka-docs/rst/java/stream/stream-dynamic.rst b/akka-docs/rst/java/stream/stream-dynamic.rst index 2e0d37c368..df4ac69e4c 100644 --- a/akka-docs/rst/java/stream/stream-dynamic.rst +++ b/akka-docs/rst/java/stream/stream-dynamic.rst @@ -61,3 +61,80 @@ Refer to the below for usage examples. A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed before any materialization takes place. +Dynamic fan-in and fan-out with MergeHub and BroadcastHub +--------------------------------------------------------- + +There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow) +are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph +must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs +should be used. They provide means to construct :class:`Sink` and :class:`Source` pairs that are "attached" to each +other, but one of them can be materialized multiple times to implement dynamic fan-in or fan-out. + +Using the MergeHub +^^^^^^^^^^^^^^^^^^ + +A :class:`MergeHub` allows to implement a dynamic fan-in junction point in a graph where elements coming from +different producers are emitted in a First-Comes-First-Served fashion. If the consumer cannot keep up then *all* of the +producers are backpressured. The hub itself comes as a :class:`Source` to which the single consumer can be attached. +It is not possible to attach any producers until this :class:`Source` has been materialized (started). This is ensured +by the fact that we only get the corresponding :class:`Sink` as a materialized value. Usage might look like this: + +.. includecode:: ../code/docs/stream/HubDocTest.java#merge-hub + +This sequence, while might look odd at first, ensures proper startup order. Once we get the :class:`Sink`, +we can use it as many times as wanted. Everything that is fed to it will be delivered to the consumer we attached +previously until it cancels. + +Using the BroadcastHub +^^^^^^^^^^^^^^^^^^^^^^ + +A :class:`BroadcastHub` can be used to consume elements from a common producer by a dynamic set of consumers. The +rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a :class:`Sink` +to which the single producer must be attached first. Consumers can only be attached once the :class:`Sink` has +been materialized (i.e. the producer has been started). One example of using the :class:`BroadcastHub`: + +.. includecode:: ../code/docs/stream/HubDocTest.java#broadcast-hub + +The resulting :class:`Source` can be materialized any number of times, each materialization effectively attaching +a new subscriber. If there are no subscribers attached to this hub then it will not drop any elements but instead +backpressure the upstream producer until subscribers arrive. This behavior can be tweaked by using the combinators +``.buffer`` for example with a drop strategy, or just attaching a subscriber that drops all messages. If there +are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new +subscriber arrives it will adaptively slow down, ensuring no more messages are dropped. + +Combining dynamic stages to build a simple Publish-Subscribe service +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The features provided by the Hub implementations are limited by default. This is by design, as various combinations +can be used to express additional features like unsubscribing producers or consumers externally. We show here +an example that builds a :class:`Flow` representing a publish-subscribe channel. The input of the :class:`Flow` is +published to all subscribers while the output streams all the elements published. + +First, we connect a :class:`MergeHub` and a :class:`BroadcastHub` together to form a publish-subscribe channel. Once +we materialize this small stream, we get back a pair of :class:`Source` and :class:`Sink` that together define +the publish and subscribe sides of our channel. + +.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-1 + +We now use a few tricks to add more features. First of all, we attach a ``Sink.ignore`` +at the broadcast side of the channel to keep it drained when there are no subscribers. If this behavior is not the +desired one this line can be simply dropped. + +.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-2 + +We now wrap the :class:`Sink` and :class:`Source` in a :class:`Flow` using ``Flow.fromSinkAndSource``. This bundles +up the two sides of the channel into one and forces users of it to always define a publisher and subscriber side +(even if the subscriber side is just dropping). It also allows us to very simply attach a :class:`KillSwitch` as +a :class:`BidiStage` which in turn makes it possible to close both the original :class:`Sink` and :class:`Source` at the +same time. +Finally, we add ``backpressureTimeout`` on the consumer side to ensure that subscribers that block the channel for more +than 3 seconds are forcefully removed (and their stream failed). + +.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-3 + +The resulting Flow now has a type of ``Flow[String, String, UniqueKillSwitch]`` representing a publish-subscribe +channel which can be used any number of times to attach new producers or consumers. In addition, it materializes +to a :class:`UniqueKillSwitch` (see :ref:`unique-kill-switch-java`) that can be used to deregister a single user externally: + + +.. includecode:: ../code/docs/stream/HubDocTest.java#pub-sub-4 diff --git a/akka-docs/rst/scala/code/docs/stream/HubsDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/HubsDocSpec.scala new file mode 100644 index 0000000000..471e992b7f --- /dev/null +++ b/akka-docs/rst/scala/code/docs/stream/HubsDocSpec.scala @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package docs.stream + +import akka.NotUsed +import akka.stream.{ ActorMaterializer, KillSwitches, UniqueKillSwitch } +import akka.stream.scaladsl._ +import akka.testkit.AkkaSpec +import docs.CompileOnlySpec + +import scala.concurrent.duration._ + +class HubsDocSpec extends AkkaSpec with CompileOnlySpec { + implicit val materializer = ActorMaterializer() + + "Hubs" must { + + "demonstrate creating a dynamic merge" in { + def println(s: String) = testActor ! s + + //#merge-hub + // A simple consumer that will print to the console for now + val consumer = Sink.foreach(println) + + // Attach a MergeHub Source to the consumer. This will materialize to a + // corresponding Sink. + val runnableGraph: RunnableGraph[Sink[String, NotUsed]] = + MergeHub.source[String](perProducerBufferSize = 16).to(consumer) + + // By running/materializing the consumer we get back a Sink, and hence + // now have access to feed elements into it. This Sink can be materialized + // any number of times, and every element that enters the Sink will + // be consumed by our consumer. + val toConsumer: Sink[String, NotUsed] = runnableGraph.run() + + // Feeding two independent sources into the hub. + Source.single("Hello!").runWith(toConsumer) + Source.single("Hub!").runWith(toConsumer) + //#merge-hub + + expectMsgAllOf("Hello!", "Hub!") + } + + "demonstrate creating a dynamic broadcast" in compileOnlySpec { + //#broadcast-hub + // A simple producer that publishes a new "message" every second + val producer = Source.tick(1.second, 1.second, "New message") + + // Attach a BroadcastHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + val runnableGraph: RunnableGraph[Source[String, NotUsed]] = + producer.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right) + + // By running/materializing the producer, we get back a Source, which + // gives us access to the elements published by the producer. + val fromProducer: Source[String, NotUsed] = runnableGraph.run() + + // Print out messages from the producer in two independent consumers + fromProducer.runForeach(msg => println("consumer1: " + msg)) + fromProducer.runForeach(msg => println("consumer2: " + msg)) + //#broadcast-hub + } + + "demonstrate combination" in { + def println(s: String) = testActor ! s + + //#pub-sub-1 + // Obtain a Sink and Source which will publish and receive from the "bus" respectively. + val (sink, source) = + MergeHub.source[String](perProducerBufferSize = 16) + .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) + .run() + //#pub-sub-1 + + //#pub-sub-2 + // Ensure that the Broadcast output is dropped if there are no listening parties. + // If this dropping Sink is not attached, then the broadcast hub will not drop any + // elements itself when there are no subscribers, backpressuring the producer instead. + source.runWith(Sink.ignore) + //#pub-sub-2 + + //#pub-sub-3 + // We create now a Flow that represents a publish-subscribe channel using the above + // started stream as its "topic". We add two more features, external cancellation of + // the registration and automatic cleanup for very slow subscribers. + val busFlow: Flow[String, String, UniqueKillSwitch] = + Flow.fromSinkAndSource(sink, source) + .joinMat(KillSwitches.singleBidi[String, String])(Keep.right) + .backpressureTimeout(3.seconds) + //#pub-sub-3 + + //#pub-sub-4 + val switch: UniqueKillSwitch = + Source.repeat("Hello world!") + .viaMat(busFlow)(Keep.right) + .to(Sink.foreach(println)) + .run() + + // Shut down externally + switch.shutdown() + //#pub-sub-4 + } + + } + +} diff --git a/akka-docs/rst/scala/stream/stream-dynamic.rst b/akka-docs/rst/scala/stream/stream-dynamic.rst index cd4f5d6690..357d0e4c0b 100644 --- a/akka-docs/rst/scala/stream/stream-dynamic.rst +++ b/akka-docs/rst/scala/stream/stream-dynamic.rst @@ -61,3 +61,80 @@ Refer to the below for usage examples. A ``UniqueKillSwitch`` is always a result of a materialization, whilst ``SharedKillSwitch`` needs to be constructed before any materialization takes place. +Dynamic fan-in and fan-out with MergeHub and BroadcastHub +--------------------------------------------------------- + +There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow) +are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph +must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs +should be used. They provide means to construct :class:`Sink` and :class:`Source` pairs that are "attached" to each +other, but one of them can be materialized multiple times to implement dynamic fan-in or fan-out. + +Using the MergeHub +^^^^^^^^^^^^^^^^^^ + +A :class:`MergeHub` allows to implement a dynamic fan-in junction point in a graph where elements coming from +different producers are emitted in a First-Comes-First-Served fashion. If the consumer cannot keep up then *all* of the +producers are backpressured. The hub itself comes as a :class:`Source` to which the single consumer can be attached. +It is not possible to attach any producers until this :class:`Source` has been materialized (started). This is ensured +by the fact that we only get the corresponding :class:`Sink` as a materialized value. Usage might look like this: + +.. includecode:: ../code/docs/stream/HubsDocSpec.scala#merge-hub + +This sequence, while might look odd at first, ensures proper startup order. Once we get the :class:`Sink`, +we can use it as many times as wanted. Everything that is fed to it will be delivered to the consumer we attached +previously until it cancels. + +Using the BroadcastHub +^^^^^^^^^^^^^^^^^^^^^^ + +A :class:`BroadcastHub` can be used to consume elements from a common producer by a dynamic set of consumers. The +rate of the producer will be automatically adapted to the slowest consumer. In this case, the hub is a :class:`Sink` +to which the single producer must be attached first. Consumers can only be attached once the :class:`Sink` has +been materialized (i.e. the producer has been started). One example of using the :class:`BroadcastHub`: + +.. includecode:: ../code/docs/stream/HubsDocSpec.scala#broadcast-hub + +The resulting :class:`Source` can be materialized any number of times, each materialization effectively attaching +a new subscriber. If there are no subscribers attached to this hub then it will not drop any elements but instead +backpressure the upstream producer until subscribers arrive. This behavior can be tweaked by using the combinators +``.buffer`` for example with a drop strategy, or just attaching a subscriber that drops all messages. If there +are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new +subscriber arrives it will adaptively slow down, ensuring no more messages are dropped. + +Combining dynamic stages to build a simple Publish-Subscribe service +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The features provided by the Hub implementations are limited by default. This is by design, as various combinations +can be used to express additional features like unsubscribing producers or consumers externally. We show here +an example that builds a :class:`Flow` representing a publish-subscribe channel. The input of the :class:`Flow` is +published to all subscribers while the output streams all the elements published. + +First, we connect a :class:`MergeHub` and a :class:`BroadcastHub` together to form a publish-subscribe channel. Once +we materialize this small stream, we get back a pair of :class:`Source` and :class:`Sink` that together define +the publish and subscribe sides of our channel. + +.. includecode:: ../code/docs/stream/HubsDocSpec.scala#pub-sub-1 + +We now use a few tricks to add more features. First of all, we attach a ``Sink.ignore`` +at the broadcast side of the channel to keep it drained when there are no subscribers. If this behavior is not the +desired one this line can be simply dropped. + +.. includecode:: ../code/docs/stream/HubsDocSpec.scala#pub-sub-2 + +We now wrap the :class:`Sink` and :class:`Source` in a :class:`Flow` using ``Flow.fromSinkAndSource``. This bundles +up the two sides of the channel into one and forces users of it to always define a publisher and subscriber side +(even if the subscriber side is just dropping). It also allows us to very simply attach a :class:`KillSwitch` as +a :class:`BidiStage` which in turn makes it possible to close both the original :class:`Sink` and :class:`Source` at the +same time. +Finally, we add ``backpressureTimeout`` on the consumer side to ensure that subscribers that block the channel for more +than 3 seconds are forcefully removed (and their stream failed). + +.. includecode:: ../code/docs/stream/HubsDocSpec.scala#pub-sub-3 + +The resulting Flow now has a type of ``Flow[String, String, UniqueKillSwitch]`` representing a publish-subscribe +channel which can be used any number of times to attach new producers or consumers. In addition, it materializes +to a :class:`UniqueKillSwitch` (see :ref:`unique-kill-switch-scala`) that can be used to deregister a single user externally: + + +.. includecode:: ../code/docs/stream/HubsDocSpec.scala#pub-sub-4 diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala new file mode 100644 index 0000000000..a5ac5e15c1 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala @@ -0,0 +1,351 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ ActorMaterializer, KillSwitches, ThrottleMode } +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.testkit.EventFilter + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +class HubSpec extends StreamSpec { + + implicit val mat = ActorMaterializer() + + "MergeHub" must { + + "work in the happy case" in assertAllStagesStopped { + val (sink, result) = MergeHub.source[Int](16).take(20).toMat(Sink.seq)(Keep.both).run() + Source(1 to 10).runWith(sink) + Source(11 to 20).runWith(sink) + + result.futureValue.sorted should ===(1 to 20) + } + + "notify new producers if consumer cancels before first producer" in assertAllStagesStopped { + val sink = Sink.cancelled[Int].runWith(MergeHub.source[Int](16)) + val upstream = TestPublisher.probe[Int]() + + Source.fromPublisher(upstream).runWith(sink) + + upstream.expectCancellation() + } + + "notify existing producers if consumer cancels after a few elements" in assertAllStagesStopped { + val (sink, result) = MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run() + val upstream = TestPublisher.probe[Int]() + + Source.fromPublisher(upstream).runWith(sink) + for (i ← 1 to 5) upstream.sendNext(i) + + upstream.expectCancellation() + result.futureValue.sorted should ===(1 to 5) + } + + "notify new producers if consumer cancels after a few elements" in assertAllStagesStopped { + val (sink, result) = MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run() + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[Int]() + + Source.fromPublisher(upstream1).runWith(sink) + for (i ← 1 to 5) upstream1.sendNext(i) + + upstream1.expectCancellation() + result.futureValue.sorted should ===(1 to 5) + + Source.fromPublisher(upstream2).runWith(sink) + + upstream2.expectCancellation() + } + + "respect buffer size" in assertAllStagesStopped { + val downstream = TestSubscriber.manualProbe[Int]() + val sink = Sink.fromSubscriber(downstream).runWith(MergeHub.source[Int](3)) + + Source(1 to 10).map { i ⇒ testActor ! i; i }.runWith(sink) + + val sub = downstream.expectSubscription() + sub.request(1) + + // Demand starts from 3 + expectMsg(1) + expectMsg(2) + expectMsg(3) + expectNoMsg(100.millis) + + // One element consumed (it was requested), demand 0 remains at producer + downstream.expectNext(1) + + // Requesting next element, results in next element to be consumed. + sub.request(1) + downstream.expectNext(2) + + // Two elements have been consumed, so threshold of 2 is reached, additional 2 demand is dispatched. + // There is 2 demand at the producer now + + expectMsg(4) + expectMsg(5) + expectNoMsg(100.millis) + + // Two additional elements have been sent: + // - 3, 4, 5 are pending + // - demand is 0 at the producer + // - next demand batch is after two elements have been consumed again + + // Requesting next gives the next element + // Demand is not yet refreshed for the producer as there is one more element until threshold is met + sub.request(1) + downstream.expectNext(3) + + expectNoMsg(100.millis) + + sub.request(1) + downstream.expectNext(4) + expectMsg(6) + expectMsg(7) + + sub.cancel() + } + + "work with long streams" in assertAllStagesStopped { + val (sink, result) = MergeHub.source[Int](16).take(20000).toMat(Sink.seq)(Keep.both).run() + Source(1 to 10000).runWith(sink) + Source(10001 to 20000).runWith(sink) + + result.futureValue.sorted should ===(1 to 20000) + } + + "work with long streams when buffer size is 1" in assertAllStagesStopped { + val (sink, result) = MergeHub.source[Int](1).take(20000).toMat(Sink.seq)(Keep.both).run() + Source(1 to 10000).runWith(sink) + Source(10001 to 20000).runWith(sink) + + result.futureValue.sorted should ===(1 to 20000) + } + + "work with long streams when consumer is slower" in assertAllStagesStopped { + val (sink, result) = + MergeHub.source[Int](16) + .take(2000) + .throttle(10, 1.millisecond, 200, ThrottleMode.shaping) + .toMat(Sink.seq)(Keep.both) + .run() + + Source(1 to 1000).runWith(sink) + Source(1001 to 2000).runWith(sink) + + result.futureValue.sorted should ===(1 to 2000) + } + + "work with long streams if one of the producers is slower" in assertAllStagesStopped { + val (sink, result) = + MergeHub.source[Int](16) + .take(2000) + .toMat(Sink.seq)(Keep.both) + .run() + + Source(1 to 1000).throttle(10, 1.millisecond, 100, ThrottleMode.shaping).runWith(sink) + Source(1001 to 2000).runWith(sink) + + result.futureValue.sorted should ===(1 to 2000) + } + + "work with different producers separated over time" in assertAllStagesStopped { + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + val sink = MergeHub.source[Int](16).grouped(100).toMat(Sink.fromSubscriber(downstream))(Keep.left).run() + + Source(1 to 100).runWith(sink) + downstream.requestNext() should ===(1 to 100) + + Source(101 to 200).runWith(sink) + downstream.requestNext() should ===(101 to 200) + + downstream.cancel() + } + + "keep working even if one of the producers fail" in assertAllStagesStopped { + val (sink, result) = MergeHub.source[Int](16).take(10).toMat(Sink.seq)(Keep.both).run() + EventFilter.error("Upstream producer failed with exception").intercept { + Source.failed(TE("faling")).runWith(sink) + Source(1 to 10).runWith(sink) + } + + result.futureValue.sorted should ===(1 to 10) + + } + + } + + "BroadcastHub" must { + + "work in the happy case" in assertAllStagesStopped { + val source = Source(1 to 10).runWith(BroadcastHub.sink(8)) + source.runWith(Sink.seq).futureValue should ===(1 to 10) + } + + "send the same elements to consumers attaching around the same time" in assertAllStagesStopped { + val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 10)).toMat(BroadcastHub.sink(8))(Keep.both).run() + + val f1 = source.runWith(Sink.seq) + val f2 = source.runWith(Sink.seq) + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.sleep(100) + firstElem.success(Some(1)) + f1.futureValue should ===(1 to 10) + f2.futureValue should ===(1 to 10) + } + + "send the same prefix to consumers attaching around the same time if one cancels earlier" in assertAllStagesStopped { + val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 20)).toMat(BroadcastHub.sink(8))(Keep.both).run() + + val f1 = source.runWith(Sink.seq) + val f2 = source.take(10).runWith(Sink.seq) + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.sleep(100) + firstElem.success(Some(1)) + f1.futureValue should ===(1 to 20) + f2.futureValue should ===(1 to 10) + } + + "ensure that subsequent consumers see subsequent elements without gap" in assertAllStagesStopped { + val source = Source(1 to 20).runWith(BroadcastHub.sink(8)) + source.take(10).runWith(Sink.seq).futureValue should ===(1 to 10) + source.take(10).runWith(Sink.seq).futureValue should ===(11 to 20) + } + + "send the same elements to consumers of different speed attaching around the same time" in assertAllStagesStopped { + val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 10)).toMat(BroadcastHub.sink(8))(Keep.both).run() + + val f1 = source.throttle(1, 10.millis, 3, ThrottleMode.shaping).runWith(Sink.seq) + val f2 = source.runWith(Sink.seq) + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.sleep(100) + firstElem.success(Some(1)) + f1.futureValue should ===(1 to 10) + f2.futureValue should ===(1 to 10) + } + + "send the same elements to consumers of attaching around the same time if the producer is slow" in assertAllStagesStopped { + val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 10)) + .throttle(1, 10.millis, 3, ThrottleMode.shaping) + .toMat(BroadcastHub.sink(8))(Keep.both).run() + + val f1 = source.runWith(Sink.seq) + val f2 = source.runWith(Sink.seq) + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.sleep(100) + firstElem.success(Some(1)) + f1.futureValue should ===(1 to 10) + f2.futureValue should ===(1 to 10) + } + + "ensure that from two different speed consumers the slower controls the rate" in assertAllStagesStopped { + val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 20)).toMat(BroadcastHub.sink(1))(Keep.both).run() + + val f1 = source.throttle(1, 10.millis, 1, ThrottleMode.shaping).runWith(Sink.seq) + // Second cannot be overwhelmed since the first one throttles the overall rate, and second allows a higher rate + val f2 = source.throttle(10, 10.millis, 8, ThrottleMode.enforcing).runWith(Sink.seq) + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.sleep(100) + firstElem.success(Some(1)) + f1.futureValue should ===(1 to 20) + f2.futureValue should ===(1 to 20) + + } + + "send the same elements to consumers attaching around the same time with a buffer size of one" in assertAllStagesStopped { + val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 10)).toMat(BroadcastHub.sink(1))(Keep.both).run() + + val f1 = source.runWith(Sink.seq) + val f2 = source.runWith(Sink.seq) + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + Thread.sleep(100) + firstElem.success(Some(1)) + f1.futureValue should ===(1 to 10) + f2.futureValue should ===(1 to 10) + } + + "be able to implement a keep-dropping-if-unsubscribed policy with a simple Sink.ignore" in assertAllStagesStopped { + val killSwitch = KillSwitches.shared("test-switch") + val source = Source.fromIterator(() ⇒ Iterator.from(0)).via(killSwitch.flow).runWith(BroadcastHub.sink(8)) + + // Now the Hub "drops" elements until we attach a new consumer (Source.ignore consumes as fast as possible) + source.runWith(Sink.ignore) + + // Now we attached a subscriber which will block the Sink.ignore to "take away" and drop elements anymore, + // turning the BroadcastHub to a normal non-dropping mode + val downstream = TestSubscriber.probe[Int]() + source.runWith(Sink.fromSubscriber(downstream)) + + downstream.request(1) + val first = downstream.expectNext() + + for (i ← (first + 1) to (first + 10)) { + downstream.request(1) + downstream.expectNext(i) + } + + downstream.cancel() + + killSwitch.shutdown() + } + + "properly signal error to consumers" in assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val source = Source.fromPublisher(upstream).runWith(BroadcastHub.sink(8)) + + val downstream1 = TestSubscriber.probe[Int]() + val downstream2 = TestSubscriber.probe[Int]() + source.runWith(Sink.fromSubscriber(downstream1)) + source.runWith(Sink.fromSubscriber(downstream2)) + + downstream1.request(4) + downstream2.request(8) + + (1 to 8) foreach (upstream.sendNext(_)) + + downstream1.expectNext(1, 2, 3, 4) + downstream2.expectNext(1, 2, 3, 4, 5, 6, 7, 8) + + downstream1.expectNoMsg(100.millis) + downstream2.expectNoMsg(100.millis) + + upstream.sendError(TE("Failed")) + + downstream1.expectError(TE("Failed")) + downstream2.expectError(TE("Failed")) + } + + "properly singal completion to consumers arriving after producer finished" in assertAllStagesStopped { + val source = Source.empty[Int].runWith(BroadcastHub.sink(8)) + // Wait enough so the Hub gets the completion. This is racy, but this is fine because both + // cases should work in the end + Thread.sleep(10) + + source.runWith(Sink.seq).futureValue should ===(Nil) + } + + "properly singal error to consumers arriving after producer finished" in assertAllStagesStopped { + val source = Source.failed(TE("Fail!")).runWith(BroadcastHub.sink(8)) + // Wait enough so the Hub gets the completion. This is racy, but this is fine because both + // cases should work in the end + Thread.sleep(10) + + a[TE] shouldBe thrownBy { + Await.result(source.runWith(Sink.seq), 3.seconds) + } + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala new file mode 100644 index 0000000000..f34f417871 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.javadsl + +import akka.NotUsed + +/** + * A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of + * producers. It consists of two parts, a [[Source]] and a [[Sink]]. The [[Source]] streams the element to a consumer from + * its merged inputs. Once the consumer has been materialized, the [[Source]] returns a materialized value which is + * the corresponding [[Sink]]. This [[Sink]] can then be materialized arbitrary many times, where each of the new + * materializations will feed its consumed elements to the original [[Source]]. + */ +object MergeHub { + + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered + * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * @param clazz Type of elements this hub emits and consumes + * @param perProducerBufferSize Buffer space used per producer. + */ + def of[T](clazz: Class[T], perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = { + akka.stream.scaladsl.MergeHub.source[T](perProducerBufferSize) + .mapMaterializedValue(_.asJava) + .asJava + } + + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered + * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * @param clazz Type of elements this hub emits and consumes + */ + def of[T](clazz: Class[T]): Source[T, Sink[T, NotUsed]] = of(clazz, 16) + +} + +/** + * A BroadcastHub is a special streaming hub that is able to broadcast streamed elements to a dynamic set of consumers. + * It consissts of two parts, a [[Sink]] and a [[Source]]. The [[Sink]] broadcasts elements from a producer to the + * actually live consumers it has. Once the producer has been materialized, the [[Sink]] it feeds into returns a + * materialized value which is the corresponding [[Source]]. This [[Source]] can be materialized arbitrary many times, + * where weach of the new materializations will receive their elements from the original [[Sink]]. + */ +object BroadcastHub { + + /** + * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set + * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized + * value. This [[Source]] can be materialized arbitrary many times and each materialization will receive the + * broadcast elements form the ofiginal [[Sink]]. + * + * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own + * [[Source]] for consuming the [[Sink]] of that materialization. + * + * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized + * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then + * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later + * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are + * cancelled are simply removed from the dynamic set of consumers. + * + * @param clazz Type of elements this hub emits and consumes + * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two + * concurrent consumers can be in terms of element. If the buffer is full, the producer + * is backpressured. Must be a power of two and less than 4096. + */ + def of[T](clazz: Class[T], bufferSize: Int): Sink[T, Source[T, NotUsed]] = { + akka.stream.scaladsl.BroadcastHub.sink[T](bufferSize) + .mapMaterializedValue(_.asJava) + .asJava + } + + def of[T](clazz: Class[T]): Sink[T, Source[T, NotUsed]] = of(clazz, 256) + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala new file mode 100644 index 0000000000..f94dca794c --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -0,0 +1,676 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } + +import akka.NotUsed +import akka.dispatch.AbstractNodeQueue +import akka.stream._ +import akka.stream.stage._ + +import scala.annotation.tailrec +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success, Try } + +/** + * A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of + * producers. It consists of two parts, a [[Source]] and a [[Sink]]. The [[Source]] streams the element to a consumer from + * its merged inputs. Once the consumer has been materialized, the [[Source]] returns a materialized value which is + * the corresponding [[Sink]]. This [[Sink]] can then be materialized arbitrary many times, where each of the new + * materializations will feed its consumed elements to the original [[Source]]. + */ +object MergeHub { + private val Cancel = -1 + + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered + * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * @param perProducerBufferSize Buffer space used per producer. Default value is 16. + */ + def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = + Source.fromGraph(new MergeHub[T](perProducerBufferSize)) + + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered + * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + */ + def source[T]: Source[T, Sink[T, NotUsed]] = source(perProducerBufferSize = 16) + + final class ProducerFailed(msg: String, cause: Throwable) extends RuntimeException(msg, cause) +} + +/** + * INTERNAL API + */ +private[akka] class MergeHub[T](perProducerBufferSize: Int) extends GraphStageWithMaterializedValue[SourceShape[T], Sink[T, NotUsed]] { + require(perProducerBufferSize > 0, "Buffer size must be positive") + + val out: Outlet[T] = Outlet("MergeHub.out") + override val shape: SourceShape[T] = SourceShape(out) + + // Half of buffer size, rounded up + private[this] val DemandThreshold = (perProducerBufferSize / 2) + (perProducerBufferSize % 2) + + private sealed trait Event { + def id: Long + } + + private final case class Element(id: Long, elem: T) extends Event + private final case class Register(id: Long, demandCallback: AsyncCallback[Long]) extends Event + private final case class Deregister(id: Long) extends Event + + final class InputState(signalDemand: AsyncCallback[Long]) { + private var untilNextDemandSignal = DemandThreshold + + def onElement(): Unit = { + untilNextDemandSignal -= 1 + if (untilNextDemandSignal == 0) { + untilNextDemandSignal = DemandThreshold + signalDemand.invoke(DemandThreshold) + } + } + + def close(): Unit = signalDemand.invoke(MergeHub.Cancel) + + } + + final class MergedSourceLogic(_shape: Shape, producerCount: AtomicLong) extends GraphStageLogic(_shape) with OutHandler { + /* + * Basically all merged messages are shared in this queue. Individual buffer sizes are enforced by tracking + * demand per producer in the 'demands' Map. One twist here is that the same queue contains control messages, + * too. Since the queue is read only if the output port has been pulled, downstream backpressure can delay + * processing of control messages. This causes no issues though, see the explanation in 'tryProcessNext'. + */ + private val queue = new AbstractNodeQueue[Event] {} + @volatile private[this] var needWakeup = false + @volatile private[this] var shuttingDown = false + + private[this] val demands = scala.collection.mutable.LongMap.empty[InputState] + private[this] val wakeupCallback = getAsyncCallback[NotUsed]((_) ⇒ + // We are only allowed to dequeue if we are not backpressured. See comment in tryProcessNext() for details. + if (isAvailable(out)) tryProcessNext(firstAttempt = true) + ) + + setHandler(out, this) + + // Returns true when we have not consumed demand, false otherwise + private def onEvent(ev: Event): Boolean = ev match { + case Element(id, elem) ⇒ + demands(id).onElement() + push(out, elem) + false + case Register(id, callback) ⇒ + demands.put(id, new InputState(callback)) + true + case Deregister(id) ⇒ + demands.remove(id) + true + } + + override def onPull(): Unit = tryProcessNext(firstAttempt = true) + + @tailrec private def tryProcessNext(firstAttempt: Boolean): Unit = { + val nextElem = queue.poll() + + // That we dequeue elements from the queue when there is demand means that Register and Deregister messages + // might be delayed for arbitrary long. This is not a problem as Register is only interesting if it is followed + // by actual elements, which would be delayed anyway by the backpressure. + // Unregister is only used to keep the map growing too large, but otherwise it is not critical to process it + // timely. In fact, the only way the map could keep growing would mean that we dequeue Registers from the + // queue, but then we will eventually reach the Deregister message, too. + if (nextElem ne null) { + needWakeup = false + if (onEvent(nextElem)) tryProcessNext(firstAttempt = true) + } else { + needWakeup = true + // additional poll() to grab any elements that might missed the needWakeup + // and have been enqueued just after it + if (firstAttempt) + tryProcessNext(firstAttempt = false) + } + } + + def isShuttingDown: Boolean = shuttingDown + + // External API + def enqueue(ev: Event): Unit = { + queue.add(ev) + /* + * Simple volatile var is enough, there is no need for a CAS here. The first important thing to note + * that we don't care about double-wakeups. Since the "wakeup" is actually handled by an actor message + * (AsyncCallback) we don't need to handle this case, a double-wakeup will be idempotent (only wasting some cycles). + * + * The only case that we care about is a missed wakeup. The characteristics of a missed wakeup are the following: + * (1) there is at least one message in the queue + * (2) the consumer is not running right now + * (3) no wakeupCallbacks are pending + * (4) all producers exited this method + * + * From the above we can deduce that + * (5) needWakeup = true at some point in time. This is implied by (1) and (2) and the + * 'tryProcessNext' method + * (6) There must have been one producer that observed needWakeup = false. This follows from (4) and (3) + * and the implementation of this method. In addition, this producer arrived after needWakeup = true, + * since before that, every queued elements have been consumed. + * (7) There have been at least one producer that observed needWakeup = true and enqueued an element and + * a wakeup signal. This follows from (5) and (6), and the fact that either this method sets + * needWakeup = false, or the 'tryProcessNext' method, i.e. a wakeup must happened since (5) + * (8) If there were multiple producers satisfying (6) take the last one. Due to (6), (3) and (4) we know + * there cannot be a wakeup pending, and we just enqueued an element, so (1) holds. Since we are the last + * one, (2) must be true or there is no lost wakeup. However, due to (7) we know there was at least one + * wakeup (otherwise needWakeup = true). Now, if the consumer is still running (2) is violated, + * if not running then needWakeup = false is violated (which comes from (6)). No matter what, + * contradiction. QED. + * + */ + if (needWakeup) { + needWakeup = false + wakeupCallback.invoke(NotUsed) + } + } + + override def postStop(): Unit = { + // First announce that we are shutting down. This will notify late-comers to not even put anything in the queue + shuttingDown = true + // Anybody that missed the announcement needs to be notified. + var event = queue.poll() + while (event ne null) { + event match { + case Register(_, demandCallback) ⇒ demandCallback.invoke(MergeHub.Cancel) + case _ ⇒ + } + event = queue.poll() + } + + // Kill everyone else + val states = demands.valuesIterator + while (states.hasNext) { + states.next().close() + } + } + } + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Sink[T, NotUsed]) = { + val idCounter = new AtomicLong() + + val logic: MergedSourceLogic = new MergedSourceLogic(shape, idCounter) + + val sink = new GraphStage[SinkShape[T]] { + val in: Inlet[T] = Inlet("MergeHub.in") + override val shape: SinkShape[T] = SinkShape(in) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { + // Start from non-zero demand to avoid initial delays. + // The HUB will expect this behavior. + private[this] var demand: Long = perProducerBufferSize + private[this] val id = idCounter.getAndIncrement() + + override def preStart(): Unit = { + if (!logic.isShuttingDown) { + logic.enqueue(Register(id, getAsyncCallback(onDemand))) + + // At this point, we could be in the unfortunate situation that: + // - we missed the shutdown announcement and entered this arm of the if statement + // - *before* we enqueued our Register event, the Hub already finished looking at the queue + // and is now dead, so we are never notified again. + // To safeguard against this, we MUST check the announcement again. This is enough: + // if the Hub is no longer looking at the queue, then it must be that isShuttingDown must be already true. + if (!logic.isShuttingDown) pullWithDemand() + else completeStage() + } else { + completeStage() + } + } + override def postStop(): Unit = { + // Unlike in the case of preStart, we don't care about the Hub no longer looking at the queue. + if (!logic.isShuttingDown) logic.enqueue(Deregister(id)) + } + + override def onPush(): Unit = { + logic.enqueue(Element(id, grab(in))) + if (demand > 0) pullWithDemand() + } + + private def pullWithDemand(): Unit = { + demand -= 1 + pull(in) + } + + // Make some noise + override def onUpstreamFailure(ex: Throwable): Unit = { + throw new MergeHub.ProducerFailed("Upstream producer failed with exception, " + + "removing from MergeHub now", ex) + } + + private def onDemand(moreDemand: Long): Unit = { + if (moreDemand == MergeHub.Cancel) completeStage() + else { + demand += moreDemand + if (!hasBeenPulled(in)) pullWithDemand() + } + } + + setHandler(in, this) + } + + } + + (logic, Sink.fromGraph(sink)) + } +} + +/** + * A BroadcastHub is a special streaming hub that is able to broadcast streamed elements to a dynamic set of consumers. + * It consissts of two parts, a [[Sink]] and a [[Source]]. The [[Sink]] broadcasts elements from a producer to the + * actually live consumers it has. Once the producer has been materialized, the [[Sink]] it feeds into returns a + * materialized value which is the corresponding [[Source]]. This [[Source]] can be materialized arbitrary many times, + * where weach of the new materializations will receive their elements from the original [[Sink]]. + */ +object BroadcastHub { + + /** + * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set + * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized + * value. This [[Source]] can be materialized arbitrary many times and each materialization will receive the + * broadcast elements form the ofiginal [[Sink]]. + * + * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own + * [[Source]] for consuming the [[Sink]] of that materialization. + * + * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized + * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then + * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later + * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are + * cancelled are simply removed from the dynamic set of consumers. + * + * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two + * concurrent consumers can be in terms of element. If this buffer is full, the producer + * is backpressured. Must be a power of two and less than 4096. + */ + def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize)) + + /** + * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set + * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized + * value. This [[Source]] can be materialized arbitrary many times and each materialization will receive the + * broadcast elements form the ofiginal [[Sink]]. + * + * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own + * [[Source]] for consuming the [[Sink]] of that materialization. + * + * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized + * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then + * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later + * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are + * cancelled are simply removed from the dynamic set of consumers. + * + */ + def sink[T]: Sink[T, Source[T, NotUsed]] = sink(bufferSize = 256) + +} + +/** + * INTERNAL API + */ +private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { + require(bufferSize > 0, "Buffer size must be positive") + require(bufferSize < 4096, "Buffer size larger then 4095 is not allowed") + require((bufferSize & bufferSize - 1) == 0, "Buffer size must be a power of two") + + private val Mask = bufferSize - 1 + private val WheelMask = (bufferSize * 2) - 1 + + val in: Inlet[T] = Inlet("BroadcastHub.in") + override val shape: SinkShape[T] = SinkShape(in) + + // Half of buffer size, rounded up + private[this] val DemandThreshold = (bufferSize / 2) + (bufferSize % 2) + + private sealed trait HubEvent + + private object RegistrationPending extends HubEvent + private final case class UnRegister(id: Long, previousOffset: Int, finalOffset: Int) extends HubEvent + private final case class Advance(id: Long, previousOffset: Int) extends HubEvent + private final case class NeedWakeup(id: Long, previousOffset: Int, currentOffset: Int) extends HubEvent + + private final case class Consumer(id: Long, callback: AsyncCallback[ConsumerEvent]) + + private object Completed + + private sealed trait HubState + private case class Open(callbackFuture: Future[AsyncCallback[HubEvent]], registrations: List[Consumer]) extends HubState + private case class Closed(failure: Option[Throwable]) extends HubState + + private class BroadcastSinkLogic(_shape: Shape) + extends GraphStageLogic(_shape) with InHandler { + + private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise() + private[this] val noRegistrationsState = Open(callbackPromise.future, Nil) + val state = new AtomicReference[HubState](noRegistrationsState) + + // Start from values that will almost immediately overflow. This has no effect on performance, any starting + // number will do, however, this protects from regressions as these values *almost surely* overflow and fail + // tests if someone makes a mistake. + @volatile private[this] var tail = Int.MaxValue + private[this] var head = Int.MaxValue + /* + * An Array with a published tail ("latest message") and a privately maintained head ("earliest buffered message"). + * Elements are published by simply putting them into the array and bumping the tail. If necessary, certain + * consumers are sent a wakeup message through an AsyncCallback. + */ + private[this] val queue = Array.ofDim[AnyRef](bufferSize) + /* This is basically a classic Bucket Queue: https://en.wikipedia.org/wiki/Bucket_queue + * (in fact, this is the variant described in the Optimizations section, where the given set + * of priorities always fall to a range + * + * This wheel tracks the position of Consumers relative to the slowest ones. Every slot + * contains a list of Consumers being known at that location (this might be out of date!). + * Consumers from time to time send Advance messages to indicate that they have progressed + * by reading from the broadcast queue. Consumers that are blocked (due to reaching tail) request + * a wakeup and update their position at the same time. + * + */ + private[this] val consumerWheel = Array.fill[List[Consumer]](bufferSize * 2)(Nil) + private[this] var activeConsumers = 0 + + override def preStart(): Unit = { + setKeepGoing(true) + callbackPromise.success(getAsyncCallback[HubEvent](onEvent)) + pull(in) + } + + // Cannot complete immediately if there is no space in the queue to put the completion marker + override def onUpstreamFinish(): Unit = if (!isFull) complete() + + override def onPush(): Unit = { + publish(grab(in)) + if (!isFull) pull(in) + } + + private def onEvent(ev: HubEvent): Unit = { + ev match { + case RegistrationPending ⇒ + state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations foreach { consumer ⇒ + val startFrom = head + activeConsumers += 1 + addConsumer(consumer, startFrom) + consumer.callback.invoke(Initialize(startFrom)) + } + + case UnRegister(id, previousOffset, finalOffset) ⇒ + activeConsumers -= 1 + val consumer = findAndRemoveConsumer(id, previousOffset) + if (activeConsumers == 0) { + if (isClosed(in)) completeStage() + else if (head != finalOffset) { + // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not + // see the already consumed elements. This feature is quite handy. + while (head != finalOffset) { + queue(head & Mask) = null + head += 1 + } + head = finalOffset + if (!hasBeenPulled(in)) pull(in) + } + } else checkUnblock(previousOffset) + case Advance(id, previousOffset) ⇒ + val newOffset = previousOffset + DemandThreshold + // Move the consumer from its last known offest to its new one. Check if we are unblocked. + val consumer = findAndRemoveConsumer(id, previousOffset) + addConsumer(consumer, newOffset) + checkUnblock(previousOffset) + case NeedWakeup(id, previousOffset, currentOffset) ⇒ + // Move the consumer from its last known offest to its new one. Check if we are unblocked. + val consumer = findAndRemoveConsumer(id, previousOffset) + addConsumer(consumer, currentOffset) + + // Also check if the consumer is now unblocked since we published an element since it went asleep. + if (currentOffset != tail) consumer.callback.invoke(Wakeup) + checkUnblock(previousOffset) + } + } + + // Producer API + // We are full if the distance between the slowest (known) consumer and the fastest (known) consumer is + // the buffer size. We must wait until the slowest either advances, or cancels. + private def isFull: Boolean = tail - head == bufferSize + + override def onUpstreamFailure(ex: Throwable): Unit = { + val failMessage = HubCompleted(Some(ex)) + + // Notify pending consumers and set tombstone + state.getAndSet(Closed(Some(ex))).asInstanceOf[Open].registrations foreach { consumer ⇒ + consumer.callback.invoke(failMessage) + } + + // Notify registered consumers + consumerWheel.iterator.flatMap(_.iterator) foreach { consumer ⇒ + consumer.callback.invoke(failMessage) + } + failStage(ex) + } + + /* + * This method removes a consumer with a given ID from the known offset and returns it. + * + * NB: You cannot remove a consumer without knowing its last offset! Consumers on the Source side always must + * track this so this can be a fast operation. + */ + private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = { + // TODO: Try to eliminate modulo division somehow... + val wheelSlot = offset & WheelMask + var consumersInSlot = consumerWheel(wheelSlot) + //debug(s"consumers before removal $consumersInSlot") + var remainingConsumersInSlot: List[Consumer] = Nil + var removedConsumer: Consumer = null + + while (consumersInSlot.nonEmpty) { + val consumer = consumersInSlot.head + if (consumer.id != id) remainingConsumersInSlot = consumer :: remainingConsumersInSlot + else removedConsumer = consumer + consumersInSlot = consumersInSlot.tail + } + consumerWheel(wheelSlot) = remainingConsumersInSlot + removedConsumer + } + + /* + * After removing a Consumer from a wheel slot (because it cancelled, or we moved it because it advanced) + * we need to check if it was blocking us from advancing (being the slowest). + */ + private def checkUnblock(offsetOfConsumerRemoved: Int): Unit = { + if (unblockIfPossible(offsetOfConsumerRemoved)) { + if (isClosed(in)) complete() + else if (!hasBeenPulled(in)) pull(in) + } + } + + private def unblockIfPossible(offsetOfConsumerRemoved: Int): Boolean = { + var unblocked = false + if (offsetOfConsumerRemoved == head) { + // Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until + // we either find a nonempty one, or we reached the end of the buffer. + while (consumerWheel(head & WheelMask).isEmpty && head != tail) { + queue(head & Mask) = null + head += 1 + unblocked = true + } + } + unblocked + } + + private def addConsumer(consumer: Consumer, offset: Int): Unit = { + val slot = offset & WheelMask + consumerWheel(slot) = consumer :: consumerWheel(slot) + } + + /* + * Send a wakeup signal to all the Consumers at a certain wheel index. Note, this needs the actual index, + * which is offset modulo (bufferSize + 1). + */ + private def wakeupIdx(idx: Int): Unit = { + val itr = consumerWheel(idx).iterator + while (itr.hasNext) itr.next().callback.invoke(Wakeup) + } + + private def complete(): Unit = { + val idx = tail & Mask + val wheelSlot = tail & WheelMask + queue(idx) = Completed + wakeupIdx(wheelSlot) + tail = tail + 1 + if (activeConsumers == 0) { + val completedMessage = HubCompleted(None) + // Notify pending consumers and set tombstone + state.getAndSet(Closed(None)).asInstanceOf[Open].registrations foreach { consumer ⇒ + consumer.callback.invoke(completedMessage) + } + + // Existing consumers have already consumed all elements and will see completion status in the queue + completeStage() + } + } + + private def publish(elem: T): Unit = { + val idx = tail & Mask + val wheelSlot = tail & WheelMask + queue(idx) = elem.asInstanceOf[AnyRef] + // Publish the new tail before calling the wakeup + tail = tail + 1 + wakeupIdx(wheelSlot) + } + + // Consumer API + def poll(offset: Int): AnyRef = { + if (offset == tail) null + else queue(offset & Mask) + } + + setHandler(in, this) + + } + + private sealed trait ConsumerEvent + private object Wakeup extends ConsumerEvent + private final case class HubCompleted(failure: Option[Throwable]) extends ConsumerEvent + private final case class Initialize(offset: Int) extends ConsumerEvent + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Source[T, NotUsed]) = { + val idCounter = new AtomicLong() + + val logic = new BroadcastSinkLogic(shape) + + val source = new GraphStage[SourceShape[T]] { + val out: Outlet[T] = Outlet("BroadcastHub.out") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + private[this] var untilNextAdvanceSignal = DemandThreshold + private[this] val id = idCounter.getAndIncrement() + private[this] var initialized = false + private[this] var hubCallback: AsyncCallback[HubEvent] = _ + + /* + * We need to track our last offset that we published to the Hub. The reason is, that for efficiency reasons, + * the Hub can only look up and move/remove Consumers with known wheel slots. This means that no extra hash-map + * is needed, but it also means that we need to keep track of both our current offset, and the last one that + * we published. + */ + private[this] var previousPublishedOffset = 0 + private[this] var offset = 0 + + override def preStart(): Unit = { + val callback = getAsyncCallback(onCommand) + + val onHubReady: Try[AsyncCallback[HubEvent]] ⇒ Unit = { + case Success(callback) ⇒ + hubCallback = callback + callback.invoke(RegistrationPending) + case Failure(ex) ⇒ + failStage(ex) + } + + @tailrec def register(): Unit = { + logic.state.get() match { + case Closed(Some(ex)) ⇒ failStage(ex) + case Closed(None) ⇒ completeStage() + case previousState @ Open(callbackFuture, registrations) ⇒ + val newRegistrations = Consumer(id, callback) :: registrations + if (logic.state.compareAndSet(previousState, Open(callbackFuture, newRegistrations))) { + callbackFuture.onComplete(getAsyncCallback(onHubReady).invoke)(materializer.executionContext) + } else register() + } + } + + register() + + } + + override def onPull(): Unit = { + if (initialized) { + val elem = logic.poll(offset) + + elem match { + case null ⇒ + hubCallback.invoke(NeedWakeup(id, previousPublishedOffset, offset)) + previousPublishedOffset = offset + untilNextAdvanceSignal = DemandThreshold + case Completed ⇒ + completeStage() + case _ ⇒ + push(out, elem.asInstanceOf[T]) + offset += 1 + untilNextAdvanceSignal -= 1 + if (untilNextAdvanceSignal == 0) { + untilNextAdvanceSignal = DemandThreshold + val previousOffset = previousPublishedOffset + previousPublishedOffset += DemandThreshold + hubCallback.invoke(Advance(id, previousOffset)) + } + } + } + } + + override def postStop(): Unit = { + if (hubCallback ne null) + hubCallback.invoke(UnRegister(id, previousPublishedOffset, offset)) + } + + private def onCommand(cmd: ConsumerEvent): Unit = cmd match { + case HubCompleted(Some(ex)) ⇒ failStage(ex) + case HubCompleted(None) ⇒ completeStage() + case Wakeup ⇒ + if (isAvailable(out)) onPull() + case Initialize(initialOffset) ⇒ + initialized = true + previousPublishedOffset = initialOffset + offset = initialOffset + if (isAvailable(out)) onPull() + } + + setHandler(out, this) + } + } + + (logic, Source.fromGraph(source)) + } +} \ No newline at end of file