Operator docs for Source.tick (#28225)

This commit is contained in:
Johan Andrén 2019-11-25 13:25:07 +01:00 committed by GitHub
parent 5e2435a424
commit ac15cb3289
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 177 additions and 3 deletions

View file

@ -15,7 +15,47 @@ A periodical repetition of an arbitrary object.
## Description
A periodical repetition of an arbitrary object. Delay of first tick is specified
separately from interval of the following ticks.
separately from interval of the following ticks.
If downstream is applying backpressure when the time period has passed the tick is dropped.
The source materializes a @apidoc[Cancellable] that can be used to complete the source.
@@@note
The element must be immutable as the source can be materialized several times and may pass it between threads, see the second
example for achieving a periodical element that changes over time.
@@@
## Examples
This first example prints to standard out periodically:
Scala
: @@snip [Tick.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala) { #simple }
Java
: @@snip [Tick.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java) { #simple }
You can also use the tick to periodically emit a value, in this sample we use the tick to trigger a query to an
actor using @ref:[ask](../../../typed/interaction-patterns.md#outside-ask) and emit the response downstream. For this
usage, what is important is that it was emitted, not the actual tick value.
Scala
: @@snip [Tick.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala) { #poll-actor }
Java
: @@snip [Tick.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java) { #poll-actor }
A neat trick is to combine this with @ref:[zipLatest](../Source-or-Flow/zipLatest.md) to combine a stream of elements
with a value that is updated periodically instead of having to trigger a query for each element:
Scala
: @@snip [Tick.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Tick.scala) { #zip-latest }
Java
: @@snip [Tick.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Tick.java) { #zip-latest }
## Reactive Streams semantics
@ -23,7 +63,6 @@ separately from interval of the following ticks.
**emits** periodically, if there is downstream backpressure ticks are skipped
**completes** never
**completes** when the materialized `Cancellable` is cancelled
@@@

View file

@ -0,0 +1,76 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
import akka.NotUsed;
import akka.actor.Cancellable;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.AskPattern;
import java.util.concurrent.CompletionStage;
import java.time.Duration;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
public class Tick {
// not really a runnable example, these are just pretend
private ActorSystem<Void> system = null;
private ActorRef<MyActor.Command> myActor = null;
static class MyActor {
interface Command {}
static class Query implements Command {
public final ActorRef<Response> replyTo;
public Query(ActorRef<Response> replyTo) {
this.replyTo = replyTo;
}
}
static class Response {
public final String text;
public Response(String text) {
this.text = text;
}
}
}
void simple() {
// #simple
Source.tick(
Duration.ofSeconds(1), // delay of first tick
Duration.ofSeconds(1), // delay of subsequent ticks
"tick" // element emitted each tick
)
.runForeach(System.out::println, system);
// #simple
}
void pollSomething() {
// #poll-actor
Source<String, Cancellable> periodicActorResponse =
Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick")
.mapAsync(
1,
notUsed -> {
CompletionStage<MyActor.Response> response =
AskPattern.ask(
myActor, MyActor.Query::new, Duration.ofSeconds(3), system.scheduler());
return response;
})
.map(response -> response.text);
// #poll-actor
// #zip-latest
Flow<Integer, Pair<Integer, String>, NotUsed> zipWithLatestResponse =
Flow.of(Integer.class).zipLatest(periodicActorResponse);
// #zip-latest
}
}

View file

@ -0,0 +1,59 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.source
import akka.NotUsed
import akka.actor.Cancellable
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.stream.scaladsl.Source
import akka.actor.typed.scaladsl.AskPattern._
import akka.stream.scaladsl.Flow
import akka.util.Timeout
import scala.concurrent.Future
import scala.concurrent.duration._
object Tick {
// not really a runnable example, these are just pretend
implicit val system: ActorSystem[Nothing] = null
val myActor: ActorRef[MyActor.Command] = null;
object MyActor {
sealed trait Command {}
case class Query(replyTo: ActorRef[Response]) extends Command
case class Response(text: String)
}
def simple() {
// #simple
Source
.tick(
1.second, // delay of first tick
1.second, // delay of subsequent ticks
"tick" // element emitted each tick
)
.runForeach(println)
// #simple
}
def pollSomething() {
// #poll-actor
val periodicActorResponse: Source[String, Cancellable] = Source
.tick(1.second, 1.second, "tick")
.mapAsync(1) { _ =>
implicit val timeout: Timeout = 3.seconds
val response: Future[MyActor.Response] = myActor.ask(MyActor.Query)
response
}
.map(_.text);
// #poll-actor
// #zip-latest
val zipWithLatestResponse: Flow[Int, (Int, String), NotUsed] =
Flow[Int].zipLatest(periodicActorResponse);
// #zip-latest
}
}