Make test conductor barriers fail for all on timeouts and care about within() blocks. See #2218

This commit is contained in:
Björn Antonsson 2012-06-13 13:52:58 +02:00
parent 8d12385a3e
commit 463e62926e
8 changed files with 303 additions and 109 deletions

View file

@ -492,7 +492,7 @@ public final class TestConductorProtocol {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { private Builder(BuilderParent parent) {
super(parent); super(parent);
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -1397,7 +1397,7 @@ public final class TestConductorProtocol {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { private Builder(BuilderParent parent) {
super(parent); super(parent);
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -1702,6 +1702,14 @@ public final class TestConductorProtocol {
// optional bool status = 2; // optional bool status = 2;
boolean hasStatus(); boolean hasStatus();
boolean getStatus(); boolean getStatus();
// optional int64 timeout = 3;
boolean hasTimeout();
long getTimeout();
// optional bool failed = 4;
boolean hasFailed();
boolean getFailed();
} }
public static final class EnterBarrier extends public static final class EnterBarrier extends
com.google.protobuf.GeneratedMessage com.google.protobuf.GeneratedMessage
@ -1774,9 +1782,31 @@ public final class TestConductorProtocol {
return status_; return status_;
} }
// optional int64 timeout = 3;
public static final int TIMEOUT_FIELD_NUMBER = 3;
private long timeout_;
public boolean hasTimeout() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public long getTimeout() {
return timeout_;
}
// optional bool failed = 4;
public static final int FAILED_FIELD_NUMBER = 4;
private boolean failed_;
public boolean hasFailed() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public boolean getFailed() {
return failed_;
}
private void initFields() { private void initFields() {
name_ = ""; name_ = "";
status_ = false; status_ = false;
timeout_ = 0L;
failed_ = false;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -1800,6 +1830,12 @@ public final class TestConductorProtocol {
if (((bitField0_ & 0x00000002) == 0x00000002)) { if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBool(2, status_); output.writeBool(2, status_);
} }
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt64(3, timeout_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBool(4, failed_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -1817,6 +1853,14 @@ public final class TestConductorProtocol {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(2, status_); .computeBoolSize(2, status_);
} }
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(3, timeout_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(4, failed_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -1927,7 +1971,7 @@ public final class TestConductorProtocol {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { private Builder(BuilderParent parent) {
super(parent); super(parent);
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -1945,6 +1989,10 @@ public final class TestConductorProtocol {
bitField0_ = (bitField0_ & ~0x00000001); bitField0_ = (bitField0_ & ~0x00000001);
status_ = false; status_ = false;
bitField0_ = (bitField0_ & ~0x00000002); bitField0_ = (bitField0_ & ~0x00000002);
timeout_ = 0L;
bitField0_ = (bitField0_ & ~0x00000004);
failed_ = false;
bitField0_ = (bitField0_ & ~0x00000008);
return this; return this;
} }
@ -1991,6 +2039,14 @@ public final class TestConductorProtocol {
to_bitField0_ |= 0x00000002; to_bitField0_ |= 0x00000002;
} }
result.status_ = status_; result.status_ = status_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.timeout_ = timeout_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.failed_ = failed_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -2013,6 +2069,12 @@ public final class TestConductorProtocol {
if (other.hasStatus()) { if (other.hasStatus()) {
setStatus(other.getStatus()); setStatus(other.getStatus());
} }
if (other.hasTimeout()) {
setTimeout(other.getTimeout());
}
if (other.hasFailed()) {
setFailed(other.getFailed());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -2058,6 +2120,16 @@ public final class TestConductorProtocol {
status_ = input.readBool(); status_ = input.readBool();
break; break;
} }
case 24: {
bitField0_ |= 0x00000004;
timeout_ = input.readInt64();
break;
}
case 32: {
bitField0_ |= 0x00000008;
failed_ = input.readBool();
break;
}
} }
} }
} }
@ -2121,6 +2193,48 @@ public final class TestConductorProtocol {
return this; return this;
} }
// optional int64 timeout = 3;
private long timeout_ ;
public boolean hasTimeout() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public long getTimeout() {
return timeout_;
}
public Builder setTimeout(long value) {
bitField0_ |= 0x00000004;
timeout_ = value;
onChanged();
return this;
}
public Builder clearTimeout() {
bitField0_ = (bitField0_ & ~0x00000004);
timeout_ = 0L;
onChanged();
return this;
}
// optional bool failed = 4;
private boolean failed_ ;
public boolean hasFailed() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public boolean getFailed() {
return failed_;
}
public Builder setFailed(boolean value) {
bitField0_ |= 0x00000008;
failed_ = value;
onChanged();
return this;
}
public Builder clearFailed() {
bitField0_ = (bitField0_ & ~0x00000008);
failed_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:EnterBarrier) // @@protoc_insertion_point(builder_scope:EnterBarrier)
} }
@ -2377,7 +2491,7 @@ public final class TestConductorProtocol {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { private Builder(BuilderParent parent) {
super(parent); super(parent);
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -3005,7 +3119,7 @@ public final class TestConductorProtocol {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { private Builder(BuilderParent parent) {
super(parent); super(parent);
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -3611,7 +3725,7 @@ public final class TestConductorProtocol {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { private Builder(BuilderParent parent) {
super(parent); super(parent);
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -4056,19 +4170,19 @@ public final class TestConductorProtocol {
"\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" + "\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" +
"ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." + "ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." +
"AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" + "AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" +
"\007address\030\002 \002(\0132\010.Address\",\n\014EnterBarrier" + "\007address\030\002 \002(\0132\010.Address\"M\n\014EnterBarrier" +
"\022\014\n\004name\030\001 \002(\t\022\016\n\006status\030\002 \001(\010\"6\n\016Addres" + "\022\014\n\004name\030\001 \002(\t\022\016\n\006status\030\002 \001(\010\022\017\n\007timeou" +
"sRequest\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.A" + "t\030\003 \001(\003\022\016\n\006failed\030\004 \001(\010\"6\n\016AddressReques" +
"ddress\"G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006s" + "t\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"" +
"ystem\030\002 \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"", "G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002",
"\212\001\n\rInjectFailure\022\032\n\007failure\030\001 \002(\0162\t.Fai" + " \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInj" +
"lType\022\035\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007" + "ectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035" +
"address\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(" + "\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007address" +
"\002\022\021\n\texitValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Thro" + "\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\tex" +
"ttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shu" + "itValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Throttle\020\001\022" +
"tdown\020\004*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receiv" + "\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shutdown\020\004" +
"e\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testconduct" + "*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receive\020\002\022\010\n\004" +
"orH\001" "Both\020\003B\035\n\031akka.remote.testconductorH\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -4096,7 +4210,7 @@ public final class TestConductorProtocol {
internal_static_EnterBarrier_fieldAccessorTable = new internal_static_EnterBarrier_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_EnterBarrier_descriptor, internal_static_EnterBarrier_descriptor,
new java.lang.String[] { "Name", "Status", }, new java.lang.String[] { "Name", "Status", "Timeout", "Failed", },
akka.remote.testconductor.TestConductorProtocol.EnterBarrier.class, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.class,
akka.remote.testconductor.TestConductorProtocol.EnterBarrier.Builder.class); akka.remote.testconductor.TestConductorProtocol.EnterBarrier.Builder.class);
internal_static_AddressRequest_descriptor = internal_static_AddressRequest_descriptor =

View file

@ -7,7 +7,7 @@ option optimize_for = SPEED;
/****************************************** /******************************************
Compile with: Compile with:
cd ./akka-remote/src/main/protocol cd ./akka-remote-tests/src/main/protocol
protoc TestConductorProtocol.proto --java_out ../java protoc TestConductorProtocol.proto --java_out ../java
*******************************************/ *******************************************/
@ -27,6 +27,8 @@ message Hello {
message EnterBarrier { message EnterBarrier {
required string name = 1; required string name = 1;
optional bool status = 2; optional bool status = 2;
optional int64 timeout = 3;
optional bool failed = 4;
} }
message AddressRequest { message AddressRequest {

View file

@ -376,7 +376,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
* BarrierTimeouts in the players). * BarrierTimeouts in the players).
*/ */
override def supervisorStrategy = OneForOneStrategy() { override def supervisorStrategy = OneForOneStrategy() {
case BarrierTimeout(data) SupervisorStrategy.Resume case BarrierTimeout(data) SupervisorStrategy.Restart
case BarrierEmpty(data, msg) SupervisorStrategy.Resume case BarrierEmpty(data, msg) SupervisorStrategy.Resume
case WrongBarrier(name, client, data) client ! ToClient(BarrierResult(name, false)); failBarrier(data) case WrongBarrier(name, client, data) client ! ToClient(BarrierResult(name, false)); failBarrier(data)
case ClientLost(data, node) failBarrier(data) case ClientLost(data, node) failBarrier(data)
@ -426,6 +426,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case op: ServerOp case op: ServerOp
op match { op match {
case _: EnterBarrier barrier forward op case _: EnterBarrier barrier forward op
case _: FailBarrier barrier forward op
case GetAddress(node) case GetAddress(node)
if (nodes contains node) sender ! ToClient(AddressReply(node, nodes(node).addr)) if (nodes contains node) sender ! ToClient(AddressReply(node, nodes(node).addr))
else addrInterest += node -> ((addrInterest get node getOrElse Set()) + sender) else addrInterest += node -> ((addrInterest get node getOrElse Set()) + sender)
@ -497,9 +498,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
import BarrierCoordinator._ import BarrierCoordinator._
import akka.actor.FSM._ import akka.actor.FSM._
import Controller._ import Controller._
import akka.util.{ Timeout auTimeout }
// this shall be set to false if all subsequent barriers shall fail // this shall be set to true if all subsequent barriers shall fail
var failed = false var failed = false
var barrierTimeout: Option[auTimeout] = None
override def preRestart(reason: Throwable, message: Option[Any]) {} override def preRestart(reason: Throwable, message: Option[Any]) {}
override def postRestart(reason: Throwable) { failed = true } override def postRestart(reason: Throwable) { failed = true }
@ -520,27 +525,29 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
} }
when(Idle) { when(Idle) {
case Event(EnterBarrier(name), d @ Data(clients, _, _)) case Event(EnterBarrier(name, timeout), d @ Data(clients, _, _))
if (failed) if (failed)
stay replying ToClient(BarrierResult(name, false)) stay replying ToClient(BarrierResult(name, false))
else if (clients.map(_.fsm) == Set(sender)) else if (clients.map(_.fsm) == Set(sender))
stay replying ToClient(BarrierResult(name, true)) stay replying ToClient(BarrierResult(name, true))
else if (clients.find(_.fsm == sender).isEmpty) else if (clients.find(_.fsm == sender).isEmpty)
stay replying ToClient(BarrierResult(name, false)) stay replying ToClient(BarrierResult(name, false))
else else {
barrierTimeout = timeout
goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil) goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil)
}
case Event(RemoveClient(name), d @ Data(clients, _, _)) case Event(RemoveClient(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw BarrierEmpty(d, "cannot remove " + name + ": no client to remove") if (clients.isEmpty) throw BarrierEmpty(d, "cannot remove " + name + ": no client to remove")
stay using d.copy(clients = clients filterNot (_.name == name)) stay using d.copy(clients = clients filterNot (_.name == name))
} }
onTransition { onTransition {
case Idle -> Waiting setTimer("Timeout", StateTimeout, TestConductor().Settings.BarrierTimeout.duration, false) case Idle -> Waiting setTimer("Timeout", StateTimeout, barrierTimeout.getOrElse[auTimeout](TestConductor().Settings.BarrierTimeout).duration, false)
case Waiting -> Idle cancelTimer("Timeout") case Waiting -> Idle cancelTimer("Timeout")
} }
when(Waiting) { when(Waiting) {
case Event(EnterBarrier(name), d @ Data(clients, barrier, arrived)) case Event(EnterBarrier(name, timeout), d @ Data(clients, barrier, arrived))
if (name != barrier) throw WrongBarrier(name, sender, d) if (name != barrier) throw WrongBarrier(name, sender, d)
val together = if (clients.exists(_.fsm == sender)) sender :: arrived else arrived val together = if (clients.exists(_.fsm == sender)) sender :: arrived else arrived
handleBarrier(d.copy(arrived = together)) handleBarrier(d.copy(arrived = together))
@ -550,18 +557,27 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
case Some(client) case Some(client)
handleBarrier(d.copy(clients = clients - client, arrived = arrived filterNot (_ == client.fsm))) handleBarrier(d.copy(clients = clients - client, arrived = arrived filterNot (_ == client.fsm)))
} }
case Event(StateTimeout, data) case Event(FailBarrier(name), d @ Data(clients, barrier, arrived))
throw BarrierTimeout(data) if (name != barrier) throw WrongBarrier(name, sender, d)
failed = true
handleBarrier(d, false)
case Event(StateTimeout, d @ Data(clients, barrier, arrived))
handleBarrier(d, false)
throw BarrierTimeout(d)
} }
initialize initialize
def handleBarrier(data: Data): State = { def handleBarrier(data: Data, status: Boolean = true): State = {
log.debug("handleBarrier({})", data) log.debug("handleBarrier({}, {})", data, status)
if (data.arrived.isEmpty) { if (!status) {
data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, status)))
goto(Idle) using data.copy(barrier = "", arrived = Nil)
} else if (data.arrived.isEmpty) {
goto(Idle) using data.copy(barrier = "") goto(Idle) using data.copy(barrier = "")
} else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) { } else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) {
data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, true))) data.arrived foreach (_ ! ToClient(BarrierResult(data.barrier, status)))
goto(Idle) using data.copy(barrier = "", arrived = Nil) goto(Idle) using data.copy(barrier = "", arrived = Nil)
} else { } else {
stay using data stay using data

View file

@ -10,6 +10,7 @@ import akka.remote.testconductor.{ TestConductorProtocol ⇒ TCP }
import com.google.protobuf.Message import com.google.protobuf.Message
import akka.actor.Address import akka.actor.Address
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
import akka.util.Timeout
case class RoleName(name: String) case class RoleName(name: String)
@ -28,7 +29,8 @@ private[akka] sealed trait ConfirmedClientOp extends ClientOp
*/ */
private[akka] case class Hello(name: String, addr: Address) extends NetworkOp private[akka] case class Hello(name: String, addr: Address) extends NetworkOp
private[akka] case class EnterBarrier(name: String) extends ServerOp with NetworkOp private[akka] case class EnterBarrier(name: String, timeout: Option[Timeout] = None) extends ServerOp with NetworkOp
private[akka] case class FailBarrier(name: String) extends ServerOp with NetworkOp
private[akka] case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp private[akka] case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp
private[akka] case class Throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Float) extends CommandOp private[akka] case class Throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Float) extends CommandOp
@ -72,10 +74,14 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
x match { x match {
case Hello(name, addr) case Hello(name, addr)
w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr)) w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr))
case EnterBarrier(name) case EnterBarrier(name, timeout)
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name)) val barrier = TCP.EnterBarrier.newBuilder.setName(name)
timeout foreach (t barrier.setTimeout(t.duration.toMillis))
w.setBarrier(barrier)
case BarrierResult(name, success) case BarrierResult(name, success)
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setStatus(success)) w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setStatus(success))
case FailBarrier(name)
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setFailed(true))
case ThrottleMsg(target, dir, rate) case ThrottleMsg(target, dir, rate)
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target) w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
.setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate)) .setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate))
@ -115,7 +121,8 @@ private[akka] class MsgDecoder extends OneToOneDecoder {
} else if (w.hasBarrier) { } else if (w.hasBarrier) {
val barrier = w.getBarrier val barrier = w.getBarrier
if (barrier.hasStatus) BarrierResult(barrier.getName, barrier.getStatus) if (barrier.hasStatus) BarrierResult(barrier.getName, barrier.getStatus)
else EnterBarrier(w.getBarrier.getName) else if (barrier.hasFailed) FailBarrier(barrier.getName)
else EnterBarrier(w.getBarrier.getName, if (barrier.hasTimeout) Option(Timeout.longToTimeout(barrier.getTimeout)) else None)
} else if (w.hasFailure) { } else if (w.hasFailure) {
val f = w.getFailure val f = w.getFailure
import TCP.{ FailType FT } import TCP.{ FailType FT }

View file

@ -11,7 +11,7 @@ import com.typesafe.config.ConfigFactory
import akka.util.Timeout import akka.util.Timeout
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.pattern.{ ask, pipe } import akka.pattern.{ ask, pipe, AskTimeoutException }
import akka.dispatch.Await import akka.dispatch.Await
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.actor.Status import akka.actor.Status
@ -76,10 +76,34 @@ trait Player { this: TestConductorExt ⇒
* throw an exception in case of timeouts or other errors. * throw an exception in case of timeouts or other errors.
*/ */
def enter(name: String*) { def enter(name: String*) {
enter(Settings.BarrierTimeout, name)
}
case class OutOfTimeException(barrier: String) extends RuntimeException("Ran out of time while waiting for barrier '" + barrier + "'") with NoStackTrace
/**
* Enter the named barriers, one after the other, in the order given. Will
* throw an exception in case of timeouts or other errors.
*/
def enter(timeout: Timeout, name: Seq[String]) {
def now: Duration = System.nanoTime.nanos
system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) system.log.debug("entering barriers " + name.mkString("(", ", ", ")"))
val stop = now + timeout.duration
name foreach { b name foreach { b
import Settings.BarrierTimeout val barrierTimeout = stop - now
Await.result(client ? ToServer(EnterBarrier(b)), Duration.Inf) if (barrierTimeout < Duration.Zero) {
client ! ToServer(FailBarrier(b))
throw OutOfTimeException(b)
}
try {
implicit val timeout = Timeout(barrierTimeout + Settings.QueryTimeout.duration)
Await.result(client ? ToServer(EnterBarrier(b, Option(barrierTimeout))), Duration.Inf)
} catch {
case e: AskTimeoutException
client ! ToServer(FailBarrier(b))
throw e
}
system.log.debug("passed barrier {}", b) system.log.debug("passed barrier {}", b)
} }
} }
@ -88,7 +112,7 @@ trait Player { this: TestConductorExt ⇒
* Query remote transport address of named node. * Query remote transport address of named node.
*/ */
def getAddressFor(name: RoleName): Future[Address] = { def getAddressFor(name: RoleName): Future[Address] = {
import Settings.BarrierTimeout import Settings.QueryTimeout
client ? ToServer(GetAddress(name)) mapTo client ? ToServer(GetAddress(name)) mapTo
} }
} }
@ -168,8 +192,8 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
case Event(ToServer(msg), d @ Data(Some(channel), None)) case Event(ToServer(msg), d @ Data(Some(channel), None))
channel.write(msg) channel.write(msg)
val token = msg match { val token = msg match {
case EnterBarrier(barrier) barrier case EnterBarrier(barrier, timeout) barrier
case GetAddress(node) node.name case GetAddress(node) node.name
} }
stay using d.copy(runningOp = Some(token, sender)) stay using d.copy(runningOp = Some(token, sender))
case Event(ToServer(op), Data(channel, Some((token, _)))) case Event(ToServer(op), Data(channel, Some((token, _))))

View file

@ -19,6 +19,7 @@ import org.scalatest.BeforeAndAfterEach
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.InetAddress import java.net.InetAddress
import akka.testkit.TimingTest import akka.testkit.TimingTest
import akka.util.{ Timeout, Duration }
object BarrierSpec { object BarrierSpec {
case class Failed(ref: ActorRef, thr: Throwable) case class Failed(ref: ActorRef, thr: Throwable)
@ -74,8 +75,8 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
"fail entering barrier when nobody registered" taggedAs TimingTest in { "fail entering barrier when nobody registered" taggedAs TimingTest in {
val b = getBarrier() val b = getBarrier()
b ! EnterBarrier("b") b ! EnterBarrier("bar1")
expectMsg(ToClient(BarrierResult("b", false))) expectMsg(ToClient(BarrierResult("bar1", false)))
} }
"enter barrier" taggedAs TimingTest in { "enter barrier" taggedAs TimingTest in {
@ -83,12 +84,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
val a, b = TestProbe() val a, b = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar2"))
noMsg(a, b) noMsg(a, b)
within(2 second) { within(2 second) {
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar2"))
a.expectMsg(ToClient(BarrierResult("bar", true))) a.expectMsg(ToClient(BarrierResult("bar2", true)))
b.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar2", true)))
} }
} }
@ -97,15 +98,15 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
val a, b, c = TestProbe() val a, b, c = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar3"))
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar3"))
noMsg(a, b, c) noMsg(a, b, c)
within(2 second) { within(2 second) {
c.send(barrier, EnterBarrier("bar")) c.send(barrier, EnterBarrier("bar3"))
a.expectMsg(ToClient(BarrierResult("bar", true))) a.expectMsg(ToClient(BarrierResult("bar3", true)))
b.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar3", true)))
c.expectMsg(ToClient(BarrierResult("bar", true))) c.expectMsg(ToClient(BarrierResult("bar3", true)))
} }
} }
@ -115,14 +116,14 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar4"))
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar4"))
barrier ! RemoveClient(A) barrier ! RemoveClient(A)
barrier ! ClientDisconnected(A) barrier ! ClientDisconnected(A)
noMsg(a, b, c) noMsg(a, b, c)
b.within(2 second) { b.within(2 second) {
barrier ! RemoveClient(C) barrier ! RemoveClient(C)
b.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar4", true)))
} }
barrier ! ClientDisconnected(C) barrier ! ClientDisconnected(C)
expectNoMsg(1 second) expectNoMsg(1 second)
@ -133,7 +134,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
val a, b = TestProbe() val a, b = TestProbe()
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar5"))
barrier ! RemoveClient(A) barrier ! RemoveClient(A)
b.send(barrier, EnterBarrier("foo")) b.send(barrier, EnterBarrier("foo"))
b.expectMsg(ToClient(BarrierResult("foo", true))) b.expectMsg(ToClient(BarrierResult("foo", true)))
@ -145,11 +146,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA barrier ! nodeA
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar6"))
EventFilter[ClientLost](occurrences = 1) intercept { EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B) barrier ! ClientDisconnected(B)
} }
expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar", a.ref :: Nil), B))) expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar6", a.ref :: Nil), B)))
} }
"fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
@ -160,12 +161,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! nodeA barrier ! nodeA
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeC barrier ! nodeC
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar7"))
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar7"))
EventFilter[ClientLost](occurrences = 1) intercept { EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B) barrier ! ClientDisconnected(B)
} }
expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar", a.ref :: Nil), B))) expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar7", a.ref :: Nil), B)))
} }
"fail when entering wrong barrier" taggedAs TimingTest in { "fail when entering wrong barrier" taggedAs TimingTest in {
@ -175,11 +176,11 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! nodeA barrier ! nodeA
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeB barrier ! nodeB
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar8"))
EventFilter[WrongBarrier](occurrences = 1) intercept { EventFilter[WrongBarrier](occurrences = 1) intercept {
b.send(barrier, EnterBarrier("foo")) b.send(barrier, EnterBarrier("foo"))
} }
expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar", a.ref :: Nil)))) expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar8", a.ref :: Nil))))
} }
"fail barrier after first failure" taggedAs TimingTest in { "fail barrier after first failure" taggedAs TimingTest in {
@ -190,8 +191,8 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
} }
expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove"))) expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "cannot remove RoleName(a): no client to remove")))
barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) barrier ! NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
a.send(barrier, EnterBarrier("right")) a.send(barrier, EnterBarrier("bar9"))
a.expectMsg(ToClient(BarrierResult("right", false))) a.expectMsg(ToClient(BarrierResult("bar9", false)))
} }
"fail after barrier timeout" taggedAs TimingTest in { "fail after barrier timeout" taggedAs TimingTest in {
@ -201,9 +202,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA barrier ! nodeA
barrier ! nodeB barrier ! nodeB
a.send(barrier, EnterBarrier("right")) a.send(barrier, EnterBarrier("bar10"))
EventFilter[BarrierTimeout](occurrences = 1) intercept { EventFilter[BarrierTimeout](occurrences = 1) intercept {
expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "right", a.ref :: Nil)))) expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "bar10", a.ref :: Nil))))
} }
} }
@ -264,12 +265,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar11"))
noMsg(a, b) noMsg(a, b)
within(2 second) { within(2 second) {
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar11"))
a.expectMsg(ToClient(BarrierResult("bar", true))) a.expectMsg(ToClient(BarrierResult("bar11", true)))
b.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar11", true)))
} }
} }
@ -280,16 +281,16 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar12"))
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
c.expectMsg(ToClient(Done)) c.expectMsg(ToClient(Done))
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar12"))
noMsg(a, b, c) noMsg(a, b, c)
within(2 second) { within(2 second) {
c.send(barrier, EnterBarrier("bar")) c.send(barrier, EnterBarrier("bar12"))
a.expectMsg(ToClient(BarrierResult("bar", true))) a.expectMsg(ToClient(BarrierResult("bar12", true)))
b.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar12", true)))
c.expectMsg(ToClient(BarrierResult("bar", true))) c.expectMsg(ToClient(BarrierResult("bar12", true)))
} }
} }
@ -302,14 +303,14 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
c.expectMsg(ToClient(Done)) c.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar13"))
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar13"))
barrier ! Remove(A) barrier ! Remove(A)
barrier ! ClientDisconnected(A) barrier ! ClientDisconnected(A)
noMsg(a, b, c) noMsg(a, b, c)
b.within(2 second) { b.within(2 second) {
barrier ! Remove(C) barrier ! Remove(C)
b.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar13", true)))
} }
barrier ! ClientDisconnected(C) barrier ! ClientDisconnected(C)
expectNoMsg(1 second) expectNoMsg(1 second)
@ -322,7 +323,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar14"))
barrier ! Remove(A) barrier ! Remove(A)
b.send(barrier, EnterBarrier("foo")) b.send(barrier, EnterBarrier("foo"))
b.expectMsg(ToClient(BarrierResult("foo", true))) b.expectMsg(ToClient(BarrierResult("foo", true)))
@ -336,13 +337,13 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar15"))
barrier ! ClientDisconnected(RoleName("unknown")) barrier ! ClientDisconnected(RoleName("unknown"))
noMsg(a) noMsg(a)
EventFilter[ClientLost](occurrences = 1) intercept { EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B) barrier ! ClientDisconnected(B)
} }
a.expectMsg(ToClient(BarrierResult("bar", false))) a.expectMsg(ToClient(BarrierResult("bar15", false)))
} }
"fail barrier with disconnecing node who already arrived" taggedAs TimingTest in { "fail barrier with disconnecing node who already arrived" taggedAs TimingTest in {
@ -356,12 +357,12 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
c.expectMsg(ToClient(Done)) c.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar16"))
b.send(barrier, EnterBarrier("bar")) b.send(barrier, EnterBarrier("bar16"))
EventFilter[ClientLost](occurrences = 1) intercept { EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected(B) barrier ! ClientDisconnected(B)
} }
a.expectMsg(ToClient(BarrierResult("bar", false))) a.expectMsg(ToClient(BarrierResult("bar16", false)))
} }
"fail when entering wrong barrier" taggedAs TimingTest in { "fail when entering wrong barrier" taggedAs TimingTest in {
@ -373,15 +374,15 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! nodeB barrier ! nodeB
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar")) a.send(barrier, EnterBarrier("bar17"))
EventFilter[WrongBarrier](occurrences = 1) intercept { EventFilter[WrongBarrier](occurrences = 1) intercept {
b.send(barrier, EnterBarrier("foo")) b.send(barrier, EnterBarrier("foo"))
} }
a.expectMsg(ToClient(BarrierResult("bar", false))) a.expectMsg(ToClient(BarrierResult("bar17", false)))
b.expectMsg(ToClient(BarrierResult("foo", false))) b.expectMsg(ToClient(BarrierResult("foo", false)))
} }
"not really fail after barrier timeout" taggedAs TimingTest in { "fail after barrier timeout" taggedAs TimingTest in {
val barrier = getController(2) val barrier = getController(2)
val a, b = TestProbe() val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref) val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
@ -390,13 +391,13 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
barrier ! nodeB barrier ! nodeB
a.expectMsg(ToClient(Done)) a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done)) b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("right")) a.send(barrier, EnterBarrier("bar18", Option(Timeout.durationToTimeout(2 seconds))))
EventFilter[BarrierTimeout](occurrences = 1) intercept { EventFilter[BarrierTimeout](occurrences = 1) intercept {
Thread.sleep(5000) Thread.sleep(4000)
} }
b.send(barrier, EnterBarrier("right")) b.send(barrier, EnterBarrier("bar18"))
a.expectMsg(ToClient(BarrierResult("right", true))) a.expectMsg(ToClient(BarrierResult("bar18", false)))
b.expectMsg(ToClient(BarrierResult("right", true))) b.expectMsg(ToClient(BarrierResult("bar18", false)))
} }
"fail if a node registers twice" taggedAs TimingTest in { "fail if a node registers twice" taggedAs TimingTest in {
@ -423,8 +424,27 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
controller ! nodeB controller ! nodeB
b.expectMsg(ToClient(BarrierResult("initial startup", false))) b.expectMsg(ToClient(BarrierResult("initial startup", false)))
} }
a.send(controller, EnterBarrier("x")) a.send(controller, EnterBarrier("bar19"))
a.expectMsg(ToClient(BarrierResult("x", false))) a.expectMsg(ToClient(BarrierResult("bar19", false)))
}
"fail subsequent barriers after foreced failure" taggedAs TimingTest in {
val barrier = getController(2)
val a, b = TestProbe()
val nodeA = NodeInfo(A, AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
barrier ! nodeB
a.expectMsg(ToClient(Done))
b.expectMsg(ToClient(Done))
a.send(barrier, EnterBarrier("bar20", Option(Timeout.durationToTimeout(2 seconds))))
b.send(barrier, FailBarrier("bar20"))
a.expectMsg(ToClient(BarrierResult("bar20", false)))
b.expectNoMsg(1 second)
a.send(barrier, EnterBarrier("bar21"))
b.send(barrier, EnterBarrier("bar21"))
a.expectMsg(ToClient(BarrierResult("bar21", false)))
b.expectMsg(ToClient(BarrierResult("bar21", false)))
} }
"finally have no failure messages left" taggedAs TimingTest in { "finally have no failure messages left" taggedAs TimingTest in {

View file

@ -12,7 +12,7 @@ import akka.dispatch.Await
import akka.dispatch.Await.Awaitable import akka.dispatch.Await.Awaitable
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.util.{ NonFatal, Duration } import akka.util.{ Timeout, NonFatal, Duration }
/** /**
* Configure the role names and participants of the test, including configuration settings. * Configure the role names and participants of the test, including configuration settings.
@ -182,6 +182,14 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
if (nodes exists (_ == myself)) yes else no if (nodes exists (_ == myself)) yes else no
} }
/**
* Enter the named barriers in the order given. Use the remaining duration from
* the innermost enclosing `within` block or the default `BarrierTimeout`
*/
def enter(name: String*) {
testConductor.enter(Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), name)
}
/** /**
* Query the controller for the transport address of the given node (by role name) and * Query the controller for the transport address of the given node (by role name) and
* return that as an ActorPath for easy composition: * return that as an ActorPath for easy composition:
@ -193,11 +201,14 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await) def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await)
/** /**
* Enrich `.await()` onto all Awaitables, using BarrierTimeout. * Enrich `.await()` onto all Awaitables, using remaining duration from the innermost
* enclosing `within` block or BarrierTimeout.
*
* FIXME Is it really BarrierTimeout we want here? That seems like an awfully long time.
*/ */
implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w) implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w)
class AwaitHelper[T](w: Awaitable[T]) { class AwaitHelper[T](w: Awaitable[T]) {
def await: T = Await.result(w, testConductor.Settings.BarrierTimeout.duration) def await: T = Await.result(w, remainingOr(testConductor.Settings.BarrierTimeout.duration))
} }
/* /*

View file

@ -158,7 +158,13 @@ trait TestKitBase {
* block or missing that it returns the properly dilated default for this * block or missing that it returns the properly dilated default for this
* case from settings (key "akka.test.single-expect-default"). * case from settings (key "akka.test.single-expect-default").
*/ */
def remaining: Duration = if (end == Duration.Undefined) testKitSettings.SingleExpectDefaultTimeout.dilated else end - now def remaining: Duration = remainingOr(testKitSettings.SingleExpectDefaultTimeout.dilated)
/**
* Obtain time remaining for execution of the innermost enclosing `within`
* block or missing that it returns the given duration.
*/
def remainingOr(duration: Duration): Duration = if (end == Duration.Undefined) duration else end - now
/** /**
* Query queue status. * Query queue status.
@ -605,12 +611,6 @@ object TestKit {
/** /**
* Await until the given condition evaluates to `true` or the timeout * Await until the given condition evaluates to `true` or the timeout
* expires, whichever comes first. * expires, whichever comes first.
*
* If no timeout is given, take it from the innermost enclosing `within`
* block.
*
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "akka.test.timefactor"
*/ */
def awaitCond(p: Boolean, max: Duration, interval: Duration = 100.millis, noThrow: Boolean = false): Boolean = { def awaitCond(p: Boolean, max: Duration, interval: Duration = 100.millis, noThrow: Boolean = false): Boolean = {
val stop = now + max val stop = now + max