diff --git a/akka-remote/src/main/java/akka/remote/testconductor/TestConductorProtocol.java b/akka-remote/src/main/java/akka/remote/testconductor/TestConductorProtocol.java index 4b9da03059..3d6c145097 100644 --- a/akka-remote/src/main/java/akka/remote/testconductor/TestConductorProtocol.java +++ b/akka-remote/src/main/java/akka/remote/testconductor/TestConductorProtocol.java @@ -1543,6 +1543,10 @@ public final class TestConductorProtocol { // required string name = 1; boolean hasName(); String getName(); + + // optional bool failed = 2; + boolean hasFailed(); + boolean getFailed(); } public static final class EnterBarrier extends com.google.protobuf.GeneratedMessage @@ -1605,8 +1609,19 @@ public final class TestConductorProtocol { } } + // optional bool failed = 2; + public static final int FAILED_FIELD_NUMBER = 2; + private boolean failed_; + public boolean hasFailed() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getFailed() { + return failed_; + } + private void initFields() { name_ = ""; + failed_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1627,6 +1642,9 @@ public final class TestConductorProtocol { if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeBytes(1, getNameBytes()); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBool(2, failed_); + } getUnknownFields().writeTo(output); } @@ -1640,6 +1658,10 @@ public final class TestConductorProtocol { size += com.google.protobuf.CodedOutputStream .computeBytesSize(1, getNameBytes()); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(2, failed_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1766,6 +1788,8 @@ public final class TestConductorProtocol { super.clear(); name_ = ""; bitField0_ = (bitField0_ & ~0x00000001); + failed_ = false; + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -1808,6 +1832,10 @@ public final class TestConductorProtocol { to_bitField0_ |= 0x00000001; } result.name_ = name_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.failed_ = failed_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1827,6 +1855,9 @@ public final class TestConductorProtocol { if (other.hasName()) { setName(other.getName()); } + if (other.hasFailed()) { + setFailed(other.getFailed()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1867,6 +1898,11 @@ public final class TestConductorProtocol { name_ = input.readBytes(); break; } + case 16: { + bitField0_ |= 0x00000002; + failed_ = input.readBool(); + break; + } } } } @@ -1909,6 +1945,27 @@ public final class TestConductorProtocol { onChanged(); } + // optional bool failed = 2; + private boolean failed_ ; + public boolean hasFailed() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getFailed() { + return failed_; + } + public Builder setFailed(boolean value) { + bitField0_ |= 0x00000002; + failed_ = value; + onChanged(); + return this; + } + public Builder clearFailed() { + bitField0_ = (bitField0_ & ~0x00000002); + failed_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:EnterBarrier) } @@ -3300,17 +3357,17 @@ public final class TestConductorProtocol { "\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007barrier\030\002 \001(\013" + "2\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Injec" + "tFailure\022\014\n\004done\030\004 \001(\t\"0\n\005Hello\022\014\n\004name\030" + - "\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\"\034\n\014Ente" + - "rBarrier\022\014\n\004name\030\001 \002(\t\"G\n\007Address\022\020\n\010pro" + - "tocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004host\030\003 \002(" + - "\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailure\022\032\n\007fai" + - "lure\030\001 \002(\0162\t.FailType\022\035\n\tdirection\030\002 \001(\016" + - "2\n.Direction\022\031\n\007address\030\003 \001(\0132\010.Address\022", - "\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030\007 \001(\005*A\n\010" + - "FailType\022\014\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t" + - "\n\005Abort\020\003\022\014\n\010Shutdown\020\004*,\n\tDirection\022\010\n\004" + - "Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" + - "mote.testconductorH\001" + "\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\",\n\014Ente" + + "rBarrier\022\014\n\004name\030\001 \002(\t\022\016\n\006failed\030\002 \001(\010\"G" + + "\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 " + + "\002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInje" + + "ctFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n" + + "\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007address\030", + "\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texi" + + "tValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Throttle\020\001\022\016" + + "\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shutdown\020\004*" + + ",\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receive\020\002\022\010\n\004B" + + "oth\020\003B\035\n\031akka.remote.testconductorH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3338,7 +3395,7 @@ public final class TestConductorProtocol { internal_static_EnterBarrier_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_EnterBarrier_descriptor, - new java.lang.String[] { "Name", }, + new java.lang.String[] { "Name", "Failed", }, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.class, akka.remote.testconductor.TestConductorProtocol.EnterBarrier.Builder.class); internal_static_Address_descriptor = diff --git a/akka-remote/src/main/protocol/TestConductorProtocol.proto b/akka-remote/src/main/protocol/TestConductorProtocol.proto index e483bf4f01..007965b2e8 100644 --- a/akka-remote/src/main/protocol/TestConductorProtocol.proto +++ b/akka-remote/src/main/protocol/TestConductorProtocol.proto @@ -25,6 +25,7 @@ message Hello { message EnterBarrier { required string name = 1; + optional bool failed = 2; } message Address { diff --git a/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala index 7e3d315fea..2bbae6d28b 100644 --- a/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -22,6 +22,8 @@ import akka.event.LoggingReceive import akka.actor.Address import java.net.InetSocketAddress import akka.dispatch.Future +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy trait Conductor extends RunControl with FailureInject { this: TestConductorExt ⇒ @@ -194,6 +196,15 @@ class Controller(_participants: Int) extends Actor { val connection = RemoteConnection(Server, settings.host, settings.port, new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler"))) + override def supervisorStrategy = OneForOneStrategy() { + case e: BarrierCoordinator.BarrierTimeoutException ⇒ SupervisorStrategy.Resume + case e: BarrierCoordinator.WrongBarrierException ⇒ + // I think we are lacking a means of communication here: this is not correct! + for (i ← 1 to e.data.clients) barrier ! ClientConnected + for (c ← e.data.arrived) c ! BarrierFailed(e.barrier) + SupervisorStrategy.Restart + } + val barrier = context.actorOf(Props[BarrierCoordinator], "barriers") var nodes = Map[String, NodeInfo]() @@ -240,7 +251,8 @@ object BarrierCoordinator { case object Waiting extends State case class Data(clients: Int, barrier: String, arrived: List[ActorRef]) - class BarrierTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace + class BarrierTimeoutException(val data: Data) extends RuntimeException(data.barrier) with NoStackTrace + class WrongBarrierException(val barrier: String, val client: ActorRef, val data: Data) extends RuntimeException(barrier) with NoStackTrace } class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] { @@ -262,13 +274,13 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, } onTransition { - case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, 30 seconds, false) + case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, TestConductor().Settings.BarrierTimeout.duration, false) case Waiting -> Idle ⇒ cancelTimer("Timeout") } when(Waiting) { case Event(e @ EnterBarrier(name), d @ Data(num, barrier, arrived)) ⇒ - if (name != barrier) throw new IllegalStateException("trying enter barrier '" + name + "' while barrier '" + barrier + "' is active") + if (name != barrier) throw new WrongBarrierException(barrier, sender, d) val together = sender :: arrived if (together.size == num) { together foreach (_ ! Send(e)) @@ -287,8 +299,8 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, } else { stay using d.copy(clients = expected) } - case Event(StateTimeout, Data(num, barrier, arrived)) ⇒ - throw new BarrierTimeoutException("only " + arrived.size + " of " + num + " arrived at barrier " + barrier) + case Event(StateTimeout, data) ⇒ + throw new BarrierTimeoutException(data) } initialize diff --git a/akka-remote/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-remote/src/main/scala/akka/remote/testconductor/DataTypes.scala index 90d7eeccd5..cadd69f786 100644 --- a/akka-remote/src/main/scala/akka/remote/testconductor/DataTypes.scala +++ b/akka-remote/src/main/scala/akka/remote/testconductor/DataTypes.scala @@ -19,6 +19,7 @@ sealed trait NetworkOp // messages sent over the wire case class Hello(name: String, addr: Address) extends NetworkOp case class EnterBarrier(name: String) extends ClientOp with ServerOp with NetworkOp +case class BarrierFailed(name: String) extends NetworkOp case class Throttle(node: String, target: String, direction: Direction, rateMBit: Float) extends ServerOp case class ThrottleMsg(target: Address, direction: Direction, rateMBit: Float) extends NetworkOp case class Disconnect(node: String, target: String, abort: Boolean) extends ServerOp @@ -41,6 +42,8 @@ class MsgEncoder extends OneToOneEncoder { w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr)) case EnterBarrier(name) ⇒ w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name)) + case BarrierFailed(name) ⇒ + w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setFailed(true)) case ThrottleMsg(target, dir, rate) ⇒ w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target) .setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate)) @@ -64,7 +67,9 @@ class MsgDecoder extends OneToOneDecoder { val h = w.getHello Hello(h.getName, h.getAddress) } else if (w.hasBarrier) { - EnterBarrier(w.getBarrier.getName) + val barrier = w.getBarrier + if (barrier.hasFailed && barrier.getFailed) BarrierFailed(barrier.getName) + else EnterBarrier(w.getBarrier.getName) } else if (w.hasFailure) { val f = w.getFailure import TCP.{ FailType ⇒ FT } diff --git a/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala index f7d2fbd532..6e78610cfb 100644 --- a/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala @@ -129,6 +129,9 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client sender ! b } stay using Data(channel, None) + case Event(BarrierFailed(b), Data(channel, Some((_, sender)))) ⇒ + sender ! Status.Failure(new RuntimeException("barrier failed: " + b)) + stay using Data(channel, None) case Event(ThrottleMsg(target, dir, rate), _) ⇒ import settings.QueryTimeout import context.dispatcher