From ac15cb32893941068bf3cfa760d7afbe5762adda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 25 Nov 2019 13:25:07 +0100 Subject: [PATCH] Operator docs for Source.tick (#28225) --- .../paradox/stream/operators/Source/tick.md | 45 ++++++++++- .../jdocs/stream/operators/source/Tick.java | 76 +++++++++++++++++++ .../docs/stream/operators/source/Tick.scala | 59 ++++++++++++++ 3 files changed, 177 insertions(+), 3 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source/tick.md b/akka-docs/src/main/paradox/stream/operators/Source/tick.md index 28cd61a822..af2201b864 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/tick.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/tick.md @@ -15,7 +15,47 @@ A periodical repetition of an arbitrary object. ## Description A periodical repetition of an arbitrary object. Delay of first tick is specified -separately from interval of the following ticks. +separately from interval of the following ticks. + +If downstream is applying backpressure when the time period has passed the tick is dropped. + +The source materializes a @apidoc[Cancellable] that can be used to complete the source. + +@@@note + +The element must be immutable as the source can be materialized several times and may pass it between threads, see the second +example for achieving a periodical element that changes over time. + +@@@ + +## Examples + +This first example prints to standard out periodically: + +Scala +: @@snip [Tick.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala) { #simple } + +Java +: @@snip [Tick.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java) { #simple } + +You can also use the tick to periodically emit a value, in this sample we use the tick to trigger a query to an +actor using @ref:[ask](../../../typed/interaction-patterns.md#outside-ask) and emit the response downstream. For this +usage, what is important is that it was emitted, not the actual tick value. + +Scala +: @@snip [Tick.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala) { #poll-actor } + +Java +: @@snip [Tick.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java) { #poll-actor } + +A neat trick is to combine this with @ref:[zipLatest](../Source-or-Flow/zipLatest.md) to combine a stream of elements +with a value that is updated periodically instead of having to trigger a query for each element: + +Scala +: @@snip [Tick.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala) { #zip-latest } + +Java +: @@snip [Tick.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java) { #zip-latest } ## Reactive Streams semantics @@ -23,7 +63,6 @@ separately from interval of the following ticks. **emits** periodically, if there is downstream backpressure ticks are skipped -**completes** never +**completes** when the materialized `Cancellable` is cancelled @@@ - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java new file mode 100644 index 0000000000..2af1667ae3 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.actor.Cancellable; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.javadsl.AskPattern; +import java.util.concurrent.CompletionStage; +import java.time.Duration; + +import akka.japi.Pair; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; + +public class Tick { + + // not really a runnable example, these are just pretend + private ActorSystem system = null; + private ActorRef myActor = null; + + static class MyActor { + interface Command {} + + static class Query implements Command { + public final ActorRef replyTo; + + public Query(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static class Response { + public final String text; + + public Response(String text) { + this.text = text; + } + } + } + + void simple() { + // #simple + Source.tick( + Duration.ofSeconds(1), // delay of first tick + Duration.ofSeconds(1), // delay of subsequent ticks + "tick" // element emitted each tick + ) + .runForeach(System.out::println, system); + // #simple + } + + void pollSomething() { + // #poll-actor + Source periodicActorResponse = + Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick") + .mapAsync( + 1, + notUsed -> { + CompletionStage response = + AskPattern.ask( + myActor, MyActor.Query::new, Duration.ofSeconds(3), system.scheduler()); + return response; + }) + .map(response -> response.text); + // #poll-actor + + // #zip-latest + Flow, NotUsed> zipWithLatestResponse = + Flow.of(Integer.class).zipLatest(periodicActorResponse); + // #zip-latest + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala new file mode 100644 index 0000000000..f6a4427ec7 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.actor.Cancellable +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.stream.scaladsl.Source +import akka.actor.typed.scaladsl.AskPattern._ +import akka.stream.scaladsl.Flow +import akka.util.Timeout + +import scala.concurrent.Future +import scala.concurrent.duration._ + +object Tick { + // not really a runnable example, these are just pretend + implicit val system: ActorSystem[Nothing] = null + val myActor: ActorRef[MyActor.Command] = null; + + object MyActor { + sealed trait Command {} + case class Query(replyTo: ActorRef[Response]) extends Command + case class Response(text: String) + } + + def simple() { + // #simple + Source + .tick( + 1.second, // delay of first tick + 1.second, // delay of subsequent ticks + "tick" // element emitted each tick + ) + .runForeach(println) + // #simple + } + + def pollSomething() { + // #poll-actor + val periodicActorResponse: Source[String, Cancellable] = Source + .tick(1.second, 1.second, "tick") + .mapAsync(1) { _ => + implicit val timeout: Timeout = 3.seconds + val response: Future[MyActor.Response] = myActor.ask(MyActor.Query) + response + } + .map(_.text); + // #poll-actor + + // #zip-latest + val zipWithLatestResponse: Flow[Int, (Int, String), NotUsed] = + Flow[Int].zipLatest(periodicActorResponse); + // #zip-latest + } +}