=act #3623 unsubscribe when actors terminate, instead of isTerminated

* Moved to removing actors proactively when they are terminated instead of
  checking `isTerminated` during publish.
* Subscribers which have registered before initializing the unsubscriber
  will be aggregated in a Seq until one is registered and then it will
  take responsibility of unregistering them on termination.
* Initialization of the unsubscriber can only be run once - attempting
  to initialize the event stream with another unsubscriber will fail,
  and init will return false.
* Assumed having an init (mutable) method on the `EventBus` is fine, as
  it has such methods already and @patriknw's comment in the task for
  this.
* since we must check if the subscriber has any subscribed channels left
  we had to expose this detail from SubchannelClassification via
  `hasSubscriptions`. Increases cost of ubsubscribe(actor, channel) a bit.
* Evacuated the expensive `hasSubscription` call out of eventstream's `unsubscribe` call, and instead making the Unsubscriber check this before it stops watching an actor. If in the mean time the same actor got subscribed, there will be a new Subscribe message emited - so we're good on that side. Also, if it would terminate before the unsubscriber gets the Register message it will call `watch(actor)` on a dead actor, which results in getting Terminated for it, thus we'll stop watching it from the Unsubscriber as expected.

Final squash and small cleanup. Please review again;
This commit is contained in:
Konrad Malawski 2014-02-13 22:52:01 +00:00 committed by Konrad Malawski
parent 5982aab066
commit f57470926e
5 changed files with 257 additions and 5 deletions

View file

@ -6,7 +6,7 @@ package akka.event
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } import akka.actor._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.event.Logging.InitializeLogger import akka.event.Logging.InitializeLogger
@ -28,11 +28,15 @@ object EventStreamSpec {
akka { akka {
actor.serialize-messages = off actor.serialize-messages = off
stdout-loglevel = WARNING stdout-loglevel = WARNING
loglevel = DEBUG loglevel = WARNING
actor.debug.unhandled = on actor.debug.unhandled = on
} }
""") """)
val configUnhandledWithDebug =
ConfigFactory.parseString("akka.actor.debug.event-stream = on")
.withFallback(configUnhandled)
final case class M(i: Int) final case class M(i: Int)
final case class SetTarget(ref: ActorRef) final case class SetTarget(ref: ActorRef)
@ -274,6 +278,130 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
es.unsubscribe(a1.ref, classOf[AT]) should be(true) es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BTT]) should be(true) es.unsubscribe(a2.ref, classOf[BTT]) should be(true)
} }
"unsubscribe an actor on its termination" in {
val sys = ActorSystem("EventStreamSpecUnsubscribeOnTerminated", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
val tm = new A
val target = sys.actorOf(Props(new Actor {
def receive = { case in a1.ref forward in }
}), "to-be-killed")
es.subscribe(a2.ref, classOf[Any])
es.subscribe(target, classOf[A]) should be(true)
es.subscribe(target, classOf[A]) should be(false)
target ! PoisonPill
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
fishForDebugMessage(a2, s"unwatching $target")
es.publish(tm)
a1.expectNoMsg(1 second)
a2.expectMsg(tm)
} finally {
sys.shutdown()
}
}
"unsubscribe the actor, when it subscribes already in terminated state" in {
val sys = ActorSystem("EventStreamSpecUnsubscribeTerminated", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
val target = system.actorOf(Props(new Actor {
def receive = { case in a1.ref forward in }
}), "to-be-killed")
watch(target)
target ! PoisonPill
expectTerminated(target)
es.subscribe(a2.ref, classOf[Any])
// target1 is Terminated; When subscribing, it will be unsubscribed by the Unsubscriber right away
es.subscribe(target, classOf[A]) should be(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
es.subscribe(target, classOf[A]) should be(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
} finally {
sys.shutdown()
}
}
"not allow initializing a TerminatedUnsubscriber twice" in {
val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config)
// initializes an TerminatedUnsubscriber during start
try {
val es = sys.eventStream
val p = TestProbe()
val refWillBeUsedAsUnsubscriber = es.initUnsubscriber(p.ref)
refWillBeUsedAsUnsubscriber should equal(false)
} finally {
sys.shutdown()
}
}
"unwatch an actor from unsubscriber when that actor unsubscribes from the stream" in {
val sys = ActorSystem("MustUnregisterDuringUnsubscribe", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[Logging.Debug])
es.subscribe(a2.ref, classOf[A])
fishForDebugMessage(a1, s"watching ${a2.ref}")
es.unsubscribe(a2.ref)
fishForDebugMessage(a1, s"unwatching ${a2.ref}")
} finally {
sys.shutdown()
}
}
"unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed" in {
val sys = ActorSystem("MustUnregisterWhenNoMoreChannelSubscriptions", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[Logging.Debug])
es.subscribe(a2.ref, classOf[A])
es.subscribe(a2.ref, classOf[T])
fishForDebugMessage(a1, s"watching ${a2.ref}")
es.unsubscribe(a2.ref, classOf[A]) should equal(true)
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel class akka.event.EventStreamSpec$$A")
a1.expectNoMsg(1 second)
es.unsubscribe(a2.ref, classOf[T]) should equal(true)
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel interface akka.event.EventStreamSpec$$T")
fishForDebugMessage(a1, s"unwatching ${a2.ref}, since has no subscriptions")
a1.expectNoMsg(1 second)
es.unsubscribe(a2.ref, classOf[T]) should equal(false)
} finally {
sys.shutdown()
}
}
} }
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
@ -284,4 +412,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
msg foreach (expectMsg(_)) msg foreach (expectMsg(_))
} }
private def fishForDebugMessage(a: TestProbe, messagePrefix: String) {
a.fishForMessage(3 seconds, hint = "expected debug message prefix: " + messagePrefix) {
case Logging.Debug(_, _, msg: String) if msg startsWith messagePrefix true
case other false
}
}
} }

View file

@ -618,6 +618,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
provider.init(this) provider.init(this)
if (settings.LogDeadLetters > 0) if (settings.LogDeadLetters > 0)
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
startEventStreamUnsubscriber(eventStream)
registerOnTermination(stopScheduler()) registerOnTermination(stopScheduler())
loadExtensions() loadExtensions()
if (LogConfigOnStart) logConfiguration() if (LogConfigOnStart) logConfiguration()
@ -662,6 +663,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
shutdown() shutdown()
} }
def startEventStreamUnsubscriber(eventStream: EventStream) {
systemActorOf(Props(classOf[EventStreamUnsubscriber], eventStream, DebugEventStream), "eventStreamUnsubscriber")
}
//#create-scheduler //#create-scheduler
/** /**
* Create the scheduler service. This one needs one special behavior: if * Create the scheduler service. This one needs one special behavior: if

View file

@ -175,6 +175,13 @@ trait SubchannelClassification { this: EventBus ⇒
recv foreach (publish(event, _)) recv foreach (publish(event, _))
} }
/**
* INTERNAL API
* Expensive call! Avoid calling directly from event bus subscribe / unsubscribe.
*/
private[akka] def hasSubscriptions(subscriber: Subscriber): Boolean =
cache.values exists { _ contains subscriber }
private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit = private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) { cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs) case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs)
@ -184,6 +191,7 @@ trait SubchannelClassification { this: EventBus ⇒
cache = (cache /: changes) { cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs) case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs)
} }
} }
/** /**

View file

@ -5,9 +5,11 @@ package akka.event
import language.implicitConversions import language.implicitConversions
import akka.actor.{ ActorRef, ActorSystem } import akka.actor._
import akka.event.Logging.simpleName import akka.event.Logging.simpleName
import akka.util.Subclassification import akka.util.Subclassification
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
object EventStream { object EventStream {
//Why is this here and why isn't there a failing test if it is removed? //Why is this here and why isn't there a failing test if it is removed?
@ -28,6 +30,9 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
type Event = AnyRef type Event = AnyRef
type Classifier = Class[_] type Classifier = Class[_]
/** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */
private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))
protected implicit val subclassification = new Subclassification[Class[_]] { protected implicit val subclassification = new Subclassification[Class[_]] {
def isEqual(x: Class[_], y: Class[_]) = x == y def isEqual(x: Class[_], y: Class[_]) = x == y
def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
@ -36,13 +41,13 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
protected def classify(event: AnyRef): Class[_] = event.getClass protected def classify(event: AnyRef): Class[_] = event.getClass
protected def publish(event: AnyRef, subscriber: ActorRef) = { protected def publish(event: AnyRef, subscriber: ActorRef) = {
if (subscriber.isTerminated) unsubscribe(subscriber) subscriber ! event
else subscriber ! event
} }
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
registerWithUnsubscriber(subscriber)
super.subscribe(subscriber, channel) super.subscribe(subscriber, channel)
} }
@ -50,6 +55,7 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
val ret = super.unsubscribe(subscriber, channel) val ret = super.unsubscribe(subscriber, channel)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel)) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
unregisterIfNoMoreSubscribedChannels(subscriber)
ret ret
} }
@ -57,6 +63,61 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
super.unsubscribe(subscriber) super.unsubscribe(subscriber)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
unregisterIfNoMoreSubscribedChannels(subscriber)
}
/**
* INTERNAL API
*/
private[akka] def initUnsubscriber(unsubscriber: ActorRef): Boolean = {
initiallySubscribedOrUnsubscriber.get match {
case value @ Left(subscribers)
if (initiallySubscribedOrUnsubscriber.compareAndSet(value, Right(unsubscriber))) {
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers.size + " initial subscribers with it"))
subscribers foreach registerWithUnsubscriber
true
} else {
// recurse, because either new subscribers have been registered since `get` (retry Left case),
// or another thread has succeeded in setting it's unsubscriber (end on Right case)
initUnsubscriber(unsubscriber)
}
case Right(presentUnsubscriber)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, s"not using unsubscriber $unsubscriber, because already initialized with $presentUnsubscriber"))
false
}
}
/**
* INTERNAL API
*/
private[akka] def registerWithUnsubscriber(subscriber: ActorRef): Unit = {
initiallySubscribedOrUnsubscriber.get match {
case value @ Left(subscribers)
if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber)))
registerWithUnsubscriber(subscriber)
case Right(unsubscriber)
unsubscriber ! EventStreamUnsubscriber.Register(subscriber)
}
}
/**
* INTERNAL API
*
* The actual check if the subscriber still has subscriptions is performed by the `EventStreamUnsubscriber`,
* because it's an expensive operation, and we don want to block client-code for that long, the Actor will eventually
* catch up and perform the apropriate operation.
*/
private[akka] def unregisterIfNoMoreSubscribedChannels(subscriber: ActorRef): Unit = {
initiallySubscribedOrUnsubscriber.get match {
case value @ Left(subscribers)
if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers - subscriber)))
unregisterIfNoMoreSubscribedChannels(subscriber)
case Right(unsubscriber)
unsubscriber ! EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)
}
} }
} }

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor._
import akka.event.Logging.simpleName
/**
* Watches all actors which subscribe on the given event stream, and unsubscribes them from it when they are Terminated.
*/
class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor with ActorLogging {
import EventStreamUnsubscriber._
override def preStart() {
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $eventStream"))
eventStream initUnsubscriber self
}
def receive = {
case Register(actor)
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
context watch actor
case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor)
// do nothing
// hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream
case UnregisterIfNoMoreSubscribedChannels(actor)
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
context unwatch actor
case Terminated(actor)
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $eventStream, because it was terminated"))
eventStream unsubscribe actor
}
}
object EventStreamUnsubscriber {
final case class Register(actor: ActorRef)
final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef)
}