fix CallingThreadDispatcher’s assumption of mailbox type
- usually it’s a CallingThreadMailbox, but - it is swapped out for deadLetter upon stop() - so use Option[CallingThreadMailbox]
This commit is contained in:
parent
a25452126d
commit
3b62873e2c
1 changed files with 16 additions and 7 deletions
|
|
@ -109,7 +109,10 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
|||
|
||||
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor)
|
||||
|
||||
private def getMailbox(actor: ActorCell) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
|
||||
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒ Some(m)
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
protected[akka] override def start() {}
|
||||
|
||||
|
|
@ -122,11 +125,13 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
|||
protected[akka] override def timeoutMs = 100L
|
||||
|
||||
override def suspend(actor: ActorCell) {
|
||||
getMailbox(actor).suspendSwitch.switchOn
|
||||
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
|
||||
}
|
||||
|
||||
override def resume(actor: ActorCell) {
|
||||
val mbox = getMailbox(actor)
|
||||
val mboxopt = getMailbox(actor)
|
||||
if (mboxopt.isEmpty) return
|
||||
val mbox = mboxopt.get
|
||||
val queue = mbox.queue
|
||||
val wasActive = queue.isActive
|
||||
val switched = mbox.suspendSwitch.switchOff {
|
||||
|
|
@ -137,12 +142,14 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
|||
}
|
||||
}
|
||||
|
||||
override def mailboxSize(actor: ActorCell) = getMailbox(actor).queue.size
|
||||
override def mailboxSize(actor: ActorCell) = getMailbox(actor) map (_.queue.size) getOrElse 0
|
||||
|
||||
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty
|
||||
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true
|
||||
|
||||
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
|
||||
val mbox = getMailbox(receiver)
|
||||
val mboxopt = getMailbox(receiver)
|
||||
if (mboxopt.isEmpty) return
|
||||
val mbox = mboxopt.get
|
||||
mbox.systemEnqueue(message)
|
||||
val queue = mbox.queue
|
||||
if (!queue.isActive) {
|
||||
|
|
@ -152,7 +159,9 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
|||
}
|
||||
|
||||
protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
|
||||
val mbox = getMailbox(receiver)
|
||||
val mboxopt = getMailbox(receiver)
|
||||
if (mboxopt.isEmpty) return
|
||||
val mbox = mboxopt.get
|
||||
val queue = mbox.queue
|
||||
val execute = mbox.suspendSwitch.fold {
|
||||
queue.push(handle)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue