diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala new file mode 100644 index 0000000000..908e27a16c --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.event + +import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } +import org.scalatest.matchers.MustMatchers + +import akka.actor.Actor._ +import akka.testkit._ +import akka.util.duration._ +import java.util.concurrent.atomic._ +import akka.actor.ActorRef + +object EventBusSpec { + +} + +abstract class EventBusSpec(busName: String) extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach { + import EventBusSpec._ + type BusType <: EventBus + + def createNewEventBus(): BusType + + def createEvents(numberOfEvents: Int): Iterable[BusType#Event] + + def createSubscriber(pipeTo: ActorRef): BusType#Subscriber + + def classifierFor(event: BusType#Event): BusType#Classifier + + def disposeSubscriber(subscriber: BusType#Subscriber): Unit + + busName must { + + def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber] + def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier] + + val bus = createNewEventBus() + val events = createEvents(100) + val event = events.head + val classifier = getClassifierFor(event) + val subscriber = createNewSubscriber() + + "allow subscribers" in { + bus.subscribe(subscriber, classifier) must be === true + } + + "allow to unsubscribe already existing subscriber" in { + bus.unsubscribe(subscriber, classifier) must be === true + } + + "not allow to unsubscribe non-existing subscriber" in { + val sub = createNewSubscriber() + bus.unsubscribe(sub, classifier) must be === false + disposeSubscriber(sub) + } + + "not allow for the same subscriber to subscribe to the same channel twice" in { + bus.subscribe(subscriber, classifier) must be === true + bus.subscribe(subscriber, classifier) must be === false + bus.unsubscribe(subscriber, classifier) must be === true + } + + "not allow for the same subscriber to unsubscribe to the same channel twice" in { + bus.subscribe(subscriber, classifier) must be === true + bus.unsubscribe(subscriber, classifier) must be === true + bus.unsubscribe(subscriber, classifier) must be === false + } + + "allow to add multiple subscribers" in { + val subscribers = (1 to 10) map { _ ⇒ createNewSubscriber() } + val events = createEvents(10) + val classifiers = events map getClassifierFor + subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } must be === true + subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } must be === true + + subscribers foreach disposeSubscriber + } + + "cleanup subscriber" in { + disposeSubscriber(subscriber) + } + } +} + +object ActorEventBusSpec { + class ComposedActorEventBus extends ActorEventBus with LookupClassification with EventType[String] with ClassifierType[String] { + def classify(event: String) = event.charAt(0).toString + def publish(event: String, subscriber: ActorRef) = subscriber ! event + } +} + +class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { + import akka.event.ActorEventBusSpec.ComposedActorEventBus + + type BusType = ComposedActorEventBus + def createNewEventBus(): BusType = new ComposedActorEventBus + + def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) map { _.toString } + + def createSubscriber(pipeTo: ActorRef) = pipeTo + + def classifierFor(event: BusType#Event) = event.charAt(0).toString + + def disposeSubscriber(subscriber: BusType#Subscriber): Unit = () +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index cdf43c8560..ba5a07b2c0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -397,7 +397,7 @@ private[akka] class ActorCell( if (supervisor.isDefined) supervisor.get ! ChildTerminated(self, cause) - InVMMonitoring.signal(Terminated(self, cause)) + InVMMonitoring.publish(Terminated(self, cause)) currentMessage = null clearActorContext() @@ -419,10 +419,10 @@ private[akka] class ActorCell( case Create ⇒ create() case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ - akka.event.InVMMonitoring.link(self, subject) + akka.event.InVMMonitoring.subscribe(self, subject) if (Actor.debugLifecycle) EventHandler.debug(actor, "now monitoring " + subject) case Unlink(subject) ⇒ - akka.event.InVMMonitoring.unlink(self, subject) + akka.event.InVMMonitoring.unsubscribe(self, subject) if (Actor.debugLifecycle) EventHandler.debug(actor, "stopped monitoring " + subject) case Suspend ⇒ suspend() case Resume ⇒ resume() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index d97e3551d0..2884a8e4b3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -12,6 +12,7 @@ import ClusterModule._ import java.net.InetSocketAddress import scala.collection.immutable.Stack import java.lang.{ UnsupportedOperationException, IllegalStateException } +import akka.event.{ EventHandler, InVMMonitoring } /** * ActorRef is an immutable and serializable handle to an Actor. @@ -448,3 +449,23 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } + +object DeadLetterActorRef extends UnsupportedActorRef { + val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken."))) + val address: String = "akka:internal:DeadLetterActorRef" + + override def link(actorRef: ActorRef): ActorRef = actorRef + + override def unlink(actorRef: ActorRef): ActorRef = actorRef + + def isShutdown(): Boolean = true + + def stop(): Unit = () + + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = EventHandler.debug(this, message) + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Timeout, + channel: UntypedChannel): Future[Any] = { EventHandler.debug(this, message); brokenPromise } +} diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index d91c52e733..31aabc076b 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -6,116 +6,31 @@ package akka.event import akka.actor._ -trait DeathWatch { - def signal(terminated: Terminated): Unit +/** + * The contract of DeathWatch is not properly expressed using the type system + * Whenever there is a publish, all listeners to the Terminated Actor should be atomically removed + * A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down + * See InVMMonitoring for semantics + */ +trait DeathWatch extends ActorEventBus with ActorClassifier { + type Event = Terminated + + protected final def classify(event: Event): Classifier = event.actor } -trait Monitoring { +object InVMMonitoring extends DeathWatch with ActorClassification { - def link(monitor: ActorRef, monitored: ActorRef): Unit + def mapSize = 1024 - def unlink(monitor: ActorRef, monitored: ActorRef): Unit -} - -object InVMMonitoring extends DeathWatch with Monitoring { - - class MonitoringBook(mapSize: Int = 1024) { - import java.util.concurrent.ConcurrentHashMap - import scala.annotation.tailrec - - val mappings = new ConcurrentHashMap[ActorRef, Vector[ActorRef]](mapSize) - - @tailrec - final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = { - val current = mappings get monitored - current match { - case null ⇒ - if (monitored.isShutdown) false - else { - if (mappings.putIfAbsent(monitored, Vector(monitor)) ne null) associate(monitored, monitor) - else { - if (monitored.isShutdown) !dissociate(monitored, monitor) - else true - } - } - case raw: Vector[_] ⇒ - val v = raw.asInstanceOf[Vector[ActorRef]] - if (monitored.isShutdown) false - if (v.contains(monitor)) true - else { - val added = v :+ monitor - if (!mappings.replace(monitored, v, added)) associate(monitored, monitor) - else { - if (monitored.isShutdown) !dissociate(monitored, monitor) - else true - } - } - } - } - - final def dissociate(monitored: ActorRef): Iterable[ActorRef] = { - @tailrec - def dissociateAsMonitored(monitored: ActorRef): Iterable[ActorRef] = { - val current = mappings get monitored - current match { - case null ⇒ Vector.empty[ActorRef] - case raw: Vector[_] ⇒ - val v = raw.asInstanceOf[Vector[ActorRef]] - if (!mappings.remove(monitored, v)) dissociateAsMonitored(monitored) - else v - } - } - - def dissociateAsMonitor(monitor: ActorRef): Unit = { - val i = mappings.entrySet.iterator - while (i.hasNext()) { - val entry = i.next() - val v = entry.getValue - v match { - case raw: Vector[_] ⇒ - val monitors = raw.asInstanceOf[Vector[ActorRef]] - if (monitors.contains(monitor)) - dissociate(entry.getKey, monitor) - case _ ⇒ //Dun care - } - } - } - - try { dissociateAsMonitored(monitored) } finally { dissociateAsMonitor(monitored) } - } - - @tailrec - final def dissociate(monitored: ActorRef, monitor: ActorRef): Boolean = { - val current = mappings get monitored - current match { - case null ⇒ false - case raw: Vector[_] ⇒ - val v = raw.asInstanceOf[Vector[ActorRef]] - val removed = v.filterNot(monitor ==) - if (removed eq v) false - else if (removed.isEmpty) { - if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) - else true - } else { - if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) - else true - } - } - } + override def publish(event: Event): Unit = { + val monitors = dissociate(classify(event)) + if (monitors.nonEmpty) monitors.foreach(_ ! event) } - val monitoring = new MonitoringBook(1024) //Key == monitored, Values == monitors - - def signal(terminated: Terminated): Unit = { - val monitors = monitoring.dissociate(terminated.actor) - if (monitors.nonEmpty) monitors.foreach(_ ! terminated) + override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = { + if (!super.subscribe(subscriber, to)) { + subscriber ! Terminated(subscriber, new ActorKilledException("Already terminated when linking")) + false + } else true } - - def link(monitor: ActorRef, monitored: ActorRef): Unit = { - if (!monitoring.associate(monitored, monitor)) - monitor ! Terminated(monitored, new ActorKilledException("Already terminated when linking")) - } - - def unlink(monitor: ActorRef, monitored: ActorRef): Unit = - monitoring.dissociate(monitored, monitor) } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala new file mode 100644 index 0000000000..29bf3cd5a1 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -0,0 +1,185 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.event + +import akka.actor.ActorRef +import akka.util.Index +import java.util.concurrent.ConcurrentSkipListSet +import java.util.Comparator + +trait EventBus { + type Event + type Classifier + type Subscriber + + def subscribe(subscriber: Subscriber, to: Classifier): Boolean + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean + def unsubscribe(subscriber: Subscriber): Unit + + def publish(event: Event): Unit +} + +trait ActorEventBus extends EventBus { + type Subscriber = ActorRef +} + +trait ActorClassifier { self: EventBus ⇒ + type Classifier = ActorRef +} + +trait PredicateClassifier { self: EventBus ⇒ + type Classifier = Event ⇒ Boolean +} + +trait EventType[T] { self: EventBus ⇒ + type Event = T +} + +trait ClassifierType[T] { self: EventBus ⇒ + type Classifier = T +} + +trait LookupClassification { self: EventBus ⇒ + protected final val subscribers = new Index[Classifier, Subscriber] + + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber) + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber) + def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber) + + protected def classify(event: Event): Classifier + + protected def publish(event: Event, subscriber: Subscriber): Unit + + def publish(event: Event): Unit = + subscribers.valueIterator(classify(event)).foreach(publish(event, _)) +} + +trait ScanningClassification { self: EventBus ⇒ + protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](ordering) + + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.add((to, subscriber)) + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove((from, subscriber)) + def unsubscribe(subscriber: Subscriber): Unit = { + val i = subscribers.iterator() + while (i.hasNext) { + val e = i.next() + if (subscriber == e._2) i.remove() + } + } + + protected def ordering: Comparator[(Classifier, Subscriber)] + + protected def matches(classifier: Classifier, event: Event): Boolean + + protected def publish(event: Event, subscriber: Subscriber): Unit + + def publish(event: Event): Unit = { + val currentSubscribers = subscribers.iterator() + while (currentSubscribers.hasNext) { + val (classifier, subscriber) = currentSubscribers.next() + if (matches(classifier, event)) publish(event, subscriber) + } + } +} + +trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ + import java.util.concurrent.ConcurrentHashMap + import scala.annotation.tailrec + + def mapSize: Int + + protected val mappings = new ConcurrentHashMap[ActorRef, Vector[ActorRef]](mapSize) + + @tailrec + protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = { + val current = mappings get monitored + current match { + case null ⇒ + if (monitored.isShutdown) false + else { + if (mappings.putIfAbsent(monitored, Vector(monitor)) ne null) associate(monitored, monitor) + else { + if (monitored.isShutdown) !dissociate(monitored, monitor) + else true + } + } + case raw: Vector[_] ⇒ + val v = raw.asInstanceOf[Vector[ActorRef]] + if (monitored.isShutdown) false + if (v.contains(monitor)) true + else { + val added = v :+ monitor + if (!mappings.replace(monitored, v, added)) associate(monitored, monitor) + else { + if (monitored.isShutdown) !dissociate(monitored, monitor) + else true + } + } + } + } + + protected final def dissociate(monitored: ActorRef): Iterable[ActorRef] = { + @tailrec + def dissociateAsMonitored(monitored: ActorRef): Iterable[ActorRef] = { + val current = mappings get monitored + current match { + case null ⇒ Vector.empty[ActorRef] + case raw: Vector[_] ⇒ + val v = raw.asInstanceOf[Vector[ActorRef]] + if (!mappings.remove(monitored, v)) dissociateAsMonitored(monitored) + else v + } + } + + def dissociateAsMonitor(monitor: ActorRef): Unit = { + val i = mappings.entrySet.iterator + while (i.hasNext()) { + val entry = i.next() + val v = entry.getValue + v match { + case raw: Vector[_] ⇒ + val monitors = raw.asInstanceOf[Vector[ActorRef]] + if (monitors.contains(monitor)) + dissociate(entry.getKey, monitor) + case _ ⇒ //Dun care + } + } + } + + try { dissociateAsMonitored(monitored) } finally { dissociateAsMonitor(monitored) } + } + + @tailrec + protected final def dissociate(monitored: ActorRef, monitor: ActorRef): Boolean = { + val current = mappings get monitored + current match { + case null ⇒ false + case raw: Vector[_] ⇒ + val v = raw.asInstanceOf[Vector[ActorRef]] + val removed = v.filterNot(monitor ==) + if (removed eq v) false + else if (removed.isEmpty) { + if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) + else true + } else { + if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) + else true + } + } + } + + protected def classify(event: Event): Classifier + + def publish(event: Event): Unit = mappings.get(classify(event)) match { + case null ⇒ + case raw: Vector[_] ⇒ + val v = raw.asInstanceOf[Vector[ActorRef]] + v foreach { _ ! event } + } + + def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber) + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = dissociate(from, subscriber) + def unsubscribe(subscriber: Subscriber): Unit = dissociate(subscriber) +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index cc710a831a..b8d776e1a2 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -16,8 +16,7 @@ import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } * * @author Viktor Klang */ -class Index[K <: AnyRef, V <: AnyRef: Manifest] { - private val Naught = Array[V]() //Nil for Arrays +class Index[K, V] { private val container = new ConcurrentHashMap[K, JSet[V]] private val emptySet = new ConcurrentSkipListSet[V] @@ -65,15 +64,6 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] { spinPut(key, value) } - /** - * @return a _new_ array of all existing values for the given key at the time of the call - */ - def values(key: K): Array[V] = { - val set: JSet[V] = container get key - val result = if (set ne null) set toArray Naught else Naught - result.asInstanceOf[Array[V]] - } - /** * @return Some(value) for the first matching value where the supplied function returns true for the given key, * if no matches it returns None @@ -85,6 +75,16 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] { else None } + /** + * Returns an Iterator of V containing the values for the supplied key, or an empty iterator if the key doesn't exist + */ + def valueIterator(key: K): scala.Iterator[V] = { + container.get(key) match { + case null ⇒ Iterator.empty + case some ⇒ scala.collection.JavaConversions.asScalaIterator(some.iterator()) + } + } + /** * Applies the supplied function to all keys and their values */ @@ -112,6 +112,10 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] { } else false //Remove failed } + /** + * Disassociates all the values for the specified key + * @returns None if the key wasn't associated at all, or Some(scala.Iterable[V]) if it was associated + */ def remove(key: K): Option[Iterable[V]] = { val set = container get key @@ -123,6 +127,26 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] { } else None //Remove failed } + /** + * Removes the specified value from all keys + */ + def removeValue(value: V): Unit = { + val i = container.entrySet().iterator() + while (i.hasNext) { + val e = i.next() + val set = e.getValue() + + if (set ne null) { + set.synchronized { + if (set.remove(value)) { //If we can remove the value + if (set.isEmpty) //and the set becomes empty + container.remove(e.getKey, emptySet) //We try to remove the key if it's mapped to an empty set + } + } + } + } + } + /** * @return true if the underlying containers is empty, may report false negatives when the last remove is underway */ @@ -131,5 +155,21 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] { /** * Removes all keys and all values */ - def clear = foreach { case (k, v) ⇒ remove(k, v) } + def clear(): Unit = { + val i = container.entrySet().iterator() + while (i.hasNext) { + val e = i.next() + val set = e.getValue() + if (set ne null) { set.synchronized { set.clear(); container.remove(e.getKey, emptySet) } } + } + } } + +/** + * An implementation of a ConcurrentMultiMap + * Adds/remove is serialized over the specified key + * Reads are fully concurrent <-- el-cheapo + * + * @author Viktor Klang + */ +class ConcurrentMultiMap[K, V] extends Index[K, V] diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index aac831dec4..8464759f60 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1370,7 +1370,7 @@ class DefaultClusterNode private[akka] ( private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) { EventHandler.info(this, "Failing over ClusterActorRef from %s to %s".format(from, to)) - clusterActorRefs.values(from) foreach (_.failOver(from, to)) + clusterActorRefs.valueIterator(from) foreach (_.failOver(from, to)) } private[cluster] def migrateActorsOnFailedNodes(