diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 68485fc57f..ad3f5e2870 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -130,7 +130,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt Thread.sleep(50.millis.dilated.toMillis) callbackWasRun = true } - + import system.dispatcher system2.scheduler.scheduleOnce(200.millis.dilated) { system2.shutdown() } system2.awaitTermination(5 seconds) @@ -164,6 +164,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt "reliable deny creation of actors while shutting down" in { val system = ActorSystem() + import system.dispatcher system.scheduler.scheduleOnce(200 millis) { system.shutdown() } var failing = false var created = Vector.empty[ActorRef] diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 7cbbeb4164..8d1d2fa965 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { private val cancellables = new ConcurrentLinkedQueue[Cancellable]() + import system.dispatcher def collectCancellable(c: Cancellable): Cancellable = { cancellables.add(c) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index eee5049d3f..488606f9bc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -307,7 +307,7 @@ object SupervisorHierarchySpec { case Fail(ref, dir) ⇒ ref ! Failure(dir, Vector.empty) } if (idleChildren.nonEmpty) self ! Work(x - 1) - else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1)) + else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))(context.dispatcher) stay case Event(Work(_), _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing) case Event("pong", _) ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index ef23305b83..d10f7ba29c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -610,22 +610,13 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, * cannot schedule a task. Once scheduled, the task MUST be executed. If * executed upon close(), the task may execute before its timeout. */ - protected def createScheduler(): Scheduler = { - val hwt = new HashedWheelTimer(log, - threadFactory.copy(threadFactory.name + "-scheduler"), - settings.SchedulerTickDuration, - settings.SchedulerTicksPerWheel) - // note that dispatcher is by-name parameter in DefaultScheduler constructor, - // because dispatcher is not initialized when the scheduler is created - def safeDispatcher = dispatcher match { - case null ⇒ - val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized") - log.error(exc, exc.getMessage) - throw exc - case dispatcher ⇒ dispatcher - } - new DefaultScheduler(hwt, log, safeDispatcher) - } + protected def createScheduler(): Scheduler = + new DefaultScheduler( + new HashedWheelTimer(log, + threadFactory.copy(threadFactory.name + "-scheduler"), + settings.SchedulerTickDuration, + settings.SchedulerTicksPerWheel), + log) /* * This is called after the last actor has signaled its termination, i.e. diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index bc27b497b3..a42066d0d2 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -87,22 +87,19 @@ object FSM { /** * Internal API */ - private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) { + private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) { private var ref: Option[Cancellable] = _ + private val scheduler = context.system.scheduler + private implicit val executionContext = context.dispatcher - def schedule(actor: ActorRef, timeout: Duration) { - if (repeat) { - ref = Some(system.scheduler.schedule(timeout, timeout, actor, this)) - } else { - ref = Some(system.scheduler.scheduleOnce(timeout, actor, this)) - } - } + def schedule(actor: ActorRef, timeout: Duration): Unit = + ref = Some( + if (repeat) scheduler.schedule(timeout, timeout, actor, this) + else scheduler.scheduleOnce(timeout, actor, this)) - def cancel { - if (ref.isDefined) { - ref.get.cancel() - ref = None - } + def cancel(): Unit = if (ref.isDefined) { + ref.get.cancel() + ref = None } } @@ -348,7 +345,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { if (timers contains name) { timers(name).cancel } - val timer = Timer(name, msg, repeat, timerGen.next)(context.system) + val timer = Timer(name, msg, repeat, timerGen.next)(context) timer.schedule(self, timeout) timers(name) = timer stay @@ -601,6 +598,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { + import context.dispatcher timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index c088fbce51..42f2a10604 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -12,6 +12,7 @@ import java.io.Closeable import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import akka.util.internal._ +import concurrent.ExecutionContext //#scheduler /** @@ -36,7 +37,7 @@ trait Scheduler { initialDelay: Duration, frequency: Duration, receiver: ActorRef, - message: Any): Cancellable + message: Any)(implicit executor: ExecutionContext): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and a @@ -47,7 +48,7 @@ trait Scheduler { * Scala API */ def schedule( - initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable + initialDelay: Duration, frequency: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and @@ -58,7 +59,7 @@ trait Scheduler { * Java API */ def schedule( - initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable + initialDelay: Duration, frequency: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable /** * Schedules a Runnable to be run once with a delay, i.e. a time period that @@ -66,7 +67,7 @@ trait Scheduler { * * Java & Scala API */ - def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable + def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable /** * Schedules a message to be sent once with a delay, i.e. a time period that has @@ -74,7 +75,7 @@ trait Scheduler { * * Java & Scala API */ - def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable /** * Schedules a function to be run once with a delay, i.e. a time period that has @@ -82,7 +83,7 @@ trait Scheduler { * * Scala API */ - def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable + def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable } //#scheduler @@ -118,76 +119,61 @@ trait Cancellable { * if it does not enqueue a task. Once a task is queued, it MUST be executed or * returned from stop(). */ -class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, - log: LoggingAdapter, - dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable { - - override def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = { +class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable { + override def schedule(initialDelay: Duration, + delay: Duration, + receiver: ActorRef, + message: Any)(implicit executor: ExecutionContext): Cancellable = { val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( new TimerTask with ContinuousScheduling { def run(timeout: HWTimeout) { - receiver ! message - // Check if the receiver is still alive and kicking before reschedule the task - if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor has been terminated.") - else scheduleNext(timeout, delay, continuousCancellable) + executor execute new Runnable { + override def run = { + receiver ! message + // Check if the receiver is still alive and kicking before reschedule the task + if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver) + else scheduleNext(timeout, delay, continuousCancellable) + } + } } }, initialDelay)) } - override def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = { - val continuousCancellable = new ContinuousCancellable - continuousCancellable.init( - hashedWheelTimer.newTimeout( - new TimerTask with ContinuousScheduling with Runnable { - def run = f - def run(timeout: HWTimeout) { - dispatcher.execute(this) - scheduleNext(timeout, delay, continuousCancellable) - } - }, - initialDelay)) - } + override def schedule(initialDelay: Duration, + delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = + schedule(initialDelay, delay, new Runnable { override def run = f }) - override def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = { + override def schedule(initialDelay: Duration, + delay: Duration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( new TimerTask with ContinuousScheduling { - def run(timeout: HWTimeout) { - dispatcher.execute(runnable) - scheduleNext(timeout, delay, continuousCancellable) - } + override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable { + override def run = { + runnable.run() + scheduleNext(timeout, delay, continuousCancellable) + } + }) }, initialDelay)) } - override def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = + override def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = new DefaultCancellable( hashedWheelTimer.newTimeout( - new TimerTask() { - def run(timeout: HWTimeout): Unit = dispatcher.execute(runnable) - }, + new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) }, delay)) - override def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = - new DefaultCancellable( - hashedWheelTimer.newTimeout( - new TimerTask { - def run(timeout: HWTimeout): Unit = receiver ! message - }, - delay)) + override def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = + scheduleOnce(delay, new Runnable { override def run = receiver ! message }) - override def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = - new DefaultCancellable( - hashedWheelTimer.newTimeout( - new TimerTask with Runnable { - def run = f - def run(timeout: HWTimeout): Unit = dispatcher.execute(this) - }, - delay)) + override def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = + scheduleOnce(delay, new Runnable { override def run = f }) private trait ContinuousScheduling { this: TimerTask ⇒ def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) { diff --git a/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala index 4fd46413ce..c04d485262 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala @@ -40,7 +40,7 @@ private[akka] trait ReceiveTimeout { this: ActorCell ⇒ if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)) + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)(this.dispatcher)) } else cancelReceiveTimeout() } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index a8c8d91d57..b16b91a9fd 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -249,7 +249,10 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext private def scheduleShutdownAction(): Unit = { // IllegalStateException is thrown if scheduler has been shutdown - try scheduler.scheduleOnce(shutdownTimeout, shutdownAction) catch { + try scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { + override def execute(runnable: Runnable): Unit = runnable.run() + override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t) + }) catch { case _: IllegalStateException ⇒ shutdown() } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 6ffbc98f00..42498c43ca 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.forkjoin.ThreadLocalRandom import akka.dispatch.Dispatchers import scala.annotation.tailrec +import concurrent.ExecutionContext /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -1221,7 +1222,7 @@ case class DefaultResizer( } else if (requestedCapacity < 0) { val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity) routeeProvider.unregisterRoutees(abandon) - delayedStop(routeeProvider.context.system.scheduler, abandon) + delayedStop(routeeProvider.context.system.scheduler, abandon)(routeeProvider.context.dispatcher) } } @@ -1229,7 +1230,8 @@ case class DefaultResizer( * Give concurrent messages a chance to be placed in mailbox before * sending PoisonPill. */ - protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]): Unit = { + protected def delayedStop(scheduler: Scheduler, + abandon: IndexedSeq[ActorRef])(implicit executor: ExecutionContext): Unit = { if (abandon.nonEmpty) { if (stopDelay <= Duration.Zero) { abandon foreach (_ ! PoisonPill) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 796b39af52..bf9c2945cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -15,7 +15,6 @@ import akka.pattern._ import akka.remote._ import akka.routing._ import akka.util._ -import scala.concurrent.Await import scala.concurrent.util.duration._ import scala.concurrent.util.{ Duration, Deadline } import scala.concurrent.forkjoin.ThreadLocalRandom @@ -27,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import akka.util.internal.HashedWheelTimer +import concurrent.{ ExecutionContext, Await } /** * Cluster Extension Id and factory for creating Cluster extension. @@ -121,31 +121,41 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis) - val threadFactory = system.threadFactory match { - case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") - case tf ⇒ tf - } - val hwt = new HashedWheelTimer(log, - threadFactory, - SchedulerTickDuration, SchedulerTicksPerWheel) - new DefaultScheduler(hwt, log, system.dispatcher) + new DefaultScheduler( + new HashedWheelTimer(log, + system.threadFactory match { + case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") + case tf ⇒ tf + }, + SchedulerTickDuration, + SchedulerTicksPerWheel), + log) } else { - // delegate to system.scheduler, but don't close + // delegate to system.scheduler, but don't close over system val systemScheduler = system.scheduler new Scheduler with Closeable { - // we are using system.scheduler, which we are not responsible for closing - def close(): Unit = () - def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable = + override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing + + override def schedule(initialDelay: Duration, frequency: Duration, + receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = systemScheduler.schedule(initialDelay, frequency, receiver, message) - def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable = + + override def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = systemScheduler.schedule(initialDelay, frequency)(f) - def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable = + + override def schedule(initialDelay: Duration, frequency: Duration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = systemScheduler.schedule(initialDelay, frequency, runnable) - def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = + + override def scheduleOnce(delay: Duration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay, runnable) - def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = + + override def scheduleOnce(delay: Duration, receiver: ActorRef, + message: Any)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay, receiver, message) - def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = + + override def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay)(f) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5e3e54561e..77fecff161 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -178,6 +178,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). withDispatcher(UseDispatcher), name = "coreSender") + import context.dispatcher + // start periodic gossip to random nodes in cluster val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala index 118785ef18..396f5127ad 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala @@ -8,12 +8,15 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import akka.actor.{ Scheduler, Cancellable } import scala.concurrent.util.Duration +import concurrent.ExecutionContext /** * INTERNAL API */ private[akka] object FixedRateTask { - def apply(scheduler: Scheduler, initalDelay: Duration, delay: Duration)(f: ⇒ Unit): FixedRateTask = + def apply(scheduler: Scheduler, + initalDelay: Duration, + delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask = new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f }) } @@ -24,7 +27,10 @@ private[akka] object FixedRateTask { * for inaccuracy in scheduler. It will start when constructed, using the * initialDelay. */ -private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable) +private[akka] class FixedRateTask(scheduler: Scheduler, + initalDelay: Duration, + delay: Duration, + task: Runnable)(implicit executor: ExecutionContext) extends Runnable with Cancellable { private val delayNanos = delay.toNanos diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala index 98634b0787..e6590cf9c3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala @@ -12,7 +12,7 @@ import scala.concurrent.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FixedRateTaskSpec extends AkkaSpec { - + import system.dispatcher "Task scheduled at fixed rate" must { "adjust for scheduler inaccuracy" taggedAs TimingTest in { val startTime = System.nanoTime diff --git a/akka-docs/java/code/docs/actor/SchedulerDocTestBase.java b/akka-docs/java/code/docs/actor/SchedulerDocTestBase.java index 4a98d66338..a5837ac85c 100644 --- a/akka-docs/java/code/docs/actor/SchedulerDocTestBase.java +++ b/akka-docs/java/code/docs/actor/SchedulerDocTestBase.java @@ -46,7 +46,7 @@ public class SchedulerDocTestBase { public void scheduleOneOffTask() { //#schedule-one-off-message //Schedules to send the "foo"-message to the testActor after 50ms - system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), testActor, "foo"); + system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), testActor, "foo", system.dispatcher()); //#schedule-one-off-message //#schedule-one-off-thunk @@ -56,7 +56,7 @@ public class SchedulerDocTestBase { public void run() { testActor.tell(System.currentTimeMillis()); } - }); + }, system.dispatcher()); //#schedule-one-off-thunk } @@ -80,7 +80,7 @@ public class SchedulerDocTestBase { //This will schedule to send the Tick-message //to the tickActor after 0ms repeating every 50ms Cancellable cancellable = system.scheduler().schedule(Duration.Zero(), Duration.create(50, TimeUnit.MILLISECONDS), - tickActor, "Tick"); + tickActor, "Tick", system.dispatcher()); //This cancels further Ticks to be sent cancellable.cancel(); diff --git a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java index b7338830e4..ff1e243aaf 100644 --- a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -138,7 +138,9 @@ public class FaultHandlingDocSample { log.debug("received message {}", msg); if (msg.equals(Start) && progressListener == null) { progressListener = getSender(); - getContext().system().scheduler().schedule(Duration.Zero(), Duration.parse("1 second"), getSelf(), Do); + getContext().system().scheduler().schedule( + Duration.Zero(), Duration.parse("1 second"), getSelf(), Do, getContext().dispatcher() + ); } else if (msg.equals(Do)) { counterService.tell(new Increment(1), getSelf()); counterService.tell(new Increment(1), getSelf()); @@ -296,7 +298,9 @@ public class FaultHandlingDocSample { // Tell the counter that there is no storage for the moment counter.tell(new UseStorage(null), getSelf()); // Try to re-establish storage after while - getContext().system().scheduler().scheduleOnce(Duration.parse("10 seconds"), getSelf(), Reconnect); + getContext().system().scheduler().scheduleOnce( + Duration.parse("10 seconds"), getSelf(), Reconnect, getContext().dispatcher() + ); } else if (msg.equals(Reconnect)) { // Re-establish storage after the scheduled delay initStorage(); diff --git a/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java b/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java index a24e1680cd..f7b246bc57 100644 --- a/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java +++ b/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java @@ -187,7 +187,7 @@ public class ZeromqDocTestBase { @Override public void preStart() { getContext().system().scheduler() - .schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK); + .schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK, getContext().dispatcher()); } @Override diff --git a/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala b/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala index 1e4dc4f6ab..1e7f8afdb0 100644 --- a/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala +++ b/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala @@ -91,6 +91,7 @@ class Worker extends Actor with ActorLogging { var progressListener: Option[ActorRef] = None val counterService = context.actorOf(Props[CounterService], name = "counter") val totalCount = 51 + import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext def receive = LoggingReceive { case Start if progressListener.isEmpty ⇒ @@ -103,7 +104,6 @@ class Worker extends Actor with ActorLogging { counterService ! Increment(1) // Send current progress to the initial sender - import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext counterService ? GetCurrentCount map { case CurrentCount(_, count) ⇒ Progress(100.0 * count / totalCount) } pipeTo progressListener.get @@ -143,6 +143,8 @@ class CounterService extends Actor { var backlog = IndexedSeq.empty[(ActorRef, Any)] val MaxBacklog = 10000 + import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext + override def preStart() { initStorage() } diff --git a/akka-docs/scala/code/docs/actor/SchedulerDocSpec.scala b/akka-docs/scala/code/docs/actor/SchedulerDocSpec.scala index 3d3755bd5d..2d76628089 100644 --- a/akka-docs/scala/code/docs/actor/SchedulerDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/SchedulerDocSpec.scala @@ -19,6 +19,9 @@ import akka.testkit._ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "schedule a one-off task" in { //#schedule-one-off-message + //Use the system's dispatcher as ExecutionContext + import system.dispatcher + //Schedules to send the "foo"-message to the testActor after 50ms system.scheduler.scheduleOnce(50 milliseconds, testActor, "foo") //#schedule-one-off-message @@ -42,6 +45,9 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { case Tick ⇒ //Do something } })) + //Use system's dispatcher as ExecutionContext + import system.dispatcher + //This will schedule to send the Tick-message //to the tickActor after 0ms repeating every 50ms val cancellable = diff --git a/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala index 1702a8e89a..8d8865dd44 100644 --- a/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ b/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala @@ -33,6 +33,7 @@ object ZeromqDocSpec { val memory = ManagementFactory.getMemoryMXBean val os = ManagementFactory.getOperatingSystemMXBean val ser = SerializationExtension(context.system) + import context.dispatcher override def preStart() { context.system.scheduler.schedule(1 second, 1 second, self, Tick) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 749f6d040c..41db33d629 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -8,7 +8,6 @@ import language.postfixOps import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler } import RemoteConnection.getAddrString import scala.concurrent.util.{ Duration, Deadline } -import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.util.Timeout import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent, WriteCompletionEvent, ExceptionEvent } @@ -16,11 +15,11 @@ import com.typesafe.config.ConfigFactory import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeoutException import akka.pattern.{ ask, pipe, AskTimeoutException } -import scala.concurrent.Future import scala.util.control.NoStackTrace import akka.event.{ LoggingAdapter, Logging } import java.net.{ InetSocketAddress, ConnectException } import scala.reflect.classTag +import concurrent.{ ExecutionContext, Await, Future } /** * The Player is the client component of the @@ -143,7 +142,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) val settings = TestConductor().Settings val handler = new PlayerHandler(controllerAddr, settings.ClientReconnects, settings.ReconnectBackoff, - self, Logging(context.system, "PlayerHandler"), context.system.scheduler) + self, Logging(context.system, "PlayerHandler"), context.system.scheduler)(context.dispatcher) startWith(Connecting, Data(None, None)) @@ -256,7 +255,7 @@ private[akka] class PlayerHandler( backoff: Duration, fsm: ActorRef, log: LoggingAdapter, - scheduler: Scheduler) + scheduler: Scheduler)(implicit executor: ExecutionContext) extends SimpleChannelUpstreamHandler { import ClientFSM._ diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 945f0b6df3..fbef659a7b 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -187,6 +187,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A { (msg: PollMsg) ⇒ // for negative timeout values, schedule Poll token -duration into the future + import context.dispatcher context.system.scheduler.scheduleOnce(d, self, msg) () } diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index 4a0864ea1b..d5d9370a2f 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -47,6 +47,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { val context = Context() val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint)) val subscriber = zmq.newSocket(SocketType.Sub, context, Listener(subscriberProbe.ref), Connect(endpoint), SubscribeAll) + import system.dispatcher val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable { var number = 0 def run() { @@ -130,6 +131,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { var genMessages: Cancellable = null override def preStart() = { + import system.dispatcher genMessages = system.scheduler.schedule(100 millis, 10 millis, self, "genMessage") }