fixed bug in nested supervisors + added tests + added latch to agent tests
This commit is contained in:
parent
59e4b53bf4
commit
4dee6cbd64
3 changed files with 96 additions and 19 deletions
|
|
@ -1031,7 +1031,9 @@ trait Actor extends TransactionManagement with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private[Actor] def restart(reason: Throwable) = synchronized {
|
||||
private[Actor] def restart(reason: Throwable): Unit = synchronized {
|
||||
getLinkedActors.toArray.foreach(s => println("---------- " + s))
|
||||
restartLinkedActors(reason)
|
||||
preRestart(reason)
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
postRestart(reason)
|
||||
|
|
|
|||
|
|
@ -12,9 +12,11 @@ import org.scalatest.matchers.MustMatchers
|
|||
import org.junit.runner.RunWith
|
||||
import org.junit.{Test}
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class AgentSpec extends junit.framework.TestCase
|
||||
with Suite with MustMatchers
|
||||
class AgentSpec extends junit.framework.TestCase
|
||||
with Suite with MustMatchers
|
||||
with ActorTestUtil with Logging {
|
||||
|
||||
@Test def testSendFun = verify(new TestActor {
|
||||
|
|
@ -44,10 +46,11 @@ with ActorTestUtil with Logging {
|
|||
def test = {
|
||||
val agent = Agent(5)
|
||||
var result = 0
|
||||
val latch = new CountDownLatch(2)
|
||||
handle(agent) {
|
||||
agent sendProc (result += _)
|
||||
agent sendProc (result += _)
|
||||
Thread.sleep(1000)
|
||||
agent sendProc { e => result += e; latch.countDown }
|
||||
agent sendProc { e => result += e; latch.countDown }
|
||||
assert(latch.await(1, TimeUnit.SECONDS))
|
||||
result must be(10)
|
||||
}
|
||||
}
|
||||
|
|
@ -56,11 +59,12 @@ with ActorTestUtil with Logging {
|
|||
@Test def testOneAgentsendWithinEnlosingTransactionSuccess = {
|
||||
case object Go
|
||||
val agent = Agent(5)
|
||||
val latch = new CountDownLatch(1)
|
||||
val tx = transactor {
|
||||
case Go => agent send (_ + 1)
|
||||
case Go => agent send { e => latch.countDown; e + 1 }
|
||||
}
|
||||
tx ! Go
|
||||
Thread.sleep(1000)
|
||||
assert(latch.await(1, TimeUnit.SECONDS))
|
||||
val result = agent()
|
||||
result must be(6)
|
||||
agent.close
|
||||
|
|
@ -68,18 +72,17 @@ with ActorTestUtil with Logging {
|
|||
}
|
||||
|
||||
@Test def testDoingAgentGetInEnlosingTransactionShouldYieldException = {
|
||||
import java.util.concurrent.CountDownLatch
|
||||
case object Go
|
||||
val latch = new CountDownLatch(1)
|
||||
val agent = Agent(5)
|
||||
val tx = transactor {
|
||||
case Go =>
|
||||
case Go =>
|
||||
agent send (_ * 2)
|
||||
try { agent() }
|
||||
catch {
|
||||
case _ => latch.countDown
|
||||
catch {
|
||||
case _ => latch.countDown
|
||||
}
|
||||
}
|
||||
}
|
||||
tx ! Go
|
||||
assert(latch.await(1, TimeUnit.SECONDS))
|
||||
agent.close
|
||||
|
|
@ -92,7 +95,7 @@ with ActorTestUtil with Logging {
|
|||
val agent1 = Agent(3)
|
||||
var result = 0
|
||||
for (first <- agent1) {
|
||||
result = first + 1
|
||||
result = first + 1
|
||||
}
|
||||
result must be(4)
|
||||
agent1.close
|
||||
|
|
@ -102,7 +105,7 @@ with ActorTestUtil with Logging {
|
|||
@Test def testAgentMap = verify(new TestActor {
|
||||
def test = {
|
||||
val agent1 = Agent(3)
|
||||
val result = for (first <- agent1) yield first + 1
|
||||
val result = for (first <- agent1) yield first + 1
|
||||
result() must be(4)
|
||||
result.close
|
||||
agent1.close
|
||||
|
|
@ -116,7 +119,7 @@ with ActorTestUtil with Logging {
|
|||
val result = for {
|
||||
first <- agent1
|
||||
second <- agent2
|
||||
} yield second + first
|
||||
} yield second + first
|
||||
result() must be(8)
|
||||
result.close
|
||||
agent1.close
|
||||
|
|
|
|||
|
|
@ -34,6 +34,16 @@ class SupervisorSpec extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
@Test def shouldStartServerForNestedSupervisorHierarchy = {
|
||||
messageLog.clear
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup.start
|
||||
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillSingleActorOneForOne = {
|
||||
messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
|
|
@ -294,12 +304,74 @@ class SupervisorSpec extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
@Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = {
|
||||
messageLog.clear
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup.start
|
||||
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! Ping).getOrElse("nil")
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! Ping).getOrElse("nil")
|
||||
}
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! Die
|
||||
}
|
||||
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! Ping).getOrElse("nil")
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! Ping).getOrElse("nil")
|
||||
}
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================
|
||||
// Creat some supervisors with different configurations
|
||||
|
||||
def getSingleActorAllForOneSupervisor: Supervisor = {
|
||||
pingpong1 = new PingPong1Actor
|
||||
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
|
||||
|
|
@ -382,7 +454,7 @@ class SupervisorSpec extends JUnitSuite {
|
|||
LifeCycle(Permanent))
|
||||
::
|
||||
SupervisorConfig(
|
||||
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
|
||||
RestartStrategy(AllForOne, 3, 100, Nil),
|
||||
Supervise(
|
||||
pingpong2,
|
||||
LifeCycle(Permanent))
|
||||
|
|
@ -403,7 +475,7 @@ class SupervisorSpec extends JUnitSuite {
|
|||
|
||||
case OneWay =>
|
||||
oneWayLog.put("oneway")
|
||||
|
||||
|
||||
case Die =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue