diff --git a/akka-actor/src/main/mima-filters/2.5.20.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.20.backwards.excludes new file mode 100644 index 0000000000..450fe2f326 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.5.20.backwards.excludes @@ -0,0 +1,7 @@ +# Add `getDispatcher` to `akka.actor.AbstractActor.ActorContext` #26161 +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.getSelf") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.getProps") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.getDispatcher") +# Below is for Scala 2.11.x +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.getSender") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.getReceiveTimeout") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala index b2f6566804..4783d9fd65 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala @@ -12,6 +12,7 @@ import java.util.Optional import akka.util.JavaDurationConverters +import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.Duration /** @@ -50,6 +51,30 @@ object AbstractActor { @DoNotInherit trait ActorContext extends akka.actor.ActorContext { + /** + * The ActorRef representing this actor + * + * This method is thread-safe and can be called from other threads than the ordinary + * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks. + */ + def getSelf(): ActorRef + + /** + * Retrieve the Props which were used to create this actor. + * + * This method is thread-safe and can be called from other threads than the ordinary + * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks. + */ + def getProps(): Props + + /** + * Returns the sender 'ActorRef' of the current message. + * + * *Warning*: This method is not thread-safe and must not be accessed from threads other + * than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks. + */ + def getSender(): ActorRef = sender() + /** * Returns an unmodifiable Java Collection containing the linked actors * @@ -93,6 +118,14 @@ object AbstractActor { */ def getSystem(): ActorSystem + /** + * Returns the dispatcher (MessageDispatcher) that is used for this Actor. + * + * This method is thread-safe and can be called from other threads than the ordinary + * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks. + */ + def getDispatcher(): ExecutionContextExecutor + /** * Changes the Actor's behavior to become the new 'Receive' handler. * Replaces the current behavior on the top of the behavior stack. @@ -120,6 +153,18 @@ object AbstractActor { def become(behavior: Receive, discardOld: Boolean): Unit = become(behavior.onMessage.asInstanceOf[PartialFunction[Any, Unit]], discardOld) + /** + * Gets the current receive timeout. + * When specified, the receive method should be able to handle a [[akka.actor.ReceiveTimeout]] message. + * + * *Warning*: This method is not thread-safe and must not be accessed from threads other + * than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks. + */ + def getReceiveTimeout(): java.time.Duration = { + import JavaDurationConverters._ + receiveTimeout.asJava + } + /** * Defines the inactivity timeout after which the sending of a [[akka.actor.ReceiveTimeout]] message is triggered. * When specified, the receive function should be able to handle a [[akka.actor.ReceiveTimeout]] message. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index daf395ba7b..673cba7a59 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,11 +13,10 @@ import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.Duration import scala.util.control.NonFatal import akka.actor.dungeon.ChildrenContainer -import akka.dispatch.Envelope +import akka.dispatch.{ Envelope, MessageDispatcher } import akka.dispatch.sysmsg._ import akka.event.Logging.{ Debug, Error, LogEvent } import akka.japi.Procedure -import akka.dispatch.MessageDispatcher import akka.util.{ Reflect, unused } import akka.annotation.InternalApi @@ -461,6 +460,12 @@ private[akka] class ActorCell( final def getParent() = parent // Java API final def getSystem() = system + // Java API + final override def getDispatcher(): ExecutionContextExecutor = dispatcher + // Java API + final override def getSelf(): ActorRef = self + // Java API + final override def getProps(): Props = props protected def stash(msg: SystemMessage): Unit = { assert(msg.unlinked) diff --git a/akka-docs/src/main/paradox/dispatchers.md b/akka-docs/src/main/paradox/dispatchers.md index 392ee5f009..fe2166e18f 100644 --- a/akka-docs/src/main/paradox/dispatchers.md +++ b/akka-docs/src/main/paradox/dispatchers.md @@ -227,12 +227,12 @@ implicit val executionContext: ExecutionContext = context.dispatcher @@@ div { .group-java } ```java -ExecutionContext ec = getContext().dispatcher(); +ExecutionContext ec = getContext().getDispatcher(); ``` @@@ -Using @scala[`context.dispatcher`] @java[`getContext().dispatcher()`] as the dispatcher on which the blocking `Future` +Using @scala[`context.dispatcher`] @java[`getContext().getDispatcher()`] as the dispatcher on which the blocking `Future` executes can be a problem, since this dispatcher is by default used for all other actor processing unless you @ref:[set up a separate dispatcher for the actor](dispatchers.md#setting-the-dispatcher-for-an-actor). diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md index 0788292644..d6bea3bcf8 100644 --- a/akka-docs/src/main/paradox/futures.md +++ b/akka-docs/src/main/paradox/futures.md @@ -45,7 +45,7 @@ dispatcher doubles as an `ExecutionContext`. If the nature of the Future calls invoked by the actor matches or is compatible with the activities of that actor (e.g. all CPU bound and no latency requirements), then it may be easiest to reuse the dispatcher for running the Futures by importing -@scala[`context.dispatcher`]@java[`getContext().dispatcher()`]. +@scala[`context.dispatcher`]@java[`getContext().getDispatcher()`]. Scala : @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #context-dispatcher } diff --git a/akka-docs/src/main/paradox/guide/tutorial_5.md b/akka-docs/src/main/paradox/guide/tutorial_5.md index e08b77f3d7..0bd34b680a 100644 --- a/akka-docs/src/main/paradox/guide/tutorial_5.md +++ b/akka-docs/src/main/paradox/guide/tutorial_5.md @@ -76,7 +76,7 @@ not used yet, the built-in scheduler facility. Using the scheduler is simple: * We get the scheduler from the `ActorSystem`, which, in turn, is accessible from the actor's context: @scala[`context.system.scheduler`]@java[`getContext().getSystem().scheduler()`]. This needs an @scala[implicit] `ExecutionContext` which is the thread-pool that will execute the timer task itself. In our case, we use the same dispatcher -as the actor by @scala[importing `import context.dispatcher`] @java[passing in `getContext().dispatcher()`]. +as the actor by @scala[importing `import context.dispatcher`] @java[passing in `getContext().getDispatcher()`]. * The @scala[`scheduler.scheduleOnce(time, actorRef, message)`] @java[`scheduler.scheduleOnce(time, actorRef, message, executor, sender)`] method will schedule the message `message` into the future by the specified `time` and send it to the actor `actorRef`. diff --git a/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java b/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java index 07630a1812..ddaf5ff83d 100644 --- a/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java +++ b/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java @@ -11,7 +11,7 @@ import scala.concurrent.Future; // #blocking-in-future class BlockingFutureActor extends AbstractActor { - ExecutionContext ec = getContext().dispatcher(); + ExecutionContext ec = getContext().getDispatcher(); @Override public Receive createReceive() { diff --git a/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java b/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java index 79db301c5b..b4c4b2dbf8 100644 --- a/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java +++ b/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java @@ -141,7 +141,7 @@ public class FaultHandlingDocSample { progressListener = getSender(); getContext().getSystem().scheduler().schedule( Duration.ZERO, Duration.ofSeconds(1L), getSelf(), Do, - getContext().dispatcher(), null + getContext().getDispatcher(), null ); }). matchEquals(Do, x -> { @@ -293,7 +293,7 @@ public class FaultHandlingDocSample { // Try to re-establish storage after while getContext().getSystem().scheduler().scheduleOnce( Duration.ofSeconds(10), getSelf(), Reconnect, - getContext().dispatcher(), null); + getContext().getDispatcher(), null); }). matchEquals(Reconnect, o -> { // Re-establish storage after the scheduled delay @@ -346,7 +346,7 @@ public class FaultHandlingDocSample { this.key = key; this.count = initialValue; } - + @Override public Receive createReceive() { return LoggingReceive.create(receiveBuilder(). diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java b/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java index 8ed131723a..a8740d4a81 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java @@ -26,7 +26,7 @@ public class DangerousJavaActor extends AbstractActor { public DangerousJavaActor() { this.breaker = new CircuitBreaker( - getContext().dispatcher(), getContext().system().scheduler(), + getContext().getDispatcher(), getContext().getSystem().getScheduler(), 5, Duration.ofSeconds(10), Duration.ofMinutes(1)) .addOnOpenListener(this::notifyMeOnOpen); } @@ -47,7 +47,7 @@ public class DangerousJavaActor extends AbstractActor { match(String.class, "is my middle name"::equals, m -> pipe( breaker.callWithCircuitBreakerCS(() -> CompletableFuture.supplyAsync(this::dangerousCall) - ), getContext().dispatcher() + ), getContext().getDispatcher() ).to(sender())) .match(String.class, "block for me"::equals, m -> { sender().tell(breaker diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java b/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java index 9187883b18..2c6912777f 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java @@ -17,7 +17,7 @@ public class EvenNoFailureJavaExample extends AbstractActor { public EvenNoFailureJavaExample() { this.breaker = new CircuitBreaker( - getContext().dispatcher(), getContext().system().scheduler(), + getContext().getDispatcher(), getContext().getSystem().getScheduler(), 5, Duration.ofSeconds(10), Duration.ofMinutes(1)); } diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java b/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java index ae05700e84..2f51fb39c0 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java @@ -21,7 +21,7 @@ public class TellPatternJavaActor extends AbstractActor { public TellPatternJavaActor(ActorRef targetActor) { this.target = targetActor; this.breaker = new CircuitBreaker( - getContext().dispatcher(), getContext().system().scheduler(), + getContext().getDispatcher(), getContext().getSystem().getScheduler(), 5, Duration.ofSeconds(10), Duration.ofMinutes(1)) .addOnOpenListener(this::notifyMeOnOpen); } @@ -34,16 +34,16 @@ public class TellPatternJavaActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() - .match(String.class, payload -> "call".equals(payload) && breaker.isClosed(), payload -> + .match(String.class, payload -> "call".equals(payload) && breaker.isClosed(), payload -> target.tell("message", self()) ) - .matchEquals("response", payload -> + .matchEquals("response", payload -> breaker.succeed() ) .match(Throwable.class, t -> breaker.fail() ) - .match(ReceiveTimeout.class, t -> + .match(ReceiveTimeout.class, t -> breaker.fail() ) .build(); diff --git a/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java b/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java index 6f23beb95d..b31f8f3d42 100644 --- a/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java +++ b/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java @@ -43,7 +43,7 @@ public class StatsSampleClient extends AbstractActor { .getSystem() .scheduler() .schedule(interval, interval, getSelf(), "tick", - getContext().dispatcher(), null); + getContext().getDispatcher(), null); } //subscribe to cluster changes, MemberEvent diff --git a/akka-docs/src/test/java/jdocs/ddata/DataBot.java b/akka-docs/src/test/java/jdocs/ddata/DataBot.java index c5f9e31792..9c1d03f34f 100644 --- a/akka-docs/src/test/java/jdocs/ddata/DataBot.java +++ b/akka-docs/src/test/java/jdocs/ddata/DataBot.java @@ -25,21 +25,21 @@ import akka.event.Logging; import akka.event.LoggingAdapter; public class DataBot extends AbstractActor { - + private static final String TICK = "tick"; - + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); - private final ActorRef replicator = + private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator(); private final Cluster node = Cluster.get(getContext().getSystem()); private final Cancellable tickTask = getContext().getSystem().scheduler().schedule( Duration.ofSeconds(5), Duration.ofSeconds(5), getSelf(), TICK, - getContext().dispatcher(), getSelf()); + getContext().getDispatcher(), getSelf()); private final Key> dataKey = ORSetKey.create("key"); - + @SuppressWarnings("unchecked") @Override public Receive createReceive() { @@ -57,18 +57,18 @@ public class DataBot extends AbstractActor { // add log.info("Adding: {}", s); Update> update = new Update<>( - dataKey, - ORSet.create(), - Replicator.writeLocal(), + dataKey, + ORSet.create(), + Replicator.writeLocal(), curr -> curr.add(node, s)); replicator.tell(update, getSelf()); } else { // remove log.info("Removing: {}", s); Update> update = new Update<>( - dataKey, - ORSet.create(), - Replicator.writeLocal(), + dataKey, + ORSet.create(), + Replicator.writeLocal(), curr -> curr.remove(node, s)); replicator.tell(update, getSelf()); } @@ -79,19 +79,19 @@ public class DataBot extends AbstractActor { ORSet data = c.dataValue(); log.info("Current elements: {}", data.getElements()); } - + private void receiveUpdateResponse() { // ignore } - + @Override public void preStart() { Subscribe> subscribe = new Subscribe<>(dataKey, getSelf()); replicator.tell(subscribe, ActorRef.noSender()); } - @Override + @Override public void postStop(){ tickTask.cancel(); } diff --git a/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java b/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java index fbdfadc60d..89f035f909 100644 --- a/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java +++ b/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java @@ -77,7 +77,7 @@ public class SupervisedAsk { targetActor.forward(askParam.message, getContext()); Scheduler scheduler = getContext().getSystem().scheduler(); timeoutMessage = scheduler.scheduleOnce(askParam.timeout, - getSelf(), new AskTimeout(), getContext().dispatcher(), null); + getSelf(), new AskTimeout(), getContext().getDispatcher(), null); }) .match(Terminated.class, message -> { Throwable ex = new ActorKilledException("Target actor terminated."); diff --git a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java index aa49703621..e6fbf128b5 100644 --- a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java @@ -54,9 +54,9 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { final Scheduler scheduler = getContext().getSystem().scheduler(); this.continueTask = scheduler .schedule(refreshInterval, refreshInterval, getSelf(), CONTINUE, - getContext().dispatcher(), getSelf()); + getContext().getDispatcher(), getSelf()); } - + @Override public Receive createReceive() { return receiveBuilder() @@ -70,7 +70,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { .build(); } - public static Props props(Connection conn, String tag, Long offset, + public static Props props(Connection conn, String tag, Long offset, Duration refreshInterval) { return Props.create(MyEventsByTagJavaPublisher.class, () -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); @@ -105,7 +105,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { final Long id = in.first(); final byte[] bytes = in.second(); - final PersistentRepr p = + final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); return new EventEnvelope(Offset.sequence(id), p.persistenceId(), p.sequenceNr(), p.payload()); diff --git a/akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java b/akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java index d0fde256cd..c31d78dbd4 100644 --- a/akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java +++ b/akka-docs/src/test/java/jdocs/tutorial_5/DeviceGroupQuery.java @@ -40,7 +40,7 @@ public class DeviceGroupQuery extends AbstractActor { this.requester = requester; queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( - timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() + timeout, getSelf(), new CollectionTimeout(), getContext().getDispatcher(), getSelf() ); }