diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index e86b026bfd..ad90cc5287 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -47,23 +47,23 @@ abstract class ClientDowningNodeThatIsUnreachableSpec // mark 'third' node as DOWN cluster.down(thirdAddress) - enter("down-third-node") + enterBarrier("down-third-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { - enter("down-third-node") + enterBarrier("down-third-node") } runOn(second, fourth) { - enter("down-third-node") + enterBarrier("down-third-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) } - enter("await-completion") + enterBarrier("await-completion") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 228f5b6d98..dabd92abb0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -43,7 +43,7 @@ abstract class ClientDowningNodeThatIsUpSpec runOn(first) { // mark 'third' node as DOWN cluster.down(thirdAddress) - enter("down-third-node") + enterBarrier("down-third-node") markNodeAsUnavailable(thirdAddress) @@ -52,16 +52,16 @@ abstract class ClientDowningNodeThatIsUpSpec } runOn(third) { - enter("down-third-node") + enterBarrier("down-third-node") } runOn(second, fourth) { - enter("down-third-node") + enterBarrier("down-third-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) } - enter("await-completion") + enterBarrier("await-completion") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 2e496c9b2c..a8e985412f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -46,12 +46,12 @@ abstract class ConvergenceSpec // doesn't join immediately } - enter("after-1") + enterBarrier("after-1") } "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { val thirdAddress = node(third).address - enter("before-shutdown") + enterBarrier("before-shutdown") runOn(first) { // kill 'third' node @@ -78,7 +78,7 @@ abstract class ConvergenceSpec } } - enter("after-2") + enterBarrier("after-2") } "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { @@ -116,7 +116,7 @@ abstract class ConvergenceSpec } } - enter("after-3") + enterBarrier("after-3") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index d66fb95692..d0e255fb81 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -43,7 +43,7 @@ abstract class GossipingAccrualFailureDetectorSpec cluster.failureDetector.isAvailable(secondAddress) must be(true) cluster.failureDetector.isAvailable(thirdAddress) must be(true) - enter("after-1") + enterBarrier("after-1") } "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { @@ -59,7 +59,7 @@ abstract class GossipingAccrualFailureDetectorSpec cluster.failureDetector.isAvailable(secondAddress) must be(true) } - enter("after-2") + enterBarrier("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 9953a4c61f..1ec4f47fcf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -46,7 +46,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec runOn(first) { // kill 'fourth' node testConductor.shutdown(fourth, 0) - enter("down-fourth-node") + enterBarrier("down-fourth-node") // mark the node as unreachable in the failure detector markNodeAsUnavailable(fourthAddress) @@ -57,26 +57,26 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec } runOn(fourth) { - enter("down-fourth-node") + enterBarrier("down-fourth-node") } runOn(second, third) { - enter("down-fourth-node") + enterBarrier("down-fourth-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) } - enter("await-completion-1") + enterBarrier("await-completion-1") } "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { val secondAddress = node(second).address - enter("before-down-second-node") + enterBarrier("before-down-second-node") runOn(first) { // kill 'second' node testConductor.shutdown(second, 0) - enter("down-second-node") + enterBarrier("down-second-node") // mark the node as unreachable in the failure detector markNodeAsUnavailable(secondAddress) @@ -87,16 +87,16 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec } runOn(second) { - enter("down-second-node") + enterBarrier("down-second-node") } runOn(third) { - enter("down-second-node") + enterBarrier("down-second-node") awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30 seconds) } - enter("await-completion-2") + enterBarrier("await-completion-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 28a684eb7b..965efc1555 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -50,7 +50,7 @@ abstract class LeaderElectionSpec assertLeaderIn(sortedRoles) } - enter("after") + enterBarrier("after") } def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { @@ -64,33 +64,33 @@ abstract class LeaderElectionSpec case `controller` ⇒ val leaderAddress = node(leader).address - enter("before-shutdown") + enterBarrier("before-shutdown") testConductor.shutdown(leader, 0) - enter("after-shutdown", "after-down", "completed") + enterBarrier("after-shutdown", "after-down", "completed") markNodeAsUnavailable(leaderAddress) case `leader` ⇒ - enter("before-shutdown", "after-shutdown") + enterBarrier("before-shutdown", "after-shutdown") // this node will be shutdown by the controller and doesn't participate in more barriers case `aUser` ⇒ val leaderAddress = node(leader).address - enter("before-shutdown", "after-shutdown") + enterBarrier("before-shutdown", "after-shutdown") // user marks the shutdown leader as DOWN cluster.down(leaderAddress) - enter("after-down", "completed") + enterBarrier("after-down", "completed") markNodeAsUnavailable(leaderAddress) case _ if remainingRoles.contains(myself) ⇒ // remaining cluster nodes, not shutdown - enter("before-shutdown", "after-shutdown", "after-down") + enterBarrier("before-shutdown", "after-shutdown", "after-down") awaitUpConvergence(currentRoles.size - 1) val nextExpectedLeader = remainingRoles.head cluster.isLeader must be(myself == nextExpectedLeader) assertLeaderIn(remainingRoles) - enter("completed") + enterBarrier("completed") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index f8ad009bc2..5b396cd2b9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -47,12 +47,12 @@ abstract class MembershipChangeListenerExitingSpec awaitClusterUp(first, second, third) runOn(first) { - enter("registered-listener") + enterBarrier("registered-listener") cluster.leave(secondAddress) } runOn(second) { - enter("registered-listener") + enterBarrier("registered-listener") } runOn(third) { @@ -63,11 +63,11 @@ abstract class MembershipChangeListenerExitingSpec exitingLatch.countDown() } }) - enter("registered-listener") + enterBarrier("registered-listener") exitingLatch.await } - enter("finished") + enterBarrier("finished") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index af3fe0aeee..56320f8c20 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -45,19 +45,19 @@ abstract class MembershipChangeListenerJoinSpec joinLatch.countDown() } }) - enter("registered-listener") + enterBarrier("registered-listener") joinLatch.await } runOn(second) { - enter("registered-listener") + enterBarrier("registered-listener") cluster.join(firstAddress) } awaitUpConvergence(2) - enter("after") + enterBarrier("after") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 81556a44e8..2d93f4a5dd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -44,12 +44,12 @@ abstract class MembershipChangeListenerLeavingSpec awaitClusterUp(first, second, third) runOn(first) { - enter("registered-listener") + enterBarrier("registered-listener") cluster.leave(secondAddress) } runOn(second) { - enter("registered-listener") + enterBarrier("registered-listener") } runOn(third) { @@ -62,11 +62,11 @@ abstract class MembershipChangeListenerLeavingSpec latch.countDown() } }) - enter("registered-listener") + enterBarrier("registered-listener") latch.await } - enter("finished") + enterBarrier("finished") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 7aea16ed99..f8e7475501 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -46,16 +46,16 @@ abstract class MembershipChangeListenerUpSpec latch.countDown() } }) - enter("listener-1-registered") + enterBarrier("listener-1-registered") cluster.join(firstAddress) latch.await } runOn(third) { - enter("listener-1-registered") + enterBarrier("listener-1-registered") } - enter("after-1") + enterBarrier("after-1") } "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { @@ -68,7 +68,7 @@ abstract class MembershipChangeListenerUpSpec latch.countDown() } }) - enter("listener-2-registered") + enterBarrier("listener-2-registered") runOn(third) { cluster.join(firstAddress) @@ -76,7 +76,7 @@ abstract class MembershipChangeListenerUpSpec latch.await - enter("after-2") + enterBarrier("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index d67dedc34f..0b1703642f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -71,14 +71,14 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec // make sure that the node-to-join is started before other join startClusterNode() } - enter(roles.head.name + "-started") + enterBarrier(roles.head.name + "-started") if (roles.tail.contains(myself)) { cluster.join(node(roles.head).address) } if (upConvergence && roles.contains(myself)) { awaitUpConvergence(numberOfMembers = roles.length) } - enter(roles.map(_.name).mkString("-") + "-joined") + enterBarrier(roles.map(_.name).mkString("-") + "-joined") } /** diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 4a93655fef..00563d8775 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -45,7 +45,7 @@ abstract class NodeJoinSpec awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) - enter("after") + enterBarrier("after") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index d7cf74af75..426f26ef6a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -44,7 +44,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec runOn(first) { cluster.leave(secondAddress) } - enter("second-left") + enterBarrier("second-left") runOn(first, third) { // verify that the 'second' node is no longer part of the 'members' set @@ -59,7 +59,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec isRemoved.get.address must be(secondAddress) } - enter("finished") + enterBarrier("finished") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index be28235c33..c101c0d9a1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -50,7 +50,7 @@ abstract class NodeLeavingAndExitingSpec runOn(first) { cluster.leave(secondAddress) } - enter("second-left") + enterBarrier("second-left") runOn(first, third) { @@ -69,7 +69,7 @@ abstract class NodeLeavingAndExitingSpec hasExited.get.address must be(secondAddress) } - enter("finished") + enterBarrier("finished") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index 5c5ffb16e0..11d943d02f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -44,7 +44,7 @@ abstract class NodeLeavingSpec runOn(first) { cluster.leave(secondAddress) } - enter("second-left") + enterBarrier("second-left") runOn(first, third) { awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) @@ -54,7 +54,7 @@ abstract class NodeLeavingSpec hasLeft.get.address must be(secondAddress) } - enter("finished") + enterBarrier("finished") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 350e43a54b..067fcc4063 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -38,7 +38,7 @@ abstract class NodeMembershipSpec runOn(first) { startClusterNode() } - enter("first-started") + enterBarrier("first-started") runOn(first, second) { cluster.join(firstAddress) @@ -50,7 +50,7 @@ abstract class NodeMembershipSpec awaitCond(cluster.convergence.isDefined) } - enter("after-1") + enterBarrier("after-1") } "(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in { @@ -66,7 +66,7 @@ abstract class NodeMembershipSpec } awaitCond(cluster.convergence.isDefined) - enter("after-2") + enterBarrier("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 2e4691b1a4..c6ab7c0860 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -33,7 +33,7 @@ abstract class NodeUpSpec awaitClusterUp(first, second) - enter("after-1") + enterBarrier("after-1") } "be unaffected when joining again" taggedAs LongRunningTest in { @@ -45,12 +45,12 @@ abstract class NodeUpSpec unexpected.set(members) } }) - enter("listener-registered") + enterBarrier("listener-registered") runOn(second) { cluster.join(node(first).address) } - enter("joined-again") + enterBarrier("joined-again") // let it run for a while to make sure that nothing bad happens for (n ← 1 to 20) { @@ -59,7 +59,7 @@ abstract class NodeUpSpec cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true) } - enter("after-2") + enterBarrier("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 9137abbb1a..a747f93615 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -43,7 +43,7 @@ abstract class SingletonClusterSpec cluster.isSingletonCluster must be(false) assertLeader(first, second) - enter("after-1") + enterBarrier("after-1") } "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { @@ -58,7 +58,7 @@ abstract class SingletonClusterSpec assertLeader(first) } - enter("after-2") + enterBarrier("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 5a4699c91f..33b7a9ccc0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -62,7 +62,7 @@ abstract class SunnyWeatherSpec }) for (n ← 1 to 30) { - enter("period-" + n) + enterBarrier("period-" + n) unexpected.get must be(null) awaitUpConvergence(roles.size) assertLeaderIn(roles) @@ -70,7 +70,7 @@ abstract class SunnyWeatherSpec 1.seconds.sleep } - enter("after") + enterBarrier("after") } } } diff --git a/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java b/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java index 14fe236950..bd8de8a052 100644 --- a/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java +++ b/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java @@ -8,6 +8,81 @@ public final class TestConductorProtocol { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } + public enum BarrierOp + implements com.google.protobuf.ProtocolMessageEnum { + Enter(0, 1), + Fail(1, 2), + Succeeded(2, 3), + Failed(3, 4), + ; + + public static final int Enter_VALUE = 1; + public static final int Fail_VALUE = 2; + public static final int Succeeded_VALUE = 3; + public static final int Failed_VALUE = 4; + + + public final int getNumber() { return value; } + + public static BarrierOp valueOf(int value) { + switch (value) { + case 1: return Enter; + case 2: return Fail; + case 3: return Succeeded; + case 4: return Failed; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public BarrierOp findValueByNumber(int number) { + return BarrierOp.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return akka.remote.testconductor.TestConductorProtocol.getDescriptor().getEnumTypes().get(0); + } + + private static final BarrierOp[] VALUES = { + Enter, Fail, Succeeded, Failed, + }; + + public static BarrierOp valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private BarrierOp(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:BarrierOp) + } + public enum FailType implements com.google.protobuf.ProtocolMessageEnum { Throttle(0, 1), @@ -56,7 +131,7 @@ public final class TestConductorProtocol { } public static final com.google.protobuf.Descriptors.EnumDescriptor getDescriptor() { - return akka.remote.testconductor.TestConductorProtocol.getDescriptor().getEnumTypes().get(0); + return akka.remote.testconductor.TestConductorProtocol.getDescriptor().getEnumTypes().get(1); } private static final FailType[] VALUES = { @@ -128,7 +203,7 @@ public final class TestConductorProtocol { } public static final com.google.protobuf.Descriptors.EnumDescriptor getDescriptor() { - return akka.remote.testconductor.TestConductorProtocol.getDescriptor().getEnumTypes().get(1); + return akka.remote.testconductor.TestConductorProtocol.getDescriptor().getEnumTypes().get(2); } private static final Direction[] VALUES = { @@ -1699,17 +1774,13 @@ public final class TestConductorProtocol { boolean hasName(); String getName(); - // optional bool status = 2; - boolean hasStatus(); - boolean getStatus(); + // required .BarrierOp op = 2; + boolean hasOp(); + akka.remote.testconductor.TestConductorProtocol.BarrierOp getOp(); // optional int64 timeout = 3; boolean hasTimeout(); long getTimeout(); - - // optional bool failed = 4; - boolean hasFailed(); - boolean getFailed(); } public static final class EnterBarrier extends com.google.protobuf.GeneratedMessage @@ -1772,14 +1843,14 @@ public final class TestConductorProtocol { } } - // optional bool status = 2; - public static final int STATUS_FIELD_NUMBER = 2; - private boolean status_; - public boolean hasStatus() { + // required .BarrierOp op = 2; + public static final int OP_FIELD_NUMBER = 2; + private akka.remote.testconductor.TestConductorProtocol.BarrierOp op_; + public boolean hasOp() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public boolean getStatus() { - return status_; + public akka.remote.testconductor.TestConductorProtocol.BarrierOp getOp() { + return op_; } // optional int64 timeout = 3; @@ -1792,21 +1863,10 @@ public final class TestConductorProtocol { return timeout_; } - // optional bool failed = 4; - public static final int FAILED_FIELD_NUMBER = 4; - private boolean failed_; - public boolean hasFailed() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - public boolean getFailed() { - return failed_; - } - private void initFields() { name_ = ""; - status_ = false; + op_ = akka.remote.testconductor.TestConductorProtocol.BarrierOp.Enter; timeout_ = 0L; - failed_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1817,6 +1877,10 @@ public final class TestConductorProtocol { memoizedIsInitialized = 0; return false; } + if (!hasOp()) { + memoizedIsInitialized = 0; + return false; + } memoizedIsInitialized = 1; return true; } @@ -1828,14 +1892,11 @@ public final class TestConductorProtocol { output.writeBytes(1, getNameBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBool(2, status_); + output.writeEnum(2, op_.getNumber()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeInt64(3, timeout_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeBool(4, failed_); - } getUnknownFields().writeTo(output); } @@ -1851,16 +1912,12 @@ public final class TestConductorProtocol { } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBoolSize(2, status_); + .computeEnumSize(2, op_.getNumber()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeInt64Size(3, timeout_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - size += com.google.protobuf.CodedOutputStream - .computeBoolSize(4, failed_); - } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1987,12 +2044,10 @@ public final class TestConductorProtocol { super.clear(); name_ = ""; bitField0_ = (bitField0_ & ~0x00000001); - status_ = false; + op_ = akka.remote.testconductor.TestConductorProtocol.BarrierOp.Enter; bitField0_ = (bitField0_ & ~0x00000002); timeout_ = 0L; bitField0_ = (bitField0_ & ~0x00000004); - failed_ = false; - bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -2038,15 +2093,11 @@ public final class TestConductorProtocol { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.status_ = status_; + result.op_ = op_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } result.timeout_ = timeout_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } - result.failed_ = failed_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2066,15 +2117,12 @@ public final class TestConductorProtocol { if (other.hasName()) { setName(other.getName()); } - if (other.hasStatus()) { - setStatus(other.getStatus()); + if (other.hasOp()) { + setOp(other.getOp()); } if (other.hasTimeout()) { setTimeout(other.getTimeout()); } - if (other.hasFailed()) { - setFailed(other.getFailed()); - } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2084,6 +2132,10 @@ public final class TestConductorProtocol { return false; } + if (!hasOp()) { + + return false; + } return true; } @@ -2116,8 +2168,14 @@ public final class TestConductorProtocol { break; } case 16: { - bitField0_ |= 0x00000002; - status_ = input.readBool(); + int rawValue = input.readEnum(); + akka.remote.testconductor.TestConductorProtocol.BarrierOp value = akka.remote.testconductor.TestConductorProtocol.BarrierOp.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(2, rawValue); + } else { + bitField0_ |= 0x00000002; + op_ = value; + } break; } case 24: { @@ -2125,11 +2183,6 @@ public final class TestConductorProtocol { timeout_ = input.readInt64(); break; } - case 32: { - bitField0_ |= 0x00000008; - failed_ = input.readBool(); - break; - } } } } @@ -2172,23 +2225,26 @@ public final class TestConductorProtocol { onChanged(); } - // optional bool status = 2; - private boolean status_ ; - public boolean hasStatus() { + // required .BarrierOp op = 2; + private akka.remote.testconductor.TestConductorProtocol.BarrierOp op_ = akka.remote.testconductor.TestConductorProtocol.BarrierOp.Enter; + public boolean hasOp() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public boolean getStatus() { - return status_; + public akka.remote.testconductor.TestConductorProtocol.BarrierOp getOp() { + return op_; } - public Builder setStatus(boolean value) { + public Builder setOp(akka.remote.testconductor.TestConductorProtocol.BarrierOp value) { + if (value == null) { + throw new NullPointerException(); + } bitField0_ |= 0x00000002; - status_ = value; + op_ = value; onChanged(); return this; } - public Builder clearStatus() { + public Builder clearOp() { bitField0_ = (bitField0_ & ~0x00000002); - status_ = false; + op_ = akka.remote.testconductor.TestConductorProtocol.BarrierOp.Enter; onChanged(); return this; } @@ -2214,27 +2270,6 @@ public final class TestConductorProtocol { return this; } - // optional bool failed = 4; - private boolean failed_ ; - public boolean hasFailed() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - public boolean getFailed() { - return failed_; - } - public Builder setFailed(boolean value) { - bitField0_ |= 0x00000008; - failed_ = value; - onChanged(); - return this; - } - public Builder clearFailed() { - bitField0_ = (bitField0_ & ~0x00000008); - failed_ = false; - onChanged(); - return this; - } - // @@protoc_insertion_point(builder_scope:EnterBarrier) } @@ -4170,19 +4205,21 @@ public final class TestConductorProtocol { "\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" + "ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." + "AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" + - "\007address\030\002 \002(\0132\010.Address\"M\n\014EnterBarrier" + - "\022\014\n\004name\030\001 \002(\t\022\016\n\006status\030\002 \001(\010\022\017\n\007timeou" + - "t\030\003 \001(\003\022\016\n\006failed\030\004 \001(\010\"6\n\016AddressReques" + - "t\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"" + - "G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002", - " \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInj" + - "ectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035" + - "\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007address" + - "\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\tex" + - "itValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Throttle\020\001\022" + - "\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shutdown\020\004" + - "*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receive\020\002\022\010\n\004" + - "Both\020\003B\035\n\031akka.remote.testconductorH\001" + "\007address\030\002 \002(\0132\010.Address\"E\n\014EnterBarrier" + + "\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.BarrierOp\022\017" + + "\n\007timeout\030\003 \001(\003\"6\n\016AddressRequest\022\014\n\004nod" + + "e\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"G\n\007Addre" + + "ss\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004", + "host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailu" + + "re\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n\tdirect" + + "ion\030\002 \001(\0162\n.Direction\022\031\n\007address\030\003 \001(\0132\010" + + ".Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030" + + "\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010\n\004Fail\020\002\022" + + "\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*A\n\010FailType\022\014" + + "\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022" + + "\014\n\010Shutdown\020\004*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007" + + "Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testc" + + "onductorH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4210,7 +4247,7 @@ public final class TestConductorProtocol { internal_static_EnterBarrier_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_EnterBarrier_descriptor, - new java.lang.String[] { "Name", "Status", "Timeout", "Failed", }, + new java.lang.String[] { "Name", "Op", "Timeout", }, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.class, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.Builder.class); internal_static_AddressRequest_descriptor = diff --git a/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto b/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto index bef4aad5c1..1ff8a83c24 100644 --- a/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto +++ b/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto @@ -26,11 +26,17 @@ message Hello { required Address address = 2; } +enum BarrierOp { + Enter = 1; + Fail = 2; + Succeeded = 3; + Failed = 4; +} + message EnterBarrier { required string name = 1; - optional bool status = 2; + required BarrierOp op = 2; optional int64 timeout = 3; - optional bool failed = 4; } message AddressRequest { @@ -51,11 +57,13 @@ enum FailType { Abort = 3; Shutdown = 4; } + enum Direction { Send = 1; Receive = 2; Both = 3; } + message InjectFailure { required FailType failure = 1; optional Direction direction = 2; diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 7264948b0f..9256ec1abc 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -8,8 +8,6 @@ import RemoteConnection.getAddrString import TestConductorProtocol._ import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent } import com.typesafe.config.ConfigFactory -import akka.util.Timeout -import akka.util.Duration import akka.util.duration._ import akka.pattern.ask import java.util.concurrent.TimeUnit.MILLISECONDS @@ -26,6 +24,7 @@ import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy import java.util.concurrent.ConcurrentHashMap import akka.actor.Status +import akka.util.{ Deadline, Timeout, Duration } sealed trait Direction { def includes(other: Direction): Boolean @@ -376,7 +375,8 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP * BarrierTimeouts in the players). */ override def supervisorStrategy = OneForOneStrategy() { - case BarrierTimeout(data) ⇒ SupervisorStrategy.Restart + case BarrierTimeout(data) ⇒ failBarrier(data) + case FailedBarrier(data) ⇒ failBarrier(data) case BarrierEmpty(data, msg) ⇒ SupervisorStrategy.Resume case WrongBarrier(name, client, data) ⇒ client ! ToClient(BarrierResult(name, false)); failBarrier(data) case ClientLost(data, node) ⇒ failBarrier(data) @@ -464,7 +464,7 @@ private[akka] object BarrierCoordinator { case class RemoveClient(name: RoleName) - case class Data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef]) + case class Data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef], deadline: Deadline) trait Printer { this: Product with Throwable with NoStackTrace ⇒ override def toString = productPrefix + productIterator.mkString("(", ", ", ")") @@ -472,6 +472,8 @@ private[akka] object BarrierCoordinator { case class BarrierTimeout(data: Data) extends RuntimeException("timeout while waiting for barrier '" + data.barrier + "'") with NoStackTrace with Printer + case class FailedBarrier(data: Data) + extends RuntimeException("failing barrier '" + data.barrier + "'") with NoStackTrace with Printer case class DuplicateNode(data: Data, node: Controller.NodeInfo) extends RuntimeException(node.toString) with NoStackTrace with Printer case class WrongBarrier(barrier: String, client: ActorRef, data: Data) @@ -503,20 +505,18 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor // this shall be set to true if all subsequent barriers shall fail var failed = false - var barrierTimeout: Option[auTimeout] = None - override def preRestart(reason: Throwable, message: Option[Any]) {} override def postRestart(reason: Throwable) { failed = true } // TODO what happens with the other waiting players in case of a test failure? - startWith(Idle, Data(Set(), "", Nil)) + startWith(Idle, Data(Set(), "", Nil, null)) whenUnhandled { - case Event(n: NodeInfo, d @ Data(clients, _, _)) ⇒ + case Event(n: NodeInfo, d @ Data(clients, _, _, _)) ⇒ if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) stay using d.copy(clients = clients + n) - case Event(ClientDisconnected(name), d @ Data(clients, _, arrived)) ⇒ + case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒ if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect") (clients find (_.name == name)) match { case None ⇒ stay @@ -525,7 +525,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor } when(Idle) { - case Event(EnterBarrier(name, timeout), d @ Data(clients, _, _)) ⇒ + case Event(EnterBarrier(name, timeout), d @ Data(clients, _, _, _)) ⇒ if (failed) stay replying ToClient(BarrierResult(name, false)) else if (clients.map(_.fsm) == Set(sender)) @@ -533,56 +533,61 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor else if (clients.find(_.fsm == sender).isEmpty) stay replying ToClient(BarrierResult(name, false)) else { - barrierTimeout = timeout - goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil) + goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil, + deadline = getDeadline(timeout)) } - case Event(RemoveClient(name), d @ Data(clients, _, _)) ⇒ + case Event(RemoveClient(name), d @ Data(clients, _, _, _)) ⇒ if (clients.isEmpty) throw BarrierEmpty(d, "cannot remove " + name + ": no client to remove") stay using d.copy(clients = clients filterNot (_.name == name)) } onTransition { - case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, barrierTimeout.getOrElse[auTimeout](TestConductor().Settings.BarrierTimeout).duration, false) + case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, nextStateData.deadline - Deadline.now, false) case Waiting -> Idle ⇒ cancelTimer("Timeout") } when(Waiting) { - case Event(EnterBarrier(name, timeout), d @ Data(clients, barrier, arrived)) ⇒ + case Event(EnterBarrier(name, timeout), d @ Data(clients, barrier, arrived, deadline)) ⇒ if (name != barrier) throw WrongBarrier(name, sender, d) val together = if (clients.exists(_.fsm == sender)) sender :: arrived else arrived - handleBarrier(d.copy(arrived = together)) - case Event(RemoveClient(name), d @ Data(clients, barrier, arrived)) ⇒ + val enterDeadline = getDeadline(timeout) + // we only allow the deadlines to get shorter + val newDeadline = if ((enterDeadline - deadline) < Duration.Zero) enterDeadline else deadline + if (newDeadline != deadline) { + cancelTimer("Timeout") + setTimer("Timeout", StateTimeout, newDeadline - Deadline.now, false) + } + handleBarrier(d.copy(arrived = together, deadline = newDeadline)) + case Event(RemoveClient(name), d @ Data(clients, barrier, arrived, _)) ⇒ clients find (_.name == name) match { case None ⇒ stay case Some(client) ⇒ handleBarrier(d.copy(clients = clients - client, arrived = arrived filterNot (_ == client.fsm))) } - case Event(FailBarrier(name), d @ Data(clients, barrier, arrived)) ⇒ + case Event(FailBarrier(name), d @ Data(_, barrier, _, _)) ⇒ if (name != barrier) throw WrongBarrier(name, sender, d) - failed = true - handleBarrier(d, false) - - case Event(StateTimeout, d @ Data(clients, barrier, arrived)) ⇒ - handleBarrier(d, false) + throw FailedBarrier(d) + case Event(StateTimeout, d) ⇒ throw BarrierTimeout(d) } initialize - def handleBarrier(data: Data, status: Boolean = true): State = { - log.debug("handleBarrier({}, {})", data, status) - if (!status) { - data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, status))) - goto(Idle) using data.copy(barrier = "", arrived = Nil) - } else if (data.arrived.isEmpty) { + def handleBarrier(data: Data): State = { + log.debug("handleBarrier({})", data) + if (data.arrived.isEmpty) { goto(Idle) using data.copy(barrier = "") } else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) { - data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, status))) + data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, true))) goto(Idle) using data.copy(barrier = "", arrived = Nil) } else { stay using data } } + def getDeadline(timeout: Option[Duration]): Deadline = { + Deadline.now + timeout.getOrElse(TestConductor().Settings.BarrierTimeout.duration) + } + } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala index 4730bbd508..5adc07bef2 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala @@ -10,7 +10,8 @@ import akka.remote.testconductor.{ TestConductorProtocol ⇒ TCP } import com.google.protobuf.Message import akka.actor.Address import org.jboss.netty.handler.codec.oneone.OneToOneDecoder -import akka.util.Timeout +import akka.util.Duration +import akka.remote.testconductor.TestConductorProtocol.BarrierOp case class RoleName(name: String) @@ -29,7 +30,7 @@ private[akka] sealed trait ConfirmedClientOp extends ClientOp */ private[akka] case class Hello(name: String, addr: Address) extends NetworkOp -private[akka] case class EnterBarrier(name: String, timeout: Option[Timeout] = None) extends ServerOp with NetworkOp +private[akka] case class EnterBarrier(name: String, timeout: Option[Duration] = None) extends ServerOp with NetworkOp private[akka] case class FailBarrier(name: String) extends ServerOp with NetworkOp private[akka] case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp @@ -76,12 +77,14 @@ private[akka] class MsgEncoder extends OneToOneEncoder { w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr)) case EnterBarrier(name, timeout) ⇒ val barrier = TCP.EnterBarrier.newBuilder.setName(name) - timeout foreach (t ⇒ barrier.setTimeout(t.duration.toMillis)) + timeout foreach (t ⇒ barrier.setTimeout(t.toNanos)) + barrier.setOp(BarrierOp.Enter) w.setBarrier(barrier) case BarrierResult(name, success) ⇒ - w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setStatus(success)) + val res = if (success) BarrierOp.Succeeded else BarrierOp.Failed + w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setOp(res)) case FailBarrier(name) ⇒ - w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setFailed(true)) + w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setOp(BarrierOp.Fail)) case ThrottleMsg(target, dir, rate) ⇒ w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target) .setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate)) @@ -120,9 +123,13 @@ private[akka] class MsgDecoder extends OneToOneDecoder { Hello(h.getName, h.getAddress) } else if (w.hasBarrier) { val barrier = w.getBarrier - if (barrier.hasStatus) BarrierResult(barrier.getName, barrier.getStatus) - else if (barrier.hasFailed) FailBarrier(barrier.getName) - else EnterBarrier(w.getBarrier.getName, if (barrier.hasTimeout) Option(Timeout.longToTimeout(barrier.getTimeout)) else None) + barrier.getOp match { + case BarrierOp.Succeeded ⇒ BarrierResult(barrier.getName, true) + case BarrierOp.Failed ⇒ BarrierResult(barrier.getName, false) + case BarrierOp.Fail ⇒ FailBarrier(barrier.getName) + case BarrierOp.Enter ⇒ EnterBarrier(barrier.getName, + if (barrier.hasTimeout) Option(Duration.fromNanos(barrier.getTimeout)) else None) + } } else if (w.hasFailure) { val f = w.getFailure import TCP.{ FailType ⇒ FT } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index bed14725b4..46b7106a19 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -26,6 +26,7 @@ import org.jboss.netty.channel.WriteCompletionEvent import java.net.ConnectException import akka.util.Deadline import akka.actor.Scheduler +import java.util.concurrent.TimeoutException /** * The Player is the client component of the @@ -79,8 +80,6 @@ trait Player { this: TestConductorExt ⇒ enter(Settings.BarrierTimeout, name) } - case class OutOfTimeException(barrier: String) extends RuntimeException("Ran out of time while waiting for barrier '" + barrier + "'") with NoStackTrace - /** * Enter the named barriers, one after the other, in the order given. Will * throw an exception in case of timeouts or other errors. @@ -94,7 +93,7 @@ trait Player { this: TestConductorExt ⇒ val barrierTimeout = stop - now if (barrierTimeout < Duration.Zero) { client ! ToServer(FailBarrier(b)) - throw OutOfTimeException(b) + throw new TimeoutException("Server timed out while waiting for barrier " + b); } try { implicit val timeout = Timeout(barrierTimeout + Settings.QueryTimeout.duration) @@ -102,7 +101,8 @@ trait Player { this: TestConductorExt ⇒ } catch { case e: AskTimeoutException ⇒ client ! ToServer(FailBarrier(b)) - throw e + // Why don't TimeoutException have a constructor that takes a cause? + throw new TimeoutException("Client timed out while waiting for barrier " + b); } system.log.debug("passed barrier {}", b) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala index 999e152b0f..f49dc53e2b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala @@ -47,7 +47,7 @@ class LookupRemoteActorSpec extends MultiNodeSpec(LookupRemoteActorMultiJvmSpec) val masterAddress = testConductor.getAddressFor(master).await (hello ? "identify").await.asInstanceOf[ActorRef].path.address must equal(masterAddress) } - enter("done") + enterBarrier("done") } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index 5b4e19df98..eca91495d6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -56,7 +56,7 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) system.stop(actor) } - enter("done") + enterBarrier("done") } "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef (with deployOnAll)" taggedAs LongRunningTest in { @@ -74,7 +74,7 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) system.stop(actor) } - enter("done") + enterBarrier("done") } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala index eeb09d6174..44c7ae5047 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala @@ -55,11 +55,11 @@ class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorM "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { runOn(first, second, third) { - enter("start", "broadcast-end", "end", "done") + enterBarrier("start", "broadcast-end", "end", "done") } runOn(fourth) { - enter("start") + enterBarrier("start") val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) @@ -76,17 +76,17 @@ class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorM case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } - enter("broadcast-end") + enterBarrier("broadcast-end") actor ! Broadcast(PoisonPill) - enter("end") + enterBarrier("end") replies.values foreach { _ must be > (0) } replies.get(node(fourth).address) must be(None) // shut down the actor before we let the other node(s) shut down so we don't try to send // "Terminate" to a shut down node system.stop(actor) - enter("done") + enterBarrier("done") } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala index f69989f41f..76a7e41ad1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala @@ -55,11 +55,11 @@ class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemo "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { runOn(first, second, third) { - enter("start", "broadcast-end", "end", "done") + enterBarrier("start", "broadcast-end", "end", "done") } runOn(fourth) { - enter("start") + enterBarrier("start") val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) @@ -76,17 +76,17 @@ class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemo case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } - enter("broadcast-end") + enterBarrier("broadcast-end") actor ! Broadcast(PoisonPill) - enter("end") + enterBarrier("end") replies.values foreach { _ must be(iterationCount) } replies.get(node(fourth).address) must be(None) // shut down the actor before we let the other node(s) shut down so we don't try to send // "Terminate" to a shut down node system.stop(actor) - enter("done") + enterBarrier("done") } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala index 3c18518503..b77b0c196e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala @@ -55,11 +55,11 @@ class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRout "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { runOn(first, second, third) { - enter("start", "broadcast-end", "end", "done") + enterBarrier("start", "broadcast-end", "end", "done") } runOn(fourth) { - enter("start") + enterBarrier("start") val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) @@ -76,17 +76,17 @@ class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRout case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } - enter("broadcast-end") + enterBarrier("broadcast-end") actor ! Broadcast(PoisonPill) - enter("end") + enterBarrier("end") replies.values.sum must be === connectionCount * iterationCount replies.get(node(fourth).address) must be(None) // shut down the actor before we let the other node(s) shut down so we don't try to send // "Terminate" to a shut down node system.stop(actor) - enter("done") + enterBarrier("done") } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index 5898fd458c..86fabc489d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -46,7 +46,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im }), "echo") } - enter("name") + enterBarrier("name") } "support throttling of network connections" taggedAs LongRunningTest in { @@ -62,7 +62,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im testConductor.throttle(slave, master, Direction.Send, rateMBit = 0.01).await } - enter("throttled_send") + enterBarrier("throttled_send") runOn(slave) { for (i ← 0 to 9) echo ! i @@ -73,14 +73,14 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im receiveN(9) must be(1 to 9) } - enter("throttled_send2") + enterBarrier("throttled_send2") runOn(master) { testConductor.throttle(slave, master, Direction.Send, -1).await testConductor.throttle(slave, master, Direction.Receive, rateMBit = 0.01).await } - enter("throttled_recv") + enterBarrier("throttled_recv") runOn(slave) { for (i ← 10 to 19) echo ! i @@ -98,7 +98,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im receiveN(9) must be(11 to 19) } - enter("throttled_recv2") + enterBarrier("throttled_recv2") runOn(master) { testConductor.throttle(slave, master, Direction.Receive, -1).await diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index 79dfda7559..779c02b670 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -29,10 +29,10 @@ object BarrierSpec { akka.remote.netty.port = 0 akka.actor.debug.fsm = on akka.actor.debug.lifecycle = on - """ + """ } -class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with BeforeAndAfterEach { +class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { import BarrierSpec._ import Controller._ @@ -42,10 +42,6 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with val B = RoleName("b") val C = RoleName("c") - override def afterEach { - system.eventStream.setLogLevel(Logging.WarningLevel) - } - "A BarrierCoordinator" must { "register clients and remove them" taggedAs TimingTest in { @@ -56,7 +52,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[BarrierEmpty](occurrences = 1) intercept { b ! RemoveClient(A) } - expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove"))) + expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot remove RoleName(a): no client to remove"))) } "register clients and disconnect them" taggedAs TimingTest in { @@ -66,11 +62,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[ClientLost](occurrences = 1) intercept { b ! ClientDisconnected(A) } - expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil), A))) + expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A))) EventFilter[BarrierEmpty](occurrences = 1) intercept { b ! ClientDisconnected(A) } - expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "cannot disconnect RoleName(a): no client to disconnect"))) + expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect"))) } "fail entering barrier when nobody registered" taggedAs TimingTest in { @@ -86,7 +82,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.send(barrier, EnterBarrier("bar2")) noMsg(a, b) - within(2 second) { + within(2 seconds) { b.send(barrier, EnterBarrier("bar2")) a.expectMsg(ToClient(BarrierResult("bar2", true))) b.expectMsg(ToClient(BarrierResult("bar2", true))) @@ -102,7 +98,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) b.send(barrier, EnterBarrier("bar3")) noMsg(a, b, c) - within(2 second) { + within(2 seconds) { c.send(barrier, EnterBarrier("bar3")) a.expectMsg(ToClient(BarrierResult("bar3", true))) b.expectMsg(ToClient(BarrierResult("bar3", true))) @@ -121,7 +117,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! RemoveClient(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) - b.within(2 second) { + b.within(2 seconds) { barrier ! RemoveClient(C) b.expectMsg(ToClient(BarrierResult("bar4", true))) } @@ -150,7 +146,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[ClientLost](occurrences = 1) intercept { barrier ! ClientDisconnected(B) } - expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil), B))) + val msg = expectMsgType[Failed] + msg match { + case Failed(barrier, thr: ClientLost) if (thr == ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil, thr.data.deadline), B)) ⇒ + case x ⇒ fail("Expected " + Failed(barrier, ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil, null), B)) + " but got " + x) + } } "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { @@ -166,7 +166,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[ClientLost](occurrences = 1) intercept { barrier ! ClientDisconnected(B) } - expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil), B))) + val msg = expectMsgType[Failed] + msg match { + case Failed(barrier, thr: ClientLost) if (thr == ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil, thr.data.deadline), B)) ⇒ + case x ⇒ fail("Expected " + Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil, null), B)) + " but got " + x) + } } "fail when entering wrong barrier" taggedAs TimingTest in { @@ -180,7 +184,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[WrongBarrier](occurrences = 1) intercept { b.send(barrier, EnterBarrier("foo")) } - expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil)))) + val msg = expectMsgType[Failed] + msg match { + case Failed(barrier, thr: WrongBarrier) if (thr == WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil, thr.data.deadline))) ⇒ + case x ⇒ fail("Expected " + Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil, null))) + " but got " + x) + } } "fail barrier after first failure" taggedAs TimingTest in { @@ -189,7 +197,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[BarrierEmpty](occurrences = 1) intercept { barrier ! RemoveClient(A) } - expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove"))) + val msg = expectMsgType[Failed] + msg match { + case Failed(barrier, thr: BarrierEmpty) if (thr == BarrierEmpty(Data(Set(), "", Nil, thr.data.deadline), "cannot remove RoleName(a): no client to remove")) ⇒ + case x ⇒ fail("Expected " + Failed(barrier, BarrierEmpty(Data(Set(), "", Nil, null), "cannot remove RoleName(a): no client to remove")) + " but got " + x) + } barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) a.send(barrier, EnterBarrier("bar9")) a.expectMsg(ToClient(BarrierResult("bar9", false))) @@ -204,7 +216,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeB a.send(barrier, EnterBarrier("bar10")) EventFilter[BarrierTimeout](occurrences = 1) intercept { - expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil)))) + val msg = expectMsgType[Failed](7 seconds) + msg match { + case Failed(barrier, thr: BarrierTimeout) if (thr == BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil, thr.data.deadline))) ⇒ + case x ⇒ fail("Expected " + Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil, null))) + " but got " + x) + } } } @@ -217,7 +233,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with EventFilter[DuplicateNode](occurrences = 1) intercept { barrier ! nodeB } - expectMsg(Failed(barrier, DuplicateNode(Data(Set(nodeA), "", Nil), nodeB))) + val msg = expectMsgType[Failed] + msg match { + case Failed(barrier, thr: DuplicateNode) if (thr == DuplicateNode(Data(Set(nodeA), "", Nil, thr.data.deadline), nodeB)) ⇒ + case x ⇒ fail("Expected " + Failed(barrier, DuplicateNode(Data(Set(nodeA), "", Nil, null), nodeB)) + " but got " + x) + } } "finally have no failure messages left" taggedAs TimingTest in { @@ -267,7 +287,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with b.expectMsg(ToClient(Done)) a.send(barrier, EnterBarrier("bar11")) noMsg(a, b) - within(2 second) { + within(2 seconds) { b.send(barrier, EnterBarrier("bar11")) a.expectMsg(ToClient(BarrierResult("bar11", true))) b.expectMsg(ToClient(BarrierResult("bar11", true))) @@ -286,7 +306,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with c.expectMsg(ToClient(Done)) b.send(barrier, EnterBarrier("bar12")) noMsg(a, b, c) - within(2 second) { + within(2 seconds) { c.send(barrier, EnterBarrier("bar12")) a.expectMsg(ToClient(BarrierResult("bar12", true))) b.expectMsg(ToClient(BarrierResult("bar12", true))) @@ -308,7 +328,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! Remove(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) - b.within(2 second) { + b.within(2 seconds) { barrier ! Remove(C) b.expectMsg(ToClient(BarrierResult("bar13", true))) } @@ -391,7 +411,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeB a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar18", Option(Timeout.durationToTimeout(2 seconds)))) + a.send(barrier, EnterBarrier("bar18", Option(2 seconds))) EventFilter[BarrierTimeout](occurrences = 1) intercept { Thread.sleep(4000) } @@ -437,16 +457,64 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeB a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar20", Option(Timeout.durationToTimeout(2 seconds)))) - b.send(barrier, FailBarrier("bar20")) - a.expectMsg(ToClient(BarrierResult("bar20", false))) - b.expectNoMsg(1 second) + a.send(barrier, EnterBarrier("bar20", Option(2 seconds))) + EventFilter[FailedBarrier](occurrences = 1) intercept { + b.send(barrier, FailBarrier("bar20")) + a.expectMsg(ToClient(BarrierResult("bar20", false))) + b.expectNoMsg(1 second) + } a.send(barrier, EnterBarrier("bar21")) b.send(barrier, EnterBarrier("bar21")) a.expectMsg(ToClient(BarrierResult("bar21", false))) b.expectMsg(ToClient(BarrierResult("bar21", false))) } + "timeout within the shortest timeout if the new timeout is shorter" taggedAs TimingTest in { + val barrier = getController(3) + val a, b, c = TestProbe() + val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) + val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) + barrier ! nodeA + barrier ! nodeB + barrier ! nodeC + a.expectMsg(ToClient(Done)) + b.expectMsg(ToClient(Done)) + c.expectMsg(ToClient(Done)) + a.send(barrier, EnterBarrier("bar22", Option(10 seconds))) + b.send(barrier, EnterBarrier("bar22", Option(2 seconds))) + EventFilter[BarrierTimeout](occurrences = 1) intercept { + Thread.sleep(4000) + } + c.send(barrier, EnterBarrier("bar22")) + a.expectMsg(ToClient(BarrierResult("bar22", false))) + b.expectMsg(ToClient(BarrierResult("bar22", false))) + c.expectMsg(ToClient(BarrierResult("bar22", false))) + } + + "timeout within the shortest timeout if the new timeout is longer" taggedAs TimingTest in { + val barrier = getController(3) + val a, b, c = TestProbe() + val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) + val nodeC = NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) + barrier ! nodeA + barrier ! nodeB + barrier ! nodeC + a.expectMsg(ToClient(Done)) + b.expectMsg(ToClient(Done)) + c.expectMsg(ToClient(Done)) + a.send(barrier, EnterBarrier("bar23", Option(2 seconds))) + b.send(barrier, EnterBarrier("bar23", Option(10 seconds))) + EventFilter[BarrierTimeout](occurrences = 1) intercept { + Thread.sleep(4000) + } + c.send(barrier, EnterBarrier("bar23")) + a.expectMsg(ToClient(BarrierResult("bar23", false))) + b.expectMsg(ToClient(BarrierResult("bar23", false))) + c.expectMsg(ToClient(BarrierResult("bar23", false))) + } + "finally have no failure messages left" taggedAs TimingTest in { expectNoMsg(1 second) } @@ -489,4 +557,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with probes foreach (_.msgAvailable must be(false)) } + private def data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef], previous: Data): Data = { + Data(clients, barrier, arrived, previous.deadline) + } } \ No newline at end of file diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 62539e981d..4d65a2084e 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -186,7 +186,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: * Enter the named barriers in the order given. Use the remaining duration from * the innermost enclosing `within` block or the default `BarrierTimeout` */ - def enter(name: String*) { + def enterBarrier(name: String*) { testConductor.enter(Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), name) } @@ -202,13 +202,11 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: /** * Enrich `.await()` onto all Awaitables, using remaining duration from the innermost - * enclosing `within` block or BarrierTimeout. - * - * FIXME Is it really BarrierTimeout we want here? That seems like an awfully long time. + * enclosing `within` block or QueryTimeout. */ implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w) class AwaitHelper[T](w: Awaitable[T]) { - def await: T = Await.result(w, remainingOr(testConductor.Settings.BarrierTimeout.duration)) + def await: T = Await.result(w, remainingOr(testConductor.Settings.QueryTimeout.duration)) } /* @@ -217,9 +215,11 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: private val controllerAddr = new InetSocketAddress(nodeNames(0), 4711) if (selfIndex == 0) { - testConductor.startController(initialParticipants, myself, controllerAddr).await + Await.result(testConductor.startController(initialParticipants, myself, controllerAddr), + testConductor.Settings.BarrierTimeout.duration) } else { - testConductor.startClient(myself, controllerAddr).await + Await.result(testConductor.startClient(myself, controllerAddr), + testConductor.Settings.BarrierTimeout.duration) } // now add deployments, if so desired