Remove docs for ActorPub/Sub and write persistence query docs as stages (#26844)

* use separate db columns
* Use io dispatcher for sample stage
This commit is contained in:
Christopher Batey 2019-05-03 11:36:50 +01:00 committed by Patrik Nordwall
parent 2bbf13f707
commit c65bf2d276
22 changed files with 272 additions and 2194 deletions

View file

@ -5,7 +5,6 @@
package akka.actor.typed package akka.actor.typed
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import akka.Done import akka.Done
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext

View file

@ -300,13 +300,13 @@ Scala
Java Java
: @@snip [PersistenceQueryDocTest.java](/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java) { #my-read-journal } : @@snip [PersistenceQueryDocTest.java](/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java) { #my-read-journal }
And the `eventsByTag` could be backed by such an Actor for example: And the `eventsByTag` could be backed by a GraphStage for example:
Scala Scala
: @@snip [MyEventsByTagPublisher.scala](/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagPublisher.scala) { #events-by-tag-publisher } : @@snip [MyEventsByTagSource.scala](/akka-docs/src/test/scala/docs/persistence/query/MyEventsByTagSource.scala) { #events-by-tag-publisher }
Java Java
: @@snip [MyEventsByTagJavaPublisher.java](/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java) { #events-by-tag-publisher } : @@snip [MyEventsByTagSource.java](/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagSource.java) { #events-by-tag-publisher }
The `ReadJournalProvider` class must have a constructor with one of these signatures: The `ReadJournalProvider` class must have a constructor with one of these signatures:

View file

@ -557,136 +557,3 @@ Java
Please note that a factory is necessary to achieve reusability of the resulting `Flow`. Please note that a factory is necessary to achieve reusability of the resulting `Flow`.
### Implementing Reactive Streams Publisher or Subscriber
As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and
any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you
implement Reactive Streams integrations with built-in operators or @ref:[custom operators](stream-customize.md).
For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are
provided to support implementing Reactive Streams `Publisher` and `Subscriber` with
an `Actor`.
These can be consumed by other Reactive Stream libraries or used as an Akka Streams `Source` or `Sink`.
@@@ warning
`ActorPublisher` and `ActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. `request`) are lost the
the stream may deadlock.
@@@
#### ActorPublisher
@@@ warning
**Deprecation warning:** `ActorPublisher` is deprecated in favour of the vastly more
type-safe and safe to implement @ref[`GraphStage`](stream-customize.md). It can also
expose a "operator actor ref" is needed to be addressed as-if an Actor.
Custom operators implemented using @ref[`GraphStage`](stream-customize.md) are also automatically fusable.
To learn more about implementing custom operators using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage).
@@@
Extend @scala[`akka.stream.actor.ActorPublisher` in your `Actor` to make it]@java[`akka.stream.actor.AbstractActorPublisher` to implement] a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
Scala
: @@snip [ActorPublisherDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala) { #job-manager }
Java
: @@snip [ActorPublisherDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java) { #job-manager }
You send elements to the stream by calling `onNext`. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
`totalDemand`. It is only allowed to use `onNext` when `isActive` and `totalDemand>0`,
otherwise `onNext` will throw `IllegalStateException`.
When the stream subscriber requests more elements the `ActorPublisherMessage.Request` message
is delivered to this actor, and you can act on that event. The `totalDemand`
is updated automatically.
When the stream subscriber cancels the subscription the `ActorPublisherMessage.Cancel` message
is delivered to this actor. After that subsequent calls to `onNext` will be ignored.
You can complete the stream by calling `onComplete`. After that you are not allowed to
call `onNext`, `onError` and `onComplete`.
You can terminate the stream with failure by calling `onError`. After that you are not allowed to
call `onNext`, `onError` and `onComplete`.
If you suspect that this @scala[`ActorPublisher`]@java[`AbstractActorPublisher`] may never get subscribed to, you can override the `subscriptionTimeout`
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an `ActorPublisherMessage.SubscriptionTimeoutExceeded` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input `Source` to a `Flow`:
Scala
: @@snip [ActorPublisherDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorPublisherDocSpec.scala) { #actor-publisher-usage }
Java
: @@snip [ActorPublisherDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorPublisherDocTest.java) { #actor-publisher-usage }
@scala[A publisher that is created with `Sink.asPublisher` supports a specified number of subscribers. Additional
subscription attempts will be rejected with an `IllegalStateException`.
]@java[You can only attach one subscriber to this publisher. Use a `Broadcast`-element or
attach a `Sink.asPublisher(AsPublisher.WITH_FANOUT)` to enable multiple subscribers.
]
#### ActorSubscriber
@@@ warning
**Deprecation warning:** `ActorSubscriber` is deprecated in favour of the vastly more
type-safe and safe to implement @ref[`GraphStage`](stream-customize.md). It can also
expose a "operator actor ref" is needed to be addressed as-if an Actor.
Custom operators implemented using @ref[`GraphStage`](stream-customize.md) are also automatically fusable.
To learn more about implementing custom operators using it refer to @ref:[Custom processing with GraphStage](stream-customize.md#graphstage).
@@@
Extend @scala[`akka.stream.actor.ActorSubscriber` in your `Actor` to make it]@java[`akka.stream.actor.AbstractActorSubscriber` to make your class] a
stream subscriber with full control of stream back pressure. It will receive
`ActorSubscriberMessage.OnNext`, `ActorSubscriberMessage.OnComplete` and `ActorSubscriberMessage.OnError`
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
Scala
: @@snip [ActorSubscriberDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala) { #worker-pool }
Java
: @@snip [ActorSubscriberDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java) { #worker-pool }
Subclass must define the `RequestStrategy` to control stream back pressure.
After each incoming message the @scala[`ActorSubscriber`]@java[`AbstractActorSubscriber`] will automatically invoke
the `RequestStrategy.requestDemand` and propagate the returned demand to the stream.
* The provided `WatermarkRequestStrategy` is a good strategy if the actor performs work itself.
* The provided `MaxInFlightRequestStrategy` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom `RequestStrategy` or call `request` manually together with
`ZeroRequestStrategy` or some other strategy. In that case
you must also call `request` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output `Sink` to a `Flow`:
Scala
: @@snip [ActorSubscriberDocSpec.scala](/akka-docs/src/test/scala/docs/stream/ActorSubscriberDocSpec.scala) { #actor-subscriber-usage }
Java
: @@snip [ActorSubscriberDocTest.java](/akka-docs/src/test/java/jdocs/stream/ActorSubscriberDocTest.java) { #actor-subscriber-usage }

View file

@ -6,6 +6,7 @@ package jdocs.persistence;
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.ask;
import java.sql.Connection;
import java.time.Duration; import java.time.Duration;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -21,9 +22,8 @@ import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import docs.persistence.query.MyEventsByTagPublisher; import jdocs.persistence.query.MyEventsByTagSource;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -92,12 +92,11 @@ public class PersistenceQueryDocTest {
akka.persistence.query.javadsl.PersistenceIdsQuery, akka.persistence.query.javadsl.PersistenceIdsQuery,
akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
private final FiniteDuration refreshInterval; private final Duration refreshInterval;
private Connection conn;
public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { public MyJavadslReadJournal(ExtendedActorSystem system, Config config) {
refreshInterval = refreshInterval = config.getDuration("refresh-interval");
FiniteDuration.create(
config.getDuration("refresh-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
} }
/** /**
@ -115,10 +114,8 @@ public class PersistenceQueryDocTest {
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) { public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
if (offset instanceof Sequence) { if (offset instanceof Sequence) {
Sequence sequenceOffset = (Sequence) offset; Sequence sequenceOffset = (Sequence) offset;
final Props props = return Source.fromGraph(
MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); new MyEventsByTagSource(conn, tag, sequenceOffset.value(), refreshInterval));
return Source.<EventEnvelope>actorPublisher(props)
.mapMaterializedValue(m -> NotUsed.getInstance());
} else if (offset == NoOffset.getInstance()) } else if (offset == NoOffset.getInstance())
return eventsByTag(tag, Offset.sequence(0L)); // recursive return eventsByTag(tag, Offset.sequence(0L)); // recursive
else else

View file

@ -1,135 +0,0 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.persistence.query;
import akka.actor.Cancellable;
import akka.actor.Scheduler;
import akka.japi.Pair;
import akka.persistence.PersistentRepr;
import akka.persistence.query.Offset;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.stream.actor.AbstractActorPublisher;
import akka.actor.Props;
import akka.persistence.query.EventEnvelope;
import akka.stream.actor.ActorPublisherMessage.Cancel;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.time.Duration;
import static java.util.stream.Collectors.toList;
// #events-by-tag-publisher
class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
private final Serialization serialization = SerializationExtension.get(getContext().getSystem());
private final Connection connection;
private final String tag;
private final String CONTINUE = "CONTINUE";
private final int LIMIT = 1000;
private long currentOffset;
private List<EventEnvelope> buf = new LinkedList<>();
private Cancellable continueTask;
public MyEventsByTagJavaPublisher(
Connection connection, String tag, Long offset, Duration refreshInterval) {
this.connection = connection;
this.tag = tag;
this.currentOffset = offset;
final Scheduler scheduler = getContext().getSystem().scheduler();
this.continueTask =
scheduler.schedule(
refreshInterval,
refreshInterval,
getSelf(),
CONTINUE,
getContext().getDispatcher(),
getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
CONTINUE,
(in) -> {
query();
deliverBuf();
})
.match(
Cancel.class,
(in) -> {
getContext().stop(getSelf());
})
.build();
}
public static Props props(Connection conn, String tag, Long offset, Duration refreshInterval) {
return Props.create(
MyEventsByTagJavaPublisher.class,
() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval));
}
@Override
public void postStop() {
continueTask.cancel();
}
private void query() {
if (buf.isEmpty()) {
final String query =
"SELECT id, persistent_repr "
+ "FROM journal WHERE tag = ? AND id > ? "
+ "ORDER BY id LIMIT ?";
try (PreparedStatement s = connection.prepareStatement(query)) {
s.setString(1, tag);
s.setLong(2, currentOffset);
s.setLong(3, LIMIT);
try (ResultSet rs = s.executeQuery()) {
final List<Pair<Long, byte[]>> res = new ArrayList<>(LIMIT);
while (rs.next()) res.add(Pair.create(rs.getLong(1), rs.getBytes(2)));
if (!res.isEmpty()) {
currentOffset = res.get(res.size() - 1).first();
}
buf =
res.stream()
.map(
in -> {
final Long id = in.first();
final byte[] bytes = in.second();
final PersistentRepr p =
serialization.deserialize(bytes, PersistentRepr.class).get();
return new EventEnvelope(
Offset.sequence(id), p.persistenceId(), p.sequenceNr(), p.payload());
})
.collect(toList());
}
} catch (Exception e) {
onErrorThenStop(e);
}
}
}
private void deliverBuf() {
while (totalDemand() > 0 && !buf.isEmpty()) onNext(buf.remove(0));
}
}
// #events-by-tag-publisher

View file

@ -0,0 +1,136 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.persistence.query;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.persistence.PersistentRepr;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.Offset;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.stream.*;
import akka.stream.stage.*;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import static java.util.stream.Collectors.toList;
// #events-by-tag-publisher
public class MyEventsByTagSource extends GraphStage<SourceShape<EventEnvelope>> {
public Outlet<EventEnvelope> out = Outlet.create("MyEventByTagSource.out");
private static final String QUERY =
"SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload "
+ "FROM journal WHERE tag = ? AND id > ? "
+ "ORDER BY id LIMIT ?";
enum Continue {
INSTANCE;
}
private static final int LIMIT = 1000;
private final Connection connection;
private final String tag;
private final long initialOffset;
private final Duration refreshInterval;
// assumes a shared connection, could also be a factory for creating connections/pool
public MyEventsByTagSource(
Connection connection, String tag, long initialOffset, Duration refreshInterval) {
this.connection = connection;
this.tag = tag;
this.initialOffset = initialOffset;
this.refreshInterval = refreshInterval;
}
@Override
public Attributes initialAttributes() {
return Attributes.apply(ActorAttributes.IODispatcher());
}
@Override
public SourceShape<EventEnvelope> shape() {
return SourceShape.of(out);
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new TimerGraphStageLogic(shape()) {
private ActorSystem system = ((ActorMaterializer) materializer()).system();
private long currentOffset = initialOffset;
private List<EventEnvelope> buf = new LinkedList<>();
private final Serialization serialization = SerializationExtension.get(system);
@Override
public void preStart() {
schedulePeriodically(Continue.INSTANCE, refreshInterval);
}
@Override
public void onTimer(Object timerKey) {
query();
deliver();
}
private void deliver() {
if (isAvailable(out) && !buf.isEmpty()) {
push(out, buf.remove(0));
}
}
private void query() {
if (buf.isEmpty()) {
try (PreparedStatement s = connection.prepareStatement(QUERY)) {
s.setString(1, tag);
s.setLong(2, currentOffset);
s.setLong(3, LIMIT);
try (ResultSet rs = s.executeQuery()) {
final List<EventEnvelope> res = new ArrayList<>(LIMIT);
while (rs.next()) {
Object deserialized =
serialization
.deserialize(
rs.getBytes("payload"),
rs.getInt("serializer_id"),
rs.getString("serializer_manifest"))
.get();
currentOffset = rs.getLong("id");
res.add(
new EventEnvelope(
Offset.sequence(currentOffset),
rs.getString("persistence_id"),
rs.getLong("seq_nr"),
deserialized));
}
buf = res;
}
} catch (Exception e) {
failStage(e);
}
}
}
{
setHandler(
out,
new AbstractOutHandler() {
@Override
public void onPull() {
query();
deliver();
}
});
}
};
}
}
// #events-by-tag-publisher

View file

@ -1,161 +0,0 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.actor.AbstractActorPublisher;
import akka.stream.actor.ActorPublisherMessage;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class ActorPublisherDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("ActorPublisherDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
// #job-manager
public static class JobManagerProtocol {
public static final class Job {
public final String payload;
public Job(String payload) {
this.payload = payload;
}
}
public static class JobAcceptedMessage {
@Override
public String toString() {
return "JobAccepted";
}
}
public static final JobAcceptedMessage JobAccepted = new JobAcceptedMessage();
public static class JobDeniedMessage {
@Override
public String toString() {
return "JobDenied";
}
}
public static final JobDeniedMessage JobDenied = new JobDeniedMessage();
}
public static class JobManager extends AbstractActorPublisher<JobManagerProtocol.Job> {
public static Props props() {
return Props.create(JobManager.class);
}
private final int MAX_BUFFER_SIZE = 100;
private final List<JobManagerProtocol.Job> buf = new ArrayList<>();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
JobManagerProtocol.Job.class,
job -> buf.size() == MAX_BUFFER_SIZE,
job -> {
getSender().tell(JobManagerProtocol.JobDenied, getSelf());
})
.match(
JobManagerProtocol.Job.class,
job -> {
getSender().tell(JobManagerProtocol.JobAccepted, getSelf());
if (buf.isEmpty() && totalDemand() > 0) onNext(job);
else {
buf.add(job);
deliverBuf();
}
})
.match(ActorPublisherMessage.Request.class, request -> deliverBuf())
.match(ActorPublisherMessage.Cancel.class, cancel -> getContext().stop(getSelf()))
.build();
}
void deliverBuf() {
while (totalDemand() > 0) {
/*
* totalDemand is a Long and could be larger than
* what buf.splitAt can accept
*/
if (totalDemand() <= Integer.MAX_VALUE) {
final List<JobManagerProtocol.Job> took =
buf.subList(0, Math.min(buf.size(), (int) totalDemand()));
took.forEach(this::onNext);
buf.removeAll(took);
break;
} else {
final List<JobManagerProtocol.Job> took =
buf.subList(0, Math.min(buf.size(), Integer.MAX_VALUE));
took.forEach(this::onNext);
buf.removeAll(took);
}
}
}
}
// #job-manager
@Test
public void demonstrateActorPublisherUsage() {
new TestKit(system) {
private final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());
{
// #actor-publisher-usage
final Source<JobManagerProtocol.Job, ActorRef> jobManagerSource =
Source.actorPublisher(JobManager.props());
final ActorRef ref =
jobManagerSource
.map(job -> job.payload.toUpperCase())
.map(
elem -> {
System.out.println(elem);
return elem;
})
.to(Sink.ignore())
.run(mat);
ref.tell(new JobManagerProtocol.Job("a"), ActorRef.noSender());
ref.tell(new JobManagerProtocol.Job("b"), ActorRef.noSender());
ref.tell(new JobManagerProtocol.Job("c"), ActorRef.noSender());
// #actor-publisher-usage
expectMsgEquals("A");
expectMsgEquals("B");
expectMsgEquals("C");
}
};
}
}

View file

@ -1,274 +0,0 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.actor.AbstractActorSubscriber;
import akka.stream.actor.ActorSubscriberMessage;
import akka.stream.actor.MaxInFlightRequestStrategy;
import akka.stream.actor.RequestStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.*;
import java.time.Duration;
import static org.junit.Assert.assertEquals;
public class ActorSubscriberDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("ActorSubscriberDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
// #worker-pool
public static class WorkerPoolProtocol {
public static class Msg {
public final int id;
public final ActorRef replyTo;
public Msg(int id, ActorRef replyTo) {
this.id = id;
this.replyTo = replyTo;
}
@Override
public String toString() {
return String.format("Msg(%s, %s)", id, replyTo);
}
}
public static Msg msg(int id, ActorRef replyTo) {
return new Msg(id, replyTo);
}
public static class Work {
public final int id;
public Work(int id) {
this.id = id;
}
@Override
public String toString() {
return String.format("Work(%s)", id);
}
}
public static Work work(int id) {
return new Work(id);
}
public static class Reply {
public final int id;
public Reply(int id) {
this.id = id;
}
@Override
public String toString() {
return String.format("Reply(%s)", id);
}
}
public static Reply reply(int id) {
return new Reply(id);
}
public static class Done {
public final int id;
public Done(int id) {
this.id = id;
}
@Override
public String toString() {
return String.format("Done(%s)", id);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Done done = (Done) o;
if (id != done.id) {
return false;
}
return true;
}
@Override
public int hashCode() {
return id;
}
}
public static Done done(int id) {
return new Done(id);
}
}
public static class WorkerPool extends AbstractActorSubscriber {
public static Props props() {
return Props.create(WorkerPool.class);
}
final int MAX_QUEUE_SIZE = 10;
final Map<Integer, ActorRef> queue = new HashMap<>();
final Router router;
@Override
public RequestStrategy requestStrategy() {
return new MaxInFlightRequestStrategy(MAX_QUEUE_SIZE) {
@Override
public int inFlightInternally() {
return queue.size();
}
};
}
public WorkerPool() {
final List<Routee> routees = new ArrayList<>();
for (int i = 0; i < 3; i++)
routees.add(new ActorRefRoutee(getContext().actorOf(Props.create(Worker.class))));
router = new Router(new RoundRobinRoutingLogic(), routees);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
ActorSubscriberMessage.OnNext.class,
on -> on.element() instanceof WorkerPoolProtocol.Msg,
onNext -> {
WorkerPoolProtocol.Msg msg = (WorkerPoolProtocol.Msg) onNext.element();
queue.put(msg.id, msg.replyTo);
if (queue.size() > MAX_QUEUE_SIZE)
throw new RuntimeException("queued too many: " + queue.size());
router.route(WorkerPoolProtocol.work(msg.id), getSelf());
})
.match(
ActorSubscriberMessage.onCompleteInstance().getClass(),
complete -> {
if (queue.isEmpty()) {
getContext().stop(getSelf());
}
})
.match(
WorkerPoolProtocol.Reply.class,
reply -> {
int id = reply.id;
queue.get(id).tell(WorkerPoolProtocol.done(id), getSelf());
queue.remove(id);
if (canceled() && queue.isEmpty()) {
getContext().stop(getSelf());
}
})
.build();
}
}
static class Worker extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
WorkerPoolProtocol.Work.class,
work -> {
// ...
getSender().tell(WorkerPoolProtocol.reply(work.id), getSelf());
})
.build();
}
}
// #worker-pool
@Test
public void demonstrateActorPublisherUsage() {
new TestKit(system) {
{
final ActorRef replyTo = getTestActor();
// #actor-subscriber-usage
final int N = 117;
final List<Integer> data = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
data.add(i);
}
final ActorRef worker =
Source.from(data)
.map(i -> WorkerPoolProtocol.msg(i, replyTo))
.runWith(Sink.<WorkerPoolProtocol.Msg>actorSubscriber(WorkerPool.props()), mat);
// #actor-subscriber-usage
watch(worker);
List<Object> got = new ArrayList<>(receiveN(N));
Collections.sort(
got,
new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
if (o1 instanceof WorkerPoolProtocol.Done
&& o2 instanceof WorkerPoolProtocol.Done) {
return ((WorkerPoolProtocol.Done) o1).id - ((WorkerPoolProtocol.Done) o2).id;
} else return 0;
}
});
int i = 0;
for (; i < N; i++) {
assertEquals(
String.format("Expected %d, but got %s", i, got.get(i)),
WorkerPoolProtocol.done(i),
got.get(i));
}
assertEquals(String.format("Expected 117 messages but got %d", i), i, 117);
expectTerminated(Duration.ofSeconds(10), worker);
}
};
}
}

View file

@ -1,101 +0,0 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.persistence.query
import akka.actor.Props
import akka.persistence.PersistentRepr
import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.serialization.SerializationExtension
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
import scala.concurrent.duration.FiniteDuration
object MyEventsByTagPublisher {
def props(tag: String, offset: Long, refreshInterval: FiniteDuration): Props =
Props(new MyEventsByTagPublisher(tag, offset, refreshInterval))
}
//#events-by-tag-publisher
class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration)
extends ActorPublisher[EventEnvelope] {
private case object Continue
private val connection: java.sql.Connection = ???
private val Limit = 1000
private var currentOffset = offset
var buf = Vector.empty[EventEnvelope]
import context.dispatcher
val continueTask = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)
override def postStop(): Unit = {
continueTask.cancel()
}
def receive = {
case _: Request | Continue =>
query()
deliverBuf()
case Cancel =>
context.stop(self)
}
object Select {
private def statement() = connection.prepareStatement("""
SELECT id, persistent_repr FROM journal
WHERE tag = ? AND id > ?
ORDER BY id LIMIT ?
""")
def run(tag: String, from: Long, limit: Int): Vector[(Long, Array[Byte])] = {
val s = statement()
try {
s.setString(1, tag)
s.setLong(2, from)
s.setLong(3, limit)
val rs = s.executeQuery()
val b = Vector.newBuilder[(Long, Array[Byte])]
while (rs.next()) b += (rs.getLong(1) -> rs.getBytes(2))
b.result()
} finally s.close()
}
}
def query(): Unit =
if (buf.isEmpty) {
try {
val result = Select.run(tag, currentOffset, Limit)
currentOffset = if (result.nonEmpty) result.last._1 else currentOffset
val serialization = SerializationExtension(context.system)
buf = result.map {
case (id, bytes) =>
val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get
EventEnvelope(offset = Sequence(id), p.persistenceId, p.sequenceNr, p.payload)
}
} catch {
case e: Exception =>
onErrorThenStop(e)
}
}
final def deliverBuf(): Unit =
if (totalDemand > 0 && buf.nonEmpty) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use.foreach(onNext)
} else {
buf.foreach(onNext)
buf = Vector.empty
}
}
}
//#events-by-tag-publisher

View file

@ -0,0 +1,110 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.persistence.query
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.serialization.SerializationExtension
import akka.stream.{ ActorAttributes, ActorMaterializer, Attributes, Outlet, SourceShape }
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, TimerGraphStageLogic }
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
//#events-by-tag-publisher
class MyEventsByTagSource(tag: String, offset: Long, refreshInterval: FiniteDuration)
extends GraphStage[SourceShape[EventEnvelope]] {
private case object Continue
val out: Outlet[EventEnvelope] = Outlet("MyEventByTagSource.out")
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler {
lazy val system = materializer match {
case a: ActorMaterializer => a.system
case _ =>
throw new IllegalStateException("EventsByTagStage requires ActorMaterializer")
}
private val Limit = 1000
private val connection: java.sql.Connection = ???
private var currentOffset = offset
private var buf = Vector.empty[EventEnvelope]
private val serialization = SerializationExtension(system)
override def preStart(): Unit = {
schedulePeriodically(Continue, refreshInterval)
}
override def onPull(): Unit = {
query()
tryPush()
}
override def onDownstreamFinish(): Unit = {
// close connection if responsible for doing so
}
private def query(): Unit = {
if (buf.isEmpty) {
try {
buf = Select.run(tag, currentOffset, Limit)
} catch {
case NonFatal(e) =>
failStage(e)
}
}
}
private def tryPush(): Unit = {
if (buf.nonEmpty && isAvailable(out)) {
push(out, buf.head)
buf = buf.tail
}
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case Continue =>
query()
tryPush()
}
object Select {
private def statement() =
connection.prepareStatement("""
SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload
FROM journal WHERE tag = ? AND id > ?
ORDER BY id LIMIT ?
""")
def run(tag: String, from: Long, limit: Int): Vector[EventEnvelope] = {
val s = statement()
try {
s.setString(1, tag)
s.setLong(2, from)
s.setLong(3, limit)
val rs = s.executeQuery()
val b = Vector.newBuilder[EventEnvelope]
while (rs.next()) {
val deserialized = serialization
.deserialize(rs.getBytes("payload"), rs.getInt("serializer_id"), rs.getString("serializer_manifest"))
.get
currentOffset = rs.getLong("id")
b += EventEnvelope(
Offset.sequence(currentOffset),
rs.getString("persistence_id"),
rs.getLong("seq_nr"),
deserialized)
}
b.result()
} finally s.close()
}
}
}
}
//#events-by-tag-publisher

View file

@ -66,11 +66,10 @@ object PersistenceQueryDocSpec {
*/ */
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match { override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match {
case Sequence(offsetValue) => case Sequence(offsetValue) =>
val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) Source.fromGraph(new MyEventsByTagSource(tag, offsetValue, refreshInterval))
Source.actorPublisher[EventEnvelope](props).mapMaterializedValue(_ => NotUsed)
case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive
case _ => case _ =>
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") throw new IllegalArgumentException("MyJournal does not support " + offset.getClass.getName + " offsets")
} }
override def eventsByPersistenceId( override def eventsByPersistenceId(

View file

@ -1,99 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream
import scala.annotation.tailrec
import akka.actor.Props
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.testkit.AkkaSpec
object ActorPublisherDocSpec {
//#job-manager
object JobManager {
def props: Props = Props[JobManager]
final case class Job(payload: String)
case object JobAccepted
case object JobDenied
}
class JobManager extends ActorPublisher[JobManager.Job] {
import akka.stream.actor.ActorPublisherMessage._
import JobManager._
val MaxBufferSize = 100
var buf = Vector.empty[Job]
def receive = {
case job: Job if buf.size == MaxBufferSize =>
sender() ! JobDenied
case job: Job =>
sender() ! JobAccepted
if (buf.isEmpty && totalDemand > 0)
onNext(job)
else {
buf :+= job
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
/*
* totalDemand is a Long and could be larger than
* what buf.splitAt can accept
*/
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use.foreach(onNext)
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use.foreach(onNext)
deliverBuf()
}
}
}
//#job-manager
}
class ActorPublisherDocSpec extends AkkaSpec {
import ActorPublisherDocSpec._
implicit val materializer = ActorMaterializer()
"illustrate usage of ActorPublisher" in {
def println(s: String): Unit =
testActor ! s
//#actor-publisher-usage
val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props)
val ref = Flow[JobManager.Job]
.map(_.payload.toUpperCase)
.map { elem =>
println(elem); elem
}
.to(Sink.ignore)
.runWith(jobManagerSource)
ref ! JobManager.Job("a")
ref ! JobManager.Job("b")
ref ! JobManager.Job("c")
//#actor-publisher-usage
expectMsg("A")
expectMsg("B")
expectMsg("C")
}
}

View file

@ -1,99 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.ActorRefRoutee
import akka.routing.RoundRobinRoutingLogic
import akka.routing.Router
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.MaxInFlightRequestStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.testkit.AkkaSpec
import scala.concurrent.duration._
object ActorSubscriberDocSpec {
//#worker-pool
object WorkerPool {
case class Msg(id: Int, replyTo: ActorRef)
case class Work(id: Int)
case class Reply(id: Int)
case class Done(id: Int)
def props: Props = Props(new WorkerPool)
}
class WorkerPool extends ActorSubscriber {
import WorkerPool._
import ActorSubscriberMessage._
val MaxQueueSize = 10
var queue = Map.empty[Int, ActorRef]
val router = {
val routees = Vector.fill(3) {
ActorRefRoutee(context.actorOf(Props[Worker]))
}
Router(RoundRobinRoutingLogic(), routees)
}
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) {
override def inFlightInternally: Int = queue.size
}
def receive = {
case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
case Reply(id) =>
queue(id) ! Done(id)
queue -= id
if (canceled && queue.isEmpty) {
context.stop(self)
}
case OnComplete =>
if (queue.isEmpty) {
context.stop(self)
}
}
}
class Worker extends Actor {
import WorkerPool._
def receive = {
case Work(id) =>
// ...
sender() ! Reply(id)
}
}
//#worker-pool
}
class ActorSubscriberDocSpec extends AkkaSpec {
import ActorSubscriberDocSpec._
implicit val materializer = ActorMaterializer()
"illustrate usage of ActorSubscriber" in {
val replyTo = testActor
//#actor-subscriber-usage
val N = 117
val worker = Source(1 to N).map(WorkerPool.Msg(_, replyTo)).runWith(Sink.actorSubscriber(WorkerPool.props))
//#actor-subscriber-usage
watch(worker)
receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet)
expectTerminated(worker, 10.seconds)
}
}

View file

@ -1,52 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.tck
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.tck.ActorPublisherTest.TestPublisher
import org.reactivestreams.Publisher
object ActorPublisherTest {
case object Produce
case object Loop
case object Complete
class TestPublisher(allElements: Long) extends ActorPublisher[Int] {
val source: Iterator[Int] =
(if (allElements == Long.MaxValue) 1 to Int.MaxValue else 0 until allElements.toInt).toIterator
override def receive: Receive = {
case Request(elements) =>
loopDemand()
case Produce if totalDemand > 0 && !isCompleted && source.hasNext => onNext(source.next())
case Produce if !isCompleted && !source.hasNext => onComplete()
case Produce if isCompleted => // no-op
case _ => // no-op
}
def loopDemand(): Unit = {
val loopUntil = math.min(100, totalDemand)
(1 to loopUntil.toInt).foreach { _ =>
self ! Produce
}
if (loopUntil > 100) self ! Loop
}
}
}
class ActorPublisherTest extends AkkaPublisherVerification[Int] {
override def createPublisher(elements: Long): Publisher[Int] = {
val ref = system.actorOf(Props(classOf[TestPublisher], elements).withDispatcher("akka.test.stream-dispatcher"))
ActorPublisher(ref)
}
}

View file

@ -1,29 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.tck
import akka.actor.Props
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.OneByOneRequestStrategy
import akka.stream.actor.RequestStrategy
import org.reactivestreams.Subscriber
object ActorSubscriberOneByOneRequestTest {
class StrategySubscriber(val requestStrategy: RequestStrategy) extends ActorSubscriber {
override def receive: Receive = { case _ => }
}
}
class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerification[Int] {
import ActorSubscriberOneByOneRequestTest._
override def createSubscriber(): Subscriber[Int] = {
val props = Props(classOf[StrategySubscriber], OneByOneRequestStrategy)
ActorSubscriber(system.actorOf(props.withDispatcher("akka.test.stream-dispatcher")))
}
override def createElement(element: Int): Int = element
}

View file

@ -1,64 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.actor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.stream.StreamTest;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Source;
import akka.testkit.AkkaSpec;
import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import static akka.stream.actor.ActorPublisherMessage.Request;
public class ActorPublisherTest extends StreamTest {
public ActorPublisherTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf());
public static class TestPublisher extends UntypedActorPublisher<Integer> {
@Override
public void onReceive(Object msg) {
if (msg instanceof Request) {
onNext(1);
onComplete();
} else if (msg == ActorPublisherMessage.cancelInstance()) {
getContext().stop(getSelf());
} else {
unhandled(msg);
}
}
}
@Test
public void mustHaveJavaAPI() {
final TestKit probe = new TestKit(system);
final ActorRef ref =
system.actorOf(
Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher"));
final Publisher<Integer> publisher = UntypedActorPublisher.create(ref);
Source.fromPublisher(publisher)
.runForeach(
new akka.japi.function.Procedure<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Integer elem) throws Exception {
probe.getRef().tell(elem, ActorRef.noSender());
}
},
materializer);
probe.expectMsgEquals(1);
}
}

View file

@ -1,82 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.actor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.stream.StreamTest;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.AkkaSpec;
import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import java.util.Arrays;
import static akka.stream.actor.ActorSubscriberMessage.OnError;
import static akka.stream.actor.ActorSubscriberMessage.OnNext;
public class ActorSubscriberTest extends StreamTest {
public ActorSubscriberTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ActorSubscriberTest", AkkaSpec.testConf());
public static class TestSubscriber extends UntypedActorSubscriber {
final ActorRef probe;
public TestSubscriber(ActorRef probe) {
this.probe = probe;
}
@Override
public RequestStrategy requestStrategy() {
return ZeroRequestStrategy.getInstance();
}
@Override
public void onReceive(Object msg) {
if (msg.equals("run")) {
request(4);
} else if (msg instanceof OnNext) {
probe.tell(((OnNext) msg).element(), getSelf());
} else if (msg == ActorSubscriberMessage.onCompleteInstance()) {
probe.tell("done", getSelf());
getContext().stop(getSelf());
} else if (msg instanceof OnError) {
probe.tell("err", getSelf());
getContext().stop(getSelf());
} else {
unhandled(msg);
}
}
}
@Test
public void mustHaveJavaAPI() {
final TestKit probe = new TestKit(system);
final ActorRef ref =
system.actorOf(
Props.create(TestSubscriber.class, probe.getRef())
.withDispatcher("akka.test.stream-dispatcher"));
final Subscriber<Integer> subscriber = UntypedActorSubscriber.create(ref);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3);
Source.from(input).runWith(Sink.fromSubscriber(subscriber), materializer);
ref.tell("run", null);
probe.expectMsgEquals(1);
probe.expectMsgEquals(2);
probe.expectMsgEquals(3);
probe.expectMsgEquals("done");
}
}

View file

@ -1,508 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.actor
import akka.actor.{ ActorRef, PoisonPill, Props }
import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, ClosedShape }
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.impl.ReactiveStreamsCompliance
import akka.testkit.TestEvent.Mute
import akka.testkit.{ EventFilter, ImplicitSender, TestProbe }
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.Stash
object ActorPublisherSpec {
val config =
s"""
my-dispatcher1 = $${akka.test.stream-dispatcher}
my-dispatcher2 = $${akka.test.stream-dispatcher}
"""
def testPublisherProps(probe: ActorRef, useTestDispatcher: Boolean = true): Props = {
val p = Props(new TestPublisher(probe))
if (useTestDispatcher) p.withDispatcher("akka.test.stream-dispatcher")
else p
}
def testPublisherWithStashProps(probe: ActorRef, useTestDispatcher: Boolean = true): Props = {
val p = Props(new TestPublisherWithStash(probe))
if (useTestDispatcher) p.withDispatcher("akka.test.stream-dispatcher")
else p
}
case class TotalDemand(elements: Long)
case class Produce(elem: String)
case class Err(reason: String)
case class ErrThenStop(reason: String)
case object Boom
case object Complete
case object CompleteThenStop
case object ThreadName
class TestPublisher(probe: ActorRef) extends ActorPublisher[String] {
import akka.stream.actor.ActorPublisherMessage._
def receive = {
case Request(element) => probe ! TotalDemand(totalDemand)
case Produce(elem) => onNext(elem)
case Err(reason) => onError(new RuntimeException(reason) with NoStackTrace)
case ErrThenStop(reason) => onErrorThenStop(new RuntimeException(reason) with NoStackTrace)
case Complete => onComplete()
case CompleteThenStop => onCompleteThenStop()
case Boom => throw new RuntimeException("boom") with NoStackTrace
case ThreadName => probe ! Thread.currentThread.getName
}
}
class TestPublisherWithStash(probe: ActorRef) extends TestPublisher(probe) with Stash {
override def receive = stashing
def stashing: Receive = {
case "unstash" =>
unstashAll()
context.become(super.receive)
case _ => stash()
}
}
def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher")
class Sender extends ActorPublisher[Int] {
import akka.stream.actor.ActorPublisherMessage._
var buf = Vector.empty[Int]
def receive = {
case i: Int =>
if (buf.isEmpty && totalDemand > 0)
onNext(i)
else {
buf :+= i
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec
final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use.foreach(onNext)
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use.foreach(onNext)
deliverBuf()
}
}
}
def timeoutingProps(probe: ActorRef, timeout: FiniteDuration): Props =
Props(classOf[TimeoutingPublisher], probe, timeout).withDispatcher("akka.test.stream-dispatcher")
class TimeoutingPublisher(probe: ActorRef, timeout: FiniteDuration) extends ActorPublisher[Int] {
import akka.stream.actor.ActorPublisherMessage._
import context.dispatcher
override def subscriptionTimeout = timeout
override def receive: Receive = {
case Request(_) =>
onNext(1)
case SubscriptionTimeoutExceeded =>
probe ! "timed-out"
context.system.scheduler.scheduleOnce(timeout, probe, "cleaned-up")
context.system.scheduler.scheduleOnce(timeout, self, PoisonPill)
}
}
def receiverProps(probe: ActorRef): Props =
Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher")
class Receiver(probe: ActorRef) extends ActorSubscriber {
import akka.stream.actor.ActorSubscriberMessage._
override val requestStrategy = WatermarkRequestStrategy(10)
def receive = {
case OnNext(s: String) =>
probe ! s
}
}
}
class ActorPublisherSpec extends StreamSpec(ActorPublisherSpec.config) with ImplicitSender {
import akka.stream.actor.ActorPublisherSpec._
system.eventStream.publish(Mute(EventFilter[IllegalStateException]()))
"An ActorPublisher" must {
"accumulate demand" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val s = TestSubscriber.probe[String]()
p.subscribe(s)
s.request(2)
probe.expectMsg(TotalDemand(2))
s.request(3)
probe.expectMsg(TotalDemand(5))
s.cancel()
}
"allow onNext up to requested elements, but not more" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val s = TestSubscriber.probe[String]()
p.subscribe(s)
s.request(2)
ref ! Produce("elem-1")
ref ! Produce("elem-2")
ref ! Produce("elem-3")
s.expectNext("elem-1")
s.expectNext("elem-2")
s.expectNoMsg(300.millis)
s.cancel()
}
"signal error" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
ref ! Err("wrong")
s.expectSubscription()
s.expectError().getMessage should be("wrong")
}
"not terminate after signalling onError" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription()
probe.watch(ref)
ref ! Err("wrong")
s.expectError().getMessage should be("wrong")
probe.expectNoMsg(200.millis)
}
"terminate after signalling onErrorThenStop" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription()
probe.watch(ref)
ref ! ErrThenStop("wrong")
s.expectError().getMessage should be("wrong")
probe.expectTerminated(ref, 3.seconds)
}
"signal error before subscribe" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
ref ! Err("early err")
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscriptionAndError().getMessage should be("early err")
}
"drop onNext elements after cancel" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val s = TestSubscriber.probe[String]()
p.subscribe(s)
s.request(2)
ref ! Produce("elem-1")
s.cancel()
ref ! Produce("elem-2")
s.expectNext("elem-1")
s.expectNoMsg(300.millis)
}
"remember requested after restart" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val s = TestSubscriber.probe[String]()
p.subscribe(s)
s.request(3)
probe.expectMsg(TotalDemand(3))
ref ! Produce("elem-1")
ref ! Boom
ref ! Produce("elem-2")
s.expectNext("elem-1")
s.expectNext("elem-2")
s.request(5)
probe.expectMsg(TotalDemand(6))
s.cancel()
}
"signal onComplete" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.probe[String]()
ActorPublisher[String](ref).subscribe(s)
s.request(3)
ref ! Produce("elem-1")
ref ! Complete
s.expectNext("elem-1")
s.expectComplete()
}
"not terminate after signalling onComplete" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
val sub = s.expectSubscription()
sub.request(3)
probe.expectMsg(TotalDemand(3))
probe.watch(ref)
ref ! Produce("elem-1")
ref ! Complete
s.expectNext("elem-1")
s.expectComplete()
probe.expectNoMsg(200.millis)
}
"terminate after signalling onCompleteThenStop" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
val sub = s.expectSubscription()
sub.request(3)
probe.expectMsg(TotalDemand(3))
probe.watch(ref)
ref ! Produce("elem-1")
ref ! CompleteThenStop
s.expectNext("elem-1")
s.expectComplete()
probe.expectTerminated(ref, 3.seconds)
}
"signal immediate onComplete" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
ref ! Complete
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscriptionAndComplete()
}
"only allow one subscriber" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription()
val s2 = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s2)
s2.expectSubscriptionAndError().getMessage should be(
s"ActorPublisher ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")
}
"can not subscribe the same subscriber multiple times" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription()
ActorPublisher[String](ref).subscribe(s)
s.expectError().getMessage should be(ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes)
}
"signal onCompete when actor is stopped" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = TestSubscriber.manualProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription()
ref ! PoisonPill
s.expectComplete()
}
"work together with Flow and ActorSubscriber" in {
implicit val materializer = ActorMaterializer()
assertAllStagesStopped {
val probe = TestProbe()
val source: Source[Int, ActorRef] = Source.actorPublisher(senderProps)
val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref))
val (snd, rcv) = source
.collect {
case n if n % 2 == 0 => "elem-" + n
}
.toMat(sink)(Keep.both)
.run()
(1 to 3).foreach { snd ! _ }
probe.expectMsg("elem-2")
(4 to 500).foreach { n =>
if (n % 19 == 0) Thread.sleep(50) // simulate bursts
snd ! n
}
(4 to 500 by 2).foreach { n =>
probe.expectMsg("elem-" + n)
}
watch(snd)
rcv ! PoisonPill
expectTerminated(snd)
}
}
"work in a GraphDSL" in {
implicit val materializer = ActorMaterializer()
val probe1 = TestProbe()
val probe2 = TestProbe()
val senderRef1 = system.actorOf(senderProps)
val source1 = Source.fromPublisher(ActorPublisher[Int](senderRef1))
val sink1 = Sink.fromSubscriber(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref))))
val sink2: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe2.ref))
val senderRef2 = RunnableGraph
.fromGraph(GraphDSL.create(Source.actorPublisher[Int](senderProps)) { implicit b => source2 =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[String](2))
source1 ~> merge.in(0)
source2.out ~> merge.in(1)
merge.out.map(_.toString) ~> bcast.in
bcast.out(0).map(_ + "mark") ~> sink1
bcast.out(1) ~> sink2
ClosedShape
})
.run()
(0 to 10).foreach {
senderRef1 ! _
senderRef2 ! _
}
(0 to 10).foreach { msg =>
probe1.expectMsg(msg.toString + "mark")
probe2.expectMsg(msg.toString)
}
}
"be able to define a subscription-timeout, after which it should shut down" in {
implicit val materializer = ActorMaterializer()
assertAllStagesStopped {
val timeout = 150.millis
val a = system.actorOf(timeoutingProps(testActor, timeout))
val pub = ActorPublisher(a)
// don't subscribe for `timeout` millis, so it will shut itself down
expectMsg("timed-out")
// now subscribers will already be rejected, while the actor could perform some clean-up
val sub = TestSubscriber.manualProbe()
pub.subscribe(sub)
sub.expectSubscriptionAndError()
expectMsg("cleaned-up")
// termination is tiggered by user code
watch(a)
expectTerminated(a)
}
}
"be able to define a subscription-timeout, which is cancelled by the first incoming Subscriber" in {
implicit val materializer = ActorMaterializer()
val timeout = 500.millis
val sub = TestSubscriber.manualProbe[Int]()
within(2 * timeout) {
val pub = ActorPublisher(system.actorOf(timeoutingProps(testActor, timeout)))
// subscribe right away, should cancel subscription-timeout
pub.subscribe(sub)
sub.expectSubscription()
expectNoMsg()
}
}
"use dispatcher from materializer settings" in {
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDispatcher("my-dispatcher1"))
val s = TestSubscriber.manualProbe[String]()
val ref =
Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false)).to(Sink.fromSubscriber(s)).run()
ref ! ThreadName
expectMsgType[String] should include("my-dispatcher1")
}
"use dispatcher from operation attributes" in {
implicit val materializer = ActorMaterializer()
val s = TestSubscriber.manualProbe[String]()
val ref = Source
.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false))
.withAttributes(ActorAttributes.dispatcher("my-dispatcher1"))
.to(Sink.fromSubscriber(s))
.run()
ref ! ThreadName
expectMsgType[String] should include("my-dispatcher1")
}
"use dispatcher from props" in {
implicit val materializer = ActorMaterializer()
val s = TestSubscriber.manualProbe[String]()
val ref = Source
.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false).withDispatcher("my-dispatcher1"))
.withAttributes(ActorAttributes.dispatcher("my-dispatcher2"))
.to(Sink.fromSubscriber(s))
.run()
ref ! ThreadName
expectMsgType[String] should include("my-dispatcher1")
}
"handle stash" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherWithStashProps(probe.ref))
val p = ActorPublisher[String](ref)
val s = TestSubscriber.probe[String]()
p.subscribe(s)
s.request(2)
s.request(3)
ref ! "unstash"
probe.expectMsg(TotalDemand(5))
probe.expectMsg(TotalDemand(5))
s.request(4)
probe.expectMsg(TotalDemand(9))
s.cancel()
}
}
}

View file

@ -1,300 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.actor
import akka.actor.{ Actor, ActorRef, Props }
import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.StreamSpec
import akka.testkit.ImplicitSender
import org.reactivestreams.Subscription
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
object ActorSubscriberSpec {
def manualSubscriberProps(probe: ActorRef): Props =
Props(new ManualSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher")
class ManualSubscriber(probe: ActorRef) extends ActorSubscriber {
import ActorSubscriberMessage._
override val requestStrategy = ZeroRequestStrategy
def receive = {
case next @ OnNext(elem) => probe ! next
case OnComplete => probe ! OnComplete
case err @ OnError(cause) => probe ! err
case "ready" => request(elements = 2)
case "boom" => throw new RuntimeException("boom") with NoStackTrace
case "requestAndCancel" => { request(1); cancel() }
case "cancel" => cancel()
}
}
def immediatelyCancelledSubscriberProps(probe: ActorRef): Props =
Props(new ImmediatelyCancelledSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher")
class ImmediatelyCancelledSubscriber(probe: ActorRef) extends ManualSubscriber(probe) {
override val requestStrategy = ZeroRequestStrategy
override def preStart() = {
cancel()
super.preStart()
}
}
def requestStrategySubscriberProps(probe: ActorRef, strat: RequestStrategy): Props =
Props(new RequestStrategySubscriber(probe, strat)).withDispatcher("akka.test.stream-dispatcher")
class RequestStrategySubscriber(probe: ActorRef, strat: RequestStrategy) extends ActorSubscriber {
import ActorSubscriberMessage._
override val requestStrategy = strat
def receive = {
case next @ OnNext(elem) => probe ! next
case OnComplete => probe ! OnComplete
}
}
case class Msg(id: Int, replyTo: ActorRef)
case class Work(id: Int)
case class Reply(id: Int)
case class Done(id: Int)
def streamerProps: Props =
Props(new Streamer).withDispatcher("akka.test.stream-dispatcher")
class Streamer extends ActorSubscriber {
import ActorSubscriberMessage._
var queue = Map.empty[Int, ActorRef]
val router = {
val routees = Vector.fill(3) {
ActorRefRoutee(context.actorOf(Props[Worker].withDispatcher(context.props.dispatcher)))
}
Router(RoundRobinRoutingLogic(), routees)
}
override val requestStrategy = new MaxInFlightRequestStrategy(max = 10) {
override def inFlightInternally: Int = queue.size
}
def receive = {
case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo)
assert(queue.size <= 10, s"queued too many: ${queue.size}")
router.route(Work(id), self)
case Reply(id) =>
queue(id) ! Done(id)
queue -= id
}
}
class Worker extends Actor {
def receive = {
case Work(id) =>
// ...
sender() ! Reply(id)
}
}
}
class ActorSubscriberSpec extends StreamSpec with ImplicitSender {
import ActorSubscriberMessage._
import ActorSubscriberSpec._
implicit val materializer = ActorMaterializer()
"An ActorSubscriber" must {
"receive requested elements" in {
val ref = Source(List(1, 2, 3)).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
expectNoMsg(200.millis)
ref ! "ready" // requesting 2
expectMsg(OnNext(1))
expectMsg(OnNext(2))
expectNoMsg(200.millis)
ref ! "ready"
expectMsg(OnNext(3))
expectMsg(OnComplete)
}
"signal error" in {
val e = new RuntimeException("simulated") with NoStackTrace
val ref = Source.fromIterator(() => throw e).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
ref ! "ready"
expectMsg(OnError(e))
}
"remember requested after restart" in {
// creating actor with default supervision, because stream supervisor default strategy is to stop
val ref = system.actorOf(manualSubscriberProps(testActor))
Source(1 to 7).runWith(Sink.fromSubscriber(ActorSubscriber[Int](ref)))
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
expectNoMsg(200.millis) // nothing requested
ref ! "boom"
ref ! "ready"
ref ! "ready"
ref ! "boom"
(3 to 6).foreach { n =>
expectMsg(OnNext(n))
}
expectNoMsg(200.millis)
ref ! "ready"
expectMsg(OnNext(7))
expectMsg(OnComplete)
}
"not deliver more after cancel" in {
val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
ref ! "requestAndCancel"
expectNoMsg(200.millis)
}
"terminate after cancel" in {
val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
watch(ref)
ref ! "requestAndCancel"
expectTerminated(ref, 200.millis)
}
"cancel incoming subscription when cancel() was called before it arrived" in {
val ref = system.actorOf(immediatelyCancelledSubscriberProps(testActor))
val sub = ActorSubscriber(ref)
watch(ref)
expectNoMsg(200.millis)
sub.onSubscribe(new Subscription {
override def cancel(): Unit = testActor ! "cancel"
override def request(n: Long): Unit = ()
})
expectMsg("cancel")
expectTerminated(ref, 200.millis)
}
"work with OneByOneRequestStrategy" in {
Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy)))
for (n <- 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"work with WatermarkRequestStrategy" in {
Source(1 to 17).runWith(
Sink.actorSubscriber(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10))))
for (n <- 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"suport custom max in flight request strategy with child workers" in {
val N = 117
Source(1 to N).map(Msg(_, testActor)).runWith(Sink.actorSubscriber(streamerProps))
receiveN(N).toSet should be((1 to N).map(Done).toSet)
}
}
"Provided RequestStragies" must {
"implement OneByOne correctly" in {
val strat = OneByOneRequestStrategy
strat.requestDemand(0) should be(1)
strat.requestDemand(1) should be(0)
strat.requestDemand(2) should be(0)
}
"implement Zero correctly" in {
val strat = ZeroRequestStrategy
strat.requestDemand(0) should be(0)
strat.requestDemand(1) should be(0)
strat.requestDemand(2) should be(0)
}
"implement Watermark correctly" in {
val strat = WatermarkRequestStrategy(highWatermark = 10)
strat.requestDemand(0) should be(10)
strat.requestDemand(9) should be(0)
strat.requestDemand(6) should be(0)
strat.requestDemand(5) should be(0)
strat.requestDemand(4) should be(6)
}
"implement MaxInFlight with batchSize=1 correctly" in {
var queue = Set.empty[String]
val strat = new MaxInFlightRequestStrategy(max = 10) {
override def batchSize: Int = 1
def inFlightInternally: Int = queue.size
}
strat.requestDemand(0) should be(10)
strat.requestDemand(9) should be(1)
queue += "a"
strat.requestDemand(0) should be(9)
strat.requestDemand(8) should be(1)
strat.requestDemand(9) should be(0)
queue += "b"
queue += "c"
strat.requestDemand(5) should be(2)
('d' to 'j').foreach { queue += _.toString }
queue.size should be(10)
strat.requestDemand(0) should be(0)
strat.requestDemand(1) should be(0)
queue += "g"
strat.requestDemand(0) should be(0)
strat.requestDemand(1) should be(0)
}
"implement MaxInFlight with batchSize=3 correctly" in {
var queue = Set.empty[String]
val strat = new MaxInFlightRequestStrategy(max = 10) {
override def batchSize: Int = 3
override def inFlightInternally: Int = queue.size
}
strat.requestDemand(0) should be(10)
queue += "a"
strat.requestDemand(9) should be(0)
queue += "b"
strat.requestDemand(8) should be(0)
queue += "c"
strat.requestDemand(7) should be(0)
queue += "d"
strat.requestDemand(6) should be(0)
queue -= "a" // 3 remaining in queue
strat.requestDemand(6) should be(0)
queue -= "b" // 2 remaining in queue
strat.requestDemand(6) should be(0)
queue -= "c" // 1 remaining in queue
strat.requestDemand(6) should be(3)
}
"implement MaxInFlight with batchSize=max correctly" in {
var queue = Set.empty[String]
val strat = new MaxInFlightRequestStrategy(max = 3) {
override def batchSize: Int = 5 // will be bounded to max
override def inFlightInternally: Int = queue.size
}
strat.requestDemand(0) should be(3)
queue += "a"
strat.requestDemand(2) should be(0)
queue += "b"
strat.requestDemand(1) should be(0)
queue += "c"
strat.requestDemand(0) should be(0)
queue -= "a"
strat.requestDemand(0) should be(0)
queue -= "b"
strat.requestDemand(0) should be(0)
queue -= "c"
strat.requestDemand(0) should be(3)
}
}
}

View file

@ -84,3 +84,17 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedInterval") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedInterval")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StopTimed") ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StopTimed")
#26187 Remove ActorPublisher/Subscriber
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisher")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorSubscriber$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorPublisher")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithUnboundedStash")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorPublisher$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorSubscriber")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.UntypedActorSubscriber")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisher$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithUnrestrictedStash")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorSubscriber$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithStash")

View file

@ -459,91 +459,3 @@ private[akka] class ActorPublisherState extends Extension {
def remove(ref: ActorRef): Unit = state.remove(ref) def remove(ref: ActorRef): Unit = state.remove(ref)
} }
/**
* Java API
*/
object UntypedActorPublisher {
/**
* Java API: Create a [[org.reactivestreams.Publisher]] backed by a [[UntypedActorPublisher]] actor. It can be
* attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a
* [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref)
}
/**
* Java API
* @see [[akka.stream.actor.ActorPublisher]]
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T]
/**
* Java API compatible with lambda expressions
*/
object AbstractActorPublisher {
/**
* Java API compatible with lambda expressions: Create a [[org.reactivestreams.Publisher]]
* backed by a [[AbstractActorPublisher]] actor. It can be attached to a [[org.reactivestreams.Subscriber]]
* or be used as an input source for a [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Publisher[T] = ActorPublisher.apply(ref)
}
/**
* Java API compatible with lambda expressions
* @see [[akka.stream.actor.ActorPublisher]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T]
/**
* Java API compatible with lambda expressions.
* This class adds a Stash to {@link AbstractActorPublisher}.
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithStash]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class AbstractActorPublisherWithStash[T] extends AbstractActor with ActorPublisher[T] with Stash
/**
* Java API compatible with lambda expressions.
* This class adds an unbounded Stash to {@link AbstractActorPublisher}.
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnboundedStash]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class AbstractActorPublisherWithUnboundedStash[T]
extends AbstractActor
with ActorPublisher[T]
with UnboundedStash
/**
* Java API compatible with lambda expressions.
* This class adds an unrestricted Stash to {@link AbstractActorPublisher}.
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnrestrictedStash]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class AbstractActorPublisherWithUnrestrictedStash[T]
extends AbstractActor
with ActorPublisher[T]
with UnrestrictedStash

View file

@ -341,55 +341,3 @@ private[akka] class ActorSubscriberState extends Extension {
def remove(ref: ActorRef): Unit = state.remove(ref) def remove(ref: ActorRef): Unit = state.remove(ref)
} }
/**
* Java API
*/
object UntypedActorSubscriber {
/**
* Java API: Attach a [[UntypedActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]]
* to a [[org.reactivestreams.Publisher]] or [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref)
}
/**
* Java API
* @see [[akka.stream.actor.ActorSubscriber]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber
/**
* Java API compatible with lambda expressions
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
object AbstractActorSubscriber {
/**
* Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor
* as a [[org.reactivestreams.Subscriber]] o a [[org.reactivestreams.Publisher]] or
* [[akka.stream.javadsl.Flow]].
*/
def create[T](ref: ActorRef): Subscriber[T] = ActorSubscriber.apply(ref)
}
/**
* Java API compatible with lambda expressions
* @see [[akka.stream.actor.ActorSubscriber]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber