diff --git a/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java b/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java index 99c33e6728..ec84e42331 100644 --- a/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java +++ b/akka-remote-tests/src/main/java/akka/remote/testconductor/TestConductorProtocol.java @@ -492,7 +492,7 @@ public final class TestConductorProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -1397,7 +1397,7 @@ public final class TestConductorProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -1702,6 +1702,14 @@ public final class TestConductorProtocol { // optional bool status = 2; boolean hasStatus(); boolean getStatus(); + + // optional int64 timeout = 3; + boolean hasTimeout(); + long getTimeout(); + + // optional bool failed = 4; + boolean hasFailed(); + boolean getFailed(); } public static final class EnterBarrier extends com.google.protobuf.GeneratedMessage @@ -1774,9 +1782,31 @@ public final class TestConductorProtocol { return status_; } + // optional int64 timeout = 3; + public static final int TIMEOUT_FIELD_NUMBER = 3; + private long timeout_; + public boolean hasTimeout() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getTimeout() { + return timeout_; + } + + // optional bool failed = 4; + public static final int FAILED_FIELD_NUMBER = 4; + private boolean failed_; + public boolean hasFailed() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getFailed() { + return failed_; + } + private void initFields() { name_ = ""; status_ = false; + timeout_ = 0L; + failed_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1800,6 +1830,12 @@ public final class TestConductorProtocol { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBool(2, status_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, timeout_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(4, failed_); + } getUnknownFields().writeTo(output); } @@ -1817,6 +1853,14 @@ public final class TestConductorProtocol { size += com.google.protobuf.CodedOutputStream .computeBoolSize(2, status_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(3, timeout_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, failed_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1927,7 +1971,7 @@ public final class TestConductorProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -1945,6 +1989,10 @@ public final class TestConductorProtocol { bitField0_ = (bitField0_ & ~0x00000001); status_ = false; bitField0_ = (bitField0_ & ~0x00000002); + timeout_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + failed_ = false; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -1991,6 +2039,14 @@ public final class TestConductorProtocol { to_bitField0_ |= 0x00000002; } result.status_ = status_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.timeout_ = timeout_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.failed_ = failed_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2013,6 +2069,12 @@ public final class TestConductorProtocol { if (other.hasStatus()) { setStatus(other.getStatus()); } + if (other.hasTimeout()) { + setTimeout(other.getTimeout()); + } + if (other.hasFailed()) { + setFailed(other.getFailed()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2058,6 +2120,16 @@ public final class TestConductorProtocol { status_ = input.readBool(); break; } + case 24: { + bitField0_ |= 0x00000004; + timeout_ = input.readInt64(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + failed_ = input.readBool(); + break; + } } } } @@ -2121,6 +2193,48 @@ public final class TestConductorProtocol { return this; } + // optional int64 timeout = 3; + private long timeout_ ; + public boolean hasTimeout() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getTimeout() { + return timeout_; + } + public Builder setTimeout(long value) { + bitField0_ |= 0x00000004; + timeout_ = value; + onChanged(); + return this; + } + public Builder clearTimeout() { + bitField0_ = (bitField0_ & ~0x00000004); + timeout_ = 0L; + onChanged(); + return this; + } + + // optional bool failed = 4; + private boolean failed_ ; + public boolean hasFailed() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getFailed() { + return failed_; + } + public Builder setFailed(boolean value) { + bitField0_ |= 0x00000008; + failed_ = value; + onChanged(); + return this; + } + public Builder clearFailed() { + bitField0_ = (bitField0_ & ~0x00000008); + failed_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:EnterBarrier) } @@ -2377,7 +2491,7 @@ public final class TestConductorProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -3005,7 +3119,7 @@ public final class TestConductorProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -3611,7 +3725,7 @@ public final class TestConductorProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -4056,19 +4170,19 @@ public final class TestConductorProtocol { "\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" + "ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." + "AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" + - "\007address\030\002 \002(\0132\010.Address\",\n\014EnterBarrier" + - "\022\014\n\004name\030\001 \002(\t\022\016\n\006status\030\002 \001(\010\"6\n\016Addres" + - "sRequest\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.A" + - "ddress\"G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006s" + - "ystem\030\002 \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"", - "\212\001\n\rInjectFailure\022\032\n\007failure\030\001 \002(\0162\t.Fai" + - "lType\022\035\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007" + - "address\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(" + - "\002\022\021\n\texitValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Thro" + - "ttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shu" + - "tdown\020\004*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receiv" + - "e\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testconduct" + - "orH\001" + "\007address\030\002 \002(\0132\010.Address\"M\n\014EnterBarrier" + + "\022\014\n\004name\030\001 \002(\t\022\016\n\006status\030\002 \001(\010\022\017\n\007timeou" + + "t\030\003 \001(\003\022\016\n\006failed\030\004 \001(\010\"6\n\016AddressReques" + + "t\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"" + + "G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002", + " \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInj" + + "ectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035" + + "\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007address" + + "\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\tex" + + "itValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Throttle\020\001\022" + + "\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shutdown\020\004" + + "*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receive\020\002\022\010\n\004" + + "Both\020\003B\035\n\031akka.remote.testconductorH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4096,7 +4210,7 @@ public final class TestConductorProtocol { internal_static_EnterBarrier_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_EnterBarrier_descriptor, - new java.lang.String[] { "Name", "Status", }, + new java.lang.String[] { "Name", "Status", "Timeout", "Failed", }, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.class, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.Builder.class); internal_static_AddressRequest_descriptor = diff --git a/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto b/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto index 648234614e..b35bbd23d8 100644 --- a/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto +++ b/akka-remote-tests/src/main/protocol/TestConductorProtocol.proto @@ -7,7 +7,7 @@ option optimize_for = SPEED; /****************************************** Compile with: - cd ./akka-remote/src/main/protocol + cd ./akka-remote-tests/src/main/protocol protoc TestConductorProtocol.proto --java_out ../java *******************************************/ @@ -27,6 +27,8 @@ message Hello { message EnterBarrier { required string name = 1; optional bool status = 2; + optional int64 timeout = 3; + optional bool failed = 4; } message AddressRequest { diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 17a2bfcd5f..7264948b0f 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -376,7 +376,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP * BarrierTimeouts in the players). */ override def supervisorStrategy = OneForOneStrategy() { - case BarrierTimeout(data) ⇒ SupervisorStrategy.Resume + case BarrierTimeout(data) ⇒ SupervisorStrategy.Restart case BarrierEmpty(data, msg) ⇒ SupervisorStrategy.Resume case WrongBarrier(name, client, data) ⇒ client ! ToClient(BarrierResult(name, false)); failBarrier(data) case ClientLost(data, node) ⇒ failBarrier(data) @@ -426,6 +426,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP case op: ServerOp ⇒ op match { case _: EnterBarrier ⇒ barrier forward op + case _: FailBarrier ⇒ barrier forward op case GetAddress(node) ⇒ if (nodes contains node) sender ! ToClient(AddressReply(node, nodes(node).addr)) else addrInterest += node -> ((addrInterest get node getOrElse Set()) + sender) @@ -497,9 +498,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor import BarrierCoordinator._ import akka.actor.FSM._ import Controller._ + import akka.util.{ Timeout ⇒ auTimeout } - // this shall be set to false if all subsequent barriers shall fail + // this shall be set to true if all subsequent barriers shall fail var failed = false + + var barrierTimeout: Option[auTimeout] = None + override def preRestart(reason: Throwable, message: Option[Any]) {} override def postRestart(reason: Throwable) { failed = true } @@ -520,27 +525,29 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor } when(Idle) { - case Event(EnterBarrier(name), d @ Data(clients, _, _)) ⇒ + case Event(EnterBarrier(name, timeout), d @ Data(clients, _, _)) ⇒ if (failed) stay replying ToClient(BarrierResult(name, false)) else if (clients.map(_.fsm) == Set(sender)) stay replying ToClient(BarrierResult(name, true)) else if (clients.find(_.fsm == sender).isEmpty) stay replying ToClient(BarrierResult(name, false)) - else + else { + barrierTimeout = timeout goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil) + } case Event(RemoveClient(name), d @ Data(clients, _, _)) ⇒ if (clients.isEmpty) throw BarrierEmpty(d, "cannot remove " + name + ": no client to remove") stay using d.copy(clients = clients filterNot (_.name == name)) } onTransition { - case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, TestConductor().Settings.BarrierTimeout.duration, false) + case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, barrierTimeout.getOrElse[auTimeout](TestConductor().Settings.BarrierTimeout).duration, false) case Waiting -> Idle ⇒ cancelTimer("Timeout") } when(Waiting) { - case Event(EnterBarrier(name), d @ Data(clients, barrier, arrived)) ⇒ + case Event(EnterBarrier(name, timeout), d @ Data(clients, barrier, arrived)) ⇒ if (name != barrier) throw WrongBarrier(name, sender, d) val together = if (clients.exists(_.fsm == sender)) sender :: arrived else arrived handleBarrier(d.copy(arrived = together)) @@ -550,18 +557,27 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor case Some(client) ⇒ handleBarrier(d.copy(clients = clients - client, arrived = arrived filterNot (_ == client.fsm))) } - case Event(StateTimeout, data) ⇒ - throw BarrierTimeout(data) + case Event(FailBarrier(name), d @ Data(clients, barrier, arrived)) ⇒ + if (name != barrier) throw WrongBarrier(name, sender, d) + failed = true + handleBarrier(d, false) + + case Event(StateTimeout, d @ Data(clients, barrier, arrived)) ⇒ + handleBarrier(d, false) + throw BarrierTimeout(d) } initialize - def handleBarrier(data: Data): State = { - log.debug("handleBarrier({})", data) - if (data.arrived.isEmpty) { + def handleBarrier(data: Data, status: Boolean = true): State = { + log.debug("handleBarrier({}, {})", data, status) + if (!status) { + data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, status))) + goto(Idle) using data.copy(barrier = "", arrived = Nil) + } else if (data.arrived.isEmpty) { goto(Idle) using data.copy(barrier = "") } else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) { - data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, true))) + data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, status))) goto(Idle) using data.copy(barrier = "", arrived = Nil) } else { stay using data diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala index 022ae2d89b..4730bbd508 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala @@ -10,6 +10,7 @@ import akka.remote.testconductor.{ TestConductorProtocol ⇒ TCP } import com.google.protobuf.Message import akka.actor.Address import org.jboss.netty.handler.codec.oneone.OneToOneDecoder +import akka.util.Timeout case class RoleName(name: String) @@ -28,7 +29,8 @@ private[akka] sealed trait ConfirmedClientOp extends ClientOp */ private[akka] case class Hello(name: String, addr: Address) extends NetworkOp -private[akka] case class EnterBarrier(name: String) extends ServerOp with NetworkOp +private[akka] case class EnterBarrier(name: String, timeout: Option[Timeout] = None) extends ServerOp with NetworkOp +private[akka] case class FailBarrier(name: String) extends ServerOp with NetworkOp private[akka] case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp private[akka] case class Throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Float) extends CommandOp @@ -72,10 +74,14 @@ private[akka] class MsgEncoder extends OneToOneEncoder { x match { case Hello(name, addr) ⇒ w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr)) - case EnterBarrier(name) ⇒ - w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name)) + case EnterBarrier(name, timeout) ⇒ + val barrier = TCP.EnterBarrier.newBuilder.setName(name) + timeout foreach (t ⇒ barrier.setTimeout(t.duration.toMillis)) + w.setBarrier(barrier) case BarrierResult(name, success) ⇒ w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setStatus(success)) + case FailBarrier(name) ⇒ + w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setFailed(true)) case ThrottleMsg(target, dir, rate) ⇒ w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target) .setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate)) @@ -115,7 +121,8 @@ private[akka] class MsgDecoder extends OneToOneDecoder { } else if (w.hasBarrier) { val barrier = w.getBarrier if (barrier.hasStatus) BarrierResult(barrier.getName, barrier.getStatus) - else EnterBarrier(w.getBarrier.getName) + else if (barrier.hasFailed) FailBarrier(barrier.getName) + else EnterBarrier(w.getBarrier.getName, if (barrier.hasTimeout) Option(Timeout.longToTimeout(barrier.getTimeout)) else None) } else if (w.hasFailure) { val f = w.getFailure import TCP.{ FailType ⇒ FT } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 53c03d5d40..bed14725b4 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -11,7 +11,7 @@ import com.typesafe.config.ConfigFactory import akka.util.Timeout import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.pattern.{ ask, pipe } +import akka.pattern.{ ask, pipe, AskTimeoutException } import akka.dispatch.Await import scala.util.control.NoStackTrace import akka.actor.Status @@ -76,10 +76,34 @@ trait Player { this: TestConductorExt ⇒ * throw an exception in case of timeouts or other errors. */ def enter(name: String*) { + enter(Settings.BarrierTimeout, name) + } + + case class OutOfTimeException(barrier: String) extends RuntimeException("Ran out of time while waiting for barrier '" + barrier + "'") with NoStackTrace + + /** + * Enter the named barriers, one after the other, in the order given. Will + * throw an exception in case of timeouts or other errors. + */ + def enter(timeout: Timeout, name: Seq[String]) { + def now: Duration = System.nanoTime.nanos + system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) + val stop = now + timeout.duration name foreach { b ⇒ - import Settings.BarrierTimeout - Await.result(client ? ToServer(EnterBarrier(b)), Duration.Inf) + val barrierTimeout = stop - now + if (barrierTimeout < Duration.Zero) { + client ! ToServer(FailBarrier(b)) + throw OutOfTimeException(b) + } + try { + implicit val timeout = Timeout(barrierTimeout + Settings.QueryTimeout.duration) + Await.result(client ? ToServer(EnterBarrier(b, Option(barrierTimeout))), Duration.Inf) + } catch { + case e: AskTimeoutException ⇒ + client ! ToServer(FailBarrier(b)) + throw e + } system.log.debug("passed barrier {}", b) } } @@ -88,7 +112,7 @@ trait Player { this: TestConductorExt ⇒ * Query remote transport address of named node. */ def getAddressFor(name: RoleName): Future[Address] = { - import Settings.BarrierTimeout + import Settings.QueryTimeout client ? ToServer(GetAddress(name)) mapTo } } @@ -168,8 +192,8 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) case Event(ToServer(msg), d @ Data(Some(channel), None)) ⇒ channel.write(msg) val token = msg match { - case EnterBarrier(barrier) ⇒ barrier - case GetAddress(node) ⇒ node.name + case EnterBarrier(barrier, timeout) ⇒ barrier + case GetAddress(node) ⇒ node.name } stay using d.copy(runningOp = Some(token, sender)) case Event(ToServer(op), Data(channel, Some((token, _)))) ⇒ diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index 37ebd0a193..79dfda7559 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -19,6 +19,7 @@ import org.scalatest.BeforeAndAfterEach import java.net.InetSocketAddress import java.net.InetAddress import akka.testkit.TimingTest +import akka.util.{ Timeout, Duration } object BarrierSpec { case class Failed(ref: ActorRef, thr: Throwable) @@ -74,8 +75,8 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with "fail entering barrier when nobody registered" taggedAs TimingTest in { val b = getBarrier() - b ! EnterBarrier("b") - expectMsg(ToClient(BarrierResult("b", false))) + b ! EnterBarrier("bar1") + expectMsg(ToClient(BarrierResult("bar1", false))) } "enter barrier" taggedAs TimingTest in { @@ -83,12 +84,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with val a, b = TestProbe() barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar2")) noMsg(a, b) within(2 second) { - b.send(barrier, EnterBarrier("bar")) - a.expectMsg(ToClient(BarrierResult("bar", true))) - b.expectMsg(ToClient(BarrierResult("bar", true))) + b.send(barrier, EnterBarrier("bar2")) + a.expectMsg(ToClient(BarrierResult("bar2", true))) + b.expectMsg(ToClient(BarrierResult("bar2", true))) } } @@ -97,15 +98,15 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with val a, b, c = TestProbe() barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar3")) barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) - b.send(barrier, EnterBarrier("bar")) + b.send(barrier, EnterBarrier("bar3")) noMsg(a, b, c) within(2 second) { - c.send(barrier, EnterBarrier("bar")) - a.expectMsg(ToClient(BarrierResult("bar", true))) - b.expectMsg(ToClient(BarrierResult("bar", true))) - c.expectMsg(ToClient(BarrierResult("bar", true))) + c.send(barrier, EnterBarrier("bar3")) + a.expectMsg(ToClient(BarrierResult("bar3", true))) + b.expectMsg(ToClient(BarrierResult("bar3", true))) + c.expectMsg(ToClient(BarrierResult("bar3", true))) } } @@ -115,14 +116,14 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) - a.send(barrier, EnterBarrier("bar")) - b.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar4")) + b.send(barrier, EnterBarrier("bar4")) barrier ! RemoveClient(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) b.within(2 second) { barrier ! RemoveClient(C) - b.expectMsg(ToClient(BarrierResult("bar", true))) + b.expectMsg(ToClient(BarrierResult("bar4", true))) } barrier ! ClientDisconnected(C) expectNoMsg(1 second) @@ -133,7 +134,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with val a, b = TestProbe() barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar5")) barrier ! RemoveClient(A) b.send(barrier, EnterBarrier("foo")) b.expectMsg(ToClient(BarrierResult("foo", true))) @@ -145,11 +146,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! nodeA barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar6")) EventFilter[ClientLost](occurrences = 1) intercept { barrier ! ClientDisconnected(B) } - expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar", a.ref :: Nil), B))) + expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil), B))) } "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { @@ -160,12 +161,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeA barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! nodeC - a.send(barrier, EnterBarrier("bar")) - b.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar7")) + b.send(barrier, EnterBarrier("bar7")) EventFilter[ClientLost](occurrences = 1) intercept { barrier ! ClientDisconnected(B) } - expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar", a.ref :: Nil), B))) + expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil), B))) } "fail when entering wrong barrier" taggedAs TimingTest in { @@ -175,11 +176,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeA val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! nodeB - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar8")) EventFilter[WrongBarrier](occurrences = 1) intercept { b.send(barrier, EnterBarrier("foo")) } - expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar", a.ref :: Nil)))) + expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil)))) } "fail barrier after first failure" taggedAs TimingTest in { @@ -190,8 +191,8 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with } expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove"))) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) - a.send(barrier, EnterBarrier("right")) - a.expectMsg(ToClient(BarrierResult("right", false))) + a.send(barrier, EnterBarrier("bar9")) + a.expectMsg(ToClient(BarrierResult("bar9", false))) } "fail after barrier timeout" taggedAs TimingTest in { @@ -201,9 +202,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! nodeA barrier ! nodeB - a.send(barrier, EnterBarrier("right")) + a.send(barrier, EnterBarrier("bar10")) EventFilter[BarrierTimeout](occurrences = 1) intercept { - expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "right", a.ref :: Nil)))) + expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil)))) } } @@ -264,12 +265,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar11")) noMsg(a, b) within(2 second) { - b.send(barrier, EnterBarrier("bar")) - a.expectMsg(ToClient(BarrierResult("bar", true))) - b.expectMsg(ToClient(BarrierResult("bar", true))) + b.send(barrier, EnterBarrier("bar11")) + a.expectMsg(ToClient(BarrierResult("bar11", true))) + b.expectMsg(ToClient(BarrierResult("bar11", true))) } } @@ -280,16 +281,16 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar12")) barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) c.expectMsg(ToClient(Done)) - b.send(barrier, EnterBarrier("bar")) + b.send(barrier, EnterBarrier("bar12")) noMsg(a, b, c) within(2 second) { - c.send(barrier, EnterBarrier("bar")) - a.expectMsg(ToClient(BarrierResult("bar", true))) - b.expectMsg(ToClient(BarrierResult("bar", true))) - c.expectMsg(ToClient(BarrierResult("bar", true))) + c.send(barrier, EnterBarrier("bar12")) + a.expectMsg(ToClient(BarrierResult("bar12", true))) + b.expectMsg(ToClient(BarrierResult("bar12", true))) + c.expectMsg(ToClient(BarrierResult("bar12", true))) } } @@ -302,14 +303,14 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) c.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) - b.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar13")) + b.send(barrier, EnterBarrier("bar13")) barrier ! Remove(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) b.within(2 second) { barrier ! Remove(C) - b.expectMsg(ToClient(BarrierResult("bar", true))) + b.expectMsg(ToClient(BarrierResult("bar13", true))) } barrier ! ClientDisconnected(C) expectNoMsg(1 second) @@ -322,7 +323,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar14")) barrier ! Remove(A) b.send(barrier, EnterBarrier("foo")) b.expectMsg(ToClient(BarrierResult("foo", true))) @@ -336,13 +337,13 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar15")) barrier ! ClientDisconnected(RoleName("unknown")) noMsg(a) EventFilter[ClientLost](occurrences = 1) intercept { barrier ! ClientDisconnected(B) } - a.expectMsg(ToClient(BarrierResult("bar", false))) + a.expectMsg(ToClient(BarrierResult("bar15", false))) } "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { @@ -356,12 +357,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) c.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) - b.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar16")) + b.send(barrier, EnterBarrier("bar16")) EventFilter[ClientLost](occurrences = 1) intercept { barrier ! ClientDisconnected(B) } - a.expectMsg(ToClient(BarrierResult("bar", false))) + a.expectMsg(ToClient(BarrierResult("bar16", false))) } "fail when entering wrong barrier" taggedAs TimingTest in { @@ -373,15 +374,15 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeB a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("bar")) + a.send(barrier, EnterBarrier("bar17")) EventFilter[WrongBarrier](occurrences = 1) intercept { b.send(barrier, EnterBarrier("foo")) } - a.expectMsg(ToClient(BarrierResult("bar", false))) + a.expectMsg(ToClient(BarrierResult("bar17", false))) b.expectMsg(ToClient(BarrierResult("foo", false))) } - "not really fail after barrier timeout" taggedAs TimingTest in { + "fail after barrier timeout" taggedAs TimingTest in { val barrier = getController(2) val a, b = TestProbe() val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) @@ -390,13 +391,13 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! nodeB a.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done)) - a.send(barrier, EnterBarrier("right")) + a.send(barrier, EnterBarrier("bar18", Option(Timeout.durationToTimeout(2 seconds)))) EventFilter[BarrierTimeout](occurrences = 1) intercept { - Thread.sleep(5000) + Thread.sleep(4000) } - b.send(barrier, EnterBarrier("right")) - a.expectMsg(ToClient(BarrierResult("right", true))) - b.expectMsg(ToClient(BarrierResult("right", true))) + b.send(barrier, EnterBarrier("bar18")) + a.expectMsg(ToClient(BarrierResult("bar18", false))) + b.expectMsg(ToClient(BarrierResult("bar18", false))) } "fail if a node registers twice" taggedAs TimingTest in { @@ -423,8 +424,27 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with controller ! nodeB b.expectMsg(ToClient(BarrierResult("initial startup", false))) } - a.send(controller, EnterBarrier("x")) - a.expectMsg(ToClient(BarrierResult("x", false))) + a.send(controller, EnterBarrier("bar19")) + a.expectMsg(ToClient(BarrierResult("bar19", false))) + } + + "fail subsequent barriers after foreced failure" taggedAs TimingTest in { + val barrier = getController(2) + val a, b = TestProbe() + val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeA + barrier ! nodeB + a.expectMsg(ToClient(Done)) + b.expectMsg(ToClient(Done)) + a.send(barrier, EnterBarrier("bar20", Option(Timeout.durationToTimeout(2 seconds)))) + b.send(barrier, FailBarrier("bar20")) + a.expectMsg(ToClient(BarrierResult("bar20", false))) + b.expectNoMsg(1 second) + a.send(barrier, EnterBarrier("bar21")) + b.send(barrier, EnterBarrier("bar21")) + a.expectMsg(ToClient(BarrierResult("bar21", false))) + b.expectMsg(ToClient(BarrierResult("bar21", false))) } "finally have no failure messages left" taggedAs TimingTest in { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index faaab5cdc4..62539e981d 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -12,7 +12,7 @@ import akka.dispatch.Await import akka.dispatch.Await.Awaitable import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec -import akka.util.{ NonFatal, Duration } +import akka.util.{ Timeout, NonFatal, Duration } /** * Configure the role names and participants of the test, including configuration settings. @@ -182,6 +182,14 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: if (nodes exists (_ == myself)) yes else no } + /** + * Enter the named barriers in the order given. Use the remaining duration from + * the innermost enclosing `within` block or the default `BarrierTimeout` + */ + def enter(name: String*) { + testConductor.enter(Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), name) + } + /** * Query the controller for the transport address of the given node (by role name) and * return that as an ActorPath for easy composition: @@ -193,11 +201,14 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await) /** - * Enrich `.await()` onto all Awaitables, using BarrierTimeout. + * Enrich `.await()` onto all Awaitables, using remaining duration from the innermost + * enclosing `within` block or BarrierTimeout. + * + * FIXME Is it really BarrierTimeout we want here? That seems like an awfully long time. */ implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w) class AwaitHelper[T](w: Awaitable[T]) { - def await: T = Await.result(w, testConductor.Settings.BarrierTimeout.duration) + def await: T = Await.result(w, remainingOr(testConductor.Settings.BarrierTimeout.duration)) } /* diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index c0fb6e5267..0f19e4e6c7 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -158,7 +158,13 @@ trait TestKitBase { * block or missing that it returns the properly dilated default for this * case from settings (key "akka.test.single-expect-default"). */ - def remaining: Duration = if (end == Duration.Undefined) testKitSettings.SingleExpectDefaultTimeout.dilated else end - now + def remaining: Duration = remainingOr(testKitSettings.SingleExpectDefaultTimeout.dilated) + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or missing that it returns the given duration. + */ + def remainingOr(duration: Duration): Duration = if (end == Duration.Undefined) duration else end - now /** * Query queue status. @@ -605,12 +611,6 @@ object TestKit { /** * Await until the given condition evaluates to `true` or the timeout * expires, whichever comes first. - * - * If no timeout is given, take it from the innermost enclosing `within` - * block. - * - * Note that the timeout is scaled using Duration.dilated, which uses the - * configuration entry "akka.test.timefactor" */ def awaitCond(p: ⇒ Boolean, max: Duration, interval: Duration = 100.millis, noThrow: Boolean = false): Boolean = { val stop = now + max