+doc Add examples to Sink.actorRefWithAck (#24854)
This commit is contained in:
parent
aaea027d0e
commit
046b88ce90
3 changed files with 141 additions and 3 deletions
|
|
@ -68,6 +68,27 @@ If the target actor terminates the stream will be cancelled. When the stream is
|
||||||
given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with
|
given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with
|
||||||
failure a `akka.actor.Status.Failure` message will be sent to the destination actor.
|
failure a `akka.actor.Status.Failure` message will be sent to the destination actor.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck }
|
||||||
|
|
||||||
|
The receiving actor would then need to be implemented similar to the following:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #actorRefWithAck-actor }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #actorRefWithAck-actor }
|
||||||
|
|
||||||
|
Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted
|
||||||
|
as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements.
|
||||||
|
Handling the other signals while is not required, however is a good practice, to see the state of the streams lifecycle
|
||||||
|
in the connected actor as well. Technically it is also possible to use multiple sinks targeting the same actor,
|
||||||
|
however it is not common practice to do so, and one should rather investigate using a `Merge` stage for this purpose.
|
||||||
|
|
||||||
|
|
||||||
@@@ note
|
@@@ note
|
||||||
|
|
||||||
Using `Sink.actorRef` or ordinary `tell` from a `map` or `foreach` stage means that there is
|
Using `Sink.actorRef` or ordinary `tell` from a `map` or `foreach` stage means that there is
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
|
||||||
import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.tweets;
|
import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.tweets;
|
||||||
import static junit.framework.TestCase.assertTrue;
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
|
||||||
|
@SuppressWarnings("ALL")
|
||||||
public class IntegrationDocTest extends AbstractJavaTest {
|
public class IntegrationDocTest extends AbstractJavaTest {
|
||||||
|
|
||||||
private static final SilenceSystemOut.System System = SilenceSystemOut.get();
|
private static final SilenceSystemOut.System System = SilenceSystemOut.get();
|
||||||
|
|
@ -309,6 +310,41 @@ public class IntegrationDocTest extends AbstractJavaTest {
|
||||||
}
|
}
|
||||||
//#ask-actor
|
//#ask-actor
|
||||||
|
|
||||||
|
//#actorRefWithAck-actor
|
||||||
|
static class Ack {}
|
||||||
|
|
||||||
|
static class StreamInitialized {}
|
||||||
|
static class StreamCompleted {}
|
||||||
|
static class StreamFailure {
|
||||||
|
private final Throwable cause;
|
||||||
|
public StreamFailure(Throwable cause) { this.cause = cause; }
|
||||||
|
|
||||||
|
public Throwable getCause() { return cause; }
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AckingReceiver extends AbstractLoggingActor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder()
|
||||||
|
.match(StreamInitialized.class, init -> {
|
||||||
|
log().info("Stream initialized");
|
||||||
|
})
|
||||||
|
.match(String.class, element -> {
|
||||||
|
log().info("Received element: {}", element);
|
||||||
|
sender().tell(new Ack(), self());
|
||||||
|
})
|
||||||
|
.match(StreamCompleted.class, completed -> {
|
||||||
|
log().info("Stream completed");
|
||||||
|
})
|
||||||
|
.match(StreamFailure.class, failed -> {
|
||||||
|
log().error(failed.getCause(),"Stream failed!");
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#actorRefWithAck-actor
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void askStage() throws Exception {
|
public void askStage() throws Exception {
|
||||||
|
|
@ -325,6 +361,28 @@ public class IntegrationDocTest extends AbstractJavaTest {
|
||||||
//#ask
|
//#ask
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void actorRefWithAckExample() throws Exception {
|
||||||
|
//#actorRefWithAck
|
||||||
|
Source<String, NotUsed> words =
|
||||||
|
Source.from(Arrays.asList("hello", "hi"));
|
||||||
|
|
||||||
|
ActorRef receiver =
|
||||||
|
system.actorOf(Props.create(AckingReceiver.class));
|
||||||
|
|
||||||
|
Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(receiver,
|
||||||
|
new StreamInitialized(),
|
||||||
|
new Ack(),
|
||||||
|
new StreamCompleted(),
|
||||||
|
ex -> new StreamFailure(ex)
|
||||||
|
);
|
||||||
|
|
||||||
|
words
|
||||||
|
.map(el -> el.toLowerCase())
|
||||||
|
.runWith(sink, mat);
|
||||||
|
//#actorRefWithAck
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void callingExternalServiceWithMapAsync() throws Exception {
|
public void callingExternalServiceWithMapAsync() throws Exception {
|
||||||
|
|
|
||||||
|
|
@ -10,21 +10,23 @@ import scala.concurrent.duration._
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import akka.testkit.TestProbe
|
import akka.testkit.TestProbe
|
||||||
import akka.actor.ActorRef
|
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Status }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor
|
|
||||||
import akka.actor.Props
|
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.stream.Attributes
|
import akka.stream.Attributes
|
||||||
import akka.stream.ActorAttributes
|
import akka.stream.ActorAttributes
|
||||||
|
|
||||||
import scala.concurrent.ExecutionContext
|
import scala.concurrent.ExecutionContext
|
||||||
import akka.stream.ActorMaterializerSettings
|
import akka.stream.ActorMaterializerSettings
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
import akka.stream.Supervision
|
import akka.stream.Supervision
|
||||||
import akka.stream.scaladsl.Flow
|
import akka.stream.scaladsl.Flow
|
||||||
import akka.Done
|
import akka.Done
|
||||||
|
import akka.actor.Status.Status
|
||||||
|
|
||||||
object IntegrationDocSpec {
|
object IntegrationDocSpec {
|
||||||
import TwitterStreamQuickstartDocSpec._
|
import TwitterStreamQuickstartDocSpec._
|
||||||
|
|
@ -195,6 +197,63 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
||||||
probe.expectMsg("akkateam@somewhere.com")
|
probe.expectMsg("akkateam@somewhere.com")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"actorRefWithAck" in {
|
||||||
|
//#actorRefWithAck
|
||||||
|
val words: Source[String, NotUsed] =
|
||||||
|
Source(List("hello", "hi"))
|
||||||
|
|
||||||
|
// sent from actor to stream to "ack" processing of given element
|
||||||
|
val AckMessage = AckingReceiver.Ack
|
||||||
|
|
||||||
|
// sent from stream to actor to indicate start, end or failure of stream:
|
||||||
|
val InitMessage = AckingReceiver.StreamInitialized
|
||||||
|
val OnCompleteMessage = AckingReceiver.StreamCompleted
|
||||||
|
val onErrorMessage = (ex: Throwable) ⇒ AckingReceiver.StreamFailure(ex)
|
||||||
|
|
||||||
|
val receiver = system.actorOf(
|
||||||
|
Props(new AckingReceiver(ackWith = AckMessage)))
|
||||||
|
val sink = Sink.actorRefWithAck(
|
||||||
|
receiver,
|
||||||
|
onInitMessage = InitMessage,
|
||||||
|
ackMessage = AckMessage,
|
||||||
|
onCompleteMessage = OnCompleteMessage,
|
||||||
|
onFailureMessage = onErrorMessage
|
||||||
|
)
|
||||||
|
|
||||||
|
words
|
||||||
|
.map(_.toLowerCase)
|
||||||
|
.runWith(sink)
|
||||||
|
//#actorRefWithAck
|
||||||
|
}
|
||||||
|
|
||||||
|
//#actorRefWithAck-actor
|
||||||
|
object AckingReceiver {
|
||||||
|
case object Ack
|
||||||
|
|
||||||
|
case object StreamInitialized
|
||||||
|
case object StreamCompleted
|
||||||
|
final case class StreamFailure(ex: Throwable)
|
||||||
|
}
|
||||||
|
|
||||||
|
class AckingReceiver(ackWith: Any) extends Actor with ActorLogging {
|
||||||
|
import AckingReceiver._
|
||||||
|
|
||||||
|
def receive: Receive = {
|
||||||
|
case StreamInitialized ⇒
|
||||||
|
log.info("Stream initialized!")
|
||||||
|
|
||||||
|
case el: String ⇒
|
||||||
|
log.info("Received element: {}", el)
|
||||||
|
sender() ! Ack // ack to allow the stream to proceed sending more elements
|
||||||
|
|
||||||
|
case StreamCompleted ⇒
|
||||||
|
log.info("Stream completed!")
|
||||||
|
case StreamFailure(ex) ⇒
|
||||||
|
log.error(ex, "Stream failed!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#actorRefWithAck-actor
|
||||||
|
|
||||||
"lookup email with mapAsync and supervision" in {
|
"lookup email with mapAsync and supervision" in {
|
||||||
val addressSystem = new AddressSystem2
|
val addressSystem = new AddressSystem2
|
||||||
val authors: Source[Author, NotUsed] =
|
val authors: Source[Author, NotUsed] =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue