2011-10-27 12:23:01 +02:00
|
|
|
/**
|
2014-02-02 19:05:45 -06:00
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
2011-10-27 12:23:01 +02:00
|
|
|
*/
|
|
|
|
|
package akka.event
|
|
|
|
|
|
2012-06-21 16:09:14 +02:00
|
|
|
import language.postfixOps
|
|
|
|
|
|
2012-09-21 14:50:06 +02:00
|
|
|
import scala.concurrent.duration._
|
2014-02-13 22:52:01 +00:00
|
|
|
import akka.actor._
|
2011-11-15 11:34:39 +01:00
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
import scala.collection.JavaConverters._
|
2012-04-23 15:40:21 +02:00
|
|
|
import akka.event.Logging.InitializeLogger
|
|
|
|
|
import akka.pattern.gracefulStop
|
2013-08-23 14:39:21 +02:00
|
|
|
import akka.testkit.{ EventFilter, TestEvent, TestProbe, AkkaSpec }
|
2011-10-27 12:23:01 +02:00
|
|
|
|
2011-11-13 11:34:11 +01:00
|
|
|
object EventStreamSpec {
|
2011-11-15 11:34:39 +01:00
|
|
|
|
2011-11-18 11:17:08 +01:00
|
|
|
val config = ConfigFactory.parseString("""
|
2011-11-15 11:34:39 +01:00
|
|
|
akka {
|
2013-08-23 14:39:21 +02:00
|
|
|
actor.serialize-messages = off
|
2011-11-15 11:34:39 +01:00
|
|
|
stdout-loglevel = WARNING
|
|
|
|
|
loglevel = INFO
|
2013-02-01 08:02:53 +01:00
|
|
|
loggers = ["akka.event.EventStreamSpec$MyLog", "%s"]
|
2011-11-15 11:34:39 +01:00
|
|
|
}
|
2012-05-18 13:37:26 +02:00
|
|
|
""".format(Logging.StandardOutLogger.getClass.getName))
|
2011-11-15 11:34:39 +01:00
|
|
|
|
2012-04-23 15:40:21 +02:00
|
|
|
val configUnhandled = ConfigFactory.parseString("""
|
|
|
|
|
akka {
|
2013-08-23 14:39:21 +02:00
|
|
|
actor.serialize-messages = off
|
2012-04-23 15:40:21 +02:00
|
|
|
stdout-loglevel = WARNING
|
2014-02-13 22:52:01 +00:00
|
|
|
loglevel = WARNING
|
2012-04-23 15:40:21 +02:00
|
|
|
actor.debug.unhandled = on
|
|
|
|
|
}
|
|
|
|
|
""")
|
|
|
|
|
|
2014-02-13 22:52:01 +00:00
|
|
|
val configUnhandledWithDebug =
|
|
|
|
|
ConfigFactory.parseString("akka.actor.debug.event-stream = on")
|
|
|
|
|
.withFallback(configUnhandled)
|
|
|
|
|
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class M(i: Int)
|
2011-11-03 21:41:32 +01:00
|
|
|
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class SetTarget(ref: ActorRef)
|
2011-11-03 21:41:32 +01:00
|
|
|
|
|
|
|
|
class MyLog extends Actor {
|
2011-12-05 20:01:42 +01:00
|
|
|
var dst: ActorRef = context.system.deadLetters
|
2011-11-03 21:41:32 +01:00
|
|
|
def receive = {
|
2012-04-23 15:40:21 +02:00
|
|
|
case Logging.InitializeLogger(bus) ⇒
|
|
|
|
|
bus.subscribe(context.self, classOf[SetTarget])
|
|
|
|
|
bus.subscribe(context.self, classOf[UnhandledMessage])
|
2014-01-16 15:16:35 +01:00
|
|
|
sender() ! Logging.LoggerInitialized
|
2013-10-23 16:51:59 +02:00
|
|
|
case SetTarget(ref) ⇒ { dst = ref; dst ! "OK" }
|
2012-04-23 15:40:21 +02:00
|
|
|
case e: Logging.LogEvent ⇒ dst ! e
|
|
|
|
|
case u: UnhandledMessage ⇒ dst ! u
|
2011-11-03 21:41:32 +01:00
|
|
|
}
|
|
|
|
|
}
|
2011-11-10 00:26:53 +01:00
|
|
|
|
|
|
|
|
// class hierarchy for subchannel test
|
|
|
|
|
class A
|
|
|
|
|
class B1 extends A
|
|
|
|
|
class B2 extends A
|
|
|
|
|
class C extends B1
|
2012-09-25 12:32:56 +02:00
|
|
|
|
|
|
|
|
trait T
|
|
|
|
|
trait AT extends T
|
2012-09-27 13:15:31 +02:00
|
|
|
trait ATT extends AT
|
2012-09-25 12:32:56 +02:00
|
|
|
trait BT extends T
|
2012-09-27 13:15:31 +02:00
|
|
|
trait BTT extends BT
|
|
|
|
|
class CC
|
|
|
|
|
class CCATBT extends CC with ATT with BTT
|
2011-10-27 12:23:01 +02:00
|
|
|
}
|
|
|
|
|
|
2011-11-15 11:34:39 +01:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
|
|
|
|
class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
|
2011-10-27 12:46:10 +02:00
|
|
|
|
2011-11-13 11:34:11 +01:00
|
|
|
import EventStreamSpec._
|
2011-10-27 12:23:01 +02:00
|
|
|
|
2011-11-16 17:18:36 +01:00
|
|
|
val impl = system.asInstanceOf[ActorSystemImpl]
|
|
|
|
|
|
2011-11-11 18:41:43 +01:00
|
|
|
"An EventStream" must {
|
2011-10-27 12:23:01 +02:00
|
|
|
|
2011-11-03 21:41:32 +01:00
|
|
|
"manage subscriptions" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
//#event-bus-start-unsubscriber-scala
|
|
|
|
|
val bus = new EventStream(system, true)
|
|
|
|
|
bus.startUnsubscriber()
|
|
|
|
|
//#event-bus-start-unsubscriber-scala
|
|
|
|
|
|
2011-10-27 12:23:01 +02:00
|
|
|
bus.subscribe(testActor, classOf[M])
|
|
|
|
|
bus.publish(M(42))
|
2011-11-03 21:41:32 +01:00
|
|
|
within(1 second) {
|
|
|
|
|
expectMsg(M(42))
|
|
|
|
|
bus.unsubscribe(testActor)
|
|
|
|
|
bus.publish(M(13))
|
|
|
|
|
expectNoMsg
|
|
|
|
|
}
|
2011-10-27 12:23:01 +02:00
|
|
|
}
|
2012-06-18 18:21:12 +02:00
|
|
|
|
|
|
|
|
"not allow null as subscriber" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val bus = new EventStream(system, true)
|
2013-12-17 14:25:56 +01:00
|
|
|
intercept[IllegalArgumentException] { bus.subscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
|
2012-06-18 18:21:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not allow null as unsubscriber" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val bus = new EventStream(system, true)
|
2013-12-17 14:25:56 +01:00
|
|
|
intercept[IllegalArgumentException] { bus.unsubscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
|
|
|
|
|
intercept[IllegalArgumentException] { bus.unsubscribe(null) }.getMessage should be("subscriber is null")
|
2012-06-18 18:21:12 +02:00
|
|
|
}
|
2011-10-27 12:23:01 +02:00
|
|
|
|
2012-04-23 15:40:21 +02:00
|
|
|
"be able to log unhandled messages" in {
|
|
|
|
|
val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled)
|
|
|
|
|
try {
|
|
|
|
|
sys.eventStream.subscribe(testActor, classOf[AnyRef])
|
|
|
|
|
val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters)
|
|
|
|
|
sys.eventStream.publish(m)
|
|
|
|
|
expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42"))
|
|
|
|
|
sys.eventStream.unsubscribe(testActor)
|
|
|
|
|
} finally {
|
2013-05-02 17:12:36 +02:00
|
|
|
shutdown(sys)
|
2012-04-23 15:40:21 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-11-03 21:41:32 +01:00
|
|
|
"manage log levels" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val bus = new EventStream(system, false)
|
2011-11-16 17:18:36 +01:00
|
|
|
bus.startDefaultLoggers(impl)
|
2011-11-13 11:34:11 +01:00
|
|
|
bus.publish(SetTarget(testActor))
|
|
|
|
|
expectMsg("OK")
|
2011-11-13 01:47:46 +01:00
|
|
|
within(2 seconds) {
|
2011-11-03 21:41:32 +01:00
|
|
|
import Logging._
|
|
|
|
|
verifyLevel(bus, InfoLevel)
|
2011-12-30 00:00:25 +01:00
|
|
|
bus.setLogLevel(WarningLevel)
|
2011-11-03 21:41:32 +01:00
|
|
|
verifyLevel(bus, WarningLevel)
|
2011-12-30 00:00:25 +01:00
|
|
|
bus.setLogLevel(DebugLevel)
|
2011-11-03 21:41:32 +01:00
|
|
|
verifyLevel(bus, DebugLevel)
|
2011-12-30 00:00:25 +01:00
|
|
|
bus.setLogLevel(ErrorLevel)
|
2011-11-03 21:41:32 +01:00
|
|
|
verifyLevel(bus, ErrorLevel)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-25 12:32:56 +02:00
|
|
|
"manage sub-channels using classes" in {
|
2011-11-10 00:26:53 +01:00
|
|
|
val a = new A
|
|
|
|
|
val b1 = new B1
|
|
|
|
|
val b2 = new B2
|
|
|
|
|
val c = new C
|
2014-02-22 23:25:54 +00:00
|
|
|
val bus = new EventStream(system, false)
|
2011-11-10 00:26:53 +01:00
|
|
|
within(2 seconds) {
|
2014-01-31 11:14:13 +01:00
|
|
|
bus.subscribe(testActor, classOf[B2]) should be(true)
|
2011-11-10 00:26:53 +01:00
|
|
|
bus.publish(c)
|
|
|
|
|
bus.publish(b2)
|
|
|
|
|
expectMsg(b2)
|
2014-01-31 11:14:13 +01:00
|
|
|
bus.subscribe(testActor, classOf[A]) should be(true)
|
2011-11-10 00:26:53 +01:00
|
|
|
bus.publish(c)
|
|
|
|
|
expectMsg(c)
|
|
|
|
|
bus.publish(b1)
|
|
|
|
|
expectMsg(b1)
|
2014-01-31 11:14:13 +01:00
|
|
|
bus.unsubscribe(testActor, classOf[B1]) should be(true)
|
2011-11-10 00:26:53 +01:00
|
|
|
bus.publish(c)
|
|
|
|
|
bus.publish(b2)
|
|
|
|
|
bus.publish(a)
|
|
|
|
|
expectMsg(b2)
|
|
|
|
|
expectMsg(a)
|
|
|
|
|
expectNoMsg
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-25 12:32:56 +02:00
|
|
|
"manage sub-channels using classes and traits (update on subscribe)" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val es = new EventStream(system, false)
|
2012-09-27 13:15:31 +02:00
|
|
|
val tm1 = new CC
|
|
|
|
|
val tm2 = new CCATBT
|
2012-09-25 12:32:56 +02:00
|
|
|
val a1, a2, a3, a4 = TestProbe()
|
|
|
|
|
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.subscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.subscribe(a3.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
es.publish(tm1)
|
|
|
|
|
es.publish(tm2)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm2)
|
|
|
|
|
a2.expectMsgType[BT] should be(tm2)
|
|
|
|
|
a3.expectMsgType[CC] should be(tm1)
|
|
|
|
|
a3.expectMsgType[CC] should be(tm2)
|
|
|
|
|
a4.expectMsgType[CCATBT] should be(tm2)
|
|
|
|
|
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.unsubscribe(a3.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"manage sub-channels using classes and traits (update on unsubscribe)" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val es = new EventStream(system, false)
|
2012-09-27 13:15:31 +02:00
|
|
|
val tm1 = new CC
|
|
|
|
|
val tm2 = new CCATBT
|
2012-09-25 12:32:56 +02:00
|
|
|
val a1, a2, a3, a4 = TestProbe()
|
|
|
|
|
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.subscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.subscribe(a3.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
|
|
|
|
|
es.unsubscribe(a3.ref, classOf[CC]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
es.publish(tm1)
|
|
|
|
|
es.publish(tm2)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm2)
|
|
|
|
|
a2.expectMsgType[BT] should be(tm2)
|
2012-09-25 12:32:56 +02:00
|
|
|
a3.expectNoMsg(1 second)
|
2014-01-31 11:14:13 +01:00
|
|
|
a4.expectMsgType[CCATBT] should be(tm2)
|
|
|
|
|
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"manage sub-channels using classes and traits (update on unsubscribe all)" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val es = new EventStream(system, false)
|
2012-09-27 13:15:31 +02:00
|
|
|
val tm1 = new CC
|
|
|
|
|
val tm2 = new CCATBT
|
2012-09-25 12:32:56 +02:00
|
|
|
val a1, a2, a3, a4 = TestProbe()
|
|
|
|
|
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.subscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.subscribe(a3.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
es.unsubscribe(a3.ref)
|
|
|
|
|
es.publish(tm1)
|
|
|
|
|
es.publish(tm2)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm2)
|
|
|
|
|
a2.expectMsgType[BT] should be(tm2)
|
2012-09-25 12:32:56 +02:00
|
|
|
a3.expectNoMsg(1 second)
|
2014-01-31 11:14:13 +01:00
|
|
|
a4.expectMsgType[CCATBT] should be(tm2)
|
|
|
|
|
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"manage sub-channels using classes and traits (update on publish)" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val es = new EventStream(system, false)
|
2012-09-27 13:15:31 +02:00
|
|
|
val tm1 = new CC
|
|
|
|
|
val tm2 = new CCATBT
|
2012-09-25 12:32:56 +02:00
|
|
|
val a1, a2 = TestProbe()
|
|
|
|
|
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.subscribe(a2.ref, classOf[BT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
es.publish(tm1)
|
|
|
|
|
es.publish(tm2)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm2)
|
|
|
|
|
a2.expectMsgType[BT] should be(tm2)
|
|
|
|
|
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
|
2012-09-25 12:32:56 +02:00
|
|
|
}
|
|
|
|
|
|
2012-09-27 13:15:31 +02:00
|
|
|
"manage sub-channels using classes and traits (unsubscribe classes used with trait)" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val es = new EventStream(system, false)
|
2012-09-27 13:15:31 +02:00
|
|
|
val tm1 = new CC
|
|
|
|
|
val tm2 = new CCATBT
|
|
|
|
|
val a1, a2, a3 = TestProbe()
|
|
|
|
|
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.subscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.subscribe(a2.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.subscribe(a3.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[CC]) should be(true)
|
|
|
|
|
es.unsubscribe(a3.ref, classOf[CCATBT]) should be(true)
|
2012-09-27 13:15:31 +02:00
|
|
|
es.publish(tm1)
|
|
|
|
|
es.publish(tm2)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm2)
|
|
|
|
|
a2.expectMsgType[BT] should be(tm2)
|
|
|
|
|
a3.expectMsgType[CC] should be(tm1)
|
|
|
|
|
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
|
|
|
|
|
es.unsubscribe(a3.ref, classOf[CC]) should be(true)
|
2012-09-27 13:15:31 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"manage sub-channels using classes and traits (subscribe after publish)" in {
|
2014-02-22 23:25:54 +00:00
|
|
|
val es = new EventStream(system, false)
|
2012-09-27 13:15:31 +02:00
|
|
|
val tm1 = new CCATBT
|
|
|
|
|
val a1, a2 = TestProbe()
|
|
|
|
|
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a1.ref, classOf[AT]) should be(true)
|
2012-09-27 13:15:31 +02:00
|
|
|
es.publish(tm1)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm1)
|
2012-09-27 13:15:31 +02:00
|
|
|
a2.expectNoMsg(1 second)
|
2014-01-31 11:14:13 +01:00
|
|
|
es.subscribe(a2.ref, classOf[BTT]) should be(true)
|
2012-09-27 13:15:31 +02:00
|
|
|
es.publish(tm1)
|
2014-01-31 11:14:13 +01:00
|
|
|
a1.expectMsgType[AT] should be(tm1)
|
|
|
|
|
a2.expectMsgType[BTT] should be(tm1)
|
|
|
|
|
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[BTT]) should be(true)
|
2012-09-27 13:15:31 +02:00
|
|
|
}
|
2014-02-13 22:52:01 +00:00
|
|
|
|
|
|
|
|
"unsubscribe an actor on its termination" in {
|
|
|
|
|
val sys = ActorSystem("EventStreamSpecUnsubscribeOnTerminated", configUnhandledWithDebug)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
val es = sys.eventStream
|
|
|
|
|
val a1, a2 = TestProbe()
|
|
|
|
|
val tm = new A
|
|
|
|
|
|
|
|
|
|
val target = sys.actorOf(Props(new Actor {
|
|
|
|
|
def receive = { case in ⇒ a1.ref forward in }
|
|
|
|
|
}), "to-be-killed")
|
|
|
|
|
|
|
|
|
|
es.subscribe(a2.ref, classOf[Any])
|
|
|
|
|
es.subscribe(target, classOf[A]) should be(true)
|
|
|
|
|
es.subscribe(target, classOf[A]) should be(false)
|
|
|
|
|
|
|
|
|
|
target ! PoisonPill
|
|
|
|
|
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
|
|
|
|
|
fishForDebugMessage(a2, s"unwatching $target")
|
|
|
|
|
|
|
|
|
|
es.publish(tm)
|
|
|
|
|
|
|
|
|
|
a1.expectNoMsg(1 second)
|
|
|
|
|
a2.expectMsg(tm)
|
|
|
|
|
} finally {
|
2014-02-22 23:25:54 +00:00
|
|
|
shutdown(sys)
|
2014-02-13 22:52:01 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"unsubscribe the actor, when it subscribes already in terminated state" in {
|
|
|
|
|
val sys = ActorSystem("EventStreamSpecUnsubscribeTerminated", configUnhandledWithDebug)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
val es = sys.eventStream
|
|
|
|
|
val a1, a2 = TestProbe()
|
|
|
|
|
|
|
|
|
|
val target = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = { case in ⇒ a1.ref forward in }
|
|
|
|
|
}), "to-be-killed")
|
|
|
|
|
|
|
|
|
|
watch(target)
|
|
|
|
|
target ! PoisonPill
|
|
|
|
|
expectTerminated(target)
|
|
|
|
|
|
|
|
|
|
es.subscribe(a2.ref, classOf[Any])
|
|
|
|
|
|
|
|
|
|
// target1 is Terminated; When subscribing, it will be unsubscribed by the Unsubscriber right away
|
|
|
|
|
es.subscribe(target, classOf[A]) should be(true)
|
|
|
|
|
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
|
|
|
|
|
|
|
|
|
|
es.subscribe(target, classOf[A]) should be(true)
|
|
|
|
|
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
|
|
|
|
|
} finally {
|
2014-02-22 23:25:54 +00:00
|
|
|
shutdown(sys)
|
2014-02-13 22:52:01 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not allow initializing a TerminatedUnsubscriber twice" in {
|
|
|
|
|
val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config)
|
|
|
|
|
// initializes an TerminatedUnsubscriber during start
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
val es = sys.eventStream
|
|
|
|
|
val p = TestProbe()
|
|
|
|
|
|
|
|
|
|
val refWillBeUsedAsUnsubscriber = es.initUnsubscriber(p.ref)
|
|
|
|
|
|
|
|
|
|
refWillBeUsedAsUnsubscriber should equal(false)
|
|
|
|
|
|
|
|
|
|
} finally {
|
2014-02-22 23:25:54 +00:00
|
|
|
shutdown(sys)
|
2014-02-13 22:52:01 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"unwatch an actor from unsubscriber when that actor unsubscribes from the stream" in {
|
|
|
|
|
val sys = ActorSystem("MustUnregisterDuringUnsubscribe", configUnhandledWithDebug)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
val es = sys.eventStream
|
|
|
|
|
val a1, a2 = TestProbe()
|
|
|
|
|
|
|
|
|
|
es.subscribe(a1.ref, classOf[Logging.Debug])
|
|
|
|
|
|
|
|
|
|
es.subscribe(a2.ref, classOf[A])
|
|
|
|
|
fishForDebugMessage(a1, s"watching ${a2.ref}")
|
|
|
|
|
|
|
|
|
|
es.unsubscribe(a2.ref)
|
|
|
|
|
fishForDebugMessage(a1, s"unwatching ${a2.ref}")
|
|
|
|
|
|
|
|
|
|
} finally {
|
2014-02-22 23:25:54 +00:00
|
|
|
shutdown(sys)
|
2014-02-13 22:52:01 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed" in {
|
|
|
|
|
val sys = ActorSystem("MustUnregisterWhenNoMoreChannelSubscriptions", configUnhandledWithDebug)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
val es = sys.eventStream
|
|
|
|
|
val a1, a2 = TestProbe()
|
|
|
|
|
|
|
|
|
|
es.subscribe(a1.ref, classOf[Logging.Debug])
|
|
|
|
|
|
|
|
|
|
es.subscribe(a2.ref, classOf[A])
|
|
|
|
|
es.subscribe(a2.ref, classOf[T])
|
|
|
|
|
fishForDebugMessage(a1, s"watching ${a2.ref}")
|
|
|
|
|
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[A]) should equal(true)
|
|
|
|
|
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel class akka.event.EventStreamSpec$$A")
|
|
|
|
|
a1.expectNoMsg(1 second)
|
|
|
|
|
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[T]) should equal(true)
|
|
|
|
|
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel interface akka.event.EventStreamSpec$$T")
|
|
|
|
|
fishForDebugMessage(a1, s"unwatching ${a2.ref}, since has no subscriptions")
|
|
|
|
|
a1.expectNoMsg(1 second)
|
|
|
|
|
|
|
|
|
|
es.unsubscribe(a2.ref, classOf[T]) should equal(false)
|
|
|
|
|
|
|
|
|
|
} finally {
|
2014-02-22 23:25:54 +00:00
|
|
|
shutdown(sys)
|
2014-02-13 22:52:01 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-11-03 21:41:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
|
|
|
|
|
import Logging._
|
2012-01-11 14:14:08 +01:00
|
|
|
val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
|
2011-11-03 21:41:32 +01:00
|
|
|
val msg = allmsg filter (_.level <= level)
|
|
|
|
|
allmsg foreach bus.publish
|
2012-09-25 12:32:56 +02:00
|
|
|
msg foreach (expectMsg(_))
|
2011-10-27 12:23:01 +02:00
|
|
|
}
|
|
|
|
|
|
2014-02-13 22:52:01 +00:00
|
|
|
private def fishForDebugMessage(a: TestProbe, messagePrefix: String) {
|
|
|
|
|
a.fishForMessage(3 seconds, hint = "expected debug message prefix: " + messagePrefix) {
|
|
|
|
|
case Logging.Debug(_, _, msg: String) if msg startsWith messagePrefix ⇒ true
|
|
|
|
|
case other ⇒ false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-21 14:50:06 +02:00
|
|
|
}
|