From c65bf2d276b495db2dd173bfa03794143d00d897 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 3 May 2019 11:36:50 +0100 Subject: [PATCH] Remove docs for ActorPub/Sub and write persistence query docs as stages (#26844) * use separate db columns * Use io dispatcher for sample stage --- .../actor/typed/TypedBenchmarkActors.scala | 1 - .../src/main/paradox/persistence-query.md | 6 +- .../paradox/stream/stream-integrations.md | 133 ----- .../persistence/PersistenceQueryDocTest.java | 17 +- .../query/MyEventsByTagJavaPublisher.java | 135 ----- .../query/MyEventsByTagSource.java | 136 +++++ .../jdocs/stream/ActorPublisherDocTest.java | 161 ------ .../jdocs/stream/ActorSubscriberDocTest.java | 274 ---------- .../query/MyEventsByTagPublisher.scala | 101 ---- .../query/MyEventsByTagSource.scala | 110 ++++ .../query/PersistenceQueryDocSpec.scala | 5 +- .../docs/stream/ActorPublisherDocSpec.scala | 99 ---- .../docs/stream/ActorSubscriberDocSpec.scala | 99 ---- .../akka/stream/tck/ActorPublisherTest.scala | 52 -- .../akka/stream/tck/ActorSubscriberTest.scala | 29 - .../akka/stream/actor/ActorPublisherTest.java | 64 --- .../stream/actor/ActorSubscriberTest.java | 82 --- .../stream/actor/ActorPublisherSpec.scala | 508 ------------------ .../stream/actor/ActorSubscriberSpec.scala | 300 ----------- .../mima-filters/2.5.x.backwards.excludes | 14 + .../akka/stream/actor/ActorPublisher.scala | 88 --- .../akka/stream/actor/ActorSubscriber.scala | 52 -- 22 files changed, 272 insertions(+), 2194 deletions(-) delete mode 100644 akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java create mode 100644 akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagSource.java delete mode 100644 akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java delete mode 100644 akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java delete mode 100644 akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagPublisher.scala create mode 100644 akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagSource.scala delete mode 100644 akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala delete mode 100644 akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala delete mode 100644 akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala delete mode 100644 akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala delete mode 100644 akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java delete mode 100644 akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala diff --git a/akka-bench-jmh-typed/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala b/akka-bench-jmh-typed/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala index 0b47b1ad0c..aac5eddc59 100644 --- a/akka-bench-jmh-typed/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala +++ b/akka-bench-jmh-typed/src/main/scala/akka/actor/typed/TypedBenchmarkActors.scala @@ -5,7 +5,6 @@ package akka.actor.typed import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import akka.Done import akka.actor.typed.scaladsl.ActorContext diff --git a/akka-docs/src/main/paradox/persistence-query.md b/akka-docs/src/main/paradox/persistence-query.md index 3ec590c236..07e734a6e8 100644 --- a/akka-docs/src/main/paradox/persistence-query.md +++ b/akka-docs/src/main/paradox/persistence-query.md @@ -300,13 +300,13 @@ Scala Java : @@snip [PersistenceQueryDocTest.java](/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java) { #my-read-journal } -And the `eventsByTag` could be backed by such an Actor for example: +And the `eventsByTag` could be backed by a GraphStage for example: Scala -: @@snip [MyEventsByTagPublisher.scala](/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagPublisher.scala) { #events-by-tag-publisher } +: @@snip [MyEventsByTagSource.scala](/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagSource.scala) { #events-by-tag-publisher } Java -: @@snip [MyEventsByTagJavaPublisher.java](/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java) { #events-by-tag-publisher } +: @@snip [MyEventsByTagSource.java](/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagSource.java) { #events-by-tag-publisher } The `ReadJournalProvider` class must have a constructor with one of these signatures: diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index dc0e7d3c5c..5c7c1c9762 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -557,136 +557,3 @@ Java Please note that a factory is necessary to achieve reusability of the resulting `Flow`. -### Implementing Reactive Streams Publisher or Subscriber - -As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and -any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you -implement Reactive Streams integrations with built-in operators or @ref:[custom operators](stream-customize.md). - -For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are -provided to support implementing Reactive Streams `Publisher` and `Subscriber` with -an `Actor`. - -These can be consumed by other Reactive Stream libraries or used as an Akka Streams `Source` or `Sink`. - -@@@ warning - -`ActorPublisher` and `ActorSubscriber` cannot be used with remote actors, -because if signals of the Reactive Streams protocol (e.g. `request`) are lost the -the stream may deadlock. - -@@@ - -#### ActorPublisher - -@@@ warning - -**Deprecation warning:** `ActorPublisher` is deprecated in favour of the vastly more -type-safe and safe to implement @ref[`GraphStage`](stream-customize.md). It can also -expose a "operator actor ref" is needed to be addressed as-if an Actor. -Custom operators implemented using @ref[`GraphStage`](stream-customize.md) are also automatically fusable. - -To learn more about implementing custom operators using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage). - -@@@ - -Extend @scala[`akka.stream.actor.ActorPublisher` in your `Actor` to make it]@java[`akka.stream.actor.AbstractActorPublisher` to implement] a -stream publisher that keeps track of the subscription life cycle and requested elements. - -Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: - -Scala -: @@snip [ActorPublisherDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala) { #job-manager } - -Java -: @@snip [ActorPublisherDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java) { #job-manager } - -You send elements to the stream by calling `onNext`. You are allowed to send as many -elements as have been requested by the stream subscriber. This amount can be inquired with -`totalDemand`. It is only allowed to use `onNext` when `isActive` and `totalDemand>0`, -otherwise `onNext` will throw `IllegalStateException`. - -When the stream subscriber requests more elements the `ActorPublisherMessage.Request` message -is delivered to this actor, and you can act on that event. The `totalDemand` -is updated automatically. - -When the stream subscriber cancels the subscription the `ActorPublisherMessage.Cancel` message -is delivered to this actor. After that subsequent calls to `onNext` will be ignored. - -You can complete the stream by calling `onComplete`. After that you are not allowed to -call `onNext`, `onError` and `onComplete`. - -You can terminate the stream with failure by calling `onError`. After that you are not allowed to -call `onNext`, `onError` and `onComplete`. - -If you suspect that this @scala[`ActorPublisher`]@java[`AbstractActorPublisher`] may never get subscribed to, you can override the `subscriptionTimeout` -method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when -the timeout triggers via an `ActorPublisherMessage.SubscriptionTimeoutExceeded` message and MUST then perform -cleanup and stop itself. - -If the actor is stopped the stream will be completed, unless it was not already terminated with -failure, completed or canceled. - -More detailed information can be found in the API documentation. - -This is how it can be used as input `Source` to a `Flow`: - -Scala -: @@snip [ActorPublisherDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala) { #actor-publisher-usage } - -Java -: @@snip [ActorPublisherDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java) { #actor-publisher-usage } - -@scala[A publisher that is created with `Sink.asPublisher` supports a specified number of subscribers. Additional - subscription attempts will be rejected with an `IllegalStateException`. -]@java[You can only attach one subscriber to this publisher. Use a `Broadcast`-element or - attach a `Sink.asPublisher(AsPublisher.WITH_FANOUT)` to enable multiple subscribers. -] - -#### ActorSubscriber - -@@@ warning - -**Deprecation warning:** `ActorSubscriber` is deprecated in favour of the vastly more -type-safe and safe to implement @ref[`GraphStage`](stream-customize.md). It can also -expose a "operator actor ref" is needed to be addressed as-if an Actor. -Custom operators implemented using @ref[`GraphStage`](stream-customize.md) are also automatically fusable. - -To learn more about implementing custom operators using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage). - -@@@ - -Extend @scala[`akka.stream.actor.ActorSubscriber` in your `Actor` to make it]@java[`akka.stream.actor.AbstractActorSubscriber` to make your class] a -stream subscriber with full control of stream back pressure. It will receive -`ActorSubscriberMessage.OnNext`, `ActorSubscriberMessage.OnComplete` and `ActorSubscriberMessage.OnError` -messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. - -Here is an example of such an actor. It dispatches incoming jobs to child worker actors: - -Scala -: @@snip [ActorSubscriberDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala) { #worker-pool } - -Java -: @@snip [ActorSubscriberDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java) { #worker-pool } - -Subclass must define the `RequestStrategy` to control stream back pressure. -After each incoming message the @scala[`ActorSubscriber`]@java[`AbstractActorSubscriber`] will automatically invoke -the `RequestStrategy.requestDemand` and propagate the returned demand to the stream. - - * The provided `WatermarkRequestStrategy` is a good strategy if the actor performs work itself. - * The provided `MaxInFlightRequestStrategy` is useful if messages are queued internally or -delegated to other actors. - * You can also implement a custom `RequestStrategy` or call `request` manually together with -`ZeroRequestStrategy` or some other strategy. In that case -you must also call `request` when the actor is started or when it is ready, otherwise -it will not receive any elements. - -More detailed information can be found in the API documentation. - -This is how it can be used as output `Sink` to a `Flow`: - -Scala -: @@snip [ActorSubscriberDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala) { #actor-subscriber-usage } - -Java -: @@snip [ActorSubscriberDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java) { #actor-subscriber-usage } diff --git a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java index de7ee04357..6a5089edf3 100644 --- a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java @@ -6,6 +6,7 @@ package jdocs.persistence; import static akka.pattern.Patterns.ask; +import java.sql.Connection; import java.time.Duration; import java.util.HashSet; import java.util.Set; @@ -21,9 +22,8 @@ import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import docs.persistence.query.MyEventsByTagPublisher; +import jdocs.persistence.query.MyEventsByTagSource; import org.reactivestreams.Subscriber; -import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.List; @@ -92,12 +92,11 @@ public class PersistenceQueryDocTest { akka.persistence.query.javadsl.PersistenceIdsQuery, akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { - private final FiniteDuration refreshInterval; + private final Duration refreshInterval; + private Connection conn; public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { - refreshInterval = - FiniteDuration.create( - config.getDuration("refresh-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + refreshInterval = config.getDuration("refresh-interval"); } /** @@ -115,10 +114,8 @@ public class PersistenceQueryDocTest { public Source eventsByTag(String tag, Offset offset) { if (offset instanceof Sequence) { Sequence sequenceOffset = (Sequence) offset; - final Props props = - MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); - return Source.actorPublisher(props) - .mapMaterializedValue(m -> NotUsed.getInstance()); + return Source.fromGraph( + new MyEventsByTagSource(conn, tag, sequenceOffset.value(), refreshInterval)); } else if (offset == NoOffset.getInstance()) return eventsByTag(tag, Offset.sequence(0L)); // recursive else diff --git a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java deleted file mode 100644 index 80ce7f558c..0000000000 --- a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package jdocs.persistence.query; - -import akka.actor.Cancellable; -import akka.actor.Scheduler; -import akka.japi.Pair; -import akka.persistence.PersistentRepr; -import akka.persistence.query.Offset; -import akka.serialization.Serialization; -import akka.serialization.SerializationExtension; -import akka.stream.actor.AbstractActorPublisher; - -import akka.actor.Props; -import akka.persistence.query.EventEnvelope; -import akka.stream.actor.ActorPublisherMessage.Cancel; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.time.Duration; - -import static java.util.stream.Collectors.toList; - -// #events-by-tag-publisher -class MyEventsByTagJavaPublisher extends AbstractActorPublisher { - private final Serialization serialization = SerializationExtension.get(getContext().getSystem()); - - private final Connection connection; - - private final String tag; - - private final String CONTINUE = "CONTINUE"; - private final int LIMIT = 1000; - private long currentOffset; - private List buf = new LinkedList<>(); - - private Cancellable continueTask; - - public MyEventsByTagJavaPublisher( - Connection connection, String tag, Long offset, Duration refreshInterval) { - this.connection = connection; - this.tag = tag; - this.currentOffset = offset; - - final Scheduler scheduler = getContext().getSystem().scheduler(); - this.continueTask = - scheduler.schedule( - refreshInterval, - refreshInterval, - getSelf(), - CONTINUE, - getContext().getDispatcher(), - getSelf()); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .matchEquals( - CONTINUE, - (in) -> { - query(); - deliverBuf(); - }) - .match( - Cancel.class, - (in) -> { - getContext().stop(getSelf()); - }) - .build(); - } - - public static Props props(Connection conn, String tag, Long offset, Duration refreshInterval) { - return Props.create( - MyEventsByTagJavaPublisher.class, - () -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); - } - - @Override - public void postStop() { - continueTask.cancel(); - } - - private void query() { - if (buf.isEmpty()) { - final String query = - "SELECT id, persistent_repr " - + "FROM journal WHERE tag = ? AND id > ? " - + "ORDER BY id LIMIT ?"; - - try (PreparedStatement s = connection.prepareStatement(query)) { - s.setString(1, tag); - s.setLong(2, currentOffset); - s.setLong(3, LIMIT); - try (ResultSet rs = s.executeQuery()) { - - final List> res = new ArrayList<>(LIMIT); - while (rs.next()) res.add(Pair.create(rs.getLong(1), rs.getBytes(2))); - - if (!res.isEmpty()) { - currentOffset = res.get(res.size() - 1).first(); - } - - buf = - res.stream() - .map( - in -> { - final Long id = in.first(); - final byte[] bytes = in.second(); - - final PersistentRepr p = - serialization.deserialize(bytes, PersistentRepr.class).get(); - - return new EventEnvelope( - Offset.sequence(id), p.persistenceId(), p.sequenceNr(), p.payload()); - }) - .collect(toList()); - } - } catch (Exception e) { - onErrorThenStop(e); - } - } - } - - private void deliverBuf() { - while (totalDemand() > 0 && !buf.isEmpty()) onNext(buf.remove(0)); - } -} -// #events-by-tag-publisher diff --git a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagSource.java b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagSource.java new file mode 100644 index 0000000000..143a8e2af6 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagSource.java @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.persistence.query; + +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.persistence.PersistentRepr; +import akka.persistence.query.EventEnvelope; +import akka.persistence.query.Offset; +import akka.serialization.Serialization; +import akka.serialization.SerializationExtension; +import akka.stream.*; +import akka.stream.stage.*; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.time.Duration; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +// #events-by-tag-publisher +public class MyEventsByTagSource extends GraphStage> { + public Outlet out = Outlet.create("MyEventByTagSource.out"); + private static final String QUERY = + "SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload " + + "FROM journal WHERE tag = ? AND id > ? " + + "ORDER BY id LIMIT ?"; + + enum Continue { + INSTANCE; + } + + private static final int LIMIT = 1000; + private final Connection connection; + private final String tag; + private final long initialOffset; + private final Duration refreshInterval; + + // assumes a shared connection, could also be a factory for creating connections/pool + public MyEventsByTagSource( + Connection connection, String tag, long initialOffset, Duration refreshInterval) { + this.connection = connection; + this.tag = tag; + this.initialOffset = initialOffset; + this.refreshInterval = refreshInterval; + } + + @Override + public Attributes initialAttributes() { + return Attributes.apply(ActorAttributes.IODispatcher()); + } + + @Override + public SourceShape shape() { + return SourceShape.of(out); + } + + @Override + public GraphStageLogic createLogic(Attributes inheritedAttributes) { + return new TimerGraphStageLogic(shape()) { + private ActorSystem system = ((ActorMaterializer) materializer()).system(); + private long currentOffset = initialOffset; + private List buf = new LinkedList<>(); + private final Serialization serialization = SerializationExtension.get(system); + + @Override + public void preStart() { + schedulePeriodically(Continue.INSTANCE, refreshInterval); + } + + @Override + public void onTimer(Object timerKey) { + query(); + deliver(); + } + + private void deliver() { + if (isAvailable(out) && !buf.isEmpty()) { + push(out, buf.remove(0)); + } + } + + private void query() { + if (buf.isEmpty()) { + + try (PreparedStatement s = connection.prepareStatement(QUERY)) { + s.setString(1, tag); + s.setLong(2, currentOffset); + s.setLong(3, LIMIT); + try (ResultSet rs = s.executeQuery()) { + final List res = new ArrayList<>(LIMIT); + while (rs.next()) { + Object deserialized = + serialization + .deserialize( + rs.getBytes("payload"), + rs.getInt("serializer_id"), + rs.getString("serializer_manifest")) + .get(); + currentOffset = rs.getLong("id"); + res.add( + new EventEnvelope( + Offset.sequence(currentOffset), + rs.getString("persistence_id"), + rs.getLong("seq_nr"), + deserialized)); + } + buf = res; + } + } catch (Exception e) { + failStage(e); + } + } + } + + { + setHandler( + out, + new AbstractOutHandler() { + @Override + public void onPull() { + query(); + deliver(); + } + }); + } + }; + } +} +// #events-by-tag-publisher diff --git a/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java b/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java deleted file mode 100644 index ba9366850c..0000000000 --- a/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package jdocs.stream; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.stream.ActorMaterializer; -import akka.stream.Materializer; -import akka.stream.actor.AbstractActorPublisher; -import akka.stream.actor.ActorPublisherMessage; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -public class ActorPublisherDocTest extends AbstractJavaTest { - - static ActorSystem system; - static Materializer mat; - - @BeforeClass - public static void setup() { - system = ActorSystem.create("ActorPublisherDocTest"); - mat = ActorMaterializer.create(system); - } - - @AfterClass - public static void tearDown() { - TestKit.shutdownActorSystem(system); - system = null; - mat = null; - } - - // #job-manager - public static class JobManagerProtocol { - public static final class Job { - public final String payload; - - public Job(String payload) { - this.payload = payload; - } - } - - public static class JobAcceptedMessage { - @Override - public String toString() { - return "JobAccepted"; - } - } - - public static final JobAcceptedMessage JobAccepted = new JobAcceptedMessage(); - - public static class JobDeniedMessage { - @Override - public String toString() { - return "JobDenied"; - } - } - - public static final JobDeniedMessage JobDenied = new JobDeniedMessage(); - } - - public static class JobManager extends AbstractActorPublisher { - - public static Props props() { - return Props.create(JobManager.class); - } - - private final int MAX_BUFFER_SIZE = 100; - private final List buf = new ArrayList<>(); - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - JobManagerProtocol.Job.class, - job -> buf.size() == MAX_BUFFER_SIZE, - job -> { - getSender().tell(JobManagerProtocol.JobDenied, getSelf()); - }) - .match( - JobManagerProtocol.Job.class, - job -> { - getSender().tell(JobManagerProtocol.JobAccepted, getSelf()); - - if (buf.isEmpty() && totalDemand() > 0) onNext(job); - else { - buf.add(job); - deliverBuf(); - } - }) - .match(ActorPublisherMessage.Request.class, request -> deliverBuf()) - .match(ActorPublisherMessage.Cancel.class, cancel -> getContext().stop(getSelf())) - .build(); - } - - void deliverBuf() { - while (totalDemand() > 0) { - /* - * totalDemand is a Long and could be larger than - * what buf.splitAt can accept - */ - if (totalDemand() <= Integer.MAX_VALUE) { - final List took = - buf.subList(0, Math.min(buf.size(), (int) totalDemand())); - took.forEach(this::onNext); - buf.removeAll(took); - break; - } else { - final List took = - buf.subList(0, Math.min(buf.size(), Integer.MAX_VALUE)); - took.forEach(this::onNext); - buf.removeAll(took); - } - } - } - } - // #job-manager - - @Test - public void demonstrateActorPublisherUsage() { - new TestKit(system) { - private final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor()); - - { - // #actor-publisher-usage - final Source jobManagerSource = - Source.actorPublisher(JobManager.props()); - - final ActorRef ref = - jobManagerSource - .map(job -> job.payload.toUpperCase()) - .map( - elem -> { - System.out.println(elem); - return elem; - }) - .to(Sink.ignore()) - .run(mat); - - ref.tell(new JobManagerProtocol.Job("a"), ActorRef.noSender()); - ref.tell(new JobManagerProtocol.Job("b"), ActorRef.noSender()); - ref.tell(new JobManagerProtocol.Job("c"), ActorRef.noSender()); - // #actor-publisher-usage - - expectMsgEquals("A"); - expectMsgEquals("B"); - expectMsgEquals("C"); - } - }; - } -} diff --git a/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java b/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java deleted file mode 100644 index d75c95e803..0000000000 --- a/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package jdocs.stream; - -import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.routing.ActorRefRoutee; -import akka.routing.RoundRobinRoutingLogic; -import akka.routing.Routee; -import akka.routing.Router; -import akka.stream.ActorMaterializer; -import akka.stream.Materializer; -import akka.stream.actor.AbstractActorSubscriber; -import akka.stream.actor.ActorSubscriberMessage; -import akka.stream.actor.MaxInFlightRequestStrategy; -import akka.stream.actor.RequestStrategy; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.*; -import java.time.Duration; - -import static org.junit.Assert.assertEquals; - -public class ActorSubscriberDocTest extends AbstractJavaTest { - - static ActorSystem system; - static Materializer mat; - - @BeforeClass - public static void setup() { - system = ActorSystem.create("ActorSubscriberDocTest"); - mat = ActorMaterializer.create(system); - } - - @AfterClass - public static void tearDown() { - TestKit.shutdownActorSystem(system); - system = null; - mat = null; - } - - // #worker-pool - public static class WorkerPoolProtocol { - - public static class Msg { - public final int id; - public final ActorRef replyTo; - - public Msg(int id, ActorRef replyTo) { - this.id = id; - this.replyTo = replyTo; - } - - @Override - public String toString() { - return String.format("Msg(%s, %s)", id, replyTo); - } - } - - public static Msg msg(int id, ActorRef replyTo) { - return new Msg(id, replyTo); - } - - public static class Work { - public final int id; - - public Work(int id) { - this.id = id; - } - - @Override - public String toString() { - return String.format("Work(%s)", id); - } - } - - public static Work work(int id) { - return new Work(id); - } - - public static class Reply { - public final int id; - - public Reply(int id) { - this.id = id; - } - - @Override - public String toString() { - return String.format("Reply(%s)", id); - } - } - - public static Reply reply(int id) { - return new Reply(id); - } - - public static class Done { - public final int id; - - public Done(int id) { - this.id = id; - } - - @Override - public String toString() { - return String.format("Done(%s)", id); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Done done = (Done) o; - - if (id != done.id) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return id; - } - } - - public static Done done(int id) { - return new Done(id); - } - } - - public static class WorkerPool extends AbstractActorSubscriber { - - public static Props props() { - return Props.create(WorkerPool.class); - } - - final int MAX_QUEUE_SIZE = 10; - final Map queue = new HashMap<>(); - - final Router router; - - @Override - public RequestStrategy requestStrategy() { - return new MaxInFlightRequestStrategy(MAX_QUEUE_SIZE) { - @Override - public int inFlightInternally() { - return queue.size(); - } - }; - } - - public WorkerPool() { - final List routees = new ArrayList<>(); - for (int i = 0; i < 3; i++) - routees.add(new ActorRefRoutee(getContext().actorOf(Props.create(Worker.class)))); - router = new Router(new RoundRobinRoutingLogic(), routees); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - ActorSubscriberMessage.OnNext.class, - on -> on.element() instanceof WorkerPoolProtocol.Msg, - onNext -> { - WorkerPoolProtocol.Msg msg = (WorkerPoolProtocol.Msg) onNext.element(); - queue.put(msg.id, msg.replyTo); - - if (queue.size() > MAX_QUEUE_SIZE) - throw new RuntimeException("queued too many: " + queue.size()); - - router.route(WorkerPoolProtocol.work(msg.id), getSelf()); - }) - .match( - ActorSubscriberMessage.onCompleteInstance().getClass(), - complete -> { - if (queue.isEmpty()) { - getContext().stop(getSelf()); - } - }) - .match( - WorkerPoolProtocol.Reply.class, - reply -> { - int id = reply.id; - queue.get(id).tell(WorkerPoolProtocol.done(id), getSelf()); - queue.remove(id); - if (canceled() && queue.isEmpty()) { - getContext().stop(getSelf()); - } - }) - .build(); - } - } - - static class Worker extends AbstractActor { - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - WorkerPoolProtocol.Work.class, - work -> { - // ... - getSender().tell(WorkerPoolProtocol.reply(work.id), getSelf()); - }) - .build(); - } - } - // #worker-pool - - @Test - public void demonstrateActorPublisherUsage() { - new TestKit(system) { - - { - final ActorRef replyTo = getTestActor(); - - // #actor-subscriber-usage - final int N = 117; - final List data = new ArrayList<>(N); - for (int i = 0; i < N; i++) { - data.add(i); - } - - final ActorRef worker = - Source.from(data) - .map(i -> WorkerPoolProtocol.msg(i, replyTo)) - .runWith(Sink.actorSubscriber(WorkerPool.props()), mat); - // #actor-subscriber-usage - - watch(worker); - - List got = new ArrayList<>(receiveN(N)); - Collections.sort( - got, - new Comparator() { - @Override - public int compare(Object o1, Object o2) { - if (o1 instanceof WorkerPoolProtocol.Done - && o2 instanceof WorkerPoolProtocol.Done) { - return ((WorkerPoolProtocol.Done) o1).id - ((WorkerPoolProtocol.Done) o2).id; - } else return 0; - } - }); - int i = 0; - for (; i < N; i++) { - assertEquals( - String.format("Expected %d, but got %s", i, got.get(i)), - WorkerPoolProtocol.done(i), - got.get(i)); - } - assertEquals(String.format("Expected 117 messages but got %d", i), i, 117); - expectTerminated(Duration.ofSeconds(10), worker); - } - }; - } -} diff --git a/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagPublisher.scala deleted file mode 100644 index 9df0f04658..0000000000 --- a/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagPublisher.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package docs.persistence.query - -import akka.actor.Props -import akka.persistence.PersistentRepr -import akka.persistence.query.{ EventEnvelope, Sequence } -import akka.serialization.SerializationExtension -import akka.stream.actor.ActorPublisher -import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } - -import scala.concurrent.duration.FiniteDuration - -object MyEventsByTagPublisher { - def props(tag: String, offset: Long, refreshInterval: FiniteDuration): Props = - Props(new MyEventsByTagPublisher(tag, offset, refreshInterval)) -} - -//#events-by-tag-publisher -class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) - extends ActorPublisher[EventEnvelope] { - - private case object Continue - - private val connection: java.sql.Connection = ??? - - private val Limit = 1000 - private var currentOffset = offset - var buf = Vector.empty[EventEnvelope] - - import context.dispatcher - val continueTask = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue) - - override def postStop(): Unit = { - continueTask.cancel() - } - - def receive = { - case _: Request | Continue => - query() - deliverBuf() - - case Cancel => - context.stop(self) - } - - object Select { - private def statement() = connection.prepareStatement(""" - SELECT id, persistent_repr FROM journal - WHERE tag = ? AND id > ? - ORDER BY id LIMIT ? - """) - - def run(tag: String, from: Long, limit: Int): Vector[(Long, Array[Byte])] = { - val s = statement() - try { - s.setString(1, tag) - s.setLong(2, from) - s.setLong(3, limit) - val rs = s.executeQuery() - - val b = Vector.newBuilder[(Long, Array[Byte])] - while (rs.next()) b += (rs.getLong(1) -> rs.getBytes(2)) - b.result() - } finally s.close() - } - } - - def query(): Unit = - if (buf.isEmpty) { - try { - val result = Select.run(tag, currentOffset, Limit) - currentOffset = if (result.nonEmpty) result.last._1 else currentOffset - val serialization = SerializationExtension(context.system) - - buf = result.map { - case (id, bytes) => - val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get - EventEnvelope(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload) - } - } catch { - case e: Exception => - onErrorThenStop(e) - } - } - - final def deliverBuf(): Unit = - if (totalDemand > 0 && buf.nonEmpty) { - if (totalDemand <= Int.MaxValue) { - val (use, keep) = buf.splitAt(totalDemand.toInt) - buf = keep - use.foreach(onNext) - } else { - buf.foreach(onNext) - buf = Vector.empty - } - } -} -//#events-by-tag-publisher diff --git a/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagSource.scala b/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagSource.scala new file mode 100644 index 0000000000..7eb4deb8b9 --- /dev/null +++ b/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagSource.scala @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2015-2019 Lightbend Inc. + */ + +package docs.persistence.query + +import akka.persistence.query.{ EventEnvelope, Offset } +import akka.serialization.SerializationExtension +import akka.stream.{ ActorAttributes, ActorMaterializer, Attributes, Outlet, SourceShape } +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, TimerGraphStageLogic } + +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +//#events-by-tag-publisher +class MyEventsByTagSource(tag: String, offset: Long, refreshInterval: FiniteDuration) + extends GraphStage[SourceShape[EventEnvelope]] { + + private case object Continue + val out: Outlet[EventEnvelope] = Outlet("MyEventByTagSource.out") + override def shape: SourceShape[EventEnvelope] = SourceShape(out) + + override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with OutHandler { + lazy val system = materializer match { + case a: ActorMaterializer => a.system + case _ => + throw new IllegalStateException("EventsByTagStage requires ActorMaterializer") + } + private val Limit = 1000 + private val connection: java.sql.Connection = ??? + private var currentOffset = offset + private var buf = Vector.empty[EventEnvelope] + private val serialization = SerializationExtension(system) + + override def preStart(): Unit = { + schedulePeriodically(Continue, refreshInterval) + } + + override def onPull(): Unit = { + query() + tryPush() + } + + override def onDownstreamFinish(): Unit = { + // close connection if responsible for doing so + } + + private def query(): Unit = { + if (buf.isEmpty) { + try { + buf = Select.run(tag, currentOffset, Limit) + } catch { + case NonFatal(e) => + failStage(e) + } + } + } + + private def tryPush(): Unit = { + if (buf.nonEmpty && isAvailable(out)) { + push(out, buf.head) + buf = buf.tail + } + } + + override protected def onTimer(timerKey: Any): Unit = timerKey match { + case Continue => + query() + tryPush() + } + + object Select { + private def statement() = + connection.prepareStatement(""" + SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload + FROM journal WHERE tag = ? AND id > ? + ORDER BY id LIMIT ? + """) + + def run(tag: String, from: Long, limit: Int): Vector[EventEnvelope] = { + val s = statement() + try { + s.setString(1, tag) + s.setLong(2, from) + s.setLong(3, limit) + val rs = s.executeQuery() + + val b = Vector.newBuilder[EventEnvelope] + while (rs.next()) { + val deserialized = serialization + .deserialize(rs.getBytes("payload"), rs.getInt("serializer_id"), rs.getString("serializer_manifest")) + .get + currentOffset = rs.getLong("id") + b += EventEnvelope( + Offset.sequence(currentOffset), + rs.getString("persistence_id"), + rs.getLong("seq_nr"), + deserialized) + } + b.result() + } finally s.close() + } + } + } + +} +//#events-by-tag-publisher diff --git a/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala index 68cb1dd369..accd50bab2 100644 --- a/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -66,11 +66,10 @@ object PersistenceQueryDocSpec { */ override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match { case Sequence(offsetValue) => - val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) - Source.actorPublisher[EventEnvelope](props).mapMaterializedValue(_ => NotUsed) + Source.fromGraph(new MyEventsByTagSource(tag, offsetValue, refreshInterval)) case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive case _ => - throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") + throw new IllegalArgumentException("MyJournal does not support " + offset.getClass.getName + " offsets") } override def eventsByPersistenceId( diff --git a/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala b/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala deleted file mode 100644 index ea123b9ce1..0000000000 --- a/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package docs.stream - -import scala.annotation.tailrec -import akka.actor.Props -import akka.stream.ActorMaterializer -import akka.stream.actor.ActorPublisher -import akka.stream.scaladsl.{ Flow, Sink, Source } -import akka.testkit.AkkaSpec - -object ActorPublisherDocSpec { - - //#job-manager - object JobManager { - def props: Props = Props[JobManager] - - final case class Job(payload: String) - case object JobAccepted - case object JobDenied - } - - class JobManager extends ActorPublisher[JobManager.Job] { - import akka.stream.actor.ActorPublisherMessage._ - import JobManager._ - - val MaxBufferSize = 100 - var buf = Vector.empty[Job] - - def receive = { - case job: Job if buf.size == MaxBufferSize => - sender() ! JobDenied - case job: Job => - sender() ! JobAccepted - if (buf.isEmpty && totalDemand > 0) - onNext(job) - else { - buf :+= job - deliverBuf() - } - case Request(_) => - deliverBuf() - case Cancel => - context.stop(self) - } - - @tailrec final def deliverBuf(): Unit = - if (totalDemand > 0) { - /* - * totalDemand is a Long and could be larger than - * what buf.splitAt can accept - */ - if (totalDemand <= Int.MaxValue) { - val (use, keep) = buf.splitAt(totalDemand.toInt) - buf = keep - use.foreach(onNext) - } else { - val (use, keep) = buf.splitAt(Int.MaxValue) - buf = keep - use.foreach(onNext) - deliverBuf() - } - } - } - //#job-manager -} - -class ActorPublisherDocSpec extends AkkaSpec { - import ActorPublisherDocSpec._ - - implicit val materializer = ActorMaterializer() - - "illustrate usage of ActorPublisher" in { - def println(s: String): Unit = - testActor ! s - - //#actor-publisher-usage - val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props) - val ref = Flow[JobManager.Job] - .map(_.payload.toUpperCase) - .map { elem => - println(elem); elem - } - .to(Sink.ignore) - .runWith(jobManagerSource) - - ref ! JobManager.Job("a") - ref ! JobManager.Job("b") - ref ! JobManager.Job("c") - //#actor-publisher-usage - - expectMsg("A") - expectMsg("B") - expectMsg("C") - } - -} diff --git a/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala b/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala deleted file mode 100644 index c90b8f646c..0000000000 --- a/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package docs.stream - -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.routing.ActorRefRoutee -import akka.routing.RoundRobinRoutingLogic -import akka.routing.Router -import akka.stream.ActorMaterializer -import akka.stream.actor.ActorSubscriber -import akka.stream.actor.ActorSubscriberMessage -import akka.stream.actor.MaxInFlightRequestStrategy -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.testkit.AkkaSpec -import scala.concurrent.duration._ - -object ActorSubscriberDocSpec { - //#worker-pool - object WorkerPool { - case class Msg(id: Int, replyTo: ActorRef) - case class Work(id: Int) - case class Reply(id: Int) - case class Done(id: Int) - - def props: Props = Props(new WorkerPool) - } - - class WorkerPool extends ActorSubscriber { - import WorkerPool._ - import ActorSubscriberMessage._ - - val MaxQueueSize = 10 - var queue = Map.empty[Int, ActorRef] - - val router = { - val routees = Vector.fill(3) { - ActorRefRoutee(context.actorOf(Props[Worker])) - } - Router(RoundRobinRoutingLogic(), routees) - } - - override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) { - override def inFlightInternally: Int = queue.size - } - - def receive = { - case OnNext(Msg(id, replyTo)) => - queue += (id -> replyTo) - assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}") - router.route(Work(id), self) - case Reply(id) => - queue(id) ! Done(id) - queue -= id - if (canceled && queue.isEmpty) { - context.stop(self) - } - case OnComplete => - if (queue.isEmpty) { - context.stop(self) - } - } - } - - class Worker extends Actor { - import WorkerPool._ - def receive = { - case Work(id) => - // ... - sender() ! Reply(id) - } - } - //#worker-pool - -} - -class ActorSubscriberDocSpec extends AkkaSpec { - import ActorSubscriberDocSpec._ - - implicit val materializer = ActorMaterializer() - - "illustrate usage of ActorSubscriber" in { - val replyTo = testActor - - //#actor-subscriber-usage - val N = 117 - val worker = Source(1 to N).map(WorkerPool.Msg(_, replyTo)).runWith(Sink.actorSubscriber(WorkerPool.props)) - //#actor-subscriber-usage - - watch(worker) - receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet) - expectTerminated(worker, 10.seconds) - } - -} diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala deleted file mode 100644 index 04b8f69c98..0000000000 --- a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream.tck - -import akka.actor.Props -import akka.stream.actor.ActorPublisher -import akka.stream.actor.ActorPublisherMessage.Request -import akka.stream.tck.ActorPublisherTest.TestPublisher -import org.reactivestreams.Publisher - -object ActorPublisherTest { - - case object Produce - case object Loop - case object Complete - - class TestPublisher(allElements: Long) extends ActorPublisher[Int] { - - val source: Iterator[Int] = - (if (allElements == Long.MaxValue) 1 to Int.MaxValue else 0 until allElements.toInt).toIterator - - override def receive: Receive = { - case Request(elements) => - loopDemand() - - case Produce if totalDemand > 0 && !isCompleted && source.hasNext => onNext(source.next()) - case Produce if !isCompleted && !source.hasNext => onComplete() - case Produce if isCompleted => // no-op - case _ => // no-op - } - - def loopDemand(): Unit = { - val loopUntil = math.min(100, totalDemand) - (1 to loopUntil.toInt).foreach { _ => - self ! Produce - } - if (loopUntil > 100) self ! Loop - } - } - -} - -class ActorPublisherTest extends AkkaPublisherVerification[Int] { - - override def createPublisher(elements: Long): Publisher[Int] = { - val ref = system.actorOf(Props(classOf[TestPublisher], elements).withDispatcher("akka.test.stream-dispatcher")) - - ActorPublisher(ref) - } -} diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala deleted file mode 100644 index d575c59987..0000000000 --- a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream.tck - -import akka.actor.Props -import akka.stream.actor.ActorSubscriber -import akka.stream.actor.OneByOneRequestStrategy -import akka.stream.actor.RequestStrategy -import org.reactivestreams.Subscriber - -object ActorSubscriberOneByOneRequestTest { - class StrategySubscriber(val requestStrategy: RequestStrategy) extends ActorSubscriber { - - override def receive: Receive = { case _ => } - } -} - -class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerification[Int] { - import ActorSubscriberOneByOneRequestTest._ - - override def createSubscriber(): Subscriber[Int] = { - val props = Props(classOf[StrategySubscriber], OneByOneRequestStrategy) - ActorSubscriber(system.actorOf(props.withDispatcher("akka.test.stream-dispatcher"))) - } - - override def createElement(element: Int): Int = element -} diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java deleted file mode 100644 index 76edc7baad..0000000000 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.stream.actor; - -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.stream.StreamTest; -import akka.testkit.AkkaJUnitActorSystemResource; -import akka.stream.javadsl.Source; -import akka.testkit.AkkaSpec; -import akka.testkit.javadsl.TestKit; -import org.junit.ClassRule; -import org.junit.Test; -import org.reactivestreams.Publisher; - -import static akka.stream.actor.ActorPublisherMessage.Request; - -public class ActorPublisherTest extends StreamTest { - public ActorPublisherTest() { - super(actorSystemResource); - } - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf()); - - public static class TestPublisher extends UntypedActorPublisher { - - @Override - public void onReceive(Object msg) { - if (msg instanceof Request) { - onNext(1); - onComplete(); - } else if (msg == ActorPublisherMessage.cancelInstance()) { - getContext().stop(getSelf()); - } else { - unhandled(msg); - } - } - } - - @Test - public void mustHaveJavaAPI() { - final TestKit probe = new TestKit(system); - final ActorRef ref = - system.actorOf( - Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher")); - final Publisher publisher = UntypedActorPublisher.create(ref); - Source.fromPublisher(publisher) - .runForeach( - new akka.japi.function.Procedure() { - private static final long serialVersionUID = 1L; - - @Override - public void apply(Integer elem) throws Exception { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }, - materializer); - probe.expectMsgEquals(1); - } -} diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java deleted file mode 100644 index 5787bb28a1..0000000000 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.stream.actor; - -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.stream.StreamTest; -import akka.testkit.AkkaJUnitActorSystemResource; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.testkit.AkkaSpec; -import akka.testkit.javadsl.TestKit; -import org.junit.ClassRule; -import org.junit.Test; -import org.reactivestreams.Subscriber; - -import java.util.Arrays; - -import static akka.stream.actor.ActorSubscriberMessage.OnError; -import static akka.stream.actor.ActorSubscriberMessage.OnNext; - -public class ActorSubscriberTest extends StreamTest { - public ActorSubscriberTest() { - super(actorSystemResource); - } - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("ActorSubscriberTest", AkkaSpec.testConf()); - - public static class TestSubscriber extends UntypedActorSubscriber { - - final ActorRef probe; - - public TestSubscriber(ActorRef probe) { - this.probe = probe; - } - - @Override - public RequestStrategy requestStrategy() { - return ZeroRequestStrategy.getInstance(); - } - - @Override - public void onReceive(Object msg) { - if (msg.equals("run")) { - request(4); - } else if (msg instanceof OnNext) { - probe.tell(((OnNext) msg).element(), getSelf()); - } else if (msg == ActorSubscriberMessage.onCompleteInstance()) { - probe.tell("done", getSelf()); - getContext().stop(getSelf()); - } else if (msg instanceof OnError) { - probe.tell("err", getSelf()); - getContext().stop(getSelf()); - } else { - unhandled(msg); - } - } - } - - @Test - public void mustHaveJavaAPI() { - final TestKit probe = new TestKit(system); - final ActorRef ref = - system.actorOf( - Props.create(TestSubscriber.class, probe.getRef()) - .withDispatcher("akka.test.stream-dispatcher")); - final Subscriber subscriber = UntypedActorSubscriber.create(ref); - final java.lang.Iterable input = Arrays.asList(1, 2, 3); - - Source.from(input).runWith(Sink.fromSubscriber(subscriber), materializer); - - ref.tell("run", null); - probe.expectMsgEquals(1); - probe.expectMsgEquals(2); - probe.expectMsgEquals(3); - probe.expectMsgEquals("done"); - } -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala deleted file mode 100644 index d3c05f80fa..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ /dev/null @@ -1,508 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream.actor - -import akka.actor.{ ActorRef, PoisonPill, Props } -import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, ClosedShape } -import akka.stream.scaladsl._ -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl.StreamTestKit._ -import akka.stream.impl.ReactiveStreamsCompliance -import akka.testkit.TestEvent.Mute -import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } -import scala.annotation.tailrec -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.actor.Stash - -object ActorPublisherSpec { - - val config = - s""" - my-dispatcher1 = $${akka.test.stream-dispatcher} - my-dispatcher2 = $${akka.test.stream-dispatcher} - """ - - def testPublisherProps(probe: ActorRef, useTestDispatcher: Boolean = true): Props = { - val p = Props(new TestPublisher(probe)) - if (useTestDispatcher) p.withDispatcher("akka.test.stream-dispatcher") - else p - } - - def testPublisherWithStashProps(probe: ActorRef, useTestDispatcher: Boolean = true): Props = { - val p = Props(new TestPublisherWithStash(probe)) - if (useTestDispatcher) p.withDispatcher("akka.test.stream-dispatcher") - else p - } - - case class TotalDemand(elements: Long) - case class Produce(elem: String) - case class Err(reason: String) - case class ErrThenStop(reason: String) - case object Boom - case object Complete - case object CompleteThenStop - case object ThreadName - - class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { - import akka.stream.actor.ActorPublisherMessage._ - - def receive = { - case Request(element) => probe ! TotalDemand(totalDemand) - case Produce(elem) => onNext(elem) - case Err(reason) => onError(new RuntimeException(reason) with NoStackTrace) - case ErrThenStop(reason) => onErrorThenStop(new RuntimeException(reason) with NoStackTrace) - case Complete => onComplete() - case CompleteThenStop => onCompleteThenStop() - case Boom => throw new RuntimeException("boom") with NoStackTrace - case ThreadName => probe ! Thread.currentThread.getName - } - } - - class TestPublisherWithStash(probe: ActorRef) extends TestPublisher(probe) with Stash { - - override def receive = stashing - - def stashing: Receive = { - case "unstash" => - unstashAll() - context.become(super.receive) - case _ => stash() - } - - } - - def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher") - - class Sender extends ActorPublisher[Int] { - import akka.stream.actor.ActorPublisherMessage._ - - var buf = Vector.empty[Int] - - def receive = { - case i: Int => - if (buf.isEmpty && totalDemand > 0) - onNext(i) - else { - buf :+= i - deliverBuf() - } - case Request(_) => - deliverBuf() - case Cancel => - context.stop(self) - } - - @tailrec - final def deliverBuf(): Unit = - if (totalDemand > 0) { - if (totalDemand <= Int.MaxValue) { - val (use, keep) = buf.splitAt(totalDemand.toInt) - buf = keep - use.foreach(onNext) - } else { - val (use, keep) = buf.splitAt(Int.MaxValue) - buf = keep - use.foreach(onNext) - deliverBuf() - } - } - } - - def timeoutingProps(probe: ActorRef, timeout: FiniteDuration): Props = - Props(classOf[TimeoutingPublisher], probe, timeout).withDispatcher("akka.test.stream-dispatcher") - - class TimeoutingPublisher(probe: ActorRef, timeout: FiniteDuration) extends ActorPublisher[Int] { - import akka.stream.actor.ActorPublisherMessage._ - import context.dispatcher - - override def subscriptionTimeout = timeout - - override def receive: Receive = { - case Request(_) => - onNext(1) - case SubscriptionTimeoutExceeded => - probe ! "timed-out" - context.system.scheduler.scheduleOnce(timeout, probe, "cleaned-up") - context.system.scheduler.scheduleOnce(timeout, self, PoisonPill) - } - } - - def receiverProps(probe: ActorRef): Props = - Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher") - - class Receiver(probe: ActorRef) extends ActorSubscriber { - import akka.stream.actor.ActorSubscriberMessage._ - - override val requestStrategy = WatermarkRequestStrategy(10) - - def receive = { - case OnNext(s: String) => - probe ! s - } - } - -} - -class ActorPublisherSpec extends StreamSpec(ActorPublisherSpec.config) with ImplicitSender { - - import akka.stream.actor.ActorPublisherSpec._ - - system.eventStream.publish(Mute(EventFilter[IllegalStateException]())) - - "An ActorPublisher" must { - - "accumulate demand" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val p = ActorPublisher[String](ref) - val s = TestSubscriber.probe[String]() - p.subscribe(s) - s.request(2) - probe.expectMsg(TotalDemand(2)) - s.request(3) - probe.expectMsg(TotalDemand(5)) - s.cancel() - } - - "allow onNext up to requested elements, but not more" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val p = ActorPublisher[String](ref) - val s = TestSubscriber.probe[String]() - p.subscribe(s) - s.request(2) - ref ! Produce("elem-1") - ref ! Produce("elem-2") - ref ! Produce("elem-3") - s.expectNext("elem-1") - s.expectNext("elem-2") - s.expectNoMsg(300.millis) - s.cancel() - } - - "signal error" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - ref ! Err("wrong") - s.expectSubscription() - s.expectError().getMessage should be("wrong") - } - - "not terminate after signalling onError" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscription() - probe.watch(ref) - ref ! Err("wrong") - s.expectError().getMessage should be("wrong") - probe.expectNoMsg(200.millis) - } - - "terminate after signalling onErrorThenStop" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscription() - probe.watch(ref) - ref ! ErrThenStop("wrong") - s.expectError().getMessage should be("wrong") - probe.expectTerminated(ref, 3.seconds) - } - - "signal error before subscribe" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - ref ! Err("early err") - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscriptionAndError().getMessage should be("early err") - } - - "drop onNext elements after cancel" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val p = ActorPublisher[String](ref) - val s = TestSubscriber.probe[String]() - p.subscribe(s) - s.request(2) - ref ! Produce("elem-1") - s.cancel() - ref ! Produce("elem-2") - s.expectNext("elem-1") - s.expectNoMsg(300.millis) - } - - "remember requested after restart" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val p = ActorPublisher[String](ref) - val s = TestSubscriber.probe[String]() - p.subscribe(s) - s.request(3) - probe.expectMsg(TotalDemand(3)) - ref ! Produce("elem-1") - ref ! Boom - ref ! Produce("elem-2") - s.expectNext("elem-1") - s.expectNext("elem-2") - s.request(5) - probe.expectMsg(TotalDemand(6)) - s.cancel() - } - - "signal onComplete" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.probe[String]() - ActorPublisher[String](ref).subscribe(s) - s.request(3) - ref ! Produce("elem-1") - ref ! Complete - s.expectNext("elem-1") - s.expectComplete() - } - - "not terminate after signalling onComplete" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - val sub = s.expectSubscription() - sub.request(3) - probe.expectMsg(TotalDemand(3)) - probe.watch(ref) - ref ! Produce("elem-1") - ref ! Complete - s.expectNext("elem-1") - s.expectComplete() - probe.expectNoMsg(200.millis) - } - - "terminate after signalling onCompleteThenStop" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - val sub = s.expectSubscription() - sub.request(3) - probe.expectMsg(TotalDemand(3)) - probe.watch(ref) - ref ! Produce("elem-1") - ref ! CompleteThenStop - s.expectNext("elem-1") - s.expectComplete() - probe.expectTerminated(ref, 3.seconds) - } - - "signal immediate onComplete" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - ref ! Complete - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscriptionAndComplete() - } - - "only allow one subscriber" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscription() - val s2 = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s2) - s2.expectSubscriptionAndError().getMessage should be( - s"ActorPublisher ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}") - } - - "can not subscribe the same subscriber multiple times" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscription() - ActorPublisher[String](ref).subscribe(s) - s.expectError().getMessage should be(ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes) - } - - "signal onCompete when actor is stopped" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherProps(probe.ref)) - val s = TestSubscriber.manualProbe[String]() - ActorPublisher[String](ref).subscribe(s) - s.expectSubscription() - ref ! PoisonPill - s.expectComplete() - } - - "work together with Flow and ActorSubscriber" in { - implicit val materializer = ActorMaterializer() - assertAllStagesStopped { - val probe = TestProbe() - - val source: Source[Int, ActorRef] = Source.actorPublisher(senderProps) - val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref)) - - val (snd, rcv) = source - .collect { - case n if n % 2 == 0 => "elem-" + n - } - .toMat(sink)(Keep.both) - .run() - - (1 to 3).foreach { snd ! _ } - probe.expectMsg("elem-2") - - (4 to 500).foreach { n => - if (n % 19 == 0) Thread.sleep(50) // simulate bursts - snd ! n - } - - (4 to 500 by 2).foreach { n => - probe.expectMsg("elem-" + n) - } - - watch(snd) - rcv ! PoisonPill - expectTerminated(snd) - } - } - - "work in a GraphDSL" in { - implicit val materializer = ActorMaterializer() - val probe1 = TestProbe() - val probe2 = TestProbe() - - val senderRef1 = system.actorOf(senderProps) - val source1 = Source.fromPublisher(ActorPublisher[Int](senderRef1)) - - val sink1 = Sink.fromSubscriber(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref)))) - val sink2: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe2.ref)) - - val senderRef2 = RunnableGraph - .fromGraph(GraphDSL.create(Source.actorPublisher[Int](senderProps)) { implicit b => source2 => - import GraphDSL.Implicits._ - - val merge = b.add(Merge[Int](2)) - val bcast = b.add(Broadcast[String](2)) - - source1 ~> merge.in(0) - source2.out ~> merge.in(1) - - merge.out.map(_.toString) ~> bcast.in - - bcast.out(0).map(_ + "mark") ~> sink1 - bcast.out(1) ~> sink2 - ClosedShape - }) - .run() - - (0 to 10).foreach { - senderRef1 ! _ - senderRef2 ! _ - } - - (0 to 10).foreach { msg => - probe1.expectMsg(msg.toString + "mark") - probe2.expectMsg(msg.toString) - } - } - - "be able to define a subscription-timeout, after which it should shut down" in { - implicit val materializer = ActorMaterializer() - assertAllStagesStopped { - val timeout = 150.millis - val a = system.actorOf(timeoutingProps(testActor, timeout)) - val pub = ActorPublisher(a) - - // don't subscribe for `timeout` millis, so it will shut itself down - expectMsg("timed-out") - - // now subscribers will already be rejected, while the actor could perform some clean-up - val sub = TestSubscriber.manualProbe() - pub.subscribe(sub) - sub.expectSubscriptionAndError() - - expectMsg("cleaned-up") - // termination is tiggered by user code - watch(a) - expectTerminated(a) - } - } - - "be able to define a subscription-timeout, which is cancelled by the first incoming Subscriber" in { - implicit val materializer = ActorMaterializer() - val timeout = 500.millis - val sub = TestSubscriber.manualProbe[Int]() - - within(2 * timeout) { - val pub = ActorPublisher(system.actorOf(timeoutingProps(testActor, timeout))) - - // subscribe right away, should cancel subscription-timeout - pub.subscribe(sub) - sub.expectSubscription() - - expectNoMsg() - } - } - - "use dispatcher from materializer settings" in { - implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDispatcher("my-dispatcher1")) - val s = TestSubscriber.manualProbe[String]() - val ref = - Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false)).to(Sink.fromSubscriber(s)).run() - ref ! ThreadName - expectMsgType[String] should include("my-dispatcher1") - } - - "use dispatcher from operation attributes" in { - implicit val materializer = ActorMaterializer() - val s = TestSubscriber.manualProbe[String]() - val ref = Source - .actorPublisher(testPublisherProps(testActor, useTestDispatcher = false)) - .withAttributes(ActorAttributes.dispatcher("my-dispatcher1")) - .to(Sink.fromSubscriber(s)) - .run() - ref ! ThreadName - expectMsgType[String] should include("my-dispatcher1") - } - - "use dispatcher from props" in { - implicit val materializer = ActorMaterializer() - val s = TestSubscriber.manualProbe[String]() - val ref = Source - .actorPublisher(testPublisherProps(testActor, useTestDispatcher = false).withDispatcher("my-dispatcher1")) - .withAttributes(ActorAttributes.dispatcher("my-dispatcher2")) - .to(Sink.fromSubscriber(s)) - .run() - ref ! ThreadName - expectMsgType[String] should include("my-dispatcher1") - } - - "handle stash" in { - val probe = TestProbe() - val ref = system.actorOf(testPublisherWithStashProps(probe.ref)) - val p = ActorPublisher[String](ref) - val s = TestSubscriber.probe[String]() - p.subscribe(s) - s.request(2) - s.request(3) - ref ! "unstash" - probe.expectMsg(TotalDemand(5)) - probe.expectMsg(TotalDemand(5)) - s.request(4) - probe.expectMsg(TotalDemand(9)) - s.cancel() - } - - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala deleted file mode 100644 index 74ebb82656..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream.actor - -import akka.actor.{ Actor, ActorRef, Props } -import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router } -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Sink, Source } -import akka.stream.testkit.StreamSpec -import akka.testkit.ImplicitSender -import org.reactivestreams.Subscription - -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace - -object ActorSubscriberSpec { - - def manualSubscriberProps(probe: ActorRef): Props = - Props(new ManualSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher") - - class ManualSubscriber(probe: ActorRef) extends ActorSubscriber { - import ActorSubscriberMessage._ - - override val requestStrategy = ZeroRequestStrategy - - def receive = { - case next @ OnNext(elem) => probe ! next - case OnComplete => probe ! OnComplete - case err @ OnError(cause) => probe ! err - case "ready" => request(elements = 2) - case "boom" => throw new RuntimeException("boom") with NoStackTrace - case "requestAndCancel" => { request(1); cancel() } - case "cancel" => cancel() - } - } - - def immediatelyCancelledSubscriberProps(probe: ActorRef): Props = - Props(new ImmediatelyCancelledSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher") - - class ImmediatelyCancelledSubscriber(probe: ActorRef) extends ManualSubscriber(probe) { - override val requestStrategy = ZeroRequestStrategy - override def preStart() = { - cancel() - super.preStart() - } - } - - def requestStrategySubscriberProps(probe: ActorRef, strat: RequestStrategy): Props = - Props(new RequestStrategySubscriber(probe, strat)).withDispatcher("akka.test.stream-dispatcher") - - class RequestStrategySubscriber(probe: ActorRef, strat: RequestStrategy) extends ActorSubscriber { - import ActorSubscriberMessage._ - - override val requestStrategy = strat - - def receive = { - case next @ OnNext(elem) => probe ! next - case OnComplete => probe ! OnComplete - } - } - - case class Msg(id: Int, replyTo: ActorRef) - case class Work(id: Int) - case class Reply(id: Int) - case class Done(id: Int) - - def streamerProps: Props = - Props(new Streamer).withDispatcher("akka.test.stream-dispatcher") - - class Streamer extends ActorSubscriber { - import ActorSubscriberMessage._ - var queue = Map.empty[Int, ActorRef] - - val router = { - val routees = Vector.fill(3) { - ActorRefRoutee(context.actorOf(Props[Worker].withDispatcher(context.props.dispatcher))) - } - Router(RoundRobinRoutingLogic(), routees) - } - - override val requestStrategy = new MaxInFlightRequestStrategy(max = 10) { - override def inFlightInternally: Int = queue.size - } - - def receive = { - case OnNext(Msg(id, replyTo)) => - queue += (id -> replyTo) - assert(queue.size <= 10, s"queued too many: ${queue.size}") - router.route(Work(id), self) - case Reply(id) => - queue(id) ! Done(id) - queue -= id - } - } - - class Worker extends Actor { - def receive = { - case Work(id) => - // ... - sender() ! Reply(id) - } - } -} - -class ActorSubscriberSpec extends StreamSpec with ImplicitSender { - import ActorSubscriberMessage._ - import ActorSubscriberSpec._ - - implicit val materializer = ActorMaterializer() - - "An ActorSubscriber" must { - - "receive requested elements" in { - val ref = Source(List(1, 2, 3)).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) - expectNoMsg(200.millis) - ref ! "ready" // requesting 2 - expectMsg(OnNext(1)) - expectMsg(OnNext(2)) - expectNoMsg(200.millis) - ref ! "ready" - expectMsg(OnNext(3)) - expectMsg(OnComplete) - } - - "signal error" in { - val e = new RuntimeException("simulated") with NoStackTrace - val ref = Source.fromIterator(() => throw e).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) - ref ! "ready" - expectMsg(OnError(e)) - } - - "remember requested after restart" in { - // creating actor with default supervision, because stream supervisor default strategy is to stop - val ref = system.actorOf(manualSubscriberProps(testActor)) - Source(1 to 7).runWith(Sink.fromSubscriber(ActorSubscriber[Int](ref))) - ref ! "ready" - expectMsg(OnNext(1)) - expectMsg(OnNext(2)) - expectNoMsg(200.millis) // nothing requested - ref ! "boom" - ref ! "ready" - ref ! "ready" - ref ! "boom" - (3 to 6).foreach { n => - expectMsg(OnNext(n)) - } - expectNoMsg(200.millis) - ref ! "ready" - expectMsg(OnNext(7)) - expectMsg(OnComplete) - } - - "not deliver more after cancel" in { - val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) - ref ! "ready" - expectMsg(OnNext(1)) - expectMsg(OnNext(2)) - ref ! "requestAndCancel" - expectNoMsg(200.millis) - } - - "terminate after cancel" in { - val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) - watch(ref) - ref ! "requestAndCancel" - expectTerminated(ref, 200.millis) - } - - "cancel incoming subscription when cancel() was called before it arrived" in { - val ref = system.actorOf(immediatelyCancelledSubscriberProps(testActor)) - val sub = ActorSubscriber(ref) - watch(ref) - expectNoMsg(200.millis) - - sub.onSubscribe(new Subscription { - override def cancel(): Unit = testActor ! "cancel" - override def request(n: Long): Unit = () - }) - expectMsg("cancel") - expectTerminated(ref, 200.millis) - } - - "work with OneByOneRequestStrategy" in { - Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))) - for (n <- 1 to 17) expectMsg(OnNext(n)) - expectMsg(OnComplete) - } - - "work with WatermarkRequestStrategy" in { - Source(1 to 17).runWith( - Sink.actorSubscriber(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10)))) - for (n <- 1 to 17) expectMsg(OnNext(n)) - expectMsg(OnComplete) - } - - "suport custom max in flight request strategy with child workers" in { - val N = 117 - Source(1 to N).map(Msg(_, testActor)).runWith(Sink.actorSubscriber(streamerProps)) - receiveN(N).toSet should be((1 to N).map(Done).toSet) - } - - } - - "Provided RequestStragies" must { - "implement OneByOne correctly" in { - val strat = OneByOneRequestStrategy - strat.requestDemand(0) should be(1) - strat.requestDemand(1) should be(0) - strat.requestDemand(2) should be(0) - } - - "implement Zero correctly" in { - val strat = ZeroRequestStrategy - strat.requestDemand(0) should be(0) - strat.requestDemand(1) should be(0) - strat.requestDemand(2) should be(0) - } - - "implement Watermark correctly" in { - val strat = WatermarkRequestStrategy(highWatermark = 10) - strat.requestDemand(0) should be(10) - strat.requestDemand(9) should be(0) - strat.requestDemand(6) should be(0) - strat.requestDemand(5) should be(0) - strat.requestDemand(4) should be(6) - } - - "implement MaxInFlight with batchSize=1 correctly" in { - var queue = Set.empty[String] - val strat = new MaxInFlightRequestStrategy(max = 10) { - override def batchSize: Int = 1 - def inFlightInternally: Int = queue.size - } - strat.requestDemand(0) should be(10) - strat.requestDemand(9) should be(1) - queue += "a" - strat.requestDemand(0) should be(9) - strat.requestDemand(8) should be(1) - strat.requestDemand(9) should be(0) - queue += "b" - queue += "c" - strat.requestDemand(5) should be(2) - ('d' to 'j').foreach { queue += _.toString } - queue.size should be(10) - strat.requestDemand(0) should be(0) - strat.requestDemand(1) should be(0) - queue += "g" - strat.requestDemand(0) should be(0) - strat.requestDemand(1) should be(0) - } - - "implement MaxInFlight with batchSize=3 correctly" in { - var queue = Set.empty[String] - val strat = new MaxInFlightRequestStrategy(max = 10) { - override def batchSize: Int = 3 - override def inFlightInternally: Int = queue.size - } - strat.requestDemand(0) should be(10) - queue += "a" - strat.requestDemand(9) should be(0) - queue += "b" - strat.requestDemand(8) should be(0) - queue += "c" - strat.requestDemand(7) should be(0) - queue += "d" - strat.requestDemand(6) should be(0) - queue -= "a" // 3 remaining in queue - strat.requestDemand(6) should be(0) - queue -= "b" // 2 remaining in queue - strat.requestDemand(6) should be(0) - queue -= "c" // 1 remaining in queue - strat.requestDemand(6) should be(3) - } - - "implement MaxInFlight with batchSize=max correctly" in { - var queue = Set.empty[String] - val strat = new MaxInFlightRequestStrategy(max = 3) { - override def batchSize: Int = 5 // will be bounded to max - override def inFlightInternally: Int = queue.size - } - strat.requestDemand(0) should be(3) - queue += "a" - strat.requestDemand(2) should be(0) - queue += "b" - strat.requestDemand(1) should be(0) - queue += "c" - strat.requestDemand(0) should be(0) - queue -= "a" - strat.requestDemand(0) should be(0) - queue -= "b" - strat.requestDemand(0) should be(0) - queue -= "c" - strat.requestDemand(0) should be(3) - } - - } - -} diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index b112218bee..4855fd6e51 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -84,3 +84,17 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedInterval") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StopTimed") + +#26187 Remove ActorPublisher/Subscriber +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisher") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorSubscriber$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorPublisher") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithUnboundedStash") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorPublisher$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorSubscriber") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorSubscriber") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisher$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithUnrestrictedStash") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorSubscriber$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithStash") + diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index e0ef3c09f5..8f35d8b15f 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -459,91 +459,3 @@ private[akka] class ActorPublisherState extends Extension { def remove(ref: ActorRef): Unit = state.remove(ref) } - -/** - * Java API - */ -object UntypedActorPublisher { - - /** - * Java API: Create a [[org.reactivestreams.Publisher]] backed by a [[UntypedActorPublisher]] actor. It can be - * attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a - * [[akka.stream.javadsl.Flow]]. - */ - def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref) -} - -/** - * Java API - * @see [[akka.stream.actor.ActorPublisher]] - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T] - -/** - * Java API compatible with lambda expressions - */ -object AbstractActorPublisher { - - /** - * Java API compatible with lambda expressions: Create a [[org.reactivestreams.Publisher]] - * backed by a [[AbstractActorPublisher]] actor. It can be attached to a [[org.reactivestreams.Subscriber]] - * or be used as an input source for a [[akka.stream.javadsl.Flow]]. - */ - def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref) -} - -/** - * Java API compatible with lambda expressions - * @see [[akka.stream.actor.ActorPublisher]] - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T] - -/** - * Java API compatible with lambda expressions. - * This class adds a Stash to {@link AbstractActorPublisher}. - * @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithStash]] - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class AbstractActorPublisherWithStash[T] extends AbstractActor with ActorPublisher[T] with Stash - -/** - * Java API compatible with lambda expressions. - * This class adds an unbounded Stash to {@link AbstractActorPublisher}. - * @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnboundedStash]] - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class AbstractActorPublisherWithUnboundedStash[T] - extends AbstractActor - with ActorPublisher[T] - with UnboundedStash - -/** - * Java API compatible with lambda expressions. - * This class adds an unrestricted Stash to {@link AbstractActorPublisher}. - * @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnrestrictedStash]] - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class AbstractActorPublisherWithUnrestrictedStash[T] - extends AbstractActor - with ActorPublisher[T] - with UnrestrictedStash diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index 64449af296..a18d421245 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -341,55 +341,3 @@ private[akka] class ActorSubscriberState extends Extension { def remove(ref: ActorRef): Unit = state.remove(ref) } - -/** - * Java API - */ -object UntypedActorSubscriber { - - /** - * Java API: Attach a [[UntypedActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] - * to a [[org.reactivestreams.Publisher]] or [[akka.stream.javadsl.Flow]]. - */ - def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref) -} - -/** - * Java API - * @see [[akka.stream.actor.ActorSubscriber]] - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber - -/** - * Java API compatible with lambda expressions - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -object AbstractActorSubscriber { - - /** - * Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor - * as a [[org.reactivestreams.Subscriber]] o a [[org.reactivestreams.Publisher]] or - * [[akka.stream.javadsl.Flow]]. - */ - def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref) -} - -/** - * Java API compatible with lambda expressions - * @see [[akka.stream.actor.ActorSubscriber]] - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber