Fixed another shutdown of dispatcher issue. See #1454
This commit is contained in:
parent
9a677e528e
commit
f28a1f3834
2 changed files with 15 additions and 9 deletions
|
|
@ -482,10 +482,10 @@ public class HashedWheelTimer implements Timer {
|
||||||
buf.append("deadline: ");
|
buf.append("deadline: ");
|
||||||
if (remaining > 0) {
|
if (remaining > 0) {
|
||||||
buf.append(remaining);
|
buf.append(remaining);
|
||||||
buf.append(" ms later, ");
|
buf.append(" ns later, ");
|
||||||
} else if (remaining < 0) {
|
} else if (remaining < 0) {
|
||||||
buf.append(-remaining);
|
buf.append(-remaining);
|
||||||
buf.append(" ms ago, ");
|
buf.append(" ns ago, ");
|
||||||
} else {
|
} else {
|
||||||
buf.append("now, ");
|
buf.append("now, ");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -137,7 +137,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
shutdownScheduleUpdater.get(this) match {
|
shutdownScheduleUpdater.get(this) match {
|
||||||
case UNSCHEDULED ⇒
|
case UNSCHEDULED ⇒
|
||||||
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
|
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
|
||||||
scheduler.scheduleOnce(shutdownTimeout, shutdownAction)
|
scheduleShutdownAction()
|
||||||
()
|
()
|
||||||
} else ifSensibleToDoSoThenScheduleShutdown()
|
} else ifSensibleToDoSoThenScheduleShutdown()
|
||||||
case SCHEDULED ⇒
|
case SCHEDULED ⇒
|
||||||
|
|
@ -148,6 +148,13 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
case _ ⇒ ()
|
case _ ⇒ ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def scheduleShutdownAction(): Unit = {
|
||||||
|
// IllegalStateException is thrown if scheduler has been shutdown
|
||||||
|
try scheduler.scheduleOnce(shutdownTimeout, shutdownAction) catch {
|
||||||
|
case _: IllegalStateException ⇒ shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final val taskCleanup: () ⇒ Unit =
|
private final val taskCleanup: () ⇒ Unit =
|
||||||
() ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
|
() ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
|
||||||
|
|
||||||
|
|
@ -185,9 +192,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
}
|
}
|
||||||
case RESCHEDULED ⇒
|
case RESCHEDULED ⇒
|
||||||
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
|
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
|
||||||
try scheduler.scheduleOnce(shutdownTimeout, this) catch {
|
scheduleShutdownAction()
|
||||||
case _: IllegalStateException ⇒ shutdown()
|
|
||||||
}
|
|
||||||
else run()
|
else run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -279,9 +284,10 @@ abstract class MessageDispatcherConfigurator() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def configureThreadPool(config: Config,
|
def configureThreadPool(
|
||||||
settings: Settings,
|
config: Config,
|
||||||
createDispatcher: ⇒ (ThreadPoolConfig) ⇒ MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
settings: Settings,
|
||||||
|
createDispatcher: ⇒ (ThreadPoolConfig) ⇒ MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||||
import ThreadPoolConfigDispatcherBuilder.conf_?
|
import ThreadPoolConfigDispatcherBuilder.conf_?
|
||||||
|
|
||||||
//Apply the following options to the config if they are present in the config
|
//Apply the following options to the config if they are present in the config
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue