Merge pull request #15024 from ktoso/wip-removing-isTerminated-from-buses-ktoso

!act,doc Removing isTerminated from EventStream and ActorClassification
This commit is contained in:
Konrad Malawski 2014-04-25 10:16:53 +02:00
commit 403afada41
12 changed files with 666 additions and 100 deletions

View file

@ -10,9 +10,11 @@ import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import scala.concurrent.duration._
import java.util.concurrent.atomic._
import akka.actor.{ Props, Actor, ActorRef, ActorSystem }
import java.util.Comparator
import akka.actor.{ Props, Actor, ActorRef, ActorSystem, PoisonPill, RootActorPath }
import akka.japi.{ Procedure, Function }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
object EventBusSpec {
class TestActorWrapperActor(testActor: ActorRef) extends Actor {
@ -23,7 +25,7 @@ object EventBusSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfterEach {
abstract class EventBusSpec(busName: String, conf: Config = ConfigFactory.empty()) extends AkkaSpec(conf) with BeforeAndAfterEach {
import EventBusSpec._
type BusType <: EventBus
@ -37,13 +39,13 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit
busName must {
lazy val bus = createNewEventBus()
busName must {
def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber]
def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier]
def createNewEvents(numberOfEvents: Int): Iterable[bus.Event] = createEvents(numberOfEvents).asInstanceOf[Iterable[bus.Event]]
val bus = createNewEventBus()
val events = createNewEvents(100)
val event = events.head
val classifier = getClassifierFor(event)
@ -144,30 +146,137 @@ abstract class EventBusSpec(busName: String) extends AkkaSpec with BeforeAndAfte
}
object ActorEventBusSpec {
class ComposedActorEventBus extends ActorEventBus with LookupClassification {
type Event = Int
type Classifier = String
class MyActorEventBus(protected val system: ActorSystem) extends ActorEventBus
with ActorClassification with ActorClassifier {
def classify(event: Event) = event.toString
type Event = Notification
def classify(event: Event) = event.ref
protected def mapSize = 32
def publish(event: Event, subscriber: Subscriber) = subscriber ! event
}
case class Notification(ref: ActorRef, payload: Int)
}
class ActorEventBusSpec extends EventBusSpec("ActorEventBus") {
class ActorEventBusSpec(conf: Config) extends EventBusSpec("ActorEventBus", conf) {
import akka.event.ActorEventBusSpec._
import EventBusSpec.TestActorWrapperActor
type BusType = ComposedActorEventBus
def createNewEventBus(): BusType = new ComposedActorEventBus
def this() {
this(ConfigFactory.parseString("akka.actor.debug.event-stream = on").withFallback(AkkaSpec.testConf))
}
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
type BusType = MyActorEventBus
def createNewEventBus(): BusType = new MyActorEventBus(system)
// different actor in each event because we want each event to have a different classifier (see EventBusSpec tests)
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents).map(Notification(TestProbe().ref, _)).toSeq
def createSubscriber(pipeTo: ActorRef) = system.actorOf(Props(new TestActorWrapperActor(pipeTo)))
def classifierFor(event: BusType#Event) = event.toString
def classifierFor(event: BusType#Event) = event.ref
def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber)
// ActorClassification specific tests
"must unsubscribe subscriber when it terminates" in {
val a1 = createSubscriber(system.deadLetters)
val subs = createSubscriber(testActor)
def m(i: Int) = Notification(a1, i)
val p = TestProbe()
system.eventStream.subscribe(p.ref, classOf[Logging.Debug])
bus.subscribe(subs, a1)
bus.publish(m(1))
expectMsg(m(1))
watch(subs)
subs ! PoisonPill // when a1 dies, subs has nothing subscribed
expectTerminated(subs)
expectUnsubscribedByUnsubscriber(p, subs)
bus.publish(m(2))
expectNoMsg(1 second)
disposeSubscriber(system, subs)
disposeSubscriber(system, a1)
}
"must keep subscriber even if its subscription-actors have died" in {
// Deaths of monitored actors should not influence the subscription.
// For example: one might still want to monitor messages classified to A
// even though it died, and handle these in some way.
val a1 = createSubscriber(system.deadLetters)
val subs = createSubscriber(testActor)
def m(i: Int) = Notification(a1, i)
bus.subscribe(subs, a1) should equal(true)
bus.publish(m(1))
expectMsg(m(1))
watch(a1)
a1 ! PoisonPill
expectTerminated(a1)
bus.publish(m(2)) // even though a1 has terminated, classification still applies
expectMsg(m(2))
disposeSubscriber(system, subs)
disposeSubscriber(system, a1)
}
"must unregister subscriber only after it unsubscribes from all of it's subscriptions" in {
val a1, a2 = createSubscriber(system.deadLetters)
val subs = createSubscriber(testActor)
def m1(i: Int) = Notification(a1, i)
def m2(i: Int) = Notification(a2, i)
val p = TestProbe()
system.eventStream.subscribe(p.ref, classOf[Logging.Debug])
bus.subscribe(subs, a1) should equal(true)
bus.subscribe(subs, a2) should equal(true)
bus.publish(m1(1))
bus.publish(m2(1))
expectMsg(m1(1))
expectMsg(m2(1))
bus.unsubscribe(subs, a1)
bus.publish(m1(2))
expectNoMsg(1 second)
bus.publish(m2(2))
expectMsg(m2(2))
bus.unsubscribe(subs, a2)
expectUnregisterFromUnsubscriber(p, subs)
bus.publish(m1(3))
bus.publish(m2(3))
expectNoMsg(1 second)
disposeSubscriber(system, subs)
disposeSubscriber(system, a1)
disposeSubscriber(system, a2)
}
private def expectUnsubscribedByUnsubscriber(p: TestProbe, a: ActorRef) {
val expectedMsg = s"actor $a has terminated, unsubscribing it from $bus"
p.fishForMessage(1 second, hint = expectedMsg) {
case Logging.Debug(_, _, msg) if msg equals expectedMsg true
case other false
}
}
private def expectUnregisterFromUnsubscriber(p: TestProbe, a: ActorRef) {
val expectedMsg = s"unregistered watch of $a in $bus"
p.fishForMessage(1 second, hint = expectedMsg) {
case Logging.Debug(_, _, msg) if msg equals expectedMsg true
case other false
}
}
}
object ScanningEventBusSpec {

View file

@ -6,7 +6,7 @@ package akka.event
import language.postfixOps
import scala.concurrent.duration._
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage }
import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
import akka.event.Logging.InitializeLogger
@ -28,11 +28,15 @@ object EventStreamSpec {
akka {
actor.serialize-messages = off
stdout-loglevel = WARNING
loglevel = DEBUG
loglevel = WARNING
actor.debug.unhandled = on
}
""")
val configUnhandledWithDebug =
ConfigFactory.parseString("akka.actor.debug.event-stream = on")
.withFallback(configUnhandled)
final case class M(i: Int)
final case class SetTarget(ref: ActorRef)
@ -75,7 +79,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
"An EventStream" must {
"manage subscriptions" in {
val bus = new EventStream(true)
//#event-bus-start-unsubscriber-scala
val bus = new EventStream(system, true)
bus.startUnsubscriber()
//#event-bus-start-unsubscriber-scala
bus.subscribe(testActor, classOf[M])
bus.publish(M(42))
within(1 second) {
@ -87,12 +95,12 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"not allow null as subscriber" in {
val bus = new EventStream(true)
val bus = new EventStream(system, true)
intercept[IllegalArgumentException] { bus.subscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
}
"not allow null as unsubscriber" in {
val bus = new EventStream(true)
val bus = new EventStream(system, true)
intercept[IllegalArgumentException] { bus.unsubscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
intercept[IllegalArgumentException] { bus.unsubscribe(null) }.getMessage should be("subscriber is null")
}
@ -111,7 +119,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage log levels" in {
val bus = new EventStream(false)
val bus = new EventStream(system, false)
bus.startDefaultLoggers(impl)
bus.publish(SetTarget(testActor))
expectMsg("OK")
@ -132,7 +140,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
val b1 = new B1
val b2 = new B2
val c = new C
val bus = new EventStream(false)
val bus = new EventStream(system, false)
within(2 seconds) {
bus.subscribe(testActor, classOf[B2]) should be(true)
bus.publish(c)
@ -154,7 +162,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage sub-channels using classes and traits (update on subscribe)" in {
val es = new EventStream(false)
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe()
@ -177,7 +185,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage sub-channels using classes and traits (update on unsubscribe)" in {
val es = new EventStream(false)
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe()
@ -199,7 +207,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage sub-channels using classes and traits (update on unsubscribe all)" in {
val es = new EventStream(false)
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe()
@ -221,7 +229,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage sub-channels using classes and traits (update on publish)" in {
val es = new EventStream(false)
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2 = TestProbe()
@ -237,7 +245,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage sub-channels using classes and traits (unsubscribe classes used with trait)" in {
val es = new EventStream(false)
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3 = TestProbe()
@ -259,7 +267,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
}
"manage sub-channels using classes and traits (subscribe after publish)" in {
val es = new EventStream(false)
val es = new EventStream(system, false)
val tm1 = new CCATBT
val a1, a2 = TestProbe()
@ -274,6 +282,130 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BTT]) should be(true)
}
"unsubscribe an actor on its termination" in {
val sys = ActorSystem("EventStreamSpecUnsubscribeOnTerminated", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
val tm = new A
val target = sys.actorOf(Props(new Actor {
def receive = { case in a1.ref forward in }
}), "to-be-killed")
es.subscribe(a2.ref, classOf[Any])
es.subscribe(target, classOf[A]) should be(true)
es.subscribe(target, classOf[A]) should be(false)
target ! PoisonPill
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
fishForDebugMessage(a2, s"unwatching $target")
es.publish(tm)
a1.expectNoMsg(1 second)
a2.expectMsg(tm)
} finally {
shutdown(sys)
}
}
"unsubscribe the actor, when it subscribes already in terminated state" in {
val sys = ActorSystem("EventStreamSpecUnsubscribeTerminated", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
val target = system.actorOf(Props(new Actor {
def receive = { case in a1.ref forward in }
}), "to-be-killed")
watch(target)
target ! PoisonPill
expectTerminated(target)
es.subscribe(a2.ref, classOf[Any])
// target1 is Terminated; When subscribing, it will be unsubscribed by the Unsubscriber right away
es.subscribe(target, classOf[A]) should be(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
es.subscribe(target, classOf[A]) should be(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
} finally {
shutdown(sys)
}
}
"not allow initializing a TerminatedUnsubscriber twice" in {
val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config)
// initializes an TerminatedUnsubscriber during start
try {
val es = sys.eventStream
val p = TestProbe()
val refWillBeUsedAsUnsubscriber = es.initUnsubscriber(p.ref)
refWillBeUsedAsUnsubscriber should equal(false)
} finally {
shutdown(sys)
}
}
"unwatch an actor from unsubscriber when that actor unsubscribes from the stream" in {
val sys = ActorSystem("MustUnregisterDuringUnsubscribe", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[Logging.Debug])
es.subscribe(a2.ref, classOf[A])
fishForDebugMessage(a1, s"watching ${a2.ref}")
es.unsubscribe(a2.ref)
fishForDebugMessage(a1, s"unwatching ${a2.ref}")
} finally {
shutdown(sys)
}
}
"unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed" in {
val sys = ActorSystem("MustUnregisterWhenNoMoreChannelSubscriptions", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[Logging.Debug])
es.subscribe(a2.ref, classOf[A])
es.subscribe(a2.ref, classOf[T])
fishForDebugMessage(a1, s"watching ${a2.ref}")
es.unsubscribe(a2.ref, classOf[A]) should equal(true)
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel class akka.event.EventStreamSpec$$A")
a1.expectNoMsg(1 second)
es.unsubscribe(a2.ref, classOf[T]) should equal(true)
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel interface akka.event.EventStreamSpec$$T")
fishForDebugMessage(a1, s"unwatching ${a2.ref}, since has no subscriptions")
a1.expectNoMsg(1 second)
es.unsubscribe(a2.ref, classOf[T]) should equal(false)
} finally {
shutdown(sys)
}
}
}
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
@ -284,4 +416,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
msg foreach (expectMsg(_))
}
private def fishForDebugMessage(a: TestProbe, messagePrefix: String) {
a.fishForMessage(3 seconds, hint = "expected debug message prefix: " + messagePrefix) {
case Logging.Debug(_, _, msg: String) if msg startsWith messagePrefix true
case other false
}
}
}

View file

@ -567,7 +567,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
import settings._
// this provides basic logging (to stdout) until .start() is called below
val eventStream: EventStream = new EventStream(DebugEventStream)
val eventStream = new EventStream(this, DebugEventStream)
eventStream.startStdoutLogger(settings)
val log: LoggingAdapter = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass)
@ -618,6 +618,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
provider.init(this)
if (settings.LogDeadLetters > 0)
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
eventStream.startUnsubscriber()
registerOnTermination(stopScheduler())
loadExtensions()
if (LogConfigOnStart) logConfiguration()

View file

@ -4,13 +4,13 @@
package akka.event
import akka.actor.ActorRef
import akka.actor.{ ActorSystem, ActorRef }
import akka.util.Index
import java.util.concurrent.ConcurrentSkipListSet
import java.util.Comparator
import akka.util.{ Subclassification, SubclassifiedIndex }
import scala.collection.immutable.TreeSet
import scala.collection.immutable
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
/**
* Represents the base type for EventBuses
@ -175,6 +175,13 @@ trait SubchannelClassification { this: EventBus ⇒
recv foreach (publish(event, _))
}
/**
* INTERNAL API
* Expensive call! Avoid calling directly from event bus subscribe / unsubscribe.
*/
private[akka] def hasSubscriptions(subscriber: Subscriber): Boolean =
cache.values exists { _ contains subscriber }
private def removeFromCache(changes: immutable.Seq[(Classifier, Set[Subscriber])]): Unit =
cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) -- cs)
@ -184,6 +191,7 @@ trait SubchannelClassification { this: EventBus ⇒
cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[Subscriber]) ++ cs)
}
}
/**
@ -243,80 +251,113 @@ trait ScanningClassification { self: EventBus ⇒
}
/**
* Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs
* Maps ActorRefs to ActorRefs to form an EventBus where ActorRefs can listen to other ActorRefs.
*
* All subscribers will be watched by an [[akka.event.ActorClassificationUnsubscriber]] and unsubscribed when they terminate.
* The unsubscriber actor will not be stopped automatically, and if you want to stop using the bus you should stop it yourself.
*/
trait ActorClassification { this: ActorEventBus with ActorClassifier
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
private val empty = TreeSet.empty[ActorRef]
private val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize)
protected def system: ActorSystem
private class ActorClassificationMappings(val seqNr: Int, val backing: Map[ActorRef, immutable.TreeSet[ActorRef]]) {
def get(monitored: ActorRef): immutable.TreeSet[ActorRef] = backing.getOrElse(monitored, empty)
def add(monitored: ActorRef, monitor: ActorRef) = {
val watchers = backing.get(monitored).getOrElse(empty) + monitor
new ActorClassificationMappings(seqNr + 1, backing.updated(monitored, watchers))
}
def remove(monitored: ActorRef, monitor: ActorRef) = {
val monitors = backing.get(monitored).getOrElse(empty) - monitor
new ActorClassificationMappings(seqNr + 1, backing.updated(monitored, monitors))
}
def remove(monitored: ActorRef) = {
val v = backing - monitored
new ActorClassificationMappings(seqNr + 1, v)
}
}
private val mappings = new AtomicReference[ActorClassificationMappings](
new ActorClassificationMappings(0, Map.empty[ActorRef, immutable.TreeSet[ActorRef]]))
private val empty = immutable.TreeSet.empty[ActorRef]
/** The unsubscriber takes care of unsubscribing actors, which have terminated. */
protected lazy val unsubscriber = ActorClassificationUnsubscriber.start(system, this)
@tailrec
protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = {
val current = mappings get monitored
current match {
case null
if (monitored.isTerminated) false
val current = mappings.get
current.backing.get(monitored) match {
case None
val added = current.add(monitored, monitor)
if (mappings.compareAndSet(current, added)) registerWithUnsubscriber(monitor, added.seqNr)
else associate(monitored, monitor)
case Some(monitors)
if (monitors.contains(monitored)) false
else {
if (mappings.putIfAbsent(monitored, empty + monitor) ne null) associate(monitored, monitor)
else if (monitored.isTerminated) !dissociate(monitored, monitor) else true
}
case raw: TreeSet[_]
val v = raw.asInstanceOf[TreeSet[ActorRef]]
if (monitored.isTerminated) false
if (v.contains(monitor)) true
else {
val added = v + monitor
if (!mappings.replace(monitored, v, added)) associate(monitored, monitor)
else if (monitored.isTerminated) !dissociate(monitored, monitor) else true
val added = current.add(monitored, monitor)
val noChange = current.backing == added.backing
if (noChange) false
else if (mappings.compareAndSet(current, added)) registerWithUnsubscriber(monitor, added.seqNr)
else associate(monitored, monitor)
}
}
}
protected final def dissociate(monitored: ActorRef): immutable.Iterable[ActorRef] = {
protected final def dissociate(actor: ActorRef): Unit = {
@tailrec
def dissociateAsMonitored(monitored: ActorRef): immutable.Iterable[ActorRef] = {
val current = mappings get monitored
current match {
case null empty
case raw: TreeSet[_]
val v = raw.asInstanceOf[TreeSet[ActorRef]]
if (!mappings.remove(monitored, v)) dissociateAsMonitored(monitored)
else v
def dissociateAsMonitored(monitored: ActorRef): Unit = {
val current = mappings.get
if (current.backing.contains(monitored)) {
val removed = current.remove(monitored)
if (!mappings.compareAndSet(current, removed))
dissociateAsMonitored(monitored)
}
}
def dissociateAsMonitor(monitor: ActorRef): Unit = {
val i = mappings.entrySet.iterator
while (i.hasNext()) {
val entry = i.next()
val v = entry.getValue
v match {
case raw: TreeSet[_]
val monitors = raw.asInstanceOf[TreeSet[ActorRef]]
val current = mappings.get
val i = current.backing.iterator
while (i.hasNext) {
val (key, value) = i.next()
value match {
case null
// do nothing
case monitors
if (monitors.contains(monitor))
dissociate(entry.getKey, monitor)
case _ //Dun care
dissociate(key, monitor)
}
}
}
try { dissociateAsMonitored(monitored) } finally { dissociateAsMonitor(monitored) }
try { dissociateAsMonitored(actor) } finally { dissociateAsMonitor(actor) }
}
@tailrec
protected final def dissociate(monitored: ActorRef, monitor: ActorRef): Boolean = {
val current = mappings get monitored
current match {
case null false
case raw: TreeSet[_]
val v = raw.asInstanceOf[TreeSet[ActorRef]]
val removed = v - monitor
if (removed eq raw) false
else if (removed.isEmpty) {
if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) else true
val current = mappings.get
current.backing.get(monitored) match {
case None false
case Some(monitors)
val removed = current.remove(monitored, monitor)
val removedMonitors = removed.get(monitored)
if (monitors.isEmpty || monitors == removedMonitors) {
false
} else {
if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) else true
if (mappings.compareAndSet(current, removed)) unregisterFromUnsubscriber(monitor, removed.seqNr)
else dissociate(monitored, monitor)
}
}
}
@ -331,9 +372,11 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒
*/
protected def mapSize: Int
def publish(event: Event): Unit = mappings.get(classify(event)) match {
case null ()
case some some foreach { _ ! event }
def publish(event: Event): Unit = {
mappings.get.backing.get(classify(event)) match {
case None ()
case Some(refs) refs.foreach { _ ! event }
}
}
def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
@ -349,4 +392,20 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒
def unsubscribe(subscriber: Subscriber): Unit =
if (subscriber eq null) throw new IllegalArgumentException("Subscriber is null")
else dissociate(subscriber)
/**
* INTERNAL API
*/
private[akka] def registerWithUnsubscriber(subscriber: ActorRef, seqNr: Int): Boolean = {
unsubscriber ! ActorClassificationUnsubscriber.Register(subscriber, seqNr)
true
}
/**
* INTERNAL API
*/
private[akka] def unregisterFromUnsubscriber(subscriber: ActorRef, seqNr: Int): Boolean = {
unsubscriber ! ActorClassificationUnsubscriber.Unregister(subscriber, seqNr)
true
}
}

View file

@ -0,0 +1,142 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor._
import akka.event.Logging.simpleName
import java.util.concurrent.atomic.AtomicInteger
/**
* INTERNAL API
*
* Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated.
*
* Assumptions note:
* We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor,
* thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to
* needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down
* subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes
* watching a few actors too much - we opt for the 2nd choice here.
*/
private[akka] class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor {
import EventStreamUnsubscriber._
override def preStart() {
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $eventStream"))
eventStream initUnsubscriber self
}
def receive = {
case Register(actor)
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
context watch actor
case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor)
// do nothing
// hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream
case UnregisterIfNoMoreSubscribedChannels(actor)
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
context unwatch actor
case Terminated(actor)
if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $eventStream, because it was terminated"))
eventStream unsubscribe actor
}
}
/**
* INTERNAL API
*
* Provides factory for [[akka.event.EventStreamUnsubscriber]] actors with **unique names**.
* This is needed if someone spins up more [[EventStream]]s using the same [[ActorSystem]],
* each stream gets it's own unsubscriber.
*/
private[akka] object EventStreamUnsubscriber {
private val unsubscribersCount = new AtomicInteger(0)
final case class Register(actor: ActorRef)
final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef)
private def props(eventStream: EventStream, debug: Boolean) =
Props(classOf[EventStreamUnsubscriber], eventStream, debug)
def start(system: ActorSystem, stream: EventStream) = {
val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream")
system.asInstanceOf[ExtendedActorSystem]
.systemActorOf(props(stream, debug), "eventStreamUnsubscriber-" + unsubscribersCount.incrementAndGet())
}
}
/**
* INTERNAL API
*
* Watches all actors which subscribe on the given event stream, and unsubscribes them from it when they are Terminated.
*/
private[akka] class ActorClassificationUnsubscriber(bus: ActorClassification, debug: Boolean) extends Actor with Stash {
import ActorClassificationUnsubscriber._
private var atSeq = 0
private def nextSeq = atSeq + 1
override def preStart() {
super.preStart()
if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"will monitor $bus"))
}
def receive = {
case Register(actor, seq) if seq == nextSeq
if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registered watch for $actor in $bus"))
context watch actor
atSeq = nextSeq
unstashAll()
case reg: Register
stash()
case Unregister(actor, seq) if seq == nextSeq
if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unregistered watch of $actor in $bus"))
context unwatch actor
atSeq = nextSeq
unstashAll()
case unreg: Unregister
stash()
case Terminated(actor)
if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"actor $actor has terminated, unsubscribing it from $bus"))
// the `unsubscribe` will trigger another `Unregister(actor, _)` message to this unsubscriber;
// but since that actor is terminated, there cannot be any harm in processing an Unregister for it.
bus unsubscribe actor
}
}
/**
* INTERNAL API
*
* Provides factory for [[akka.event.ActorClassificationUnsubscriber]] actors with **unique names**.
*/
private[akka] object ActorClassificationUnsubscriber {
private val unsubscribersCount = new AtomicInteger(0)
final case class Register(actor: ActorRef, seq: Int)
final case class Unregister(actor: ActorRef, seq: Int)
def start(system: ActorSystem, bus: ActorClassification, debug: Boolean = false) = {
val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream")
system.asInstanceOf[ExtendedActorSystem]
.systemActorOf(props(bus, debug), "actorClassificationUnsubscriber-" + unsubscribersCount.incrementAndGet())
}
private def props(eventBus: ActorClassification, debug: Boolean) = Props(classOf[ActorClassificationUnsubscriber], eventBus, debug)
}

View file

@ -8,11 +8,8 @@ import language.implicitConversions
import akka.actor.{ ActorRef, ActorSystem }
import akka.event.Logging.simpleName
import akka.util.Subclassification
object EventStream {
//Why is this here and why isn't there a failing test if it is removed?
implicit def fromActorSystem(system: ActorSystem) = system.eventStream
}
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
/**
* An Akka EventStream is a pub-sub stream of events both system and user generated,
@ -23,11 +20,14 @@ object EventStream {
* The debug flag in the constructor toggles if operations on this EventStream should also be published
* as Debug-Events
*/
class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
class EventStream(sys: ActorSystem, private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
type Event = AnyRef
type Classifier = Class[_]
/** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */
private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))
protected implicit val subclassification = new Subclassification[Class[_]] {
def isEqual(x: Class[_], y: Class[_]) = x == y
def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
@ -36,13 +36,13 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
protected def classify(event: AnyRef): Class[_] = event.getClass
protected def publish(event: AnyRef, subscriber: ActorRef) = {
if (subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! event
subscriber ! event
}
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
registerWithUnsubscriber(subscriber)
super.subscribe(subscriber, channel)
}
@ -50,6 +50,7 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
val ret = super.unsubscribe(subscriber, channel)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
unregisterIfNoMoreSubscribedChannels(subscriber)
ret
}
@ -57,6 +58,70 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
super.unsubscribe(subscriber)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
unregisterIfNoMoreSubscribedChannels(subscriber)
}
/**
* ''Must'' be called after actor system is "ready".
* Starts system actor that takes care of unsubscribing subscribers that have terminated.
*/
def startUnsubscriber() = EventStreamUnsubscriber.start(sys, this)
/**
* INTERNAL API
*/
@tailrec
final private[akka] def initUnsubscriber(unsubscriber: ActorRef): Boolean = {
initiallySubscribedOrUnsubscriber.get match {
case value @ Left(subscribers)
if (initiallySubscribedOrUnsubscriber.compareAndSet(value, Right(unsubscriber))) {
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers.size + " initial subscribers with it"))
subscribers foreach registerWithUnsubscriber
true
} else {
// recurse, because either new subscribers have been registered since `get` (retry Left case),
// or another thread has succeeded in setting it's unsubscriber (end on Right case)
initUnsubscriber(unsubscriber)
}
case Right(presentUnsubscriber)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, s"not using unsubscriber $unsubscriber, because already initialized with $presentUnsubscriber"))
false
}
}
/**
* INTERNAL API
*/
@tailrec
private def registerWithUnsubscriber(subscriber: ActorRef): Unit = {
initiallySubscribedOrUnsubscriber.get match {
case value @ Left(subscribers)
if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber)))
registerWithUnsubscriber(subscriber)
case Right(unsubscriber)
unsubscriber ! EventStreamUnsubscriber.Register(subscriber)
}
}
/**
* INTERNAL API
*
* The actual check if the subscriber still has subscriptions is performed by the `EventStreamUnsubscriber`,
* because it's an expensive operation, and we don want to block client-code for that long, the Actor will eventually
* catch up and perform the apropriate operation.
*/
@tailrec
private def unregisterIfNoMoreSubscribedChannels(subscriber: ActorRef): Unit = {
initiallySubscribedOrUnsubscriber.get match {
case value @ Left(subscribers)
if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers - subscriber)))
unregisterIfNoMoreSubscribedChannels(subscriber)
case Right(unsubscriber)
unsubscriber ! EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)
}
}
}

View file

@ -4,7 +4,7 @@
package akka.event.japi
import akka.util.Subclassification
import akka.actor.ActorRef
import akka.actor.{ ActorSystem, ActorRef }
/**
* Java API: See documentation for [[akka.event.EventBus]]
@ -89,12 +89,11 @@ abstract class LookupEventBus[E, S, C] extends EventBus[E, S, C] {
}
/**
* See documentation for [[akka.event.SubchannelClassification]]
* Java API: See documentation for [[akka.event.SubchannelClassification]]
* E is the Event type
* S is the Subscriber type
* C is the Classifier type
*/
abstract class SubchannelEventBus[E, S, C] extends EventBus[E, S, C] {
private val bus = new akka.event.EventBus with akka.event.SubchannelClassification {
type Event = E
@ -134,7 +133,7 @@ abstract class SubchannelEventBus[E, S, C] extends EventBus[E, S, C] {
}
/**
* See documentation for [[akka.event.ScanningClassification]]
* Java API: See documentation for [[akka.event.ScanningClassification]]
* E is the Event type
* S is the Subscriber type
* C is the Classifier type
@ -185,15 +184,17 @@ abstract class ScanningEventBus[E, S, C] extends EventBus[E, S, C] {
}
/**
* See documentation for [[akka.event.ActorClassification]]
* Java API: See documentation for [[akka.event.ActorClassification]]
* An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef
* Means that ActorRefs "listen" to other ActorRefs
* E is the Event type
*/
abstract class ActorEventBus[E] extends EventBus[E, ActorRef, ActorRef] {
abstract class ActorEventBus[E](system: ActorSystem) extends EventBus[E, ActorRef, ActorRef] {
private val bus = new akka.event.ActorEventBus with akka.event.ActorClassification with akka.event.ActorClassifier {
type Event = E
override val system = ActorEventBus.this.system
override protected def mapSize: Int = ActorEventBus.this.mapSize
override protected def classify(event: E): ActorRef =

View file

@ -3,20 +3,27 @@
*/
package docs.event;
import akka.event.japi.EventBus;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.event.japi.*;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.JavaTestKit;
import akka.event.japi.EventBus;
import akka.util.Subclassification;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
//#lookup-bus
import akka.event.japi.LookupEventBus;
import java.util.concurrent.TimeUnit;
//#lookup-bus
@ -226,6 +233,12 @@ public class EventBusDocTest {
static
//#actor-bus
public class ActorBusImpl extends ActorEventBus<Notification> {
// the ActorSystem will be used for book-keeping operations, such as subscribers terminating
public ActorBusImpl(ActorSystem system) {
super(system);
}
// is used for extracting the classifier from the incoming events
@Override public ActorRef classify(Notification event) {
return event.ref;
@ -299,7 +312,7 @@ public class EventBusDocTest {
JavaTestKit probe2 = new JavaTestKit(system);
ActorRef subscriber1 = probe1.getRef();
ActorRef subscriber2 = probe2.getRef();
ActorBusImpl actorBus = new ActorBusImpl();
ActorBusImpl actorBus = new ActorBusImpl(system);
actorBus.subscribe(subscriber1, observer1);
actorBus.subscribe(subscriber2, observer1);
actorBus.subscribe(subscriber2, observer2);

View file

@ -104,6 +104,8 @@ A test for this implementation may look like this:
This classifier takes always a time which is proportional to the number of
subscriptions, independent of how many actually match.
.. _actor-classification-java:
Actor Classification
--------------------
@ -111,6 +113,11 @@ This classification was originally developed specifically for implementing
:ref:`DeathWatch <deathwatch-java>`: subscribers as well as classifiers are of
type :class:`ActorRef`.
This classification requires an :class:`ActorSystem` in order to perform book-keeping
operations related to the subscribers being Actors, which can terminate without first
unsubscribing from the EventBus. ActorClassification maitains a system Actor which
takes care of unsubscribing terminated actors automatically.
The necessary methods to be implemented are illustrated with the following example:
.. includecode:: code/docs/event/EventBusDocTest.java#actor-bus
@ -141,6 +148,8 @@ it can be subscribed like this:
.. includecode:: code/docs/event/LoggingDocTest.java#deadletters
Similarily to `Actor Classification`_, :class:`EventStream` will automatically remove subscibers when they terminate.
.. note::
The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly).
If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub`.

View file

@ -19,6 +19,25 @@ In earlier versions of Akka `TestKit.remaining` returned the default timeout con
AssertionError if called outside of within. The old behavior however can still be achieved by
calling `TestKit.remainingOrDefault` instead.
EventStream and ActorClassification EventBus now require an ActorSystem
=======================================================================
Both the ``EventStream`` (:ref:`Scala <event-stream-scala>`, :ref:`Java <event-stream-java>`) and the
``ActorClassification`` Event Bus (:ref:`Scala <actor-classification-scala>`, :ref:`Java <actor-classification-java>`) now
require an ``ActorSystem`` to properly operate. The reason for that is moving away from stateful internal lifecycle checks
to a fully reactive model for unsubscribing actors that have ``Terminated``.
If you have implemented a custom event bus, you will need to pass in the actor system through the constructor now:
.. includecode:: ../scala/code/docs/event/EventBusDocSpec.scala#actor-bus
If you have been creating EventStreams manually, you now have to provide an actor system and *start the unsubscriber*:
.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala#event-bus-start-unsubscriber-scala
Please note that this change affects you only if you have implemented your own busses, Akka's own ``context.eventStream``
is still there and does not require any attention from you concerning this change.
Removed Deprecated Features
===========================

View file

@ -5,7 +5,7 @@ package docs.event
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.actor.{ ActorSystem, ActorRef }
import akka.testkit.TestProbe
object EventBusDocSpec {
@ -126,7 +126,7 @@ object EventBusDocSpec {
final case class Notification(ref: ActorRef, id: Int)
class ActorBusImpl extends ActorEventBus with ActorClassifier with ActorClassification {
class ActorBusImpl(val system: ActorSystem) extends ActorEventBus with ActorClassifier with ActorClassification {
type Event = Notification
// is used for extracting the classifier from the incoming events
@ -187,7 +187,7 @@ class EventBusDocSpec extends AkkaSpec {
val probe2 = TestProbe()
val subscriber1 = probe1.ref
val subscriber2 = probe2.ref
val actorBus = new ActorBusImpl
val actorBus = new ActorBusImpl(system)
actorBus.subscribe(subscriber1, observer1)
actorBus.subscribe(subscriber2, observer1)
actorBus.subscribe(subscriber2, observer2)

View file

@ -104,6 +104,8 @@ A test for this implementation may look like this:
This classifier takes always a time which is proportional to the number of
subscriptions, independent of how many actually match.
.. _actor-classification-scala:
Actor Classification
--------------------
@ -111,6 +113,11 @@ This classification was originally developed specifically for implementing
:ref:`DeathWatch <deathwatch-scala>`: subscribers as well as classifiers are of
type :class:`ActorRef`.
This classification requires an :class:`ActorSystem` in order to perform book-keeping
operations related to the subscribers being Actors, which can terminate without first
unsubscribing from the EventBus. ActorClassification maitains a system Actor which
takes care of unsubscribing terminated actors automatically.
The necessary methods to be implemented are illustrated with the following example:
.. includecode:: code/docs/event/EventBusDocSpec.scala#actor-bus
@ -136,6 +143,8 @@ how a simple subscription works:
.. includecode:: code/docs/event/LoggingDocSpec.scala#deadletters
Similarily to `Actor Classification`_, :class:`EventStream` will automatically remove subscibers when they terminate.
.. note::
The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly).
If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub`.