diff --git a/akka-docs/src/main/paradox/stream/stream-cookbook.md b/akka-docs/src/main/paradox/stream/stream-cookbook.md index e5959a2796..173efd7571 100644 --- a/akka-docs/src/main/paradox/stream/stream-cookbook.md +++ b/akka-docs/src/main/paradox/stream/stream-cookbook.md @@ -206,6 +206,20 @@ Scala Java : @@snip [RecipeMultiGroupByTest.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeMultiGroupByTest.java) { #multi-groupby } +### Adhoc source + +**Situation:** The idea is that you have a source which you don't want to start until you have a demand. +Also, you want to shutdown it down when there is no more demand, and start it up again there is new demand again. + +You can achieve this behavior by combining `lazily`, `backpressureTimeout` and `recoverWithRetries` as follows: + +Scala +: @@snip [RecipeAdhocSource.scala]($code$/scala/docs/stream/cookbook/RecipeAdhocSource.scala) { #adhoc-source } + +Java +: @@snip [RecipeAdhocSourceTest.scala]($code$/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java) { #adhoc-source } + + ## Working with Graphs In this collection we show recipes that use stream graph elements to achieve various goals. @@ -470,4 +484,4 @@ Scala : @@snip [RecipeKeepAlive.scala]($code$/scala/docs/stream/cookbook/RecipeKeepAlive.scala) { #inject-keepalive } Java -: @@snip [RecipeKeepAlive.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java) { #inject-keepalive } \ No newline at end of file +: @@snip [RecipeKeepAlive.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java) { #inject-keepalive } diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java new file mode 100644 index 0000000000..22cf954dfc --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package jdocs.stream.javadsl.cookbook; + +import akka.Done; +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.japi.pf.PFBuilder; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Source; +import akka.stream.testkit.TestSubscriber; +import akka.stream.testkit.javadsl.TestSink; +import akka.testkit.javadsl.TestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.Ignore; +import scala.concurrent.Promise; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +public class RecipeAdhocSourceTest extends RecipeTest { + static ActorSystem system; + static Materializer mat; + FiniteDuration duration200mills = Duration.create(200, "milliseconds"); + + @BeforeClass + public static void setup() { + system = ActorSystem.create("RecipeAdhocSource"); + mat = ActorMaterializer.create(system); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + mat = null; + } + + //#adhoc-source + public Source adhocSource(Source source, FiniteDuration timeout, int maxRetries) { + return Source.lazily( + () -> source.backpressureTimeout(timeout).recoverWithRetries( + maxRetries, + new PFBuilder() + .match(TimeoutException.class, ex -> Source.lazily(() -> source.backpressureTimeout(timeout))) + .build() + ) + ); + } + //#adhoc-source + + @Test + @Ignore + public void noStart() throws Exception { + new TestKit(system) { + { + AtomicBoolean isStarted = new AtomicBoolean(); + adhocSource( + Source.empty().mapMaterializedValue(x -> {isStarted.set(true); return x;}), duration200mills, 3); + Thread.sleep(300); + assertEquals(false, isStarted.get()); + } + }; + } + + @Test + @Ignore + public void startStream() throws Exception { + new TestKit(system) { + { + TestSubscriber.Probe probe = adhocSource(Source.repeat("a"), duration200mills, 3) + .toMat(TestSink.probe(system), Keep.right()) + .run(mat); + probe.requestNext("a"); + } + }; + } + + @Test + @Ignore + public void shutdownStream() throws Exception { + new TestKit(system) { + { + Promise shutdown = Futures.promise(); + TestSubscriber.Probe probe = adhocSource( + Source.repeat("a").watchTermination((a, term) -> + term.thenRun(() -> shutdown.success(Done.getInstance())) + ), duration200mills, 3) + .toMat(TestSink.probe(system), Keep.right()) + .run(mat); + + probe.requestNext("a"); + Thread.sleep(500); + assertEquals(true, shutdown.isCompleted()); + } + }; + } + + @Test + @Ignore + public void notShutDownStream() throws Exception { + new TestKit(system) { + { + Promise shutdown = Futures.promise(); + TestSubscriber.Probe probe = + adhocSource( + Source.repeat("a").watchTermination((a, term) -> + term.thenRun(() -> shutdown.success(Done.getInstance())) + ), duration200mills, 3) + .toMat(TestSink.probe(system), Keep.right()) + .run(mat); + + probe.requestNext("a"); + Thread.sleep(100); + probe.requestNext("a"); + Thread.sleep(100); + probe.requestNext("a"); + Thread.sleep(100); + probe.requestNext("a"); + Thread.sleep(100); + probe.requestNext("a"); + Thread.sleep(100); + + assertEquals(false, shutdown.isCompleted()); + } + }; + } + + @Test + @Ignore + public void restartUponDemand() throws Exception { + new TestKit(system) { + { + Promise shutdown = Futures.promise(); + AtomicInteger startedCount = new AtomicInteger(0); + + Source source = Source + .empty().mapMaterializedValue(x -> startedCount.incrementAndGet()) + .concat(Source.repeat("a")); + + TestSubscriber.Probe probe = + adhocSource(source.watchTermination((a, term) -> + term.thenRun(() -> shutdown.success(Done.getInstance())) + ), duration200mills, 3) + .toMat(TestSink.probe(system), Keep.right()) + .run(mat); + + probe.requestNext("a"); + assertEquals(1, startedCount.get()); + Thread.sleep(500); + assertEquals(true, shutdown.isCompleted()); + } + }; + } + + @Test + @Ignore + public void restartUptoMaxRetries() throws Exception { + new TestKit(system) { + { + Promise shutdown = Futures.promise(); + AtomicInteger startedCount = new AtomicInteger(0); + + Source source = Source + .empty().mapMaterializedValue(x -> startedCount.incrementAndGet()) + .concat(Source.repeat("a")); + + TestSubscriber.Probe probe = + adhocSource(source.watchTermination((a, term) -> + term.thenRun(() -> shutdown.success(Done.getInstance())) + ), duration200mills, 3) + .toMat(TestSink.probe(system), Keep.right()) + .run(mat); + + probe.requestNext("a"); + assertEquals(1, startedCount.get()); + + Thread.sleep(500); + assertEquals(true, shutdown.isCompleted()); + + Thread.sleep(500); + probe.requestNext("a"); + assertEquals(2, startedCount.get()); + + Thread.sleep(500); + probe.requestNext("a"); + assertEquals(3, startedCount.get()); + + Thread.sleep(500); + probe.requestNext("a"); + assertEquals(4, startedCount.get()); //startCount == 4, which means "re"-tried 3 times + + Thread.sleep(500); + assertEquals(TimeoutException.class, probe.expectError().getClass()); + probe.request(1); //send demand + probe.expectNoMessage(Duration.create(200, "milliseconds")); //but no more restart + } + }; + } + +} diff --git a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala new file mode 100644 index 0000000000..33cb39f6f3 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala @@ -0,0 +1,132 @@ +package docs.stream.cookbook + +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } + +import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ Graph, SourceShape } +import akka.testkit.TimingTest +import akka.{ Done, NotUsed } + +import scala.concurrent._ +import scala.concurrent.duration._ + +class RecipeAdhocSource extends RecipeSpec { + + //#adhoc-source + def adhocSource[T](source: Source[T, _], timeout: FiniteDuration, maxRetries: Int): Source[T, _] = + Source.lazily( + () ⇒ source.backpressureTimeout(timeout).recoverWithRetries(maxRetries, { + case t: TimeoutException ⇒ + Source.lazily(() ⇒ source.backpressureTimeout(timeout)).mapMaterializedValue(_ ⇒ NotUsed) + }) + ) + //#adhoc-source + + "Recipe for adhoc source" must { + "not start the source if there is no demand" taggedAs TimingTest in { + val isStarted = new AtomicBoolean() + adhocSource(Source.empty.mapMaterializedValue(_ ⇒ isStarted.set(true)), 200.milliseconds, 3) + .runWith(TestSink.probe[Int]) + Thread.sleep(300) + isStarted.get() should be(false) + } + + "start the source when there is a demand" taggedAs TimingTest in { + val sink = adhocSource(Source.repeat("a"), 200.milliseconds, 3) + .runWith(TestSink.probe[String]) + sink.requestNext("a") + } + + "shut down the source when the next demand times out" taggedAs TimingTest in { + val shutdown = Promise[Done]() + val sink = adhocSource( + Source.repeat("a").watchTermination() { (_, term) ⇒ + shutdown.completeWith(term) + }, 200.milliseconds, 3) + .runWith(TestSink.probe[String]) + + sink.requestNext("a") + Thread.sleep(500) + shutdown.isCompleted should be(true) + } + + "not shut down the source when there are still demands" taggedAs TimingTest in { + val shutdown = Promise[Done]() + val sink = adhocSource( + Source.repeat("a").watchTermination() { (_, term) ⇒ + shutdown.completeWith(term) + }, 200.milliseconds, 3) + .runWith(TestSink.probe[String]) + + sink.requestNext("a") + Thread.sleep(100) + sink.requestNext("a") + Thread.sleep(100) + sink.requestNext("a") + Thread.sleep(100) + sink.requestNext("a") + Thread.sleep(100) + sink.requestNext("a") + Thread.sleep(100) + + shutdown.isCompleted should be(false) + } + + "restart upon demand again after timeout" taggedAs TimingTest in { + val shutdown = Promise[Done]() + val startedCount = new AtomicInteger(0) + + val source = Source + .empty.mapMaterializedValue(_ ⇒ startedCount.incrementAndGet()) + .concat(Source.repeat("a")) + + val sink = adhocSource(source.watchTermination() { (_, term) ⇒ + shutdown.completeWith(term) + }, 200.milliseconds, 3) + .runWith(TestSink.probe[String]) + + sink.requestNext("a") + startedCount.get() should be(1) + Thread.sleep(500) + shutdown.isCompleted should be(true) + } + + "restart up to specified maxRetries" taggedAs TimingTest in { + val shutdown = Promise[Done]() + val startedCount = new AtomicInteger(0) + + val source = Source + .empty.mapMaterializedValue(_ ⇒ startedCount.incrementAndGet()) + .concat(Source.repeat("a")) + + val sink = adhocSource(source.watchTermination() { (_, term) ⇒ + shutdown.completeWith(term) + }, 200.milliseconds, 3) + .runWith(TestSink.probe[String]) + + sink.requestNext("a") + startedCount.get() should be(1) + + Thread.sleep(500) + shutdown.isCompleted should be(true) + + Thread.sleep(500) + sink.requestNext("a") + startedCount.get() should be(2) + + Thread.sleep(500) + sink.requestNext("a") + startedCount.get() should be(3) + + Thread.sleep(500) + sink.requestNext("a") + startedCount.get() should be(4) //startCount == 4, which means "re"-tried 3 times + + Thread.sleep(500) + sink.expectError().getClass should be(classOf[TimeoutException]) + sink.request(1) //send demand + sink.expectNoMessage(200.milliseconds) //but no more restart + } + } +}