diff --git a/akka-actor/src/main/java/akka/actor/AbstractScheduler.java b/akka-actor/src/main/java/akka/actor/AbstractScheduler.java index 98af2c2863..f42ed146e8 100644 --- a/akka-actor/src/main/java/akka/actor/AbstractScheduler.java +++ b/akka-actor/src/main/java/akka/actor/AbstractScheduler.java @@ -4,6 +4,7 @@ package akka.actor; +import akka.util.JavaDurationConverters; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -33,7 +34,26 @@ public abstract class AbstractScheduler extends AbstractSchedulerBase { */ @Override public abstract Cancellable schedule(FiniteDuration initialDelay, - FiniteDuration interval, Runnable runnable, ExecutionContext executor); + FiniteDuration interval, + Runnable runnable, + ExecutionContext executor); + + /** + * Schedules a function to be run repeatedly with an initial delay and + * a frequency. E.g. if you would like the function to be run after 2 + * seconds and thereafter every 100ms you would set delay = Duration(2, + * TimeUnit.SECONDS) and interval = Duration.ofMillis(100) + */ + public Cancellable schedule(final java.time.Duration initialDelay, + final java.time.Duration interval, + final Runnable runnable, + final ExecutionContext executor) { + return schedule( + JavaDurationConverters.asFiniteDuration(initialDelay), + JavaDurationConverters.asFiniteDuration(interval), + runnable, + executor); + } /** * Schedules a Runnable to be run once with a delay, i.e. a time period that @@ -41,7 +61,17 @@ public abstract class AbstractScheduler extends AbstractSchedulerBase { */ @Override public abstract Cancellable scheduleOnce(FiniteDuration delay, Runnable runnable, - ExecutionContext executor); + ExecutionContext executor); + + /** + * Schedules a Runnable to be run once with a delay, i.e. a time period that + * has to pass before the runnable is executed. + */ + public Cancellable scheduleOnce(final java.time.Duration delay, + final Runnable runnable, + final ExecutionContext executor) { + return scheduleOnce(JavaDurationConverters.asFiniteDuration(delay), runnable, executor); + } /** * The maximum supported task frequency of this scheduler, i.e. the inverse diff --git a/akka-actor/src/main/mima-filters/2.5.11.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.11.backwards.excludes index 512079b488..628eb061fc 100644 --- a/akka-actor/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.11.backwards.excludes @@ -1,3 +1,7 @@ # Internal API changes ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.util.JavaDurationConverters#JavaDurationOps.asScala") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.util.JavaDurationConverters#JavaDurationOps.asScala$extension") + +# For Scheduler +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.Scheduler.schedule") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.Scheduler.scheduleOnce") diff --git a/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala b/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala index 676e4e168f..0a81b737cc 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala @@ -4,7 +4,7 @@ package akka.actor -import akka.annotation.ApiMayChange +import akka.util.JavaDurationConverters import scala.concurrent.duration.FiniteDuration @@ -31,10 +31,11 @@ object AbstractFSM { * */ abstract class AbstractFSM[S, D] extends FSM[S, D] { - import akka.japi.pf._ - import akka.japi.pf.FI._ import java.util.{ List ⇒ JList } + import FSM._ + import akka.japi.pf.FI._ + import akka.japi.pf._ /** * Returns this AbstractActor's ActorContext @@ -80,7 +81,7 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] { * @param stateFunctionBuilder partial function builder describing response to input */ final def when(stateName: S, stateFunctionBuilder: FSMStateFunctionBuilder[S, D]): Unit = - when(stateName, null, stateFunctionBuilder) + when(stateName, null.asInstanceOf[FiniteDuration], stateFunctionBuilder) /** * Insert a new StateFunction at the end of the processing chain for the @@ -98,6 +99,24 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] { stateFunctionBuilder: FSMStateFunctionBuilder[S, D]): Unit = super.when(stateName, stateTimeout)(stateFunctionBuilder.build()) + /** + * Insert a new StateFunction at the end of the processing chain for the + * given state. If the stateTimeout parameter is set, entering this state + * without a differing explicit timeout setting will trigger a StateTimeout + * event; the same is true when using #stay. + * + * @param stateName designator for the state + * @param stateTimeout default state timeout for this state + * @param stateFunctionBuilder partial function builder describing response to input + */ + final def when( + stateName: S, + stateTimeout: java.time.Duration, + stateFunctionBuilder: FSMStateFunctionBuilder[S, D]): Unit = { + import JavaDurationConverters._ + when(stateName, stateTimeout.asScala, stateFunctionBuilder) + } + /** * Set initial state. Call this method from the constructor before the [[#initialize]] method. * If different state is needed after a restart this method, followed by [[#initialize]], can @@ -121,6 +140,20 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] { final def startWith(stateName: S, stateData: D, timeout: FiniteDuration): Unit = super.startWith(stateName, stateData, Option(timeout)) + /** + * Set initial state. Call this method from the constructor before the [[#initialize]] method. + * If different state is needed after a restart this method, followed by [[#initialize]], can + * be used in the actor life cycle hooks [[akka.actor.Actor#preStart]] and [[akka.actor.Actor#postRestart]]. + * + * @param stateName initial state designator + * @param stateData initial state data + * @param timeout state timeout for the initial state, overriding the default timeout for that state + */ + final def startWith(stateName: S, stateData: D, timeout: java.time.Duration): Unit = { + import JavaDurationConverters._ + startWith(stateName, stateData, timeout.asScala) + } + /** * Add a handler which is called upon each state transition, i.e. not when * staying in the same state. @@ -386,7 +419,34 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] { * @param timeout delay of first message delivery and between subsequent messages */ final def setTimer(name: String, msg: Any, timeout: FiniteDuration): Unit = - setTimer(name, msg, timeout, false) + setTimer(name, msg, timeout, repeat = false) + + /** + * Schedule named timer to deliver message after given delay, possibly repeating. + * Any existing timer with the same name will automatically be canceled before + * adding the new timer. + * @param name identifier to be used with cancelTimer() + * @param msg message to be delivered + * @param timeout delay of first message delivery and between subsequent messages + */ + final def setTimer(name: String, msg: Any, timeout: java.time.Duration): Unit = { + import JavaDurationConverters._ + setTimer(name, msg, timeout.asScala) + } + + /** + * Schedule named timer to deliver message after given delay, possibly repeating. + * Any existing timer with the same name will automatically be canceled before + * adding the new timer. + * @param name identifier to be used with cancelTimer() + * @param msg message to be delivered + * @param timeout delay of first message delivery and between subsequent messages + * @param repeat send once if false, scheduleAtFixedRate if true + */ + final def setTimer(name: String, msg: Any, timeout: java.time.Duration, repeat: Boolean): Unit = { + import JavaDurationConverters._ + setTimer(name, msg, timeout.asScala, repeat) + } /** * Default reason if calling `stop()`. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 584059c609..4501dcbc90 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -17,8 +17,7 @@ import java.util.regex.Pattern import akka.pattern.ask import akka.routing.MurmurHash -import akka.util.Helpers -import akka.util.Timeout +import akka.util.{ Helpers, JavaDurationConverters, Timeout } import akka.dispatch.ExecutionContexts import scala.compat.java8.FutureConverters @@ -97,6 +96,21 @@ abstract class ActorSelection extends Serializable { def resolveOneCS(timeout: FiniteDuration): CompletionStage[ActorRef] = FutureConverters.toJava[ActorRef](resolveOne(timeout)) + /** + * Java API for [[#resolveOne]] + * + * Resolve the [[ActorRef]] matching this selection. + * The result is returned as a CompletionStage that is completed with the [[ActorRef]] + * if such an actor exists. It is completed with failure [[ActorNotFound]] if + * no such actor exists or the identification didn't complete within the + * supplied `timeout`. + * + */ + def resolveOneCS(timeout: java.time.Duration): CompletionStage[ActorRef] = { + import JavaDurationConverters._ + resolveOneCS(timeout.asScala) + } + override def toString: String = { val builder = new java.lang.StringBuilder() builder.append("ActorSelection[Anchor(").append(anchor.path) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 02dd87084b..19b66f97a5 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -4,6 +4,8 @@ package akka.actor +import akka.util.JavaDurationConverters + import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -53,13 +55,31 @@ trait Scheduler { executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = schedule(initialDelay, interval, new Runnable { - def run = { + def run(): Unit = { receiver ! message if (receiver.isTerminated) - throw new SchedulerException("timer active for terminated actor") + throw SchedulerException("timer active for terminated actor") } }) + /** + * Schedules a message to be sent repeatedly with an initial delay and + * frequency. E.g. if you would like a message to be sent immediately and + * thereafter every 500ms you would set delay=Duration.Zero and + * interval=Duration(500, TimeUnit.MILLISECONDS) + * + * Java API + */ + final def schedule( + initialDelay: java.time.Duration, + interval: java.time.Duration, + receiver: ActorRef, + message: Any, + executor: ExecutionContext, + sender: ActorRef): Cancellable = { + import JavaDurationConverters._ + schedule(initialDelay.asScala, interval.asScala, receiver, message)(executor, sender) + } /** * Schedules a function to be run repeatedly with an initial delay and a * frequency. E.g. if you would like the function to be run after 2 seconds @@ -80,7 +100,7 @@ trait Scheduler { interval: FiniteDuration)(f: ⇒ Unit)( implicit executor: ExecutionContext): Cancellable = - schedule(initialDelay, interval, new Runnable { override def run = f }) + schedule(initialDelay, interval, new Runnable { override def run(): Unit = f }) /** * Schedules a `Runnable` to be run repeatedly with an initial delay and @@ -106,6 +126,33 @@ trait Scheduler { interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable + /** + * Schedules a `Runnable` to be run repeatedly with an initial delay and + * a frequency. E.g. if you would like the function to be run after 2 + * seconds and thereafter every 100ms you would set delay = Duration(2, + * TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS). If + * the execution of the runnable takes longer than the interval, the + * subsequent execution will start immediately after the prior one completes + * (there will be no overlap of executions of the runnable). In such cases, + * the actual execution interval will differ from the interval passed to this + * method. + * + * If the `Runnable` throws an exception the repeated scheduling is aborted, + * i.e. the function will not be invoked any more. + * + * @throws IllegalArgumentException if the given delays exceed the maximum + * reach (calculated as: `delay / tickNanos > Int.MaxValue`). + * + * Java API + */ + def schedule( + initialDelay: java.time.Duration, + interval: java.time.Duration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { + import JavaDurationConverters._ + schedule(initialDelay.asScala, interval.asScala, runnable) + } + /** * Schedules a message to be sent once with a delay, i.e. a time period that has * to pass before the message is sent. @@ -122,9 +169,28 @@ trait Scheduler { executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = scheduleOnce(delay, new Runnable { - override def run = receiver ! message + override def run(): Unit = receiver ! message }) + /** + * Schedules a message to be sent once with a delay, i.e. a time period that has + * to pass before the message is sent. + * + * @throws IllegalArgumentException if the given delays exceed the maximum + * reach (calculated as: `delay / tickNanos > Int.MaxValue`). + * + * Java API + */ + final def scheduleOnce( + delay: java.time.Duration, + receiver: ActorRef, + message: Any, + executor: ExecutionContext, + sender: ActorRef): Cancellable = { + import JavaDurationConverters._ + scheduleOnce(delay.asScala, receiver, message)(executor, sender) + } + /** * Schedules a function to be run once with a delay, i.e. a time period that has * to pass before the function is run. @@ -137,7 +203,7 @@ trait Scheduler { final def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)( implicit executor: ExecutionContext): Cancellable = - scheduleOnce(delay, new Runnable { override def run = f }) + scheduleOnce(delay, new Runnable { override def run(): Unit = f }) /** * Schedules a Runnable to be run once with a delay, i.e. a time period that @@ -152,6 +218,21 @@ trait Scheduler { delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable + /** + * Schedules a Runnable to be run once with a delay, i.e. a time period that + * has to pass before the runnable is executed. + * + * @throws IllegalArgumentException if the given delays exceed the maximum + * reach (calculated as: `delay / tickNanos > Int.MaxValue`). + * + * Java & Scala API + */ + def scheduleOnce( + delay: java.time.Duration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { + import JavaDurationConverters._ + scheduleOnce(delay.asScala, runnable)(executor) + } /** * The maximum supported task frequency of this scheduler, i.e. the inverse * of the minimum time interval between executions of a recurring task, in Hz. diff --git a/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala b/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala index d873b1b4c7..3c179ebfcd 100644 --- a/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala +++ b/akka-actor/src/main/scala/akka/util/JavaDurationConverters.scala @@ -3,11 +3,14 @@ */ package akka.util import java.time.{ Duration ⇒ JDuration } + import scala.concurrent.duration.{ Duration, FiniteDuration } /** * INTERNAL API */ private[akka] object JavaDurationConverters { + def asFiniteDuration(duration: JDuration): FiniteDuration = duration.asScala + final implicit class JavaDurationOps(val self: JDuration) extends AnyVal { def asScala: FiniteDuration = Duration.fromNanos(self.toNanos) }