+sam #3925 Adding Supervision for Java with Lambda Support Activator template

This commit is contained in:
Björn Antonsson 2014-03-17 14:56:22 +01:00
parent 4b57392143
commit ea6e459adf
13 changed files with 952 additions and 12 deletions

View file

@ -0,0 +1,75 @@
package supervision;
import akka.actor.*;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.util.HashMap;
import java.util.Map;
import static supervision.FlakyExpressionCalculator.FlakinessException;
import static supervision.FlakyExpressionCalculator.Result;
import static supervision.FlakyExpressionCalculator.Position.Left;
// A very simple service that accepts arithmetic expressions and tries to
// evaluate them. Since the calculation is dangerous (at least for the sake
// of this example) it is delegated to a worker actor of type
// FlakyExpressionCalculator.
class ArithmeticService extends AbstractLoggingActor {
// Map of workers to the original actors requesting the calculation
Map<ActorRef, ActorRef> pendingWorkers = new HashMap<>();
private SupervisorStrategy strategy = new OneForOneStrategy(false, DeciderBuilder.
match(FlakinessException.class, e -> {
log().warning("Evaluation of a top level expression failed, restarting.");
return SupervisorStrategy.restart();
}).
match(ArithmeticException.class, e -> {
log().error("Evaluation failed because of: {}", e.getMessage());
notifyConsumerFailure(sender(), e);
return SupervisorStrategy.stop();
}).
match(Throwable.class, e -> {
log().error("Unexpected failure: {}", e.getMessage());
notifyConsumerFailure(sender(), e);
return SupervisorStrategy.stop();
}).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
private void notifyConsumerFailure(ActorRef worker, Throwable failure) {
// Status.Failure is a message type provided by the Akka library. The
// reason why it is used is because it is recognized by the "ask" pattern
// and the Future returned by ask will fail with the provided exception.
ActorRef pending = pendingWorkers.get(worker);
if (pending != null) {
pending.tell(new Status.Failure(failure), self());
pendingWorkers.remove(worker);
}
}
private void notifyConsumerSuccess(ActorRef worker, Integer result) {
ActorRef pending = pendingWorkers.get(worker);
if (pending != null) {
pending.tell(result, self());
pendingWorkers.remove(worker);
}
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Expression.class, expr -> {
// We delegate the dangerous task of calculation to a worker, passing the
// expression as a constructor argument to the actor.
ActorRef worker = context().actorOf(FlakyExpressionCalculator.props(expr, Left));
pendingWorkers.put(worker, sender());
}).
match(Result.class, r -> notifyConsumerSuccess(sender(), r.getValue())).build();
}
}

View file

@ -0,0 +1,122 @@
package supervision;
// Represents an arithmetic expression involving integer numbers
public interface Expression {
public Expression getLeft();
public Expression getRight();
// Basic arithmetic operations that are supported by the ArithmeticService. Every
// operation except the constant value has a left and right side. For example
// the addition in (3 * 2) + (6 * 6) has the left side (3 * 2) and the right
// side (6 * 6).
public static abstract class AbstractExpression implements Expression {
private final Expression left;
private final Expression right;
private final String operator;
protected AbstractExpression(Expression left, Expression right, String operator) {
this.left = left;
this.right = right;
this.operator = operator;
}
public Expression getLeft() {
return left;
}
public Expression getRight() {
return right;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AbstractExpression)) return false;
AbstractExpression that = (AbstractExpression) o;
if (!left.equals(that.left)) return false;
if (!operator.equals(that.operator)) return false;
if (!right.equals(that.right)) return false;
return true;
}
@Override
public int hashCode() {
int result = left.hashCode();
result = 31 * result + right.hashCode();
result = 31 * result + operator.hashCode();
return result;
}
@Override
public String toString() {
return "(" + getLeft() + " " + operator + " " + getRight() + ")";
}
}
public static final class Add extends AbstractExpression {
public Add(Expression left, Expression right) {
super(left, right, "+");
}
}
public static final class Multiply extends AbstractExpression {
public Multiply(Expression left, Expression right) {
super(left, right, "*");
}
}
public static final class Divide extends AbstractExpression {
public Divide(Expression left, Expression right) {
super(left, right, "/");
}
}
public static final class Const implements Expression{
private final int value;
public Const(int value) {
this.value = value;
}
@Override
public Expression getLeft() {
return this;
}
@Override
public Expression getRight() {
return this;
}
public int getValue() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Const)) return false;
Const aConst = (Const) o;
if (value != aConst.value) return false;
return true;
}
@Override
public int hashCode() {
return value;
}
@Override
public String toString() {
return String.valueOf(value);
}
}
}

View file

@ -0,0 +1,148 @@
package supervision;
import akka.actor.*;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static supervision.Expression.*;
import static supervision.FlakyExpressionCalculator.Position.*;
public class FlakyExpressionCalculator extends AbstractLoggingActor {
public static Props props(Expression expr, Position position) {
return Props.create(FlakyExpressionCalculator.class, expr, position);
}
// Encodes the original position of a sub-expression in its parent expression
// Example: (4 / 2) has position Left in the original expression (4 / 2) * 3
public static enum Position {
Left, Right
}
public static class Result {
private final Expression originalExpression;
private final Integer value;
private final Position position;
public Result(Expression originalExpression, Integer value, Position position) {
this.originalExpression = originalExpression;
this.value = value;
this.position = position;
}
public Expression getOriginalExpression() {
return originalExpression;
}
public Integer getValue() {
return value;
}
public Position getPosition() {
return position;
}
}
public static class FlakinessException extends RuntimeException {
static final long serialVersionUID = 1;
public FlakinessException() {
super("Flakiness");
}
}
// This actor has the sole purpose of calculating a given expression and
// return the result to its parent. It takes an additional argument,
// myPosition, which is used to signal the parent which side of its
// expression has been calculated.
private final Expression expr;
private final Position myPosition;
public FlakyExpressionCalculator(Expression expr, Position myPosition) {
this.expr = expr;
this.myPosition = myPosition;
}
private Expression getExpr() {
return expr;
}
private SupervisorStrategy strategy = new OneForOneStrategy(false, DeciderBuilder.
match(FlakinessException.class, e -> {
log().warning("Evaluation of {} failed, restarting.", getExpr());
return SupervisorStrategy.restart();
}).
matchAny(e -> SupervisorStrategy.escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
// The value of these variables will be reinitialized after every restart.
// The only stable data the actor has during restarts is those embedded in
// the Props when it was created. In this case expr, and myPosition.
Map<Position, Integer> results = new HashMap<>();
Set<Position> expected = Stream.of(Left, Right).collect(Collectors.toSet());
@Override
public void preStart() {
if (expr instanceof Const) {
Integer value = ((Const) expr).getValue();
context().parent().tell(new Result(expr, value, myPosition), self());
// Don't forget to stop the actor after it has nothing more to do
context().stop(self());
}
else {
context().actorOf(FlakyExpressionCalculator.props(expr.getLeft(), Left), "left");
context().actorOf(FlakyExpressionCalculator.props(expr.getRight(), Right), "right");
}
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Result.class, r -> expected.contains(r.getPosition()), r -> {
expected.remove(r.getPosition());
results.put(r.getPosition(), r.getValue());
if (results.size() == 2) {
// Sometimes we fail to calculate
flakiness();
Integer result = evaluate(expr, results.get(Left),results.get(Right));
log().info("Evaluated expression {} to value {}", expr, result);
context().parent().tell(new Result(expr, result, myPosition), self());
// Don't forget to stop the actor after it has nothing more to do
context().stop(self());
}
}).match(Result.class, r -> {
throw new IllegalStateException("Expected results for positions " +
expected.stream().map(Object::toString).collect(Collectors.joining(", ")) +
" but got position " + r.getPosition());
}).build();
}
private Integer evaluate(Expression expr, Integer left, Integer right) {
if (expr instanceof Add) {
return left + right;
} else if( expr instanceof Multiply) {
return left * right;
} else if (expr instanceof Divide) {
return left / right;
} else {
throw new IllegalStateException("Unknown expression type " + expr.getClass());
}
}
private void flakiness() throws FlakinessException {
if (ThreadLocalRandom.current().nextDouble() < 0.2)
throw new FlakinessException();
}
}

View file

@ -0,0 +1,39 @@
package supervision;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ActorSystem;
import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import static supervision.Expression.*;
import static akka.pattern.Patterns.ask;
import static akka.japi.Util.classTag;
public class Main {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("calculator-system");
ActorRef calculatorService =
system.actorOf(Props.create(ArithmeticService.class), "arithmetic-service");
// (3 + 5) / (2 * (1 + 1))
Expression task = new Divide(
new Add(new Const(3), new Const(5)),
new Multiply(
new Const(2),
new Add(new Const(1), new Const(1))
)
);
FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
Integer result = Await.result(ask(calculatorService, task, new Timeout(duration)).mapTo(classTag(Integer.class)), duration);
System.out.println("Got result: " + result);
system.shutdown();
system.awaitTermination();
}
}

View file

@ -0,0 +1,97 @@
package supervision;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.stream.IntStream;
import static supervision.Expression.*;
public class ArithmeticServiceTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("BuncherTest");
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void TheArithmeticServiceShouldCalculateConstantExpressionsProperly(){
new JavaTestKit(system) {{
final ActorRef service =
system.actorOf(Props.create(ArithmeticService.class));
final ActorRef probe = getRef();
IntStream.range(-2, 3).forEach(x -> {
service.tell(new Const(x), probe);
expectMsgEquals(x);
});
}};
}
@Test
public void TheArithmeticServiceShouldCalculateAdditionProperly(){
new JavaTestKit(system) {{
final ActorRef service =
system.actorOf(Props.create(ArithmeticService.class));
final ActorRef probe = getRef();
IntStream.range(-2, 3).forEach(x ->
IntStream.range(-2, 3).forEach(y -> {
service.tell(new Add(new Const(x), new Const(y)), probe);
expectMsgEquals(x + y);
})
);
}};
}
@Test
public void TheArithmeticServiceShouldCalculateMultiplicationAndDivisionProperly(){
new JavaTestKit(system) {{
final ActorRef service =
system.actorOf(Props.create(ArithmeticService.class));
final ActorRef probe = getRef();
IntStream.range(-2, 3).forEach(x ->
IntStream.range(-2, 3).forEach(y -> {
service.tell(new Multiply(new Const(x), new Const(y)), probe);
expectMsgEquals(x * y);
})
);
// Skip zero in the second parameter
IntStream.range(-2, 3).forEach(x ->
IntStream.of(-2, -1, 1, 2).forEach(y -> {
service.tell(new Divide(new Const(x), new Const(y)), probe);
expectMsgEquals(x / y);
})
);
}};
}
@Test
public void TheArithmeticServiceShouldSurviveIllegalExpressions(){
new JavaTestKit(system) {{
final ActorRef service =
system.actorOf(Props.create(ArithmeticService.class));
final ActorRef probe = getRef();
service.tell(new Divide(new Const(1), new Const(0)), probe);
expectMsgClass(Status.Failure.class);
service.tell(new Add(null, new Const(0)), probe);
expectMsgClass(Status.Failure.class);
service.tell(new Add(new Const(1), new Const(0)), probe);
expectMsgEquals(1);
}};
}
}