=act,rem Clean up some test serialization issues for Scala 2.12
This commit is contained in:
parent
53b9a2650c
commit
de47178eb5
3 changed files with 46 additions and 37 deletions
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import akka.actor.Props.EmptyActor
|
||||
import language.postfixOps
|
||||
import akka.dispatch.sysmsg.{ DeathWatchNotification, Failed }
|
||||
import akka.pattern.ask
|
||||
|
|
@ -12,17 +13,48 @@ import scala.concurrent.duration._
|
|||
import scala.concurrent.Await
|
||||
|
||||
class LocalDeathWatchSpec extends AkkaSpec("""
|
||||
akka.actor.serialize-messages = off
|
||||
akka.actor.serialize-messages = on
|
||||
""") with ImplicitSender with DefaultTimeout with DeathWatchSpec
|
||||
|
||||
object DeathWatchSpec {
|
||||
def props(target: ActorRef, testActor: ActorRef) = Props(new Actor {
|
||||
class Watcher(target: ActorRef, testActor: ActorRef) extends Actor {
|
||||
context.watch(target)
|
||||
def receive = {
|
||||
case t: Terminated ⇒ testActor forward WrappedTerminated(t)
|
||||
case x ⇒ testActor forward x
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
def props(target: ActorRef, testActor: ActorRef) =
|
||||
Props(classOf[Watcher], target, testActor)
|
||||
|
||||
class EmptyWatcher(target: ActorRef) extends Actor {
|
||||
context.watch(target)
|
||||
def receive = Actor.emptyBehavior
|
||||
}
|
||||
|
||||
class NKOTBWatcher(testActor: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case "NKOTB" ⇒
|
||||
val currentKid = context.watch(context.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ context stop self } }), "kid"))
|
||||
currentKid forward "NKOTB"
|
||||
context become {
|
||||
case Terminated(`currentKid`) ⇒
|
||||
testActor ! "GREEN"
|
||||
context unbecome
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class WUWatcher extends Actor {
|
||||
def receive = {
|
||||
case W(ref) ⇒ context watch ref
|
||||
case U(ref) ⇒ context unwatch ref
|
||||
case Latches(t1: TestLatch, t2: TestLatch) ⇒
|
||||
t1.countDown()
|
||||
Await.ready(t2, 3.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Forwarding `Terminated` to non-watching testActor is not possible,
|
||||
|
|
@ -41,7 +73,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
import DeathWatchSpec._
|
||||
|
||||
lazy val supervisor = system.actorOf(Props(new Supervisor(SupervisorStrategy.defaultStrategy)), "watchers")
|
||||
lazy val supervisor = system.actorOf(Props(classOf[Supervisor], SupervisorStrategy.defaultStrategy), "watchers")
|
||||
|
||||
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
|
||||
|
||||
|
|
@ -142,10 +174,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
val supervisor = system.actorOf(Props(new Supervisor(strategy)).withDeploy(Deploy.local))
|
||||
|
||||
val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
|
||||
val brother = Await.result((supervisor ? Props(new Actor {
|
||||
context.watch(failed)
|
||||
def receive = Actor.emptyBehavior
|
||||
})).mapTo[ActorRef], timeout.duration)
|
||||
val brother = Await.result((supervisor ? Props(classOf[EmptyWatcher], failed)).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
startWatching(brother)
|
||||
|
||||
|
|
@ -161,18 +190,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
}
|
||||
|
||||
"be able to watch a child with the same name after the old died" in {
|
||||
val parent = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "NKOTB" ⇒
|
||||
val currentKid = context.watch(context.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ context stop self } }), "kid"))
|
||||
currentKid forward "NKOTB"
|
||||
context become {
|
||||
case Terminated(`currentKid`) ⇒
|
||||
testActor ! "GREEN"
|
||||
context unbecome
|
||||
}
|
||||
}
|
||||
}).withDeploy(Deploy.local))
|
||||
val parent = system.actorOf(Props(classOf[NKOTBWatcher], testActor).withDeploy(Deploy.local))
|
||||
|
||||
parent ! "NKOTB"
|
||||
expectMsg("GREEN")
|
||||
|
|
@ -181,7 +199,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
}
|
||||
|
||||
"only notify when watching" in {
|
||||
val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }))
|
||||
val subject = system.actorOf(Props[EmptyActor]())
|
||||
|
||||
testActor.asInstanceOf[InternalActorRef]
|
||||
.sendSystemMessage(DeathWatchNotification(subject, existenceConfirmed = true, addressTerminated = false))
|
||||
|
|
@ -191,18 +209,8 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
}
|
||||
|
||||
"discard Terminated when unwatched between sysmsg and processing" in {
|
||||
class Watcher extends Actor {
|
||||
def receive = {
|
||||
case W(ref) ⇒ context watch ref
|
||||
case U(ref) ⇒ context unwatch ref
|
||||
case Latches(t1: TestLatch, t2: TestLatch) ⇒
|
||||
t1.countDown()
|
||||
Await.ready(t2, 3.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
val t1, t2 = TestLatch()
|
||||
val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher")
|
||||
val w = system.actorOf(Props[WUWatcher]().withDeploy(Deploy.local), "myDearWatcher")
|
||||
val p = TestProbe()
|
||||
w ! W(p.ref)
|
||||
w ! Latches(t1, t2)
|
||||
|
|
@ -213,7 +221,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
w ! U(p.ref)
|
||||
t2.countDown()
|
||||
/*
|
||||
* now the Watcher will
|
||||
* now the WUWatcher will
|
||||
* - process the DeathWatchNotification and enqueue Terminated
|
||||
* - process the unwatch command
|
||||
* - process the Terminated
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import com.typesafe.config._
|
|||
import scala.concurrent.{ Await, Future }
|
||||
import TypedActorRemoteDeploySpec._
|
||||
import akka.actor.{ Deploy, ActorSystem, TypedProps, TypedActor }
|
||||
import akka.util.IgnoreForScala212
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object TypedActorRemoteDeploySpec {
|
||||
|
|
@ -46,11 +47,11 @@ class TypedActorRemoteDeploySpec extends AkkaSpec(conf) {
|
|||
|
||||
"Typed actors" must {
|
||||
|
||||
"be possible to deploy remotely and communicate with" in {
|
||||
"be possible to deploy remotely and communicate with" taggedAs IgnoreForScala212 in {
|
||||
verify({ _.getName }, remoteName)
|
||||
}
|
||||
|
||||
"be possible to deploy remotely and be able to dereference self" in {
|
||||
"be possible to deploy remotely and be able to dereference self" taggedAs IgnoreForScala212 in {
|
||||
verify({ _.getNameSelfDeref }, remoteName)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,13 +5,13 @@
|
|||
package akka.remote.serialization
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.serialization.SerializationExtension
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.{ Actor, Address, Props, Deploy, OneForOneStrategy, SupervisorStrategy }
|
||||
import akka.remote.{ DaemonMsgCreate, RemoteScope }
|
||||
import akka.routing.{ RoundRobinPool, FromConfig }
|
||||
import akka.util.IgnoreForScala212
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object DaemonMsgCreateSerializerSpec {
|
||||
|
|
@ -56,7 +56,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"serialize and de-serialize DaemonMsgCreate with function creator" in {
|
||||
"serialize and de-serialize DaemonMsgCreate with function creator" taggedAs IgnoreForScala212 in {
|
||||
verifySerialization {
|
||||
DaemonMsgCreate(
|
||||
props = Props(new MyActor),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue