EventStream should publish to all matching traits. See #2525
This commit is contained in:
parent
3307b6ebce
commit
426120f50c
3 changed files with 138 additions and 27 deletions
|
|
@ -5,13 +5,13 @@ package akka.event
|
|||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.event.Logging.InitializeLogger
|
||||
import akka.pattern.gracefulStop
|
||||
import akka.testkit.{ TestProbe, AkkaSpec }
|
||||
|
||||
object EventStreamSpec {
|
||||
|
||||
|
|
@ -53,6 +53,12 @@ object EventStreamSpec {
|
|||
class B1 extends A
|
||||
class B2 extends A
|
||||
class C extends B1
|
||||
|
||||
trait T
|
||||
trait AT extends T
|
||||
trait BT extends T
|
||||
class TA
|
||||
class TAATBT extends TA with AT with BT
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -117,7 +123,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
|
|||
}
|
||||
}
|
||||
|
||||
"manage sub-channels" in {
|
||||
"manage sub-channels using classes" in {
|
||||
val a = new A
|
||||
val b1 = new B1
|
||||
val b2 = new B2
|
||||
|
|
@ -143,6 +149,89 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
|
|||
}
|
||||
}
|
||||
|
||||
"manage sub-channels using classes and traits (update on subscribe)" in {
|
||||
val es = new EventStream(false)
|
||||
val tm1 = new TA
|
||||
val tm2 = new TAATBT
|
||||
val a1, a2, a3, a4 = TestProbe()
|
||||
|
||||
es.subscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.subscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.subscribe(a3.ref, classOf[TA]) must be === true
|
||||
es.subscribe(a4.ref, classOf[TAATBT]) must be === true
|
||||
es.publish(tm1)
|
||||
es.publish(tm2)
|
||||
a1.expectMsgType[AT] must be === tm2
|
||||
a2.expectMsgType[BT] must be === tm2
|
||||
a3.expectMsgType[TA] must be === tm1
|
||||
a3.expectMsgType[TA] must be === tm2
|
||||
a4.expectMsgType[TAATBT] must be === tm2
|
||||
es.unsubscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.unsubscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.unsubscribe(a3.ref, classOf[TA]) must be === true
|
||||
es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true
|
||||
}
|
||||
|
||||
"manage sub-channels using classes and traits (update on unsubscribe)" in {
|
||||
val es = new EventStream(false)
|
||||
val tm1 = new TA
|
||||
val tm2 = new TAATBT
|
||||
val a1, a2, a3, a4 = TestProbe()
|
||||
|
||||
es.subscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.subscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.subscribe(a3.ref, classOf[TA]) must be === true
|
||||
es.subscribe(a4.ref, classOf[TAATBT]) must be === true
|
||||
es.unsubscribe(a3.ref, classOf[TA]) must be === true
|
||||
es.publish(tm1)
|
||||
es.publish(tm2)
|
||||
a1.expectMsgType[AT] must be === tm2
|
||||
a2.expectMsgType[BT] must be === tm2
|
||||
a3.expectNoMsg(1 second)
|
||||
a4.expectMsgType[TAATBT] must be === tm2
|
||||
es.unsubscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.unsubscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true
|
||||
}
|
||||
|
||||
"manage sub-channels using classes and traits (update on unsubscribe all)" in {
|
||||
val es = new EventStream(false)
|
||||
val tm1 = new TA
|
||||
val tm2 = new TAATBT
|
||||
val a1, a2, a3, a4 = TestProbe()
|
||||
|
||||
es.subscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.subscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.subscribe(a3.ref, classOf[TA]) must be === true
|
||||
es.subscribe(a4.ref, classOf[TAATBT]) must be === true
|
||||
es.unsubscribe(a3.ref)
|
||||
es.publish(tm1)
|
||||
es.publish(tm2)
|
||||
a1.expectMsgType[AT] must be === tm2
|
||||
a2.expectMsgType[BT] must be === tm2
|
||||
a3.expectNoMsg(1 second)
|
||||
a4.expectMsgType[TAATBT] must be === tm2
|
||||
es.unsubscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.unsubscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true
|
||||
}
|
||||
|
||||
"manage sub-channels using classes and traits (update on publish)" in {
|
||||
val es = new EventStream(false)
|
||||
val tm1 = new TA
|
||||
val tm2 = new TAATBT
|
||||
val a1, a2 = TestProbe()
|
||||
|
||||
es.subscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.subscribe(a2.ref, classOf[BT]) must be === true
|
||||
es.publish(tm1)
|
||||
es.publish(tm2)
|
||||
a1.expectMsgType[AT] must be === tm2
|
||||
a2.expectMsgType[BT] must be === tm2
|
||||
es.unsubscribe(a1.ref, classOf[AT]) must be === true
|
||||
es.unsubscribe(a2.ref, classOf[BT]) must be === true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
|
||||
|
|
@ -150,7 +239,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
|
|||
val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
|
||||
val msg = allmsg filter (_.level <= level)
|
||||
allmsg foreach bus.publish
|
||||
msg foreach (x ⇒ expectMsg(x))
|
||||
msg foreach (expectMsg(_))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -139,7 +139,7 @@ trait SubchannelClassification { this: EventBus ⇒
|
|||
val diff = subscriptions.addValue(to, subscriber)
|
||||
if (diff.isEmpty) false
|
||||
else {
|
||||
cache = cache ++ diff
|
||||
cache ++= diff
|
||||
true
|
||||
}
|
||||
}
|
||||
|
|
@ -148,16 +148,19 @@ trait SubchannelClassification { this: EventBus ⇒
|
|||
val diff = subscriptions.removeValue(from, subscriber)
|
||||
if (diff.isEmpty) false
|
||||
else {
|
||||
cache = cache ++ diff
|
||||
removeFromCache(diff)
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized {
|
||||
val diff = subscriptions.removeValue(subscriber)
|
||||
if (diff.nonEmpty) cache = cache ++ diff
|
||||
if (diff.nonEmpty) removeFromCache(diff)
|
||||
}
|
||||
|
||||
private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit =
|
||||
cache ++= changes map { case (c, s) ⇒ (c, cache.getOrElse(c, Set[Subscriber]()) -- s) }
|
||||
|
||||
def publish(event: Event): Unit = {
|
||||
val c = classify(event)
|
||||
val recv =
|
||||
|
|
@ -166,7 +169,7 @@ trait SubchannelClassification { this: EventBus ⇒
|
|||
if (cache contains c) cache(c)
|
||||
else {
|
||||
val diff = subscriptions.addKey(c)
|
||||
cache = cache ++ diff
|
||||
cache ++= diff
|
||||
cache(c)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,9 +21,9 @@ object SubclassifiedIndex {
|
|||
|
||||
class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) {
|
||||
|
||||
override def addValue(key: K, value: V): Changes = {
|
||||
override def innerAddValue(key: K, value: V): Changes = {
|
||||
// break the recursion on super when key is found and transition to recursive add-to-set
|
||||
if (sc.isEqual(key, this.key)) addValue(value) else super.addValue(key, value)
|
||||
if (sc.isEqual(key, this.key)) addValue(value) else super.innerAddValue(key, value)
|
||||
}
|
||||
|
||||
private def addValue(value: V): Changes = {
|
||||
|
|
@ -34,23 +34,24 @@ object SubclassifiedIndex {
|
|||
} else kids
|
||||
}
|
||||
|
||||
override def removeValue(key: K, value: V): Changes = {
|
||||
override def innerRemoveValue(key: K, value: V): Changes = {
|
||||
// break the recursion on super when key is found and transition to recursive remove-from-set
|
||||
if (sc.isEqual(key, this.key)) removeValue(value) else super.removeValue(key, value)
|
||||
if (sc.isEqual(key, this.key)) removeValue(value) else super.innerRemoveValue(key, value)
|
||||
}
|
||||
|
||||
override def removeValue(value: V): Changes = {
|
||||
val kids = subkeys flatMap (_ removeValue value)
|
||||
if (values contains value) {
|
||||
values -= value
|
||||
kids :+ ((key, values))
|
||||
kids :+ ((key, Set(value)))
|
||||
} else kids
|
||||
}
|
||||
|
||||
override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")")
|
||||
|
||||
}
|
||||
|
||||
private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]]
|
||||
private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ ⇒ Set[AnyRef]())
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -79,44 +80,58 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
|
|||
* Add key to this index which inherits its value set from the most specific
|
||||
* super-class which is known.
|
||||
*/
|
||||
def addKey(key: K): Changes =
|
||||
subkeys collectFirst {
|
||||
case n if sc.isEqual(n.key, key) ⇒ Nil
|
||||
case n if sc.isSubclass(key, n.key) ⇒ n.addKey(key)
|
||||
} getOrElse {
|
||||
integrate(new Nonroot(key, values))
|
||||
List((key, values))
|
||||
def addKey(key: K): Changes = mergeChangesByKey(innerAddKey(key))
|
||||
|
||||
protected def innerAddKey(key: K): Changes = {
|
||||
var found = false
|
||||
val ch = subkeys flatMap { n ⇒
|
||||
if (sc.isEqual(key, n.key)) {
|
||||
found = true
|
||||
Nil
|
||||
} else if (sc.isSubclass(key, n.key)) {
|
||||
found = true
|
||||
n.innerAddKey(key)
|
||||
} else Nil
|
||||
}
|
||||
if (!found) {
|
||||
integrate(new Nonroot(key, values))
|
||||
Seq((key, values))
|
||||
} else ch
|
||||
}
|
||||
|
||||
/**
|
||||
* Add value to all keys which are subclasses of the given key. If the key
|
||||
* is not known yet, it is inserted as if using addKey.
|
||||
*/
|
||||
def addValue(key: K, value: V): Changes = {
|
||||
def addValue(key: K, value: V): Changes = mergeChangesByKey(innerAddValue(key, value))
|
||||
|
||||
protected def innerAddValue(key: K, value: V): Changes = {
|
||||
var found = false
|
||||
val ch = subkeys flatMap { n ⇒
|
||||
if (sc.isSubclass(key, n.key)) {
|
||||
found = true
|
||||
n.addValue(key, value)
|
||||
n.innerAddValue(key, value)
|
||||
} else Nil
|
||||
}
|
||||
if (!found) {
|
||||
val v = values + value
|
||||
val n = new Nonroot(key, v)
|
||||
integrate(n)
|
||||
n.addValue(key, value) :+ ((key, v))
|
||||
n.innerAddValue(key, value) :+ (key -> v)
|
||||
} else ch
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove value from all keys which are subclasses of the given key.
|
||||
* @return The keys and values that have been removed.
|
||||
*/
|
||||
def removeValue(key: K, value: V): Changes = {
|
||||
def removeValue(key: K, value: V): Changes = mergeChangesByKey(innerRemoveValue(key, value))
|
||||
protected def innerRemoveValue(key: K, value: V): Changes = {
|
||||
var found = false
|
||||
val ch = subkeys flatMap { n ⇒
|
||||
if (sc.isSubclass(key, n.key)) {
|
||||
found = true
|
||||
n.removeValue(key, value)
|
||||
n.innerRemoveValue(key, value)
|
||||
} else Nil
|
||||
}
|
||||
if (!found) {
|
||||
|
|
@ -129,7 +144,7 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
|
|||
/**
|
||||
* Remove value from all keys in the index.
|
||||
*/
|
||||
def removeValue(value: V): Changes = subkeys flatMap (_ removeValue value)
|
||||
def removeValue(value: V): Changes = mergeChangesByKey(subkeys flatMap (_ removeValue value))
|
||||
|
||||
override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")")
|
||||
|
||||
|
|
@ -148,4 +163,8 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
private def mergeChangesByKey(changes: Changes): Changes =
|
||||
(emptyMergeMap[K, V] /: changes) {
|
||||
case (m, (k, s)) ⇒ m.updated(k, m(k) ++ s)
|
||||
}.toSeq
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue