Add locking in dispatchFuture
This commit is contained in:
parent
4bd00bc92f
commit
92a858eee8
1 changed files with 15 additions and 11 deletions
|
|
@ -78,18 +78,22 @@ trait MessageDispatcher extends Logging {
|
||||||
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
||||||
|
|
||||||
private[akka] final def dispatchFuture(invocation: FutureInvocation): Unit = {
|
private[akka] final def dispatchFuture(invocation: FutureInvocation): Unit = {
|
||||||
futures add invocation.uuid
|
guard withGuard {
|
||||||
if (active.isOff) { active.switchOn { start } }
|
futures add invocation.uuid
|
||||||
|
if (active.isOff) { active.switchOn { start } }
|
||||||
|
}
|
||||||
invocation.future.onComplete { f =>
|
invocation.future.onComplete { f =>
|
||||||
futures remove invocation.uuid
|
guard withGuard {
|
||||||
if (futures.isEmpty && uuids.isEmpty) {
|
futures remove invocation.uuid
|
||||||
shutdownSchedule match {
|
if (futures.isEmpty && uuids.isEmpty) {
|
||||||
case UNSCHEDULED =>
|
shutdownSchedule match {
|
||||||
shutdownSchedule = SCHEDULED
|
case UNSCHEDULED =>
|
||||||
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
shutdownSchedule = SCHEDULED
|
||||||
case SCHEDULED =>
|
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
||||||
shutdownSchedule = RESCHEDULED
|
case SCHEDULED =>
|
||||||
case RESCHEDULED => //Already marked for reschedule
|
shutdownSchedule = RESCHEDULED
|
||||||
|
case RESCHEDULED => //Already marked for reschedule
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue