handle barrier failures better
This commit is contained in:
parent
33cea733a3
commit
e950045015
5 changed files with 96 additions and 18 deletions
|
|
@ -1543,6 +1543,10 @@ public final class TestConductorProtocol {
|
|||
// required string name = 1;
|
||||
boolean hasName();
|
||||
String getName();
|
||||
|
||||
// optional bool failed = 2;
|
||||
boolean hasFailed();
|
||||
boolean getFailed();
|
||||
}
|
||||
public static final class EnterBarrier extends
|
||||
com.google.protobuf.GeneratedMessage
|
||||
|
|
@ -1605,8 +1609,19 @@ public final class TestConductorProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
// optional bool failed = 2;
|
||||
public static final int FAILED_FIELD_NUMBER = 2;
|
||||
private boolean failed_;
|
||||
public boolean hasFailed() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
public boolean getFailed() {
|
||||
return failed_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
name_ = "";
|
||||
failed_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
|
@ -1627,6 +1642,9 @@ public final class TestConductorProtocol {
|
|||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
output.writeBytes(1, getNameBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeBool(2, failed_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
|
@ -1640,6 +1658,10 @@ public final class TestConductorProtocol {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(1, getNameBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(2, failed_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
|
@ -1766,6 +1788,8 @@ public final class TestConductorProtocol {
|
|||
super.clear();
|
||||
name_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
failed_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -1808,6 +1832,10 @@ public final class TestConductorProtocol {
|
|||
to_bitField0_ |= 0x00000001;
|
||||
}
|
||||
result.name_ = name_;
|
||||
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.failed_ = failed_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
|
@ -1827,6 +1855,9 @@ public final class TestConductorProtocol {
|
|||
if (other.hasName()) {
|
||||
setName(other.getName());
|
||||
}
|
||||
if (other.hasFailed()) {
|
||||
setFailed(other.getFailed());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
|
@ -1867,6 +1898,11 @@ public final class TestConductorProtocol {
|
|||
name_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
case 16: {
|
||||
bitField0_ |= 0x00000002;
|
||||
failed_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1909,6 +1945,27 @@ public final class TestConductorProtocol {
|
|||
onChanged();
|
||||
}
|
||||
|
||||
// optional bool failed = 2;
|
||||
private boolean failed_ ;
|
||||
public boolean hasFailed() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
public boolean getFailed() {
|
||||
return failed_;
|
||||
}
|
||||
public Builder setFailed(boolean value) {
|
||||
bitField0_ |= 0x00000002;
|
||||
failed_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearFailed() {
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
failed_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:EnterBarrier)
|
||||
}
|
||||
|
||||
|
|
@ -3300,17 +3357,17 @@ public final class TestConductorProtocol {
|
|||
"\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007barrier\030\002 \001(\013" +
|
||||
"2\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Injec" +
|
||||
"tFailure\022\014\n\004done\030\004 \001(\t\"0\n\005Hello\022\014\n\004name\030" +
|
||||
"\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\"\034\n\014Ente" +
|
||||
"rBarrier\022\014\n\004name\030\001 \002(\t\"G\n\007Address\022\020\n\010pro" +
|
||||
"tocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004host\030\003 \002(" +
|
||||
"\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailure\022\032\n\007fai" +
|
||||
"lure\030\001 \002(\0162\t.FailType\022\035\n\tdirection\030\002 \001(\016" +
|
||||
"2\n.Direction\022\031\n\007address\030\003 \001(\0132\010.Address\022",
|
||||
"\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030\007 \001(\005*A\n\010" +
|
||||
"FailType\022\014\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t" +
|
||||
"\n\005Abort\020\003\022\014\n\010Shutdown\020\004*,\n\tDirection\022\010\n\004" +
|
||||
"Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" +
|
||||
"mote.testconductorH\001"
|
||||
"\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\",\n\014Ente" +
|
||||
"rBarrier\022\014\n\004name\030\001 \002(\t\022\016\n\006failed\030\002 \001(\010\"G" +
|
||||
"\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 " +
|
||||
"\002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInje" +
|
||||
"ctFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n" +
|
||||
"\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007address\030",
|
||||
"\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texi" +
|
||||
"tValue\030\007 \001(\005*A\n\010FailType\022\014\n\010Throttle\020\001\022\016" +
|
||||
"\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022\014\n\010Shutdown\020\004*" +
|
||||
",\n\tDirection\022\010\n\004Send\020\001\022\013\n\007Receive\020\002\022\010\n\004B" +
|
||||
"oth\020\003B\035\n\031akka.remote.testconductorH\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
@ -3338,7 +3395,7 @@ public final class TestConductorProtocol {
|
|||
internal_static_EnterBarrier_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_EnterBarrier_descriptor,
|
||||
new java.lang.String[] { "Name", },
|
||||
new java.lang.String[] { "Name", "Failed", },
|
||||
akka.remote.testconductor.TestConductorProtocol.EnterBarrier.class,
|
||||
akka.remote.testconductor.TestConductorProtocol.EnterBarrier.Builder.class);
|
||||
internal_static_Address_descriptor =
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ message Hello {
|
|||
|
||||
message EnterBarrier {
|
||||
required string name = 1;
|
||||
optional bool failed = 2;
|
||||
}
|
||||
|
||||
message Address {
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import akka.event.LoggingReceive
|
|||
import akka.actor.Address
|
||||
import java.net.InetSocketAddress
|
||||
import akka.dispatch.Future
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.SupervisorStrategy
|
||||
|
||||
trait Conductor extends RunControl with FailureInject { this: TestConductorExt ⇒
|
||||
|
||||
|
|
@ -194,6 +196,15 @@ class Controller(_participants: Int) extends Actor {
|
|||
val connection = RemoteConnection(Server, settings.host, settings.port,
|
||||
new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler")))
|
||||
|
||||
override def supervisorStrategy = OneForOneStrategy() {
|
||||
case e: BarrierCoordinator.BarrierTimeoutException ⇒ SupervisorStrategy.Resume
|
||||
case e: BarrierCoordinator.WrongBarrierException ⇒
|
||||
// I think we are lacking a means of communication here: this is not correct!
|
||||
for (i ← 1 to e.data.clients) barrier ! ClientConnected
|
||||
for (c ← e.data.arrived) c ! BarrierFailed(e.barrier)
|
||||
SupervisorStrategy.Restart
|
||||
}
|
||||
|
||||
val barrier = context.actorOf(Props[BarrierCoordinator], "barriers")
|
||||
var nodes = Map[String, NodeInfo]()
|
||||
|
||||
|
|
@ -240,7 +251,8 @@ object BarrierCoordinator {
|
|||
case object Waiting extends State
|
||||
|
||||
case class Data(clients: Int, barrier: String, arrived: List[ActorRef])
|
||||
class BarrierTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace
|
||||
class BarrierTimeoutException(val data: Data) extends RuntimeException(data.barrier) with NoStackTrace
|
||||
class WrongBarrierException(val barrier: String, val client: ActorRef, val data: Data) extends RuntimeException(barrier) with NoStackTrace
|
||||
}
|
||||
|
||||
class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] {
|
||||
|
|
@ -262,13 +274,13 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
|
|||
}
|
||||
|
||||
onTransition {
|
||||
case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, 30 seconds, false)
|
||||
case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, TestConductor().Settings.BarrierTimeout.duration, false)
|
||||
case Waiting -> Idle ⇒ cancelTimer("Timeout")
|
||||
}
|
||||
|
||||
when(Waiting) {
|
||||
case Event(e @ EnterBarrier(name), d @ Data(num, barrier, arrived)) ⇒
|
||||
if (name != barrier) throw new IllegalStateException("trying enter barrier '" + name + "' while barrier '" + barrier + "' is active")
|
||||
if (name != barrier) throw new WrongBarrierException(barrier, sender, d)
|
||||
val together = sender :: arrived
|
||||
if (together.size == num) {
|
||||
together foreach (_ ! Send(e))
|
||||
|
|
@ -287,8 +299,8 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
|
|||
} else {
|
||||
stay using d.copy(clients = expected)
|
||||
}
|
||||
case Event(StateTimeout, Data(num, barrier, arrived)) ⇒
|
||||
throw new BarrierTimeoutException("only " + arrived.size + " of " + num + " arrived at barrier " + barrier)
|
||||
case Event(StateTimeout, data) ⇒
|
||||
throw new BarrierTimeoutException(data)
|
||||
}
|
||||
|
||||
initialize
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ sealed trait NetworkOp // messages sent over the wire
|
|||
|
||||
case class Hello(name: String, addr: Address) extends NetworkOp
|
||||
case class EnterBarrier(name: String) extends ClientOp with ServerOp with NetworkOp
|
||||
case class BarrierFailed(name: String) extends NetworkOp
|
||||
case class Throttle(node: String, target: String, direction: Direction, rateMBit: Float) extends ServerOp
|
||||
case class ThrottleMsg(target: Address, direction: Direction, rateMBit: Float) extends NetworkOp
|
||||
case class Disconnect(node: String, target: String, abort: Boolean) extends ServerOp
|
||||
|
|
@ -41,6 +42,8 @@ class MsgEncoder extends OneToOneEncoder {
|
|||
w.setHello(TCP.Hello.newBuilder.setName(name).setAddress(addr))
|
||||
case EnterBarrier(name) ⇒
|
||||
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name))
|
||||
case BarrierFailed(name) ⇒
|
||||
w.setBarrier(TCP.EnterBarrier.newBuilder.setName(name).setFailed(true))
|
||||
case ThrottleMsg(target, dir, rate) ⇒
|
||||
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
|
||||
.setFailure(TCP.FailType.Throttle).setDirection(dir).setRateMBit(rate))
|
||||
|
|
@ -64,7 +67,9 @@ class MsgDecoder extends OneToOneDecoder {
|
|||
val h = w.getHello
|
||||
Hello(h.getName, h.getAddress)
|
||||
} else if (w.hasBarrier) {
|
||||
EnterBarrier(w.getBarrier.getName)
|
||||
val barrier = w.getBarrier
|
||||
if (barrier.hasFailed && barrier.getFailed) BarrierFailed(barrier.getName)
|
||||
else EnterBarrier(w.getBarrier.getName)
|
||||
} else if (w.hasFailure) {
|
||||
val f = w.getFailure
|
||||
import TCP.{ FailType ⇒ FT }
|
||||
|
|
|
|||
|
|
@ -129,6 +129,9 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client
|
|||
sender ! b
|
||||
}
|
||||
stay using Data(channel, None)
|
||||
case Event(BarrierFailed(b), Data(channel, Some((_, sender)))) ⇒
|
||||
sender ! Status.Failure(new RuntimeException("barrier failed: " + b))
|
||||
stay using Data(channel, None)
|
||||
case Event(ThrottleMsg(target, dir, rate), _) ⇒
|
||||
import settings.QueryTimeout
|
||||
import context.dispatcher
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue