diff --git a/akka-docs/rst/java/code/docs/stream/ActorSubscriberDocTest.java b/akka-docs/rst/java/code/docs/stream/ActorSubscriberDocTest.java index 37d3f316d8..d7f15f09bb 100644 --- a/akka-docs/rst/java/code/docs/stream/ActorSubscriberDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/ActorSubscriberDocTest.java @@ -26,8 +26,10 @@ import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import scala.concurrent.duration.Duration; import java.util.*; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -48,7 +50,7 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { system = null; mat = null; } - + //#worker-pool public static class WorkerPoolProtocol { @@ -70,7 +72,7 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { return new Msg(id, replyTo); } - + public static class Work { public final int id; public Work(int id) { this.id = id; } @@ -83,8 +85,8 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { public static Work work(int id) { return new Work(id); } - - + + public static class Reply { public final int id; public Reply(int id) { this.id = id; } @@ -97,8 +99,8 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { public static Reply reply(int id) { return new Reply(id); } - - + + public static class Done { public final int id; public Done(int id) { this.id = id; } @@ -136,16 +138,16 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { } } - + public static class WorkerPool extends AbstractActorSubscriber { - + public static Props props() { return Props.create(WorkerPool.class); } - + final int MAX_QUEUE_SIZE = 10; final Map queue = new HashMap<>(); - + final Router router; - + @Override public RequestStrategy requestStrategy() { return new MaxInFlightRequestStrategy(MAX_QUEUE_SIZE) { @@ -161,7 +163,7 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { for (int i = 0; i < 3; i++) routees.add(new ActorRefRoutee(context().actorOf(Props.create(Worker.class)))); router = new Router(new RoundRobinRoutingLogic(), routees); - + receive(ReceiveBuilder. match(ActorSubscriberMessage.OnNext.class, on -> on.element() instanceof WorkerPoolProtocol.Msg, onNext -> { @@ -170,18 +172,26 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { if (queue.size() > MAX_QUEUE_SIZE) throw new RuntimeException("queued too many: " + queue.size()); - + router.route(WorkerPoolProtocol.work(msg.id), self()); }). + match(ActorSubscriberMessage.onCompleteInstance().getClass(), complete -> { + if (queue.isEmpty()) { + context().stop(self()); + } + }). match(WorkerPoolProtocol.Reply.class, reply -> { int id = reply.id; queue.get(id).tell(WorkerPoolProtocol.done(id), self()); queue.remove(id); + if (canceled() && queue.isEmpty()) { + context().stop(self()); + } }). build()); } } - + static class Worker extends AbstractActor { public Worker() { receive(ReceiveBuilder. @@ -192,11 +202,11 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { } } //#worker-pool - + @Test public void demonstrateActorPublisherUsage() { new JavaTestKit(system) { - + { final ActorRef replyTo = getTestActor(); @@ -206,12 +216,14 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { for (int i = 0; i < N; i++) { data.add(i); } - - Source.from(data) + + final ActorRef worker = Source.from(data) .map(i -> WorkerPoolProtocol.msg(i, replyTo)) .runWith(Sink.actorSubscriber(WorkerPool.props()), mat); //#actor-subscriber-usage + watch(worker); + List got = Arrays.asList(receiveN(N)); Collections.sort(got, new Comparator() { @Override @@ -226,9 +238,10 @@ public class ActorSubscriberDocTest extends AbstractJavaTest { assertEquals(String.format("Expected %d, but got %s", i, got.get(i)), WorkerPoolProtocol.done(i), got.get(i)); } assertEquals(String.format("Expected 117 messages but got %d", i), i, 117); + expectTerminated(Duration.create(10, TimeUnit.SECONDS), worker); } }; } - + } diff --git a/akka-docs/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala index f8afb97266..6c3b055b88 100644 --- a/akka-docs/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala @@ -16,6 +16,7 @@ import akka.stream.actor.MaxInFlightRequestStrategy import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.testkit.AkkaSpec +import scala.concurrent.duration._ object ActorSubscriberDocSpec { //#worker-pool @@ -54,6 +55,13 @@ object ActorSubscriberDocSpec { case Reply(id) => queue(id) ! Done(id) queue -= id + if (canceled && queue.isEmpty) { + context.stop(self) + } + case OnComplete => + if (queue.isEmpty) { + context.stop(self) + } } } @@ -79,11 +87,13 @@ class ActorSubscriberDocSpec extends AkkaSpec { //#actor-subscriber-usage val N = 117 - Source(1 to N).map(WorkerPool.Msg(_, replyTo)) + val worker = Source(1 to N).map(WorkerPool.Msg(_, replyTo)) .runWith(Sink.actorSubscriber(WorkerPool.props)) //#actor-subscriber-usage + watch(worker) receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet) + expectTerminated(worker, 10.seconds) } }