Making it possible/mandatory to signal which ExecutionContext will actually execute something scheduled
This commit is contained in:
parent
a8f648ef1c
commit
9d097bcf50
22 changed files with 135 additions and 120 deletions
|
|
@ -130,7 +130,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
|||
Thread.sleep(50.millis.dilated.toMillis)
|
||||
callbackWasRun = true
|
||||
}
|
||||
|
||||
import system.dispatcher
|
||||
system2.scheduler.scheduleOnce(200.millis.dilated) { system2.shutdown() }
|
||||
|
||||
system2.awaitTermination(5 seconds)
|
||||
|
|
@ -164,6 +164,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
|||
|
||||
"reliable deny creation of actors while shutting down" in {
|
||||
val system = ActorSystem()
|
||||
import system.dispatcher
|
||||
system.scheduler.scheduleOnce(200 millis) { system.shutdown() }
|
||||
var failing = false
|
||||
var created = Vector.empty[ActorRef]
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
|
||||
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
|
||||
import system.dispatcher
|
||||
|
||||
def collectCancellable(c: Cancellable): Cancellable = {
|
||||
cancellables.add(c)
|
||||
|
|
|
|||
|
|
@ -307,7 +307,7 @@ object SupervisorHierarchySpec {
|
|||
case Fail(ref, dir) ⇒ ref ! Failure(dir, Vector.empty)
|
||||
}
|
||||
if (idleChildren.nonEmpty) self ! Work(x - 1)
|
||||
else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))
|
||||
else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))(context.dispatcher)
|
||||
stay
|
||||
case Event(Work(_), _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
|
||||
case Event("pong", _) ⇒
|
||||
|
|
|
|||
|
|
@ -610,22 +610,13 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
|||
* cannot schedule a task. Once scheduled, the task MUST be executed. If
|
||||
* executed upon close(), the task may execute before its timeout.
|
||||
*/
|
||||
protected def createScheduler(): Scheduler = {
|
||||
val hwt = new HashedWheelTimer(log,
|
||||
protected def createScheduler(): Scheduler =
|
||||
new DefaultScheduler(
|
||||
new HashedWheelTimer(log,
|
||||
threadFactory.copy(threadFactory.name + "-scheduler"),
|
||||
settings.SchedulerTickDuration,
|
||||
settings.SchedulerTicksPerWheel)
|
||||
// note that dispatcher is by-name parameter in DefaultScheduler constructor,
|
||||
// because dispatcher is not initialized when the scheduler is created
|
||||
def safeDispatcher = dispatcher match {
|
||||
case null ⇒
|
||||
val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized")
|
||||
log.error(exc, exc.getMessage)
|
||||
throw exc
|
||||
case dispatcher ⇒ dispatcher
|
||||
}
|
||||
new DefaultScheduler(hwt, log, safeDispatcher)
|
||||
}
|
||||
settings.SchedulerTicksPerWheel),
|
||||
log)
|
||||
|
||||
/*
|
||||
* This is called after the last actor has signaled its termination, i.e.
|
||||
|
|
|
|||
|
|
@ -87,24 +87,21 @@ object FSM {
|
|||
/**
|
||||
* Internal API
|
||||
*/
|
||||
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) {
|
||||
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) {
|
||||
private var ref: Option[Cancellable] = _
|
||||
private val scheduler = context.system.scheduler
|
||||
private implicit val executionContext = context.dispatcher
|
||||
|
||||
def schedule(actor: ActorRef, timeout: Duration) {
|
||||
if (repeat) {
|
||||
ref = Some(system.scheduler.schedule(timeout, timeout, actor, this))
|
||||
} else {
|
||||
ref = Some(system.scheduler.scheduleOnce(timeout, actor, this))
|
||||
}
|
||||
}
|
||||
def schedule(actor: ActorRef, timeout: Duration): Unit =
|
||||
ref = Some(
|
||||
if (repeat) scheduler.schedule(timeout, timeout, actor, this)
|
||||
else scheduler.scheduleOnce(timeout, actor, this))
|
||||
|
||||
def cancel {
|
||||
if (ref.isDefined) {
|
||||
def cancel(): Unit = if (ref.isDefined) {
|
||||
ref.get.cancel()
|
||||
ref = None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This extractor is just convenience for matching a (S, S) pair, including a
|
||||
|
|
@ -348,7 +345,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
if (timers contains name) {
|
||||
timers(name).cancel
|
||||
}
|
||||
val timer = Timer(name, msg, repeat, timerGen.next)(context.system)
|
||||
val timer = Timer(name, msg, repeat, timerGen.next)(context)
|
||||
timer.schedule(self, timeout)
|
||||
timers(name) = timer
|
||||
stay
|
||||
|
|
@ -601,6 +598,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
if (timeout.isDefined) {
|
||||
val t = timeout.get
|
||||
if (t.finite_? && t.length >= 0) {
|
||||
import context.dispatcher
|
||||
timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation)))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import java.io.Closeable
|
|||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.annotation.tailrec
|
||||
import akka.util.internal._
|
||||
import concurrent.ExecutionContext
|
||||
|
||||
//#scheduler
|
||||
/**
|
||||
|
|
@ -36,7 +37,7 @@ trait Scheduler {
|
|||
initialDelay: Duration,
|
||||
frequency: Duration,
|
||||
receiver: ActorRef,
|
||||
message: Any): Cancellable
|
||||
message: Any)(implicit executor: ExecutionContext): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a function to be run repeatedly with an initial delay and a
|
||||
|
|
@ -47,7 +48,7 @@ trait Scheduler {
|
|||
* Scala API
|
||||
*/
|
||||
def schedule(
|
||||
initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable
|
||||
initialDelay: Duration, frequency: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a function to be run repeatedly with an initial delay and
|
||||
|
|
@ -58,7 +59,7 @@ trait Scheduler {
|
|||
* Java API
|
||||
*/
|
||||
def schedule(
|
||||
initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable
|
||||
initialDelay: Duration, frequency: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a Runnable to be run once with a delay, i.e. a time period that
|
||||
|
|
@ -66,7 +67,7 @@ trait Scheduler {
|
|||
*
|
||||
* Java & Scala API
|
||||
*/
|
||||
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable
|
||||
def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a message to be sent once with a delay, i.e. a time period that has
|
||||
|
|
@ -74,7 +75,7 @@ trait Scheduler {
|
|||
*
|
||||
* Java & Scala API
|
||||
*/
|
||||
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable
|
||||
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable
|
||||
|
||||
/**
|
||||
* Schedules a function to be run once with a delay, i.e. a time period that has
|
||||
|
|
@ -82,7 +83,7 @@ trait Scheduler {
|
|||
*
|
||||
* Scala API
|
||||
*/
|
||||
def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable
|
||||
def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable
|
||||
}
|
||||
//#scheduler
|
||||
|
||||
|
|
@ -118,76 +119,61 @@ trait Cancellable {
|
|||
* if it does not enqueue a task. Once a task is queued, it MUST be executed or
|
||||
* returned from stop().
|
||||
*/
|
||||
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer,
|
||||
log: LoggingAdapter,
|
||||
dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable {
|
||||
|
||||
override def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
|
||||
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable {
|
||||
override def schedule(initialDelay: Duration,
|
||||
delay: Duration,
|
||||
receiver: ActorRef,
|
||||
message: Any)(implicit executor: ExecutionContext): Cancellable = {
|
||||
val continuousCancellable = new ContinuousCancellable
|
||||
continuousCancellable.init(
|
||||
hashedWheelTimer.newTimeout(
|
||||
new TimerTask with ContinuousScheduling {
|
||||
def run(timeout: HWTimeout) {
|
||||
executor execute new Runnable {
|
||||
override def run = {
|
||||
receiver ! message
|
||||
// Check if the receiver is still alive and kicking before reschedule the task
|
||||
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor has been terminated.")
|
||||
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver)
|
||||
else scheduleNext(timeout, delay, continuousCancellable)
|
||||
}
|
||||
},
|
||||
initialDelay))
|
||||
}
|
||||
|
||||
override def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = {
|
||||
val continuousCancellable = new ContinuousCancellable
|
||||
continuousCancellable.init(
|
||||
hashedWheelTimer.newTimeout(
|
||||
new TimerTask with ContinuousScheduling with Runnable {
|
||||
def run = f
|
||||
def run(timeout: HWTimeout) {
|
||||
dispatcher.execute(this)
|
||||
scheduleNext(timeout, delay, continuousCancellable)
|
||||
}
|
||||
},
|
||||
initialDelay))
|
||||
}
|
||||
|
||||
override def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = {
|
||||
override def schedule(initialDelay: Duration,
|
||||
delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable =
|
||||
schedule(initialDelay, delay, new Runnable { override def run = f })
|
||||
|
||||
override def schedule(initialDelay: Duration,
|
||||
delay: Duration,
|
||||
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
|
||||
val continuousCancellable = new ContinuousCancellable
|
||||
continuousCancellable.init(
|
||||
hashedWheelTimer.newTimeout(
|
||||
new TimerTask with ContinuousScheduling {
|
||||
def run(timeout: HWTimeout) {
|
||||
dispatcher.execute(runnable)
|
||||
override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable {
|
||||
override def run = {
|
||||
runnable.run()
|
||||
scheduleNext(timeout, delay, continuousCancellable)
|
||||
}
|
||||
})
|
||||
},
|
||||
initialDelay))
|
||||
}
|
||||
|
||||
override def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
|
||||
override def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
|
||||
new DefaultCancellable(
|
||||
hashedWheelTimer.newTimeout(
|
||||
new TimerTask() {
|
||||
def run(timeout: HWTimeout): Unit = dispatcher.execute(runnable)
|
||||
},
|
||||
new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) },
|
||||
delay))
|
||||
|
||||
override def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
|
||||
new DefaultCancellable(
|
||||
hashedWheelTimer.newTimeout(
|
||||
new TimerTask {
|
||||
def run(timeout: HWTimeout): Unit = receiver ! message
|
||||
},
|
||||
delay))
|
||||
override def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
|
||||
scheduleOnce(delay, new Runnable { override def run = receiver ! message })
|
||||
|
||||
override def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable =
|
||||
new DefaultCancellable(
|
||||
hashedWheelTimer.newTimeout(
|
||||
new TimerTask with Runnable {
|
||||
def run = f
|
||||
def run(timeout: HWTimeout): Unit = dispatcher.execute(this)
|
||||
},
|
||||
delay))
|
||||
override def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable =
|
||||
scheduleOnce(delay, new Runnable { override def run = f })
|
||||
|
||||
private trait ContinuousScheduling { this: TimerTask ⇒
|
||||
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ private[akka] trait ReceiveTimeout { this: ActorCell ⇒
|
|||
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
|
||||
recvtimeout._2.cancel() //Cancel any ongoing future
|
||||
//Only reschedule if desired and there are currently no more messages to be processed
|
||||
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout))
|
||||
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)(this.dispatcher))
|
||||
} else cancelReceiveTimeout()
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -249,7 +249,10 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
|||
|
||||
private def scheduleShutdownAction(): Unit = {
|
||||
// IllegalStateException is thrown if scheduler has been shutdown
|
||||
try scheduler.scheduleOnce(shutdownTimeout, shutdownAction) catch {
|
||||
try scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext {
|
||||
override def execute(runnable: Runnable): Unit = runnable.run()
|
||||
override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t)
|
||||
}) catch {
|
||||
case _: IllegalStateException ⇒ shutdown()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit
|
|||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import akka.dispatch.Dispatchers
|
||||
import scala.annotation.tailrec
|
||||
import concurrent.ExecutionContext
|
||||
|
||||
/**
|
||||
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
|
||||
|
|
@ -1221,7 +1222,7 @@ case class DefaultResizer(
|
|||
} else if (requestedCapacity < 0) {
|
||||
val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length + requestedCapacity)
|
||||
routeeProvider.unregisterRoutees(abandon)
|
||||
delayedStop(routeeProvider.context.system.scheduler, abandon)
|
||||
delayedStop(routeeProvider.context.system.scheduler, abandon)(routeeProvider.context.dispatcher)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1229,7 +1230,8 @@ case class DefaultResizer(
|
|||
* Give concurrent messages a chance to be placed in mailbox before
|
||||
* sending PoisonPill.
|
||||
*/
|
||||
protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]): Unit = {
|
||||
protected def delayedStop(scheduler: Scheduler,
|
||||
abandon: IndexedSeq[ActorRef])(implicit executor: ExecutionContext): Unit = {
|
||||
if (abandon.nonEmpty) {
|
||||
if (stopDelay <= Duration.Zero) {
|
||||
abandon foreach (_ ! PoisonPill)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import akka.pattern._
|
|||
import akka.remote._
|
||||
import akka.routing._
|
||||
import akka.util._
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.util.duration._
|
||||
import scala.concurrent.util.{ Duration, Deadline }
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
|
|
@ -27,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.util.internal.HashedWheelTimer
|
||||
import concurrent.{ ExecutionContext, Await }
|
||||
|
||||
/**
|
||||
* Cluster Extension Id and factory for creating Cluster extension.
|
||||
|
|
@ -121,31 +121,41 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
|
||||
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
|
||||
system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis)
|
||||
val threadFactory = system.threadFactory match {
|
||||
new DefaultScheduler(
|
||||
new HashedWheelTimer(log,
|
||||
system.threadFactory match {
|
||||
case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler")
|
||||
case tf ⇒ tf
|
||||
}
|
||||
val hwt = new HashedWheelTimer(log,
|
||||
threadFactory,
|
||||
SchedulerTickDuration, SchedulerTicksPerWheel)
|
||||
new DefaultScheduler(hwt, log, system.dispatcher)
|
||||
},
|
||||
SchedulerTickDuration,
|
||||
SchedulerTicksPerWheel),
|
||||
log)
|
||||
} else {
|
||||
// delegate to system.scheduler, but don't close
|
||||
// delegate to system.scheduler, but don't close over system
|
||||
val systemScheduler = system.scheduler
|
||||
new Scheduler with Closeable {
|
||||
// we are using system.scheduler, which we are not responsible for closing
|
||||
def close(): Unit = ()
|
||||
def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable =
|
||||
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
|
||||
|
||||
override def schedule(initialDelay: Duration, frequency: Duration,
|
||||
receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
|
||||
systemScheduler.schedule(initialDelay, frequency, receiver, message)
|
||||
def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable =
|
||||
|
||||
override def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable =
|
||||
systemScheduler.schedule(initialDelay, frequency)(f)
|
||||
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable =
|
||||
|
||||
override def schedule(initialDelay: Duration, frequency: Duration,
|
||||
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
|
||||
systemScheduler.schedule(initialDelay, frequency, runnable)
|
||||
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
|
||||
|
||||
override def scheduleOnce(delay: Duration,
|
||||
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
|
||||
systemScheduler.scheduleOnce(delay, runnable)
|
||||
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
|
||||
|
||||
override def scheduleOnce(delay: Duration, receiver: ActorRef,
|
||||
message: Any)(implicit executor: ExecutionContext): Cancellable =
|
||||
systemScheduler.scheduleOnce(delay, receiver, message)
|
||||
def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable =
|
||||
|
||||
override def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable =
|
||||
systemScheduler.scheduleOnce(delay)(f)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -178,6 +178,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)).
|
||||
withDispatcher(UseDispatcher), name = "coreSender")
|
||||
|
||||
import context.dispatcher
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
val gossipTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
|
||||
|
|
|
|||
|
|
@ -8,12 +8,15 @@ import java.util.concurrent.TimeUnit
|
|||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
|
||||
import akka.actor.{ Scheduler, Cancellable }
|
||||
import scala.concurrent.util.Duration
|
||||
import concurrent.ExecutionContext
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object FixedRateTask {
|
||||
def apply(scheduler: Scheduler, initalDelay: Duration, delay: Duration)(f: ⇒ Unit): FixedRateTask =
|
||||
def apply(scheduler: Scheduler,
|
||||
initalDelay: Duration,
|
||||
delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask =
|
||||
new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f })
|
||||
}
|
||||
|
||||
|
|
@ -24,7 +27,10 @@ private[akka] object FixedRateTask {
|
|||
* for inaccuracy in scheduler. It will start when constructed, using the
|
||||
* initialDelay.
|
||||
*/
|
||||
private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable)
|
||||
private[akka] class FixedRateTask(scheduler: Scheduler,
|
||||
initalDelay: Duration,
|
||||
delay: Duration,
|
||||
task: Runnable)(implicit executor: ExecutionContext)
|
||||
extends Runnable with Cancellable {
|
||||
|
||||
private val delayNanos = delay.toNanos
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import scala.concurrent.Await
|
|||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class FixedRateTaskSpec extends AkkaSpec {
|
||||
|
||||
import system.dispatcher
|
||||
"Task scheduled at fixed rate" must {
|
||||
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
|
||||
val startTime = System.nanoTime
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public class SchedulerDocTestBase {
|
|||
public void scheduleOneOffTask() {
|
||||
//#schedule-one-off-message
|
||||
//Schedules to send the "foo"-message to the testActor after 50ms
|
||||
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), testActor, "foo");
|
||||
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), testActor, "foo", system.dispatcher());
|
||||
//#schedule-one-off-message
|
||||
|
||||
//#schedule-one-off-thunk
|
||||
|
|
@ -56,7 +56,7 @@ public class SchedulerDocTestBase {
|
|||
public void run() {
|
||||
testActor.tell(System.currentTimeMillis());
|
||||
}
|
||||
});
|
||||
}, system.dispatcher());
|
||||
//#schedule-one-off-thunk
|
||||
}
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ public class SchedulerDocTestBase {
|
|||
//This will schedule to send the Tick-message
|
||||
//to the tickActor after 0ms repeating every 50ms
|
||||
Cancellable cancellable = system.scheduler().schedule(Duration.Zero(), Duration.create(50, TimeUnit.MILLISECONDS),
|
||||
tickActor, "Tick");
|
||||
tickActor, "Tick", system.dispatcher());
|
||||
|
||||
//This cancels further Ticks to be sent
|
||||
cancellable.cancel();
|
||||
|
|
|
|||
|
|
@ -138,7 +138,9 @@ public class FaultHandlingDocSample {
|
|||
log.debug("received message {}", msg);
|
||||
if (msg.equals(Start) && progressListener == null) {
|
||||
progressListener = getSender();
|
||||
getContext().system().scheduler().schedule(Duration.Zero(), Duration.parse("1 second"), getSelf(), Do);
|
||||
getContext().system().scheduler().schedule(
|
||||
Duration.Zero(), Duration.parse("1 second"), getSelf(), Do, getContext().dispatcher()
|
||||
);
|
||||
} else if (msg.equals(Do)) {
|
||||
counterService.tell(new Increment(1), getSelf());
|
||||
counterService.tell(new Increment(1), getSelf());
|
||||
|
|
@ -296,7 +298,9 @@ public class FaultHandlingDocSample {
|
|||
// Tell the counter that there is no storage for the moment
|
||||
counter.tell(new UseStorage(null), getSelf());
|
||||
// Try to re-establish storage after while
|
||||
getContext().system().scheduler().scheduleOnce(Duration.parse("10 seconds"), getSelf(), Reconnect);
|
||||
getContext().system().scheduler().scheduleOnce(
|
||||
Duration.parse("10 seconds"), getSelf(), Reconnect, getContext().dispatcher()
|
||||
);
|
||||
} else if (msg.equals(Reconnect)) {
|
||||
// Re-establish storage after the scheduled delay
|
||||
initStorage();
|
||||
|
|
|
|||
|
|
@ -187,7 +187,7 @@ public class ZeromqDocTestBase {
|
|||
@Override
|
||||
public void preStart() {
|
||||
getContext().system().scheduler()
|
||||
.schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK);
|
||||
.schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK, getContext().dispatcher());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ class Worker extends Actor with ActorLogging {
|
|||
var progressListener: Option[ActorRef] = None
|
||||
val counterService = context.actorOf(Props[CounterService], name = "counter")
|
||||
val totalCount = 51
|
||||
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
|
||||
|
||||
def receive = LoggingReceive {
|
||||
case Start if progressListener.isEmpty ⇒
|
||||
|
|
@ -103,7 +104,6 @@ class Worker extends Actor with ActorLogging {
|
|||
counterService ! Increment(1)
|
||||
|
||||
// Send current progress to the initial sender
|
||||
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
|
||||
counterService ? GetCurrentCount map {
|
||||
case CurrentCount(_, count) ⇒ Progress(100.0 * count / totalCount)
|
||||
} pipeTo progressListener.get
|
||||
|
|
@ -143,6 +143,8 @@ class CounterService extends Actor {
|
|||
var backlog = IndexedSeq.empty[(ActorRef, Any)]
|
||||
val MaxBacklog = 10000
|
||||
|
||||
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
|
||||
|
||||
override def preStart() {
|
||||
initStorage()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,9 @@ import akka.testkit._
|
|||
class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||
"schedule a one-off task" in {
|
||||
//#schedule-one-off-message
|
||||
//Use the system's dispatcher as ExecutionContext
|
||||
import system.dispatcher
|
||||
|
||||
//Schedules to send the "foo"-message to the testActor after 50ms
|
||||
system.scheduler.scheduleOnce(50 milliseconds, testActor, "foo")
|
||||
//#schedule-one-off-message
|
||||
|
|
@ -42,6 +45,9 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
case Tick ⇒ //Do something
|
||||
}
|
||||
}))
|
||||
//Use system's dispatcher as ExecutionContext
|
||||
import system.dispatcher
|
||||
|
||||
//This will schedule to send the Tick-message
|
||||
//to the tickActor after 0ms repeating every 50ms
|
||||
val cancellable =
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ object ZeromqDocSpec {
|
|||
val memory = ManagementFactory.getMemoryMXBean
|
||||
val os = ManagementFactory.getOperatingSystemMXBean
|
||||
val ser = SerializationExtension(context.system)
|
||||
import context.dispatcher
|
||||
|
||||
override def preStart() {
|
||||
context.system.scheduler.schedule(1 second, 1 second, self, Tick)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import language.postfixOps
|
|||
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler }
|
||||
import RemoteConnection.getAddrString
|
||||
import scala.concurrent.util.{ Duration, Deadline }
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.util.Timeout
|
||||
import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent, WriteCompletionEvent, ExceptionEvent }
|
||||
|
|
@ -16,11 +15,11 @@ import com.typesafe.config.ConfigFactory
|
|||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.pattern.{ ask, pipe, AskTimeoutException }
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.event.{ LoggingAdapter, Logging }
|
||||
import java.net.{ InetSocketAddress, ConnectException }
|
||||
import scala.reflect.classTag
|
||||
import concurrent.{ ExecutionContext, Await, Future }
|
||||
|
||||
/**
|
||||
* The Player is the client component of the
|
||||
|
|
@ -143,7 +142,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
val settings = TestConductor().Settings
|
||||
|
||||
val handler = new PlayerHandler(controllerAddr, settings.ClientReconnects, settings.ReconnectBackoff,
|
||||
self, Logging(context.system, "PlayerHandler"), context.system.scheduler)
|
||||
self, Logging(context.system, "PlayerHandler"), context.system.scheduler)(context.dispatcher)
|
||||
|
||||
startWith(Connecting, Data(None, None))
|
||||
|
||||
|
|
@ -256,7 +255,7 @@ private[akka] class PlayerHandler(
|
|||
backoff: Duration,
|
||||
fsm: ActorRef,
|
||||
log: LoggingAdapter,
|
||||
scheduler: Scheduler)
|
||||
scheduler: Scheduler)(implicit executor: ExecutionContext)
|
||||
extends SimpleChannelUpstreamHandler {
|
||||
|
||||
import ClientFSM._
|
||||
|
|
|
|||
|
|
@ -187,6 +187,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
|
||||
{ (msg: PollMsg) ⇒
|
||||
// for negative timeout values, schedule Poll token -duration into the future
|
||||
import context.dispatcher
|
||||
context.system.scheduler.scheduleOnce(d, self, msg)
|
||||
()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
|
|||
val context = Context()
|
||||
val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint))
|
||||
val subscriber = zmq.newSocket(SocketType.Sub, context, Listener(subscriberProbe.ref), Connect(endpoint), SubscribeAll)
|
||||
import system.dispatcher
|
||||
val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable {
|
||||
var number = 0
|
||||
def run() {
|
||||
|
|
@ -130,6 +131,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
|
|||
var genMessages: Cancellable = null
|
||||
|
||||
override def preStart() = {
|
||||
import system.dispatcher
|
||||
genMessages = system.scheduler.schedule(100 millis, 10 millis, self, "genMessage")
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue