Fail actor creation if mailbox doesn't conform to required type. See #3237
This commit is contained in:
parent
7094daf313
commit
9fd42c7cab
9 changed files with 43 additions and 37 deletions
|
|
@ -7,6 +7,7 @@ package akka.actor
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit._
|
||||
import akka.dispatch._
|
||||
import akka.TestUtils.verifyActorTermination
|
||||
|
||||
object ActorMailboxSpec {
|
||||
val mailboxConf = ConfigFactory.parseString("""
|
||||
|
|
@ -133,10 +134,9 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
|
|||
checkMailboxQueue(Props[QueueReportingActor], "default-unbounded-deque", UnboundedDeqMailboxTypes)
|
||||
}
|
||||
|
||||
"get an unbounded dequeu message queue when it's configured as mailbox overriding RequestMailbox" in {
|
||||
filterEvents(EventFilter[IllegalArgumentException]()) {
|
||||
checkMailboxQueue(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait",
|
||||
UnboundedDeqMailboxTypes)
|
||||
"fail to create actor when an unbounded dequeu message queue is configured as mailbox overriding RequestMailbox" in {
|
||||
filterEvents(EventFilter[ActorInitializationException]()) {
|
||||
verifyActorTermination(system.actorOf(Props[BoundedQueueReportingActor], "default-unbounded-deque-override-trait"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -144,9 +144,9 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
|
|||
checkMailboxQueue(Props[QueueReportingActor], "unbounded-default", UnboundedMailboxTypes)
|
||||
}
|
||||
|
||||
"get an unbounded message queue when defined in dispatcher overriding RequestMailbox" in {
|
||||
filterEvents(EventFilter[IllegalArgumentException]()) {
|
||||
checkMailboxQueue(Props[BoundedQueueReportingActor], "unbounded-default-override-trait", UnboundedMailboxTypes)
|
||||
"fail to create actor when an unbounded message queue is defined in dispatcher overriding RequestMailbox" in {
|
||||
filterEvents(EventFilter[ActorInitializationException]()) {
|
||||
verifyActorTermination(system.actorOf(Props[BoundedQueueReportingActor], "unbounded-default-override-trait"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -422,7 +422,7 @@ private[akka] class ActorCell(
|
|||
case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message)
|
||||
case f: Failed ⇒ handleFailure(f)
|
||||
case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at)
|
||||
case Create() ⇒ create()
|
||||
case Create(failure) ⇒ create(failure)
|
||||
case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher)
|
||||
case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher)
|
||||
case Recreate(cause) ⇒ faultRecreate(cause)
|
||||
|
|
@ -547,13 +547,16 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
|
||||
protected def create(): Unit = {
|
||||
protected def create(failure: Option[ActorInitializationException]): Unit = {
|
||||
def clearOutActorIfNonNull(): Unit = {
|
||||
if (actor != null) {
|
||||
clearActorFields(actor)
|
||||
actor = null // ensure that we know that we failed during creation
|
||||
}
|
||||
}
|
||||
|
||||
failure foreach { throw _ }
|
||||
|
||||
try {
|
||||
val created = newActor()
|
||||
actor = created
|
||||
|
|
|
|||
|
|
@ -255,14 +255,7 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain an upper-bound approximation of the actor class which is going to
|
||||
* be created by these Props. In other words, the [[#newActor]] method will
|
||||
* produce an instance of this class or a subclass thereof. This is used by
|
||||
* the actor system to select special dispatchers or mailboxes in case
|
||||
* dependencies are encoded in the actor type.
|
||||
*/
|
||||
def actorClass(): Class[_ <: Actor] = {
|
||||
private lazy val cachedActorClass: Class[_ <: Actor] = {
|
||||
if (classOf[IndirectActorProducer].isAssignableFrom(clazz)) {
|
||||
Reflect.instantiate(clazz, args).asInstanceOf[IndirectActorProducer].actorClass
|
||||
} else if (classOf[Actor].isAssignableFrom(clazz)) {
|
||||
|
|
@ -271,6 +264,15 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) {
|
|||
throw new IllegalArgumentException("unknown actor creator [$clazz]")
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Obtain an upper-bound approximation of the actor class which is going to
|
||||
* be created by these Props. In other words, the [[#newActor]] method will
|
||||
* produce an instance of this class or a subclass thereof. This is used by
|
||||
* the actor system to select special dispatchers or mailboxes in case
|
||||
* dependencies are encoded in the actor type.
|
||||
*/
|
||||
def actorClass(): Class[_ <: Actor] = cachedActorClass
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -10,11 +10,10 @@ import akka.dispatch.sysmsg._
|
|||
import akka.event.Logging.Error
|
||||
import akka.util.Unsafe
|
||||
import akka.dispatch.NullMessage
|
||||
import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell }
|
||||
import akka.actor._
|
||||
import akka.serialization.SerializationExtension
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.control.Exception.Catcher
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
private[akka] trait Dispatch { this: ActorCell ⇒
|
||||
|
||||
|
|
@ -47,23 +46,24 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
|||
* this is processed before anything else.
|
||||
*/
|
||||
val mbox = dispatcher.createMailbox(this)
|
||||
val actorClass = this.props.actorClass
|
||||
if (this.system.mailboxes.hasRequiredType(actorClass)) {
|
||||
this.system.mailboxes.getRequiredType(actorClass).foreach {
|
||||
|
||||
// we need to delay the failure to the point of actor creation so we can handle
|
||||
// it properly in the normal way
|
||||
val actorClass = props.actorClass
|
||||
val createMessage = if (system.mailboxes.hasRequiredType(actorClass)) {
|
||||
Create(system.mailboxes.getRequiredType(actorClass).flatMap {
|
||||
case c if !c.isAssignableFrom(mbox.messageQueue.getClass) ⇒
|
||||
// FIXME 3237 throw an exception here instead of just logging it,
|
||||
// and update the comment on the RequiresMessageQueue trait
|
||||
val e = new IllegalArgumentException(s"Actor [${this.self.path}] requires mailbox type [${c}]" +
|
||||
s" got [${mbox.messageQueue.getClass}]")
|
||||
this.systemImpl.eventStream.publish(Error(e, getClass.getName, getClass, e.getMessage))
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
Some(ActorInitializationException(self, s"Actor [${self}] requires mailbox type [${c}]" +
|
||||
s" got [${mbox.messageQueue.getClass}]"))
|
||||
case _ ⇒ None
|
||||
})
|
||||
} else Create(None)
|
||||
|
||||
swapMailbox(mbox)
|
||||
mailbox.setActor(this)
|
||||
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
mailbox.systemEnqueue(self, Create())
|
||||
mailbox.systemEnqueue(self, createMessage)
|
||||
|
||||
if (sendSupervise) {
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
|
|
|
|||
|
|
@ -132,7 +132,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
private def finishCreate(): Unit = {
|
||||
try resumeNonRecursive()
|
||||
finally clearFailed()
|
||||
try create()
|
||||
try create(None)
|
||||
catch handleNonFatalOrInterruptedException { e ⇒
|
||||
handleInvokeFailure(Nil, e)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -631,6 +631,7 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime
|
|||
* The mailbox type will be looked up by mapping the type T via akka.actor.mailbox.requirements in the config,
|
||||
* to a mailbox configuration. If no mailbox is assigned on Props or in deployment config then this one will be used.
|
||||
*
|
||||
* The queue type of the created mailbox will be checked against the type T and an error will be logged if it doesn't match.
|
||||
* The queue type of the created mailbox will be checked against the type T and actor creation will fail if it doesn't
|
||||
* fulfill the requirements.
|
||||
*/
|
||||
trait RequiresMessageQueue[T]
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.dispatch.sysmsg
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.{ InternalActorRef, ActorRef, PossiblyHarmful }
|
||||
import akka.actor.{ ActorInitializationException, InternalActorRef, ActorRef, PossiblyHarmful }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -201,7 +201,7 @@ trait StashWhenFailed
|
|||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(-4836972106317757555L)
|
||||
private[akka] case class Create() extends SystemMessage // send to self from Dispatcher.register
|
||||
private[akka] case class Create(failure: Option[ActorInitializationException]) extends SystemMessage // sent to self from Dispatcher.register
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ a dispatcher with a specified mailbox type, then that will override this mapping
|
|||
.. note::
|
||||
|
||||
The type of the queue in the mailbox created for an actor will be checked against the required type in the
|
||||
interface and if the queue doesn't implement the required type an error will be logged.
|
||||
interface and if the queue doesn't implement the required type then actor creation will fail.
|
||||
|
||||
|
||||
Mailbox configuration precedence
|
||||
|
|
|
|||
|
|
@ -240,7 +240,7 @@ a dispatcher with a specified mailbox type, then that will override this mapping
|
|||
.. note::
|
||||
|
||||
The type of the queue in the mailbox created for an actor will be checked against the required type in the
|
||||
trait and if the queue doesn't implement the required type an error will be logged.
|
||||
trait and if the queue doesn't implement the required type then actor creation will fail.
|
||||
|
||||
|
||||
Mailbox configuration precedence
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue