Docs: sync and refresh ask stream operators

This commit is contained in:
Enno Runne 2020-03-18 18:23:09 +01:00
parent bcff540dbd
commit de7dc9cdf8
6 changed files with 135 additions and 96 deletions

View file

@ -1,63 +0,0 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.typed.javadsl;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class ActorFlowCompileTest {
interface Protocol {}
class Init implements Protocol {}
class Msg implements Protocol {}
class Complete implements Protocol {}
class Failure implements Protocol {
public Exception ex;
}
{
final ActorSystem<String> system = null;
}
static
// #ask-actor
class AskMe {
final String payload;
final ActorRef<String> replyTo;
AskMe(String payload, ActorRef<String> replyTo) {
this.payload = payload;
this.replyTo = replyTo;
}
}
// #ask-actor
{
final ActorRef<AskMe> ref = null;
// #ask
Duration timeout = Duration.of(1, ChronoUnit.SECONDS);
Source.repeat("hello").via(ActorFlow.ask(ref, timeout, AskMe::new)).to(Sink.ignore());
Source.repeat("hello")
.via(
ActorFlow.<String, AskMe, String>ask(
ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
.to(Sink.ignore());
// #ask
}
}

View file

@ -0,0 +1,67 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.javadsl;
import akka.NotUsed;
// #ask-actor
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorFlow;
// #ask-actor
import java.time.Duration;
public class ActorFlowCompileTest {
final ActorSystem<String> system = null;
static
// #ask-actor
class Asking {
final String payload;
final ActorRef<Reply> replyTo;
public Asking(String payload, ActorRef<Reply> replyTo) {
this.payload = payload;
this.replyTo = replyTo;
}
}
// #ask-actor
static
// #ask-actor
class Reply {
public final String msg;
public Reply(String msg) {
this.msg = msg;
}
}
// #ask-actor
{
// #ask
final ActorRef<Asking> actorRef = // ???
// #ask
null;
// #ask
Duration timeout = Duration.ofSeconds(1);
// method reference notation
Flow<String, Reply, NotUsed> askFlow = ActorFlow.ask(actorRef, timeout, Asking::new);
// explicit creation of the sent message
Flow<String, Reply, NotUsed> askFlowExplicit =
ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo));
Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system);
// #ask
}
}

View file

@ -2,26 +2,30 @@
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.typed.scaladsl
package docs.scaladsl
import akka.NotUsed
//#imports
import akka.stream.scaladsl._
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.typed.scaladsl.ActorFlow
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
//#imports
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.stream.testkit.TestSubscriber
import org.scalatest.wordspec.AnyWordSpecLike
//#imports
import akka.stream.testkit.TestSubscriber
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
object ActorFlowSpec {
//#ask-actor
final case class Asking(s: String, replyTo: ActorRef[Reply])
final case class Reply(s: String)
final case class Reply(msg: String)
//#ask-actor
}
class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
@ -59,11 +63,17 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
//#ask-actor
//#ask
val in: Future[immutable.Seq[Reply]] =
Source(1 to 50)
.map(_.toString)
.via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo)))
.runWith(Sink.seq)
implicit val timeout: akka.util.Timeout = 1.second
val askFlow: Flow[String, Reply, NotUsed] =
ActorFlow.ask(ref)(Asking.apply)
// explicit creation of the sent message
val askFlowExplicit: Flow[String, Reply, NotUsed] =
ActorFlow.ask(ref)(makeMessage = (el, replyTo: ActorRef[Reply]) => Asking(el, replyTo))
val in: Future[immutable.Seq[String]] =
Source(1 to 50).map(_.toString).via(askFlow).map(_.msg).runWith(Sink.seq)
//#ask
in.futureValue shouldEqual List.tabulate(51)(i => Reply(s"$i!!!")).drop(1)