Merge branch 'wip-3281-NullMessage-∂π'
This commit is contained in:
commit
b3db19ee05
7 changed files with 33 additions and 56 deletions
|
|
@ -6,7 +6,6 @@ package akka.actor
|
||||||
|
|
||||||
import akka.actor.dungeon.ChildrenContainer
|
import akka.actor.dungeon.ChildrenContainer
|
||||||
import akka.dispatch.Envelope
|
import akka.dispatch.Envelope
|
||||||
import akka.dispatch.NullMessage
|
|
||||||
import akka.dispatch.sysmsg._
|
import akka.dispatch.sysmsg._
|
||||||
import akka.event.Logging.{ LogEvent, Debug, Error }
|
import akka.event.Logging.{ LogEvent, Debug, Error }
|
||||||
import akka.japi.Procedure
|
import akka.japi.Procedure
|
||||||
|
|
@ -462,39 +461,38 @@ private[akka] class ActorCell(
|
||||||
checkReceiveTimeout // Reschedule receive timeout
|
checkReceiveTimeout // Reschedule receive timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
def autoReceiveMessage(msg: Envelope): Unit =
|
def autoReceiveMessage(msg: Envelope): Unit = {
|
||||||
if (msg.message != NullMessage) {
|
if (system.settings.DebugAutoReceive)
|
||||||
if (system.settings.DebugAutoReceive)
|
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
|
||||||
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
|
|
||||||
|
|
||||||
msg.message match {
|
msg.message match {
|
||||||
case t: Terminated ⇒ receivedTerminated(t)
|
case t: Terminated ⇒ receivedTerminated(t)
|
||||||
case AddressTerminated(address) ⇒ addressTerminated(address)
|
case AddressTerminated(address) ⇒ addressTerminated(address)
|
||||||
case Kill ⇒ throw new ActorKilledException("Kill")
|
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||||
case PoisonPill ⇒ self.stop()
|
case PoisonPill ⇒ self.stop()
|
||||||
case SelectParent(m) ⇒
|
case SelectParent(m) ⇒
|
||||||
if (self == system.provider.rootGuardian) self.tell(m, msg.sender)
|
if (self == system.provider.rootGuardian) self.tell(m, msg.sender)
|
||||||
else parent.tell(m, msg.sender)
|
else parent.tell(m, msg.sender)
|
||||||
case s @ SelectChildName(name, m) ⇒
|
case s @ SelectChildName(name, m) ⇒
|
||||||
def selectChild(): Unit = {
|
def selectChild(): Unit = {
|
||||||
getChildByName(name) match {
|
getChildByName(name) match {
|
||||||
case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender)
|
case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) }
|
s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// need this special case because of extraNames handled by rootGuardian
|
}
|
||||||
if (self == system.provider.rootGuardian) {
|
// need this special case because of extraNames handled by rootGuardian
|
||||||
self.asInstanceOf[LocalActorRef].getSingleChild(name) match {
|
if (self == system.provider.rootGuardian) {
|
||||||
case Nobody ⇒ selectChild()
|
self.asInstanceOf[LocalActorRef].getSingleChild(name) match {
|
||||||
case child ⇒ child.tell(m, msg.sender)
|
case Nobody ⇒ selectChild()
|
||||||
}
|
case child ⇒ child.tell(m, msg.sender)
|
||||||
} else
|
}
|
||||||
selectChild()
|
} else
|
||||||
case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
|
selectChild()
|
||||||
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self))
|
case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
|
||||||
}
|
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(self))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
|
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.actor
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import akka.dispatch.sysmsg._
|
import akka.dispatch.sysmsg._
|
||||||
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue, NullMessage }
|
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
import akka.event._
|
import akka.event._
|
||||||
import akka.util.{ Switch, Helpers }
|
import akka.util.{ Switch, Helpers }
|
||||||
|
|
@ -495,9 +495,8 @@ private[akka] class LocalActorRefProvider private[akka] (
|
||||||
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = stopped.isOn
|
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = stopped.isOn
|
||||||
|
|
||||||
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match {
|
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match {
|
||||||
case null ⇒ throw new InvalidMessageException("Message is null")
|
case null ⇒ throw new InvalidMessageException("Message is null")
|
||||||
case NullMessage ⇒ // do nothing
|
case _ ⇒ log.error(s"$this received unexpected message [$message]")
|
||||||
case _ ⇒ log.error(s"$this received unexpected message [$message]")
|
|
||||||
})
|
})
|
||||||
|
|
||||||
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
|
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ import akka.dispatch.{ MessageDispatcher, Mailbox, Envelope }
|
||||||
import akka.dispatch.sysmsg._
|
import akka.dispatch.sysmsg._
|
||||||
import akka.event.Logging.Error
|
import akka.event.Logging.Error
|
||||||
import akka.util.Unsafe
|
import akka.util.Unsafe
|
||||||
import akka.dispatch.NullMessage
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
@ -68,7 +67,6 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
||||||
if (sendSupervise) {
|
if (sendSupervise) {
|
||||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||||
parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))
|
parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))
|
||||||
parent ! NullMessage // read ScalaDoc of NullMessage to see why
|
|
||||||
}
|
}
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -202,7 +202,6 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
||||||
catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
|
catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
|
||||||
finally try dispatcher.detach(this)
|
finally try dispatcher.detach(this)
|
||||||
finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
|
finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
|
||||||
finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why
|
|
||||||
finally try tellWatchersWeDied(a)
|
finally try tellWatchersWeDied(a)
|
||||||
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
|
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
|
||||||
finally {
|
finally {
|
||||||
|
|
|
||||||
|
|
@ -27,21 +27,6 @@ object Envelope {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This message is sent directly after the Supervise system message in order
|
|
||||||
* to form a barrier wrt. the first real message sent by the child, so that e.g.
|
|
||||||
* Failed() cannot overtake Supervise(). Processing this does nothing.
|
|
||||||
*
|
|
||||||
* Detailed explanation:
|
|
||||||
*
|
|
||||||
* The race happens because Supervise and Failed may be queued between the
|
|
||||||
* parent's check for system messages and dequeue(). Thus, if the parent
|
|
||||||
* processes the NullMessage first (by way of that tiny race window), it is
|
|
||||||
* guaranteed to then find the Supervise system message in its mailbox prior
|
|
||||||
* to turning its attention to the next real message.
|
|
||||||
*/
|
|
||||||
case object NullMessage extends AutoReceivedMessage
|
|
||||||
|
|
||||||
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Batchable {
|
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Batchable {
|
||||||
final override def isBatchable: Boolean = runnable match {
|
final override def isBatchable: Boolean = runnable match {
|
||||||
case b: Batchable ⇒ b.isBatchable
|
case b: Batchable ⇒ b.isBatchable
|
||||||
|
|
|
||||||
|
|
@ -114,7 +114,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
|
||||||
classOf[InternalClusterAction.Tick],
|
classOf[InternalClusterAction.Tick],
|
||||||
classOf[akka.actor.PoisonPill],
|
classOf[akka.actor.PoisonPill],
|
||||||
classOf[akka.dispatch.sysmsg.DeathWatchNotification],
|
classOf[akka.dispatch.sysmsg.DeathWatchNotification],
|
||||||
akka.dispatch.NullMessage.getClass,
|
|
||||||
akka.remote.transport.AssociationHandle.Disassociated.getClass,
|
akka.remote.transport.AssociationHandle.Disassociated.getClass,
|
||||||
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
|
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
|
||||||
classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)
|
classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)
|
||||||
|
|
|
||||||
|
|
@ -93,8 +93,7 @@ class RemoteWatcherSpec extends AkkaSpec(
|
||||||
|
|
||||||
Seq(system, remoteSystem).foreach(muteDeadLetters(
|
Seq(system, remoteSystem).foreach(muteDeadLetters(
|
||||||
akka.remote.transport.AssociationHandle.Disassociated.getClass,
|
akka.remote.transport.AssociationHandle.Disassociated.getClass,
|
||||||
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
|
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass)(_))
|
||||||
akka.dispatch.NullMessage.getClass)(_))
|
|
||||||
|
|
||||||
override def afterTermination() {
|
override def afterTermination() {
|
||||||
remoteSystem.shutdown()
|
remoteSystem.shutdown()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue