20890 Added MergeHub and BroadcastHub
This commit is contained in:
parent
caa98c0110
commit
bc358f3188
8 changed files with 1562 additions and 27 deletions
140
akka-docs/rst/java/code/docs/stream/HubDocTest.java
Normal file
140
akka-docs/rst/java/code/docs/stream/HubDocTest.java
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.stream;
|
||||
|
||||
import akka.Done;
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.KillSwitches;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.UniqueKillSwitch;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import docs.AbstractJavaTest;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class HubDocTest extends AbstractJavaTest {
|
||||
|
||||
static ActorSystem system;
|
||||
static Materializer materializer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("GraphDSLDocTest");
|
||||
materializer = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
materializer = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dynamicMerge() {
|
||||
//#merge-hub
|
||||
// A simple consumer that will print to the console for now
|
||||
Sink<String, CompletionStage<Done>> consumer = Sink.foreach(System.out::println);
|
||||
|
||||
// Attach a MergeHub Source to the consumer. This will materialize to a
|
||||
// corresponding Sink.
|
||||
RunnableGraph<Sink<String, NotUsed>> runnableGraph =
|
||||
MergeHub.of(String.class, 16).to(consumer);
|
||||
|
||||
// By running/materializing the consumer we get back a Sink, and hence
|
||||
// now have access to feed elements into it. This Sink can be materialized
|
||||
// any number of times, and every element that enters the Sink will
|
||||
// be consumed by our consumer.
|
||||
Sink<String, NotUsed> toConsumer = runnableGraph.run(materializer);
|
||||
|
||||
Source.single("Hello!").runWith(toConsumer, materializer);
|
||||
Source.single("Hub!").runWith(toConsumer, materializer);
|
||||
//#merge-hub
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dynamicBroadcast() {
|
||||
// Used to be able to clean up the running stream
|
||||
ActorMaterializer materializer = ActorMaterializer.create(system);
|
||||
|
||||
//#broadcast-hub
|
||||
// A simple producer that publishes a new "message" every second
|
||||
Source<String, Cancellable> producer = Source.tick(
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
"New message"
|
||||
);
|
||||
|
||||
// Attach a BroadcastHub Sink to the producer. This will materialize to a
|
||||
// corresponding Source.
|
||||
// (We need to use toMat and Keep.right since by default the materialized
|
||||
// value to the left is used)
|
||||
RunnableGraph<Source<String, NotUsed>> runnableGraph =
|
||||
producer.toMat(BroadcastHub.of(String.class, 256), Keep.right());
|
||||
|
||||
// By running/materializing the producer, we get back a Source, which
|
||||
// gives us access to the elements published by the producer.
|
||||
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);
|
||||
|
||||
// Print out messages from the producer in two independent consumers
|
||||
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
|
||||
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
|
||||
//#broadcast-hub
|
||||
|
||||
// Cleanup
|
||||
materializer.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeBroadcastCombination() {
|
||||
//#pub-sub-1
|
||||
// Obtain a Sink and Source which will publish and receive from the "bus" respectively.
|
||||
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkAndSource =
|
||||
MergeHub.of(String.class, 16)
|
||||
.toMat(BroadcastHub.of(String.class, 256), Keep.both())
|
||||
.run(materializer);
|
||||
|
||||
Sink<String, NotUsed> sink = sinkAndSource.first();
|
||||
Source<String, NotUsed> source = sinkAndSource.second();
|
||||
//#pub-sub-1
|
||||
|
||||
//#pub-sub-2
|
||||
// Ensure that the Broadcast output is dropped if there are no listening parties.
|
||||
// If this dropping Sink is not attached, then the broadcast hub will not drop any
|
||||
// elements itself when there are no subscribers, backpressuring the producer instead.
|
||||
source.runWith(Sink.ignore(), materializer);
|
||||
//#pub-sub-2
|
||||
|
||||
//#pub-sub-3
|
||||
// We create now a Flow that represents a publish-subscribe channel using the above
|
||||
// started stream as its "topic". We add two more features, external cancellation of
|
||||
// the registration and automatic cleanup for very slow subscribers.
|
||||
Flow<String, String, UniqueKillSwitch> busFlow =
|
||||
Flow.fromSinkAndSource(sink, source)
|
||||
.joinMat(KillSwitches.singleBidi(), Keep.right())
|
||||
.backpressureTimeout(FiniteDuration.create(1, TimeUnit.SECONDS));
|
||||
//#pub-sub-3
|
||||
|
||||
//#pub-sub-4
|
||||
UniqueKillSwitch killSwitch =
|
||||
Source.repeat("Hello World!")
|
||||
.viaMat(busFlow, Keep.right())
|
||||
.to(Sink.foreach(System.out::println))
|
||||
.run(materializer);
|
||||
|
||||
// Shut down externally
|
||||
killSwitch.shutdown();
|
||||
//#pub-sub-4
|
||||
}
|
||||
}
|
||||
|
|
@ -46,13 +46,14 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
|
||||
public void uniqueKillSwitchShutdownExample() throws Exception {
|
||||
//#unique-shutdown
|
||||
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
|
||||
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
|
||||
.viaMat(KillSwitches.single(), Keep.right())
|
||||
.toMat(lastSnk, Keep.both()).run(mat);
|
||||
.viaMat(KillSwitches.single(), Keep.right())
|
||||
.toMat(lastSnk, Keep.both()).run(mat);
|
||||
|
||||
final UniqueKillSwitch killSwitch = stream.first();
|
||||
final CompletionStage<Integer> completionStage = stream.second();
|
||||
|
|
@ -60,20 +61,22 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
doSomethingElse();
|
||||
killSwitch.shutdown();
|
||||
|
||||
final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final int finalCount =
|
||||
completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
assertEquals(2, finalCount);
|
||||
//#unique-shutdown
|
||||
}
|
||||
|
||||
public static void uniqueKillSwitchAbortExample() throws Exception {
|
||||
//#unique-abort
|
||||
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
|
||||
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
|
||||
.viaMat(KillSwitches.single(), Keep.right())
|
||||
.toMat(lastSnk, Keep.both()).run(mat);
|
||||
.viaMat(KillSwitches.single(), Keep.right())
|
||||
.toMat(lastSnk, Keep.both()).run(mat);
|
||||
|
||||
final UniqueKillSwitch killSwitch = stream.first();
|
||||
final CompletionStage<Integer> completionStage = stream.second();
|
||||
|
|
@ -81,31 +84,36 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
final Exception error = new Exception("boom!");
|
||||
killSwitch.abort(error);
|
||||
|
||||
final int result = completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
|
||||
final int result =
|
||||
completionStage.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
|
||||
assertEquals(-1, result);
|
||||
//#unique-abort
|
||||
}
|
||||
|
||||
public void sharedKillSwitchShutdownExample() throws Exception {
|
||||
//#shared-shutdown
|
||||
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
|
||||
|
||||
final CompletionStage<Integer> completionStage = countingSrc
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
final CompletionStage<Integer> completionStageDelayed = countingSrc
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
|
||||
doSomethingElse();
|
||||
killSwitch.shutdown();
|
||||
|
||||
final int finalCount = completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final int finalCountDelayed = completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final int finalCount =
|
||||
completionStage.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final int finalCountDelayed =
|
||||
completionStageDelayed.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(2, finalCount);
|
||||
assertEquals(1, finalCountDelayed);
|
||||
//#shared-shutdown
|
||||
|
|
@ -113,23 +121,27 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
|
||||
public static void sharedKillSwitchAbortExample() throws Exception {
|
||||
//#shared-abort
|
||||
final Source<Integer, NotUsed> countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
|
||||
|
||||
final CompletionStage<Integer> completionStage1 = countingSrc
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
final CompletionStage<Integer> completionStage2 = countingSrc
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
|
||||
final Exception error = new Exception("boom!");
|
||||
killSwitch.abort(error);
|
||||
|
||||
final int result1 = completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
|
||||
final int result2 = completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
|
||||
final int result1 =
|
||||
completionStage1.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
|
||||
final int result2 =
|
||||
completionStage2.toCompletableFuture().exceptionally(e -> -1).get(1, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(-1, result1);
|
||||
assertEquals(-1, result2);
|
||||
//#shared-abort
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue