=doc missing ACK reply in doc example (#24997)

This commit is contained in:
Konrad `ktoso` Malawski 2018-04-30 21:12:04 +09:00 committed by Arnout Engelen
parent 3e11092388
commit d870d37bb3
2 changed files with 12 additions and 2 deletions

View file

@ -329,6 +329,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
return receiveBuilder() return receiveBuilder()
.match(StreamInitialized.class, init -> { .match(StreamInitialized.class, init -> {
log().info("Stream initialized"); log().info("Stream initialized");
sender().tell(new Ack(), self());
}) })
.match(String.class, element -> { .match(String.class, element -> {
log().info("Received element: {}", element); log().info("Received element: {}", element);

View file

@ -210,8 +210,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val OnCompleteMessage = AckingReceiver.StreamCompleted val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) AckingReceiver.StreamFailure(ex) val onErrorMessage = (ex: Throwable) AckingReceiver.StreamFailure(ex)
val probe = TestProbe()
val receiver = system.actorOf( val receiver = system.actorOf(
Props(new AckingReceiver(ackWith = AckMessage))) Props(new AckingReceiver(probe.ref, ackWith = AckMessage)))
val sink = Sink.actorRefWithAck( val sink = Sink.actorRefWithAck(
receiver, receiver,
onInitMessage = InitMessage, onInitMessage = InitMessage,
@ -224,6 +225,10 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
.map(_.toLowerCase) .map(_.toLowerCase)
.runWith(sink) .runWith(sink)
//#actorRefWithAck //#actorRefWithAck
probe.expectMsg("Stream initialized!")
probe.expectMsg("hello")
probe.expectMsg("hi")
probe.expectMsg("Stream completed!")
} }
//#actorRefWithAck-actor //#actorRefWithAck-actor
@ -235,19 +240,23 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
final case class StreamFailure(ex: Throwable) final case class StreamFailure(ex: Throwable)
} }
class AckingReceiver(ackWith: Any) extends Actor with ActorLogging { class AckingReceiver(probe: ActorRef, ackWith: Any) extends Actor with ActorLogging {
import AckingReceiver._ import AckingReceiver._
def receive: Receive = { def receive: Receive = {
case StreamInitialized case StreamInitialized
log.info("Stream initialized!") log.info("Stream initialized!")
probe ! "Stream initialized!"
sender() ! Ack // ack to allow the stream to proceed sending more elements
case el: String case el: String
log.info("Received element: {}", el) log.info("Received element: {}", el)
probe ! el
sender() ! Ack // ack to allow the stream to proceed sending more elements sender() ! Ack // ack to allow the stream to proceed sending more elements
case StreamCompleted case StreamCompleted
log.info("Stream completed!") log.info("Stream completed!")
probe ! "Stream completed!"
case StreamFailure(ex) case StreamFailure(ex)
log.error(ex, "Stream failed!") log.error(ex, "Stream failed!")
} }