#1508 - Reducing closure wrapping in scheduler
This commit is contained in:
parent
cfbfa19e41
commit
ed4bc1cf0b
1 changed files with 54 additions and 53 deletions
|
|
@ -121,70 +121,70 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer,
|
||||||
|
|
||||||
def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
|
def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
|
||||||
val continuousCancellable = new ContinuousCancellable
|
val continuousCancellable = new ContinuousCancellable
|
||||||
val task = new TimerTask with ContinuousScheduling {
|
continuousCancellable.init(
|
||||||
def run(timeout: HWTimeout) {
|
hashedWheelTimer.newTimeout(
|
||||||
receiver ! message
|
new TimerTask with ContinuousScheduling {
|
||||||
// Check if the receiver is still alive and kicking before reschedule the task
|
def run(timeout: HWTimeout) {
|
||||||
if (receiver.isTerminated) {
|
receiver ! message
|
||||||
log.debug("Could not reschedule message to be sent because receiving actor has been terminated.")
|
// Check if the receiver is still alive and kicking before reschedule the task
|
||||||
} else {
|
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor has been terminated.")
|
||||||
scheduleNext(timeout, delay, continuousCancellable)
|
else scheduleNext(timeout, delay, continuousCancellable)
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
}
|
initialDelay))
|
||||||
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
|
|
||||||
continuousCancellable
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = {
|
def schedule(initialDelay: Duration, delay: Duration)(f: ⇒ Unit): Cancellable = {
|
||||||
val continuousCancellable = new ContinuousCancellable
|
val continuousCancellable = new ContinuousCancellable
|
||||||
val task = new TimerTask with ContinuousScheduling with Runnable {
|
continuousCancellable.init(
|
||||||
def run = f
|
hashedWheelTimer.newTimeout(
|
||||||
def run(timeout: HWTimeout) {
|
new TimerTask with ContinuousScheduling with Runnable {
|
||||||
dispatcher execute this
|
def run = f
|
||||||
scheduleNext(timeout, delay, continuousCancellable)
|
def run(timeout: HWTimeout) {
|
||||||
}
|
dispatcher.execute(this)
|
||||||
}
|
scheduleNext(timeout, delay, continuousCancellable)
|
||||||
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
|
}
|
||||||
continuousCancellable
|
},
|
||||||
|
initialDelay))
|
||||||
}
|
}
|
||||||
|
|
||||||
def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = {
|
def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = {
|
||||||
val continuousCancellable = new ContinuousCancellable
|
val continuousCancellable = new ContinuousCancellable
|
||||||
val task = new TimerTask with ContinuousScheduling {
|
continuousCancellable.init(
|
||||||
def run(timeout: HWTimeout) {
|
hashedWheelTimer.newTimeout(
|
||||||
dispatcher.execute(runnable)
|
new TimerTask with ContinuousScheduling {
|
||||||
scheduleNext(timeout, delay, continuousCancellable)
|
def run(timeout: HWTimeout) {
|
||||||
}
|
dispatcher.execute(runnable)
|
||||||
}
|
scheduleNext(timeout, delay, continuousCancellable)
|
||||||
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
|
}
|
||||||
continuousCancellable
|
},
|
||||||
|
initialDelay))
|
||||||
}
|
}
|
||||||
|
|
||||||
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = {
|
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
|
||||||
val task = new TimerTask() {
|
new DefaultCancellable(
|
||||||
def run(timeout: HWTimeout) { dispatcher.execute(runnable) }
|
hashedWheelTimer.newTimeout(
|
||||||
}
|
new TimerTask() {
|
||||||
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
|
def run(timeout: HWTimeout): Unit = dispatcher.execute(runnable)
|
||||||
}
|
},
|
||||||
|
delay))
|
||||||
|
|
||||||
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
|
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
|
||||||
val task = new TimerTask {
|
new DefaultCancellable(
|
||||||
def run(timeout: HWTimeout) {
|
hashedWheelTimer.newTimeout(
|
||||||
receiver ! message
|
new TimerTask {
|
||||||
}
|
def run(timeout: HWTimeout): Unit = receiver ! message
|
||||||
}
|
},
|
||||||
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
|
delay))
|
||||||
}
|
|
||||||
|
|
||||||
def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = {
|
def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable =
|
||||||
val task = new TimerTask {
|
new DefaultCancellable(
|
||||||
def run(timeout: HWTimeout) {
|
hashedWheelTimer.newTimeout(
|
||||||
dispatcher.execute(new Runnable { def run = f })
|
new TimerTask with Runnable {
|
||||||
}
|
def run = f
|
||||||
}
|
def run(timeout: HWTimeout): Unit = dispatcher.execute(this)
|
||||||
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
|
},
|
||||||
}
|
delay))
|
||||||
|
|
||||||
private trait ContinuousScheduling { this: TimerTask ⇒
|
private trait ContinuousScheduling { this: TimerTask ⇒
|
||||||
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
|
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
|
||||||
|
|
@ -220,8 +220,9 @@ private[akka] class ContinuousCancellable extends Cancellable {
|
||||||
@volatile
|
@volatile
|
||||||
private var cancelled = false
|
private var cancelled = false
|
||||||
|
|
||||||
private[akka] def init(initialTimeout: HWTimeout): Unit = {
|
private[akka] def init(initialTimeout: HWTimeout): this.type = {
|
||||||
delegate = initialTimeout
|
delegate = initialTimeout
|
||||||
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def swap(newTimeout: HWTimeout): Unit = {
|
private[akka] def swap(newTimeout: HWTimeout): Unit = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue