diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala new file mode 100644 index 0000000000..35f55d703d --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.testkit._ +import akka.util.duration._ +import org.scalatest.BeforeAndAfter +import akka.dispatch.{ Promise, Await, Future } + +class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { + + @volatile + var breakers: BreakerState = null + + class BreakerState { + + val halfOpenLatch = new TestLatch(1) + + val breaker = new CircuitBreaker(system.scheduler, 5, 100.millis.dilated, 500.millis.dilated) + .onHalfOpen(halfOpenLatch.countDown()) + + } + + before { + breakers = new BreakerState() + } + + def unreliableCall(param: String) = { + param match { + case "fail" ⇒ throw new RuntimeException("FAIL") + case _ ⇒ param + } + } + + def openBreaker: Unit = { + for (i ← 1 to 5) + Await.result(breakers.breaker.withCircuitBreaker(Future(unreliableCall("fail"))) recoverWith { + case _ ⇒ Promise.successful("OK") + }, 1.second.dilated) + } + + "A circuit breaker being called by many threads" must { + "allow many calls while in closed state with no errors" in { + + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { Thread.sleep(10); unreliableCall("succeed") }) + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be(100) + result.distinct.size must be(1) + result.distinct must contain("succeed") + + } + + "transition to open state upon reaching failure limit and fail-fast" in { + + openBreaker + + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("success") + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + } + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be(100) + result.distinct.size must be(1) + result.distinct must contain("CBO") + } + + "allow a single call through in half-open state" in { + openBreaker + + Await.ready(breakers.halfOpenLatch, 2.seconds.dilated) + + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("succeed") + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + } + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be(100) + result.distinct.size must be(2) + result.distinct must contain("succeed") + result.distinct must contain("CBO") + } + + "recover and reset the breaker after the reset timeout" in { + openBreaker + + Await.ready(breakers.halfOpenLatch, 2.seconds.dilated) + + Await.ready(breakers.breaker.withCircuitBreaker(Future(unreliableCall("succeed"))), 1.second.dilated) + + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("succeed") + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + } + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be(100) + result.distinct.size must be(1) + result.distinct must contain("succeed") + } + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala new file mode 100644 index 0000000000..2c2a07ee3f --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -0,0 +1,243 @@ + +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern + +import akka.util.duration._ +import akka.testkit._ +import org.scalatest.BeforeAndAfter +import akka.dispatch.Future +import akka.dispatch.Await + +object CircuitBreakerSpec { + + class TestException extends RuntimeException + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { + + import CircuitBreakerSpec.TestException + + val awaitTimeout = 2.seconds.dilated + + @volatile + var breakers: TestCircuitBreakers = null + + class TestCircuitBreakers { + val halfOpenLatch = new TestLatch(1) + val openLatch = new TestLatch(1) + val closedLatch = new TestLatch(1) + + val shortCallTimeoutCb = new CircuitBreaker(system.scheduler, 1, 50.millis.dilated, 500.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val shortResetTimeoutCb = new CircuitBreaker(system.scheduler, 1, 1000.millis.dilated, 50.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val longCallTimeoutCb = new CircuitBreaker(system.scheduler, 1, 5 seconds, 500.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val longResetTimeoutCb = new CircuitBreaker(system.scheduler, 1, 100.millis.dilated, 5 seconds) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val multiFailureCb = new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + } + + before { + breakers = new TestCircuitBreakers + } + + def checkLatch(latch: TestLatch) { + Await.ready(latch, awaitTimeout) + } + + def throwException = throw new TestException + + def sayHi = "hi" + + "A synchronous circuit breaker that is open" must { + "throw exceptions when called before reset timeout" in { + + intercept[TestException] { + breakers.longResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.openLatch) + + intercept[CircuitBreakerOpenException] { + breakers.longResetTimeoutCb.withSyncCircuitBreaker(sayHi) + } + } + + "transition to half-open on reset timeout" in { + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.halfOpenLatch) + } + } + + "A synchronous circuit breaker that is half-open" must { + "pass through next call and close on success" in { + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.halfOpenLatch) + assert("hi" == breakers.shortResetTimeoutCb.withSyncCircuitBreaker(sayHi)) + checkLatch(breakers.closedLatch) + } + + "open on exception in call" in { + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.halfOpenLatch) + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.openLatch) + } + } + + "A synchronous circuit breaker that is closed" must { + "allow calls through" in { + breakers.longCallTimeoutCb.withSyncCircuitBreaker(sayHi) must be("hi") + } + + "increment failure count on failure" in { + intercept[TestException] { + breakers.longCallTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.openLatch) + breakers.longCallTimeoutCb.currentFailureCount must be(1) + } + + "reset failure count after success" in { + intercept[TestException] { + breakers.multiFailureCb.withSyncCircuitBreaker(throwException) + } + + breakers.multiFailureCb.currentFailureCount must be(1) + breakers.multiFailureCb.withSyncCircuitBreaker(sayHi) + breakers.multiFailureCb.currentFailureCount must be(0) + } + + "increment failure count on callTimeout" in { + breakers.shortCallTimeoutCb.withSyncCircuitBreaker({ + 100.millis.dilated.sleep() + }) + breakers.shortCallTimeoutCb.currentFailureCount must be(1) + } + } + + "An asynchronous circuit breaker that is open" must { + "throw exceptions when called before reset timeout" in { + breakers.longResetTimeoutCb.withCircuitBreaker(Future(throwException)) + + checkLatch(breakers.openLatch) + + intercept[CircuitBreakerOpenException] { + Await.result( + breakers.longResetTimeoutCb.withCircuitBreaker(Future(sayHi)), + awaitTimeout) + } + } + + "transition to half-open on reset timeout" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + } + } + + "An asynchronous circuit breaker that is half-open" must { + "pass through next call and close on success" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + + Await.result( + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(sayHi)), + awaitTimeout) must be("hi") + checkLatch(breakers.closedLatch) + } + + "re-open on exception in call" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + + intercept[TestException] { + Await.result( + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)), + awaitTimeout) + } + checkLatch(breakers.openLatch) + } + + "re-open on async failure" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.openLatch) + } + } + + "An asynchronous circuit breaker that is closed" must { + "allow calls through" in { + Await.result( + breakers.longCallTimeoutCb.withCircuitBreaker(Future(sayHi)), + awaitTimeout) must be("hi") + } + + "increment failure count on exception" in { + intercept[TestException] { + Await.result( + breakers.longCallTimeoutCb.withCircuitBreaker(Future(throwException)), + awaitTimeout) + } + checkLatch(breakers.openLatch) + breakers.longCallTimeoutCb.currentFailureCount must be(1) + } + + "increment failure count on async failure" in { + breakers.longCallTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.openLatch) + breakers.longCallTimeoutCb.currentFailureCount must be(1) + } + + "reset failure count after success" in { + breakers.multiFailureCb.withCircuitBreaker(Future(sayHi)) + val latch = TestLatch(4) + for (n ← 1 to 4) breakers.multiFailureCb.withCircuitBreaker(Future(throwException)) + awaitCond(breakers.multiFailureCb.currentFailureCount == 4, awaitTimeout) + breakers.multiFailureCb.withCircuitBreaker(Future(sayHi)) + awaitCond(breakers.multiFailureCb.currentFailureCount == 0, awaitTimeout) + } + + "increment failure count on callTimeout" in { + breakers.shortCallTimeoutCb.withCircuitBreaker { + Future { + 100.millis.dilated.sleep() + sayHi + } + } + + checkLatch(breakers.openLatch) + breakers.shortCallTimeoutCb.currentFailureCount must be(1) + } + } + +} diff --git a/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java new file mode 100644 index 0000000000..44482bb357 --- /dev/null +++ b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern; + +import akka.util.Unsafe; + +class AbstractCircuitBreaker { + protected final static long stateOffset; + + static { + try { + stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala new file mode 100644 index 0000000000..ac8fd1c5ed --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -0,0 +1,560 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean } +import akka.AkkaException +import akka.actor.Scheduler +import akka.dispatch.{ Future, ExecutionContext, Await, Promise } +import akka.util.{ Deadline, Duration, NonFatal, Unsafe } +import akka.util.duration._ +import util.control.NoStackTrace +import java.util.concurrent.{ Callable, CopyOnWriteArrayList } + +/** + * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread + */ +object CircuitBreaker { + + /** + * Synchronous execution context to run in caller's thread - used by companion object factory methods + */ + private[CircuitBreaker] val syncExecutionContext = new ExecutionContext { + def execute(runnable: Runnable): Unit = runnable.run() + + def reportFailure(t: Throwable): Unit = () + } + + /** + * Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed + * in Future when using withCircuitBreaker. To use another ExecutionContext for the callbacks you can specify the + * executor in the constructor. + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit + */ + def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker = + new CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(syncExecutionContext) + + /** + * Java API alias for apply + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit + */ + def create(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker = + apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) +} + +/** + * Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to + * remote systems + * + * Transitions through three states: + * - In *Closed* state, calls pass through until the `maxFailures` count is reached. This causes the circuit breaker + * to open. Both exceptions and calls exceeding `callTimeout` are considered failures. + * - In *Open* state, calls fail-fast with an exception. After `resetTimeout`, circuit breaker transitions to + * half-open state. + * - In *Half-Open* state, the first call will be allowed through, if it succeeds the circuit breaker will reset to + * closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that + * execute while the first is running will fail-fast with an exception. + * + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit + * @param executor [[akka.dispatch.ExecutionContext]] used for execution of state transition listeners + */ +class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { + + def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) = { + this(scheduler, maxFailures, callTimeout, resetTimeout)(executor) + } + + /** + * Holds reference to current state of CircuitBreaker - *access only via helper methods* + */ + @volatile + private[this] var _currentStateDoNotCallMeDirectly: State = Closed + + /** + * Helper method for access to underlying state via Unsafe + * + * @param oldState Previous state on transition + * @param newState Next state on transition + * @return Whether the previous state matched correctly + */ + @inline + private[this] def swapState(oldState: State, newState: State): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.stateOffset, oldState, newState) + + /** + * Helper method for accessing underlying state via Unsafe + * + * @return Reference to current state + */ + @inline + private[this] def currentState: State = + Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State] + + /** + * Wraps invocations of asynchronous calls that need to be protected + * + * @param body Call needing protected + * @tparam T return type from call + * @return [[akka.dispatch.Future]] containing the call result + */ + def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = { + currentState.invoke(body) + } + + /** + * Java API for withCircuitBreaker + * + * @param body Call needing protected + * @tparam T return type from call + * @return [[akka.dispatch.Future]] containing the call result + */ + def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = { + withCircuitBreaker(body.call) + } + + /** + * Wraps invocations of synchronous calls that need to be protected + * + * Calls are run in caller's thread + * + * @param body Call needing protected + * @tparam T return type from call + * @return The result of the call + */ + def withSyncCircuitBreaker[T](body: ⇒ T): T = { + Await.result(withCircuitBreaker( + { + try + Promise.successful(body)(CircuitBreaker.syncExecutionContext) + catch { + case NonFatal(t) ⇒ Promise.failed(t)(CircuitBreaker.syncExecutionContext) + } + }), callTimeout) + } + + /** + * Java API for withSyncCircuitBreaker + * + * @param body Call needing protected + * @tparam T return type from call + * @return The result of the call + */ + + def callWithSyncCircuitBreaker[T](body: Callable[T]): T = { + withSyncCircuitBreaker(body.call) + } + + /** + * Adds a callback to execute when circuit breaker opens + * + * The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor. + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onOpen[T](callback: ⇒ T): CircuitBreaker = { + Open.addListener(() ⇒ callback) + this + } + + /** + * Java API for onOpen + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onOpen[T](callback: Callable[T]): CircuitBreaker = { + onOpen(callback.call) + } + + /** + * Adds a callback to execute when circuit breaker transitions to half-open + * + * The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor. + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onHalfOpen[T](callback: ⇒ T): CircuitBreaker = { + HalfOpen.addListener(() ⇒ callback) + this + } + + /** + * JavaAPI for onHalfOpen + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = { + onHalfOpen(callback.call) + } + + /** + * Adds a callback to execute when circuit breaker state closes + * + * The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor. + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onClose[T](callback: ⇒ T): CircuitBreaker = { + Closed.addListener(() ⇒ callback) + this + } + + /** + * JavaAPI for onClose + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onClose[T](callback: Callable[T]): CircuitBreaker = { + onClose(callback.call) + } + + /** + * Retrieves current failure count. + * + * @return count + */ + private[akka] def currentFailureCount: Int = Closed.get + + /** + * Implements consistent transition between states + * + * @param fromState State being transitioning from + * @param toState State being transitioning from + * @throws IllegalStateException if an invalid transition is attempted + */ + private def transition(fromState: State, toState: State): Unit = { + if (swapState(fromState, toState)) + toState.enter() + else + throw new IllegalStateException("Illegal transition attempted from: " + fromState + " to " + toState) + } + + /** + * Trips breaker to an open state. This is valid from Closed or Half-Open states. + * + * @param fromState State we're coming from (Closed or Half-Open) + */ + private def tripBreaker(fromState: State): Unit = { + transition(fromState, Open) + } + + /** + * Resets breaker to a closed state. This is valid from an Half-Open state only. + * + */ + private def resetBreaker(): Unit = { + transition(HalfOpen, Closed) + } + + /** + * Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only. + * + */ + private def attemptReset(): Unit = { + transition(Open, HalfOpen) + } + + /** + * Internal state abstraction + */ + private sealed trait State { + private val listeners = new CopyOnWriteArrayList[() ⇒ _] + + /** + * Add a listener function which is invoked on state entry + * + * @param listener listener implementation + * @tparam T return type of listener, not used - but supplied for type inference purposes + */ + def addListener[T](listener: () ⇒ T) { + listeners add listener + } + + /** + * Test for whether listeners exist + * + * @return whether listeners exist + */ + private def hasListeners: Boolean = !listeners.isEmpty + + /** + * Notifies the listeners of the transition event via a Future executed in implicit parameter ExecutionContext + * + * @return Promise which executes listener in supplied [[akka.dispatch.ExecutionContext]] + */ + protected def notifyTransitionListeners() { + if (hasListeners) { + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + //FIXME per @viktorklang: it's a bit wasteful to create Futures for one-offs, just use EC.execute instead + Future(listener()) + } + } + } + + /** + * Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed + * call timeout is counted as a failed call, otherwise a successful call + * + * @param body Implementation of the call + * @tparam T Return type of the call's implementation + * @return Future containing the result of the call + */ + def callThrough[T](body: ⇒ Future[T]): Future[T] = { + val deadline = callTimeout.fromNow + val bodyFuture = try body catch { + case NonFatal(t) ⇒ Promise.failed(t) + } + bodyFuture onFailure { + case _ ⇒ callFails() + } onSuccess { + case _ ⇒ + if (deadline.isOverdue()) callFails() + else callSucceeds() + } + } + + /** + * Abstract entry point for all states + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + def invoke[T](body: ⇒ Future[T]): Future[T] + + /** + * Invoked when call succeeds + * + */ + def callSucceeds(): Unit + + /** + * Invoked when call fails + * + */ + def callFails(): Unit + + /** + * Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template + * method _enter + * + */ + final def enter(): Unit = { + _enter() + notifyTransitionListeners() + } + + /** + * Template method for concrete traits + * + */ + def _enter(): Unit + } + + /** + * Concrete implementation of Closed state + */ + private object Closed extends AtomicInteger with State { + + /** + * Implementation of invoke, which simply attempts the call + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + override def invoke[T](body: ⇒ Future[T]): Future[T] = { + callThrough(body) + } + + /** + * On successful call, the failure count is reset to 0 + * + * @return + */ + override def callSucceeds(): Unit = { set(0) } + + /** + * On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and + * the breaker is tripped if we have reached maxFailures. + * + * @return + */ + override def callFails(): Unit = { + if (incrementAndGet() == maxFailures) tripBreaker(Closed) + } + + /** + * On entry of this state, failure count is reset. + * + * @return + */ + override def _enter(): Unit = { + set(0) + } + + /** + * Override for more descriptive toString + * + * @return + */ + override def toString: String = { + "Closed with failure count = " + get() + } + } + + /** + * Concrete implementation of half-open state + */ + private object HalfOpen extends AtomicBoolean(true) with State { + + /** + * Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. + * If the call succeeds the breaker closes. + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + override def invoke[T](body: ⇒ Future[T]): Future[T] = { + if (compareAndSet(true, false)) + callThrough(body) + else + Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)) + } + + /** + * Reset breaker on successful call. + * + * @return + */ + override def callSucceeds(): Unit = { resetBreaker() } + + /** + * Reopen breaker on failed call. + * + * @return + */ + override def callFails(): Unit = { tripBreaker(HalfOpen) } + + /** + * On entry, guard should be reset for that first call to get in + * + * @return + */ + override def _enter(): Unit = { + set(true) + } + + /** + * Override for more descriptive toString + * + * @return + */ + override def toString: String = { + "Half-Open currently testing call for success = " + get() + } + } + + /** + * Concrete implementation of Open state + */ + private object Open extends AtomicLong with State { + + /** + * Fail-fast on any invocation + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + override def invoke[T](body: ⇒ Future[T]): Future[T] = { + Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)) + } + + /** + * Calculate remaining timeout to inform the caller in case a backoff algorithm is useful + * + * @return [[akka.util.Deadline]] to when the breaker will attempt a reset by transitioning to half-open + */ + private def remainingTimeout(): Deadline = get match { + case 0L ⇒ Deadline.now + case t ⇒ (t.millis + resetTimeout).fromNow + } + + /** + * No-op for open, calls are never executed so cannot succeed or fail + * + * @return + */ + override def callSucceeds(): Unit = {} + + /** + * No-op for open, calls are never executed so cannot succeed or fail + * + * @return + */ + override def callFails(): Unit = {} + + /** + * On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to + * calculate remaining time before attempted reset. + * + * @return + */ + override def _enter(): Unit = { + set(System.currentTimeMillis) + scheduler.scheduleOnce(resetTimeout) { + attemptReset() + } + } + + /** + * Override for more descriptive toString + * + * @return + */ + override def toString: String = { + "Open" + } + } + +} + +/** + * Exception thrown when Circuit Breaker is open. + * + * @param remainingDuration Stores remaining time before attempting a reset. Zero duration means the breaker is + * currently in half-open state. + * @param message Defaults to "Circuit Breaker is open; calls are failing fast" + */ +class CircuitBreakerOpenException( + val remainingDuration: Duration, + message: String = "Circuit Breaker is open; calls are failing fast") + extends AkkaException(message) with NoStackTrace diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c21bcf50c2..c5ad773989 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -18,7 +18,7 @@ import akka.ConfigurationException import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException -import java.security.SecureRandom +import akka.jsr166y.ThreadLocalRandom import java.lang.management.ManagementFactory import javax.management._ @@ -58,11 +58,6 @@ object ClusterAction { */ case class Join(address: Address) extends ClusterMessage - /** - * Command to set a node to Up (from Joining). - */ - case class Up(address: Address) extends ClusterMessage - /** * Command to leave the cluster. */ @@ -73,15 +68,16 @@ object ClusterAction { */ case class Down(address: Address) extends ClusterMessage - /** - * Command to mark a node to be removed from the cluster immediately. - */ - case class Exit(address: Address) extends ClusterMessage - /** * Command to remove a node from the cluster immediately. */ case class Remove(address: Address) extends ClusterMessage + + /** + * Command to mark a node to be removed from the cluster immediately. + * Can only be sent by the leader. + */ + private[akka] case class Exit(address: Address) extends ClusterMessage } /** @@ -158,12 +154,10 @@ object MemberStatus { case object Down extends MemberStatus case object Removed extends MemberStatus - def isUnavailable(status: MemberStatus): Boolean = { - status == MemberStatus.Down || - status == MemberStatus.Exiting || - status == MemberStatus.Removed || - status == MemberStatus.Leaving - } + /** + * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. + */ + def isUnavailable(status: MemberStatus): Boolean = status == MemberStatus.Down || status == MemberStatus.Removed } /** @@ -203,7 +197,7 @@ case class Gossip( } /** - * Marks the gossip as seen by this node (remoteAddress) by updating the address entry in the 'gossip.overview.seen' + * Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen' * Map with the VectorClock for the new gossip. */ def seen(address: Address): Gossip = { @@ -266,7 +260,6 @@ final class ClusterCommandDaemon extends Actor { def receive = { case Join(address) ⇒ cluster.joining(address) - case Up(address) ⇒ cluster.up(address) case Down(address) ⇒ cluster.downing(address) case Leave(address) ⇒ cluster.leaving(address) case Exit(address) ⇒ cluster.exiting(address) @@ -380,11 +373,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val remoteSettings = new RemoteSettings(system.settings.config, system.name) val clusterSettings = new ClusterSettings(system.settings.config, system.name) - val remoteAddress = remote.transport.address + val selfAddress = remote.transport.address val failureDetector = new AccrualFailureDetector( - system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) - private val vclockNode = VectorClock.Node(remoteAddress.toString) + private val vclockNode = VectorClock.Node(selfAddress.toString) private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay private val gossipFrequency = clusterSettings.GossipFrequency @@ -396,18 +389,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val autoDown = clusterSettings.AutoDown private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons - private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) + private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != selfAddress) private val serialization = remote.serialization private val isRunning = new AtomicBoolean(true) private val log = Logging(system, "Node") - private val random = SecureRandom.getInstance("SHA1PRNG") private val mBeanServer = ManagementFactory.getPlatformMBeanServer private val clusterMBeanName = new ObjectName("akka:type=Cluster") - log.info("Cluster Node [{}] - is starting up...", remoteAddress) + log.info("Cluster Node [{}] - is starting up...", selfAddress) // create superisor for daemons under path "/system/cluster" private val clusterDaemons = { @@ -419,7 +411,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } private val state = { - val member = Member(remoteAddress, MemberStatus.Joining) + val member = Member(selfAddress, MemberStatus.Joining) val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock new AtomicReference[State](State(gossip)) } @@ -448,15 +440,22 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ createMBean() - log.info("Cluster Node [{}] - has started up successfully", remoteAddress) + log.info("Cluster Node [{}] - has started up successfully", selfAddress) // ====================================================== // ===================== PUBLIC API ===================== // ====================================================== - def self: Member = latestGossip.members - .find(_.address == remoteAddress) - .getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + remoteAddress + ") in the cluster membership ring")) + def self: Member = { + val gossip = latestGossip + gossip.members + .find(_.address == selfAddress) + .getOrElse { + gossip.overview.unreachable + .find(_.address == selfAddress) + .getOrElse(throw new IllegalStateException("Can't find 'this' Member [" + selfAddress + "] in the cluster membership ring or in the unreachable set")) + } + } /** * Latest gossip. @@ -473,7 +472,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ */ def isLeader: Boolean = { val members = latestGossip.members - !members.isEmpty && (remoteAddress == members.head.address) + members.nonEmpty && (selfAddress == members.head.address) } /** @@ -501,9 +500,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. */ - def shutdown() { + def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { - log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress) + log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) gossipCanceller.cancel() failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() @@ -520,7 +519,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Registers a listener to subscribe to cluster membership changes. */ @tailrec - final def registerListener(listener: MembershipChangeListener) { + final def registerListener(listener: MembershipChangeListener): Unit = { val localState = state.get val newListeners = localState.memberMembershipChangeListeners + listener val newState = localState copy (memberMembershipChangeListeners = newListeners) @@ -531,7 +530,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Unsubscribes to cluster membership changes. */ @tailrec - final def unregisterListener(listener: MembershipChangeListener) { + final def unregisterListener(listener: MembershipChangeListener): Unit = { val localState = state.get val newListeners = localState.memberMembershipChangeListeners - listener val newState = localState copy (memberMembershipChangeListeners = newListeners) @@ -542,31 +541,31 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. */ - def join(address: Address) { + def join(address: Address): Unit = { val connection = clusterCommandConnectionFor(address) - val command = ClusterAction.Join(remoteAddress) - log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", remoteAddress, address, connection) + val command = ClusterAction.Join(selfAddress) + log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) connection ! command } /** * Send command to issue state transition to LEAVING for the node specified by 'address'. */ - def leave(address: Address) { + def leave(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Leave(address) } /** * Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'. */ - def down(address: Address) { + def down(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Down(address) } /** * Send command to issue state transition to REMOVED for the node specified by 'address'. */ - def remove(address: Address) { + def remove(address: Address): Unit = { clusterCommandDaemon ! ClusterAction.Remove(address) } @@ -579,8 +578,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * New node joining. */ @tailrec - private[cluster] final def joining(node: Address) { - log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node) + private[cluster] final def joining(node: Address): Unit = { + log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) val localState = state.get val localGossip = localState.latestGossip @@ -596,45 +595,60 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { - failureDetector heartbeat node // update heartbeat in failure detector + if (node != selfAddress) failureDetector heartbeat node + if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newMembers } } } } - /** - * State transition to UP. - */ - private[cluster] final def up(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address) - } - /** * State transition to LEAVING. */ + @tailrec private[cluster] final def leaving(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address) + log.info("Cluster Node [{}] - Marking address [{}] as LEAVING", selfAddress, address) + + val localState = state.get + val localGossip = localState.latestGossip + val localMembers = localGossip.members + + val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING + val newGossip = localGossip copy (members = newMembers) + + val versionedGossip = newGossip + vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress + + val newState = localState copy (latestGossip = seenVersionedGossip) + + if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update + else { + failureDetector heartbeat address // update heartbeat in failure detector + if (convergence(newState.latestGossip).isDefined) { + newState.memberMembershipChangeListeners foreach { _ notify newMembers } + } + } } /** * State transition to EXITING. */ - private[cluster] final def exiting(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address) + private[cluster] final def exiting(address: Address): Unit = { + log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address) } /** * State transition to REMOVED. */ - private[cluster] final def removing(address: Address) { - log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address) + private[cluster] final def removing(address: Address): Unit = { + log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address) } /** @@ -645,7 +659,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * to this node and it will then go through the normal JOINING procedure. */ @tailrec - final private[cluster] def downing(address: Address) { + final private[cluster] def downing(address: Address): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -659,7 +673,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ localMembers .map { member ⇒ if (member.address == address) { - log.info("Cluster Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address) + log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address) val newMember = member copy (status = MemberStatus.Down) downedMember = Some(newMember) newMember @@ -673,7 +687,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN .map { member ⇒ if (member.address == address) { - log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address) + log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address) member copy (status = MemberStatus.Down) } else member } @@ -692,7 +706,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip val versionedGossip = newGossip + vclockNode - val newState = localState copy (latestGossip = versionedGossip seen remoteAddress) + val newState = localState copy (latestGossip = versionedGossip seen selfAddress) if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update else { @@ -706,7 +720,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Receive new gossip. */ @tailrec - final private[cluster] def receive(sender: Member, remoteGossip: Gossip) { + final private[cluster] def receive(sender: Member, remoteGossip: Gossip): Unit = { val localState = state.get val localGossip = localState.latestGossip @@ -731,14 +745,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ remoteGossip } - val newState = localState copy (latestGossip = winningGossip seen remoteAddress) + val newState = localState copy (latestGossip = winningGossip seen selfAddress) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address) + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) - failureDetector heartbeat sender.address // update heartbeat in failure detector + if (sender.address != selfAddress) failureDetector heartbeat sender.address if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } @@ -747,14 +761,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } /** - * Joins the pre-configured contact point and retrieves current gossip state. + * Joins the pre-configured contact point. */ - private def autoJoin() = nodeToJoin foreach { address ⇒ - val connection = clusterCommandConnectionFor(address) - val command = ClusterAction.Join(remoteAddress) - log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) - connection ! command - } + private def autoJoin(): Unit = nodeToJoin foreach join /** * Switches the member status. @@ -764,7 +773,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @return the updated new state with the new member status */ private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { - log.info("Cluster Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus) + log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus) val localSelf = self @@ -776,7 +785,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // change my state in 'gossip.members' val newMembersSet = localMembers map { member ⇒ - if (member.address == remoteAddress) newSelf + if (member.address == selfAddress) newSelf else member } @@ -786,7 +795,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // version my changes val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress state copy (latestGossip = seenVersionedGossip) } @@ -794,9 +803,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Gossips latest gossip to an address. */ - private def gossipTo(address: Address) { + private def gossipTo(address: Address): Unit = { val connection = clusterGossipConnectionFor(address) - log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection) + log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) connection ! GossipEnvelope(self, latestGossip) } @@ -806,10 +815,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @return 'true' if it gossiped to a "deputy" member. */ private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = { - log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", remoteAddress, addresses.mkString(", ")) + log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", ")) if (addresses.isEmpty) false else { - val peers = addresses filter (_ != remoteAddress) // filter out myself + val peers = addresses filter (_ != selfAddress) // filter out myself val peer = selectRandomNode(peers) gossipTo(peer) deputyNodes exists (peer == _) @@ -819,15 +828,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Initates a new round of gossip. */ - private def gossip() { + private def gossip(): Unit = { val localState = state.get - val localGossip = localState.latestGossip - val localMembers = localGossip.members - if (!isSingletonCluster(localState) && isAvailable(localState)) { - // only gossip if we are a non-singleton cluster and available + if (isSingletonCluster(localState)) { + // gossip to myself + // TODO could perhaps be optimized, no need to gossip to myself when Up? + gossipTo(selfAddress) - log.debug("Cluster Node [{}] - Initiating new round of gossip", remoteAddress) + } else if (isAvailable(localState)) { + log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -842,16 +852,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // 2. gossip to unreachable members if (localUnreachableSize > 0) { val probability: Double = localUnreachableSize / (localMembersSize + 1) - if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) + if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) } // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes - if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) { + if ((!gossipedToDeputy || localMembersSize < 1) && deputies.nonEmpty) { if (localMembersSize == 0) gossipToRandomNodeOf(deputies) else { val probability = 1.0 / localMembersSize + localUnreachableSize - if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies) + if (ThreadLocalRandom.current.nextDouble() <= probability) gossipToRandomNodeOf(deputies) } } } @@ -861,7 +871,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ @tailrec - final private def reapUnreachableMembers() { + final private def reapUnreachableMembers(): Unit = { val localState = state.get if (!isSingletonCluster(localState) && isAvailable(localState)) { @@ -875,7 +885,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) } - if (!newlyDetectedUnreachableMembers.isEmpty) { // we have newly detected members marked as unavailable + if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable val newMembers = localMembers diff newlyDetectedUnreachableMembers val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers @@ -885,14 +895,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // updating vclock and 'seen' table val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen remoteAddress + val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur else { - log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", ")) + log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newMembers } @@ -906,26 +916,32 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. */ @tailrec - final private def leaderActions() { + final private def leaderActions(): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members - val isLeader = !localMembers.isEmpty && (remoteAddress == localMembers.head.address) + val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) + + // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully + def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = { + true + } if (isLeader && isAvailable(localState)) { // only run the leader actions if we are the LEADER and available val localOverview = localGossip.overview val localSeen = localOverview.seen - val localUnreachableMembers = localGossip.overview.unreachable + val localUnreachableMembers = localOverview.unreachable // Leader actions are as follows: - // 1. Move JOINING => UP - // 2. Move EXITING => REMOVED - // 3. Move UNREACHABLE => DOWN (auto-downing by leader) - // 4. Updating the vclock version for the changes - // 5. Updating the 'seen' table + // 1. Move JOINING => UP -- When a node joins the cluster + // 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) + // 3. Move LEAVING => EXITING -- When all partition handoff has completed + // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader + // 5. Updating the vclock version for the changes + // 6. Updating the 'seen' table var hasChangedState = false val newGossip = @@ -934,20 +950,37 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // we have convergence - so we can't have unreachable nodes val newMembers = + localMembers map { member ⇒ - // 1. Move JOINING => UP + // ---------------------- + // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) + // ---------------------- if (member.status == MemberStatus.Joining) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address) + log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Up) } else member + } map { member ⇒ - // 2. Move EXITING => REMOVED + // ---------------------- + // 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence) + // ---------------------- if (member.status == MemberStatus.Exiting) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address) + log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Removed) } else member + + } map { member ⇒ + // ---------------------- + // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) + // ---------------------- + if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { + log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address) + hasChangedState = true + member copy (status = MemberStatus.Exiting) + } else member + } localGossip copy (members = newMembers) // update gossip @@ -955,12 +988,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // we don't have convergence - so we might have unreachable nodes // if 'auto-down' is turned on, then try to auto-down any unreachable nodes - // 3. Move UNREACHABLE => DOWN (auto-downing by leader) + // ---------------------- + // 4. Move UNREACHABLE => DOWN (auto-downing by leader) + // ---------------------- val newUnreachableMembers = localUnreachableMembers .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN .map { member ⇒ - log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address) + log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) hasChangedState = true member copy (status = MemberStatus.Down) } @@ -975,11 +1010,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (hasChangedState) { // we have a change of state - version it and try to update - // 4. Updating the vclock version for the changes + // ---------------------- + // 5. Updating the vclock version for the changes + // ---------------------- val versionedGossip = newGossip + vclockNode - // 5. Updating the 'seen' table - val seenVersionedGossip = versionedGossip seen remoteAddress + // ---------------------- + // 6. Updating the 'seen' table + // ---------------------- + val seenVersionedGossip = versionedGossip seen selfAddress val newState = localState copy (latestGossip = seenVersionedGossip) @@ -987,7 +1026,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (!state.compareAndSet(localState, newState)) leaderActions() // recur else { if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners map { _ notify newGossip.members } + newState.memberMembershipChangeListeners foreach { _ notify newGossip.members } } } } @@ -1009,12 +1048,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version - if (unreachable.isEmpty || !unreachable.exists(m ⇒ (m.status != MemberStatus.Down) && (m.status != MemberStatus.Removed))) { + if (unreachable.isEmpty || !unreachable.exists { m ⇒ + m.status != MemberStatus.Down && + m.status != MemberStatus.Removed + }) { val seen = gossip.overview.seen val views = Set.empty[VectorClock] ++ seen.values if (views.size == 1) { - log.debug("Cluster Node [{}] - Cluster convergence reached", remoteAddress) + log.debug("Cluster Node [{}] - Cluster convergence reached", selfAddress) Some(gossip) } else None } else None @@ -1027,7 +1069,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localOverview = localGossip.overview val localMembers = localGossip.members val localUnreachableMembers = localOverview.unreachable - val isUnreachable = localUnreachableMembers exists { _.address == remoteAddress } + val isUnreachable = localUnreachableMembers exists { _.address == selfAddress } val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) } isUnreachable || hasUnavailableMemberStatus } @@ -1035,7 +1077,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Looks up and returns the local cluster command connection. */ - private def clusterCommandDaemon = system.actorFor(RootActorPath(remoteAddress) / "system" / "cluster" / "commands") + private def clusterCommandDaemon = system.actorFor(RootActorPath(selfAddress) / "system" / "cluster" / "commands") /** * Looks up and returns the remote cluster command connection for the specific address. @@ -1050,9 +1092,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ - private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress) + private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != selfAddress) - private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size) + private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(ThreadLocalRandom.current nextInt addresses.size) private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1 @@ -1079,8 +1121,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val unreachable = gossip.overview.unreachable val metaData = gossip.meta "\nMembers:\n\t" + gossip.members.mkString("\n\t") + - { if (!unreachable.isEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + - { if (!metaData.isEmpty) "\nMeta Data:\t" + metaData.toString else "" } + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + + { if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" } } def getMemberStatus: String = clusterNode.status.toString @@ -1105,7 +1147,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ def shutdown() = clusterNode.shutdown() } - log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", remoteAddress, clusterMBeanName) + log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName) try { mBeanServer.registerMBean(mbean, clusterMBeanName) } catch { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index ee798d5a8a..6b0bbae22e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -43,7 +43,6 @@ class ClientDowningNodeThatIsUpSpec testConductor.enter("all-up") // mark 'third' node as DOWN - testConductor.removeNode(third) cluster.down(thirdAddress) testConductor.enter("down-third-node") @@ -56,6 +55,8 @@ class ClientDowningNodeThatIsUpSpec cluster.join(node(first).address) awaitUpConvergence(numberOfMembers = 4) testConductor.enter("all-up") + testConductor.enter("down-third-node") + testConductor.enter("await-completion") } runOn(second, fourth) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 1017c8a33a..9f1395b5dd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -4,7 +4,6 @@ package akka.cluster -import org.scalatest.BeforeAndAfter import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -19,7 +18,6 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c2 = role("c2") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - } class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec @@ -29,15 +27,11 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender { import JoinTwoClustersMultiJvmSpec._ override def initialParticipants = 6 - after { - testConductor.enter("after") - } - lazy val a1Address = node(a1).address lazy val b1Address = node(b1).address lazy val c1Address = node(c1).address @@ -67,6 +61,8 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm assertLeader(b1, b2) assertLeader(c1, c2) + testConductor.enter("two-members") + runOn(b2) { cluster.join(a1Address) } @@ -78,6 +74,8 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm assertLeader(a1, a2, b1, b2) assertLeader(c1, c2) + testConductor.enter("four-members") + } "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { @@ -89,6 +87,8 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 6) assertLeader(a1, a2, b1, b2, c1, c2) + + testConductor.enter("six-members") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index cda794fe21..63665d3c57 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -17,7 +17,7 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false). + commonConfig(debugConfig(on = true). withFallback(ConfigFactory.parseString(""" akka.cluster { auto-down = on @@ -57,7 +57,7 @@ class LeaderDowningNodeThatIsUnreachableSpec // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds.dilated) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) testConductor.enter("await-completion") } @@ -77,7 +77,7 @@ class LeaderDowningNodeThatIsUnreachableSpec testConductor.enter("down-fourth-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds.dilated) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) testConductor.enter("await-completion") } } @@ -97,7 +97,7 @@ class LeaderDowningNodeThatIsUnreachableSpec // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds.dilated) + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) testConductor.enter("await-completion") } @@ -108,7 +108,7 @@ class LeaderDowningNodeThatIsUnreachableSpec testConductor.enter("all-up") } - runOn(second, third) { + runOn(third) { cluster.join(node(first).address) awaitUpConvergence(numberOfMembers = 3) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 7053ba5b50..ba0471bedb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -49,6 +49,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp cluster.join(firstAddress) awaitUpConvergence(numberOfMembers = roles.size) cluster.isLeader must be(myself == roles.head) + assertLeaderIn(roles) } testConductor.enter("after") } @@ -58,6 +59,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp currentRoles.size must be >= (2) val leader = currentRoles.head val aUser = currentRoles.last + val remainingRoles = currentRoles.tail myself match { @@ -78,13 +80,14 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp cluster.down(leaderAddress) testConductor.enter("after-down", "completed") - case _ if currentRoles.tail.contains(myself) ⇒ + case _ if remainingRoles.contains(myself) ⇒ // remaining cluster nodes, not shutdown testConductor.enter("before-shutdown", "after-shutdown", "after-down") awaitUpConvergence(currentRoles.size - 1) - val nextExpectedLeader = currentRoles.tail.head + val nextExpectedLeader = remainingRoles.head cluster.isLeader must be(myself == nextExpectedLeader) + assertLeaderIn(remainingRoles) testConductor.enter("completed") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index ae9d3e9fb7..4d0c7f4720 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -42,15 +42,20 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address must be(a) } } + def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(myself)) { + assertLeaderIn(nodesInCluster) + } + /** * Assert that the cluster has elected the correct leader * out of all nodes in the cluster. First * member in the cluster ring is expected leader. */ - def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(myself)) { + def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { nodesInCluster.length must not be (0) val expectedLeader = roleOfLeader(nodesInCluster) cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) + cluster.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) } /** @@ -60,14 +65,15 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ def awaitUpConvergence( numberOfMembers: Int, canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], - timeout: Duration = 10.seconds.dilated): Unit = { - awaitCond(cluster.latestGossip.members.size == numberOfMembers, timeout) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up), timeout) - awaitCond(cluster.convergence.isDefined, timeout) - if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set - awaitCond( - canNotBePartOfMemberRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address))), - timeout) + timeout: Duration = 20.seconds): Unit = { + within(timeout) { + awaitCond(cluster.latestGossip.members.size == numberOfMembers) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(cluster.convergence.isDefined) + if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set + awaitCond( + canNotBePartOfMemberRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address)))) + } } def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala new file mode 100644 index 0000000000..058bfca7e9 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object NodeLeavingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + """)) + .withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec +class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec +class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec + +abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeLeavingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A node that is LEAVING a non-singleton cluster" must { + + "be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(first) { + cluster.leave(secondAddress) + } + testConductor.enter("second-left") + + runOn(first, third) { + awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) + + val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) + hasLeft must be('defined) + hasLeft.get.address must be(secondAddress) + } + + testConductor.enter("finished") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala new file mode 100644 index 0000000000..3fe9e220f6 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state + unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec + +abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeLeavingAndExitingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A node that is LEAVING a non-singleton cluster" must { + + "be moved to EXITING by the leader" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(first) { + cluster.leave(secondAddress) + } + testConductor.enter("second-left") + + runOn(first, third) { + + // 1. Verify that 'second' node is set to LEAVING + // We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a + // chance to test the LEAVING state before the leader moves the node to EXITING + awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING + val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left + hasLeft must be('defined) + hasLeft.get.address must be(secondAddress) + + // 2. Verify that 'second' node is set to EXITING + awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Exiting)) // wait on EXITING + val hasExited = cluster.latestGossip.members.find(_.status == MemberStatus.Exiting) // verify node that exited + hasExited must be('defined) + hasExited.get.address must be(secondAddress) + } + + testConductor.enter("finished") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala new file mode 100644 index 0000000000..7c1037a624 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec + +abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + val reaperWaitingTime = 30.seconds.dilated + + "A node that is LEAVING a non-singleton cluster" must { + + "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(first) { + cluster.leave(secondAddress) + } + testConductor.enter("second-left") + + runOn(first, third) { + // verify that the 'second' node is no longer part of the 'members' set + awaitCond(cluster.latestGossip.members.forall(_.address != secondAddress), reaperWaitingTime) + + // verify that the 'second' node is part of the 'unreachable' set + awaitCond(cluster.latestGossip.overview.unreachable.exists(_.status == MemberStatus.Removed), reaperWaitingTime) + + // verify node that got removed is 'second' node + val isRemoved = cluster.latestGossip.overview.unreachable.find(_.status == MemberStatus.Removed) + isRemoved must be('defined) + isRemoved.get.address must be(secondAddress) + } + + testConductor.enter("finished") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 312ac6dbe8..fecb53c898 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -37,7 +37,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp "A set of connected cluster systems" must { - "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + "(when two nodes) start gossiping to each other so that both nodes gets the same gossip info" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { @@ -57,7 +57,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp } - "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + "(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in { runOn(third) { cluster.join(firstAddress) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala new file mode 100644 index 0000000000..c0c12f4582 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeShutdownMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down = on + failure-detector.threshold = 4 + } + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class NodeShutdownMultiJvmNode1 extends NodeShutdownSpec +class NodeShutdownMultiJvmNode2 extends NodeShutdownSpec + +abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeShutdownMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + "A cluster of 2 nodes" must { + + "not be singleton cluster when joined" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second) { + cluster.join(node(first).address) + } + awaitUpConvergence(numberOfMembers = 2) + cluster.isSingletonCluster must be(false) + assertLeader(first, second) + } + + "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + testConductor.shutdown(second, 0) + testConductor.removeNode(second) + awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) + cluster.isSingletonCluster must be(true) + assertLeader(first) + } + + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index fcbcce746f..b2b98f94fa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -37,19 +37,8 @@ abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) wi "be a singleton cluster when started up" taggedAs LongRunningTest in { runOn(first) { awaitCond(cluster.isSingletonCluster) - // FIXME #2117 singletonCluster should reach convergence - //awaitCond(cluster.convergence.isDefined) - } - } - - "be in 'Joining' phase when started up" taggedAs LongRunningTest in { - runOn(first) { - val members = cluster.latestGossip.members - members.size must be(1) - - val joiningMember = members find (_.address == firstAddress) - joiningMember must not be (None) - joiningMember.get.status must be(MemberStatus.Joining) + awaitUpConvergence(numberOfMembers = 1) + assertLeader(first) } } } @@ -68,6 +57,7 @@ abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) wi } cluster.latestGossip.members.size must be(2) awaitCond(cluster.convergence.isDefined) + assertLeader(first, second) } } diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 231830cecb..fb53f13131 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -5,7 +5,8 @@ Cluster Specification ###################### -.. note:: *This document describes the new clustering coming in Akka 2.1 (not 2.0)* +.. note:: *This document describes the new clustering coming in Akka Coltrane and +is not available in the latest stable release)* Intro ===== @@ -81,16 +82,6 @@ can later explicitly send a ``Join`` message to another node to form a N-node cluster. It is also possible to link multiple N-node clusters by ``joining`` them. -Singleton Cluster ------------------ - -If a node does not have a preconfigured contact point to join in the Akka -configuration, then it is considered a singleton cluster (single node cluster) -and will automatically transition from ``joining`` to ``up``. Singleton clusters -can later explicitly send a ``Join`` message to another node to form a N-node -cluster. It is also possible to link multiple N-node clusters by ``joining`` them. - - Gossip ------ diff --git a/akka-docs/common/circuitbreaker.rst b/akka-docs/common/circuitbreaker.rst new file mode 100644 index 0000000000..bd13927c8e --- /dev/null +++ b/akka-docs/common/circuitbreaker.rst @@ -0,0 +1,130 @@ +.. _circuit-breaker: + +############### +Circuit Breaker +############### + +================== +Why are they used? +================== +A circuit breaker is used to provide stability and prevent cascading failures in distributed +systems. These should be used in conjunction with judicious timeouts at the interfaces between +remote systems to prevent the failure of a single component from bringing down all components. + +As an example, we have a web application interacting with a remote third party web service. +Let's say the third party has oversold their capacity and their database melts down under load. +Assume that the database fails in such a way that it takes a very long time to hand back an +error to the third party web service. This in turn makes calls fail after a long period of +time. Back to our web application, the users have noticed that their form submissions take +much longer seeming to hang. Well the users do what they know to do which is use the refresh +button, adding more requests to their already running requests. This eventually causes the +failure of the web application due to resource exhaustion. This will affect all users, even +those who are not using functionality dependent on this third party web service. + +Introducing circuit breakers on the web service call would cause the requests to begin to +fail-fast, letting the user know that something is wrong and that they need not refresh +their request. This also confines the failure behavior to only those users that are using +functionality dependent on the third party, other users are no longer affected as there is no +resource exhaustion. Circuit breakers can also allow savvy developers to mark portions of +the site that use the functionality unavailable, or perhaps show some cached content as +appropriate while the breaker is open. + +The Akka library provides an implementation of a circuit breaker called +:class:`akka.pattern.CircuitBreaker` which has the behavior described below. + +================= +What do they do? +================= +* During normal operation, a circuit breaker is in the `Closed` state: + * Exceptions or calls exceeding the configured `callTimeout` increment a failure counter + * Successes reset the failure count to zero + * When the failure counter reaches a `maxFailures` count, the breaker is tripped into `Open` state +* While in `Open` state: + * All calls fail-fast with a :class:`CircuitBreakerOpenException` + * After the configured `resetTimeout`, the circuit breaker enters a `Half-Open` state +* In `Half-Open` state: + * The first call attempted is allowed through without failing fast + * All other calls fail-fast with an exception just as in `Open` state + * If the first call succeeds, the breaker is reset back to `Closed` state + * If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout` +* State transition listeners: + * Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen` + * These are executed in the :class:`ExecutionContext` provided. + +.. graphviz:: + + digraph circuit_breaker { + rankdir = "LR"; + size = "6,5"; + graph [ bgcolor = "transparent" ] + node [ fontname = "Helvetica", + fontsize = 14, + shape = circle, + color = white, + style = filled ]; + edge [ fontname = "Helvetica", fontsize = 12 ] + Closed [ fillcolor = green2 ]; + "Half-Open" [fillcolor = yellow2 ]; + Open [ fillcolor = red2 ]; + Closed -> Closed [ label = "Success" ]; + "Half-Open" -> Open [ label = "Trip Breaker" ]; + "Half-Open" -> Closed [ label = "Reset Breaker" ]; + Closed -> Open [ label = "Trip Breaker" ]; + Open -> Open [ label = "Calls failing fast" ]; + Open -> "Half-Open" [ label = "Attempt Reset" ]; + } + +======== +Examples +======== + +-------------- +Initialization +-------------- + +Here's how a :class:`CircuitBreaker` would be configured for: + * 5 maximum failures + * a call timeout of 10 seconds + * a reset timeout of 1 minute + +^^^^^^^ +Scala +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala + :include: imports1,circuit-breaker-initialization + +^^^^^^^ +Java +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/DangerousJavaActor.java + :include: imports1,circuit-breaker-initialization + +--------------- +Call Protection +--------------- + +Here's how the :class:`CircuitBreaker` would be used to protect an asynchronous +call as well as a synchronous one: + +^^^^^^^ +Scala +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala + :include: circuit-breaker-usage + +^^^^^^ +Java +^^^^^^ + +.. includecode:: code/docs/circuitbreaker/DangerousJavaActor.java + :include: circuit-breaker-usage + +.. note:: + + Using the :class:`CircuitBreaker` companion object's `apply` or `create` methods + will return a :class:`CircuitBreaker` where callbacks are executed in the caller's thread. + This can be useful if the asynchronous :class:`Future` behavior is unnecessary, for + example invoking a synchronous-only API. diff --git a/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala new file mode 100644 index 0000000000..c4603017e3 --- /dev/null +++ b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.circuitbreaker + +//#imports1 +import akka.util.duration._ // small d is important here +import akka.pattern.CircuitBreaker +import akka.actor.Actor +import akka.dispatch.Future +import akka.event.Logging + +//#imports1 + +class CircuitBreakerDocSpec {} + +//#circuit-breaker-initialization +class DangerousActor extends Actor { + + val log = Logging(context.system, this) + implicit val executionContext = context.dispatcher + val breaker = + new CircuitBreaker(context.system.scheduler, 5, 10.seconds, 1.minute) + .onOpen(notifyMeOnOpen) + + def notifyMeOnOpen = + log.warning("My CircuitBreaker is now open, and will not close for one minute") + //#circuit-breaker-initialization + + //#circuit-breaker-usage + def dangerousCall: String = "This really isn't that dangerous of a call after all" + + def receive = { + case "is my middle name" ⇒ + sender ! breaker.withCircuitBreaker(Future(dangerousCall)) + case "block for me" ⇒ + sender ! breaker.withSyncCircuitBreaker(dangerousCall) + } + //#circuit-breaker-usage + +} + diff --git a/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java new file mode 100644 index 0000000000..1562338e04 --- /dev/null +++ b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.circuitbreaker; + +//#imports1 + +import akka.actor.UntypedActor; +import akka.dispatch.Future; +import akka.event.LoggingAdapter; +import akka.util.Duration; +import akka.pattern.CircuitBreaker; +import akka.event.Logging; + +import static akka.dispatch.Futures.future; + +import java.util.concurrent.Callable; + +//#imports1 + +//#circuit-breaker-initialization +public class DangerousJavaActor extends UntypedActor { + + private final CircuitBreaker breaker; + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + public DangerousJavaActor() { + this.breaker = new CircuitBreaker( + getContext().dispatcher(), getContext().system().scheduler(), + 5, Duration.parse("10s"), Duration.parse("1m")) + .onOpen(new Callable() { + public Object call() throws Exception { + notifyMeOnOpen(); + return null; + } + }); + } + + public void notifyMeOnOpen() { + log.warning("My CircuitBreaker is now open, and will not close for one minute"); + } +//#circuit-breaker-initialization + + //#circuit-breaker-usage + public String dangerousCall() { + return "This really isn't that dangerous of a call after all"; + } + + @Override + public void onReceive(Object message) { + if (message instanceof String) { + String m = (String) message; + if ("is my middle name".equals(m)) { + final Future f = future( + new Callable() { + public String call() { + return dangerousCall(); + } + }, getContext().dispatcher()); + + getSender().tell(breaker + .callWithCircuitBreaker( + new Callable>() { + public Future call() throws Exception { + return f; + } + })); + } + if ("block for me".equals(m)) { + getSender().tell(breaker + .callWithSyncCircuitBreaker( + new Callable() { + @Override + public String call() throws Exception { + return dangerousCall(); + } + })); + } + } + } +//#circuit-breaker-usage + +} \ No newline at end of file diff --git a/akka-docs/common/index.rst b/akka-docs/common/index.rst index 4e19d1a1aa..de9c7016fc 100644 --- a/akka-docs/common/index.rst +++ b/akka-docs/common/index.rst @@ -5,3 +5,4 @@ Common utilities :maxdepth: 2 duration + circuitbreaker diff --git a/akka-docs/conf.py b/akka-docs/conf.py index b632430b59..77b7c80be0 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -8,7 +8,7 @@ import sys, os # -- General configuration ----------------------------------------------------- sys.path.append(os.path.abspath('_sphinx/exts')) -extensions = ['sphinx.ext.todo', 'includecode'] +extensions = ['sphinx.ext.todo', 'includecode', 'sphinx.ext.graphviz'] templates_path = ['_templates'] source_suffix = '.rst' diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index d9aff609d8..b973a1d18c 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -185,3 +185,6 @@ External Akka Serializers `Akka-quickser by Roman Levenstein `_ + + +`Akka-kryo by Roman Levenstein `_ diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index 90bdc5616c..6ad870b309 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -99,7 +99,7 @@ Methods returning: * ``void`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell`` * ``akka.dispatch.Future`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask`` * ``scala.Option`` or ``akka.japi.Option`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer, - and return None if no answer was produced within the timout, or scala.Some/akka.japi.Some containing the result otherwise. + and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise. Any exception that was thrown during this call will be rethrown. * Any other type of value will use ``send-request-reply`` semantics, but *will* block to wait for an answer, throwing ``java.util.concurrent.TimeoutException`` if there was a timeout or rethrow any exception that was thrown during this call. diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 7df286d7f7..31a0df9674 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -370,7 +370,7 @@ specified as parameter to the ``ask`` method; this will complete the See :ref:`futures-java` for more information on how to await or query a future. -The ``onComplete``, ``onResult``, or ``onTimeout`` methods of the ``Future`` can be +The ``onComplete``, ``onSuccess``, or ``onFailure`` methods of the ``Future`` can be used to register a callback to get a notification when the Future completes. Gives you a way to avoid blocking. diff --git a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala index ac6c58ad08..b51c7bb170 100644 --- a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -50,6 +50,8 @@ import akka.dispatch.MailboxType import akka.dispatch.MessageQueue import akka.actor.mailbox.DurableMessageQueue import akka.actor.mailbox.DurableMessageSerialization +import akka.pattern.CircuitBreaker +import akka.util.duration._ class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { @@ -65,20 +67,23 @@ class MyMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { val storage = new QueueStorage + // A real-world implmentation would use configuration to set the last + // three parameters below + val breaker = CircuitBreaker(_owner.system.scheduler, 5, 30.seconds, 1.minute) - def enqueue(receiver: ActorRef, envelope: Envelope) { + def enqueue(receiver: ActorRef, envelope: Envelope): Unit = breaker.withSyncCircuitBreaker { val data: Array[Byte] = serialize(envelope) storage.push(data) } - def dequeue(): Envelope = { + def dequeue(): Envelope = breaker.withSyncCircuitBreaker { val data: Option[Array[Byte]] = storage.pull() data.map(deserialize).orNull } - def hasMessages: Boolean = !storage.isEmpty + def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty } - def numberOfMessages: Int = storage.size + def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size } /** * Called when the mailbox is disposed. diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 2a9ca174cf..5be40320d0 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -80,7 +80,9 @@ a configurator (MailboxType) and a queue implementation (DurableMessageQueue). The envelope contains the message sent to the actor, and information about sender. It is the envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala` -mechanism. This optional and you may store the envelope data in any way you like. +mechanism. This optional and you may store the envelope data in any way you like. Durable +mailboxes are an excellent fit for usage of circuit breakers. These are described in the +:ref:`circuit-breaker` documentation. .. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala :include: custom-mailbox diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 291d06e567..4a556cf6c2 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -415,7 +415,7 @@ taken from one of the following locations in order of precedence: See :ref:`futures-scala` for more information on how to await or query a future. -The ``onComplete``, ``onResult``, or ``onTimeout`` methods of the ``Future`` can be +The ``onComplete``, ``onSuccess``, or ``onFailure`` methods of the ``Future`` can be used to register a callback to get a notification when the Future completes. Gives you a way to avoid blocking. diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index 404847affc..10283b441f 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -192,3 +192,6 @@ External Akka Serializers `Akka-quickser by Roman Levenstein `_ + + +`Akka-kryo by Roman Levenstein `_ diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index 349b574888..bd7d92f924 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -99,7 +99,7 @@ Methods returning: * ``Unit`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell`` * ``akka.dispatch.Future[_]`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask`` * ``scala.Option[_]`` or ``akka.japi.Option`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer, - and return None if no answer was produced within the timout, or scala.Some/akka.japi.Some containing the result otherwise. + and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise. Any exception that was thrown during this call will be rethrown. * Any other type of value will use ``send-request-reply`` semantics, but *will* block to wait for an answer, throwing ``java.util.concurrent.TimeoutException`` if there was a timeout or rethrow any exception that was thrown during this call. diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index 1a1b7b721b..f454716af0 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -45,7 +45,19 @@ akka { keep-journal = on # whether to sync the journal after each transaction - sync-journal = off + sync-journal = off + + # circuit breaker configuration + circuit-breaker { + # maximum number of failures before opening breaker + max-failures = 3 + + # duration of time beyond which a call is assumed to be timed out and considered a failure + call-timeout = 3 seconds + + # duration of time to wait until attempting to reset the breaker during which all calls fail-fast + reset-timeout = 30 seconds + } } } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index c595fdcdd3..fccb6b5aea 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -5,14 +5,14 @@ package akka.actor.mailbox import akka.actor.ActorContext -import akka.dispatch.{ Envelope, MessageQueue } import akka.event.Logging import akka.actor.ActorRef -import akka.dispatch.MailboxType import com.typesafe.config.Config -import akka.util.NonFatal import akka.ConfigurationException import akka.actor.ActorSystem +import akka.dispatch._ +import akka.util.{ Duration, NonFatal } +import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker } class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { private val settings = new FileBasedMailboxSettings(systemSettings, config) @@ -26,6 +26,8 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? private val log = Logging(system, "FileBasedMessageQueue") + val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + private val queue = try { (new java.io.File(settings.QueuePath)) match { case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) @@ -42,18 +44,28 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox throw e } - def enqueue(receiver: ActorRef, envelope: Envelope): Unit = queue.add(serialize(envelope)) - - def dequeue(): Envelope = try { - queue.remove.map(item ⇒ { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull - } catch { - case _: java.util.NoSuchElementException ⇒ null - case NonFatal(e) ⇒ - log.error(e, "Couldn't dequeue from file-based mailbox") - throw e + def enqueue(receiver: ActorRef, envelope: Envelope) { + breaker.withSyncCircuitBreaker(queue.add(serialize(envelope))) } - def numberOfMessages: Int = queue.length.toInt + def dequeue(): Envelope = { + breaker.withSyncCircuitBreaker( + try { + queue.remove.map(item ⇒ { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull + } catch { + case _: java.util.NoSuchElementException ⇒ null + case e: CircuitBreakerOpenException ⇒ + log.debug(e.getMessage()) + throw e + case NonFatal(e) ⇒ + log.error(e, "Couldn't dequeue from file-based mailbox, due to [{}]", e.getMessage()) + throw e + }) + } + + def numberOfMessages: Int = { + breaker.withSyncCircuitBreaker(queue.length.toInt) + } def hasMessages: Boolean = numberOfMessages > 0 diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index 87dc25840f..dff4021d96 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -29,4 +29,7 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use val KeepJournal: Boolean = getBoolean("keep-journal") val SyncJournal: Boolean = getBoolean("sync-journal") + val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") + val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) + val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 6c97142068..e3ad811b52 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,7 +1,6 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils -import com.typesafe.config.ConfigFactory import akka.dispatch.Mailbox object FileBasedMailboxSpec { @@ -10,23 +9,32 @@ object FileBasedMailboxSpec { mailbox-type = akka.actor.mailbox.FileBasedMailboxType throughput = 1 file-based.directory-path = "file-based" + file-based.circuit-breaker.max-failures = 5 + file-based.circuit-breaker.call-timeout = 5 seconds } - """ + """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { - val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath + val settings = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")) "FileBasedMailboxSettings" must { "read the file-based section" in { - queuePath must be("file-based") + settings.QueuePath must be("file-based") + settings.CircuitBreakerMaxFailures must be(5) + + import akka.util.duration._ + + settings.CircuitBreakerCallTimeout must be(5 seconds) } } + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] + def clean() { - FileUtils.deleteDirectory(new java.io.File(queuePath)) + FileUtils.deleteDirectory(new java.io.File(settings.QueuePath)) } override def atStartup() { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala new file mode 100644 index 0000000000..3b3527240e --- /dev/null +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala @@ -0,0 +1,148 @@ +package akka.remote.testkit + +import java.awt.Toolkit +import java.awt.datatransfer.Clipboard +import java.awt.datatransfer.ClipboardOwner +import java.awt.datatransfer.DataFlavor +import java.awt.datatransfer.StringSelection +import java.awt.datatransfer.Transferable +import java.io.BufferedReader +import java.io.FileReader +import java.io.FileWriter +import java.io.InputStreamReader +import java.io.OutputStreamWriter +import java.io.PrintWriter +import java.io.StringReader +import java.io.StringWriter +import scala.annotation.tailrec + +/** + * Utility to make log files from multi-node tests easier to analyze. + * Replaces jvm names and host:port with corresponding logical role name. + */ +object LogRoleReplace extends ClipboardOwner { + + /** + * Main program. Use with 0, 1 or 2 arguments. + * + * When using 0 arguments it reads from standard input + * (System.in) and writes to standard output (System.out). + * + * With 1 argument it reads from the file specified in the first argument + * and writes to standard output. + * + * With 2 arguments it reads the file specified in the first argument + * and writes to the file specified in the second argument. + * + * You can also replace the contents of the clipboard instead of using files + * by supplying `clipboard` as argument + */ + def main(args: Array[String]): Unit = { + val replacer = new LogRoleReplace + + if (args.length == 0) { + replacer.process( + new BufferedReader(new InputStreamReader(System.in)), + new PrintWriter(new OutputStreamWriter(System.out))) + + } else if (args(0) == "clipboard") { + val clipboard = Toolkit.getDefaultToolkit.getSystemClipboard + val contents = clipboard.getContents(null) + if (contents != null && contents.isDataFlavorSupported(DataFlavor.stringFlavor)) { + val text = contents.getTransferData(DataFlavor.stringFlavor).asInstanceOf[String] + val result = new StringWriter + replacer.process( + new BufferedReader(new StringReader(text)), + new PrintWriter(result)) + clipboard.setContents(new StringSelection(result.toString), this) + println("Replaced clipboard contents") + } + + } else if (args.length == 1) { + val inputFile = new BufferedReader(new FileReader(args(0))) + try { + replacer.process( + inputFile, + new PrintWriter(new OutputStreamWriter(System.out))) + } finally { + inputFile.close() + } + + } else if (args.length == 2) { + val outputFile = new PrintWriter(new FileWriter(args(1))) + val inputFile = new BufferedReader(new FileReader(args(0))) + try { + replacer.process(inputFile, outputFile) + } finally { + outputFile.close() + inputFile.close() + } + } + } + + /** + * Empty implementation of the ClipboardOwner interface + */ + def lostOwnership(clipboard: Clipboard, contents: Transferable): Unit = () +} + +class LogRoleReplace { + + private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started""".r + private val RemoteServerStarted = """\[([\w\-]+)\].*RemoteServerStarted@akka://.*@([\w\-\.]+):([0-9]+)""".r + + private var replacements: Map[String, String] = Map.empty + private var jvmToAddress: Map[String, String] = Map.empty + + def process(in: BufferedReader, out: PrintWriter): Unit = { + + @tailrec + def processLines(line: String): Unit = if (line ne null) { + out.println(processLine(line)) + processLines(in.readLine) + } + + processLines(in.readLine()) + } + + def processLine(line: String): String = { + if (updateReplacements(line)) + replaceLine(line) + else + line + } + + private def updateReplacements(line: String): Boolean = { + if (line.startsWith("[info] * ")) { + // reset when new test begins + replacements = Map.empty + jvmToAddress = Map.empty + } + + line match { + case RemoteServerStarted(jvm, host, port) ⇒ + jvmToAddress += (jvm -> (host + ":" + port)) + false + + case RoleStarted(jvm, role) ⇒ + jvmToAddress.get(jvm) match { + case Some(address) ⇒ + replacements += (jvm -> role) + replacements += (address -> role) + false + case None ⇒ false + } + + case _ ⇒ true + } + } + + private def replaceLine(line: String): String = { + var result = line + for ((from, to) ← replacements) { + result = result.replaceAll(from, to) + } + result + } + +} \ No newline at end of file diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 1745d15b61..8ab65aa2c3 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -44,7 +44,7 @@ abstract class MultiNodeConfig { /** * Include for verbose debug logging - * @param on when `true` debug Config is returned, otherwise empty Config + * @param on when `true` debug Config is returned, otherwise config with info logging */ def debugConfig(on: Boolean): Config = if (on) @@ -59,7 +59,8 @@ abstract class MultiNodeConfig { fsm = on } """) - else ConfigFactory.empty + else + ConfigFactory.parseString("akka.loglevel = INFO") /** * Construct a RoleName and return it, to be used as an identifier in the @@ -248,4 +249,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, roles: } } + // useful to see which jvm is running which role + log.info("Role [{}] started", myself.name) + } \ No newline at end of file diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a56ea16c9a..b7aeb9a7e9 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -131,6 +131,18 @@ akka { # (I) Maximum total size of all channels, 0 for off max-total-memory-size = 0b + # (I&O) Sets the high water mark for the in and outbound sockets, set to 0b for platform default + write-buffer-high-water-mark = 0b + + # (I&O) Sets the low water mark for the in and outbound sockets, set to 0b for platform default + write-buffer-low-water-mark = 0b + + # (I&O) Sets the send buffer size of the Sockets, set to 0b for platform default + send-buffer-size = 0b + + # (I&O) Sets the receive buffer size of the Sockets, set to 0b for platform default + receive-buffer-size = 0b + # (O) Time between reconnect attempts for active clients reconnect-delay = 5s diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index c1737831da..c6d23e71f3 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -147,6 +147,10 @@ private[akka] class ActiveRemoteClient private[akka] ( b.setOption("tcpNoDelay", true) b.setOption("keepAlive", true) b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) + settings.ReceiveBufferSize.foreach(sz ⇒ b.setOption("receiveBufferSize", sz)) + settings.SendBufferSize.foreach(sz ⇒ b.setOption("sendBufferSize", sz)) + settings.WriteBufferHighWaterMark.foreach(sz ⇒ b.setOption("writeBufferHighWaterMark", sz)) + settings.WriteBufferLowWaterMark.foreach(sz ⇒ b.setOption("writeBufferLowWaterMark", sz)) settings.OutboundLocalAddress.foreach(s ⇒ b.setOption("localAddress", new InetSocketAddress(s, 0))) bootstrap = b diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index cc3310fada..04dfbe525e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -45,6 +45,10 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { b.setOption("tcpNoDelay", true) b.setOption("child.keepAlive", true) b.setOption("reuseAddress", true) + settings.ReceiveBufferSize.foreach(sz ⇒ b.setOption("receiveBufferSize", sz)) + settings.SendBufferSize.foreach(sz ⇒ b.setOption("sendBufferSize", sz)) + settings.WriteBufferHighWaterMark.foreach(sz ⇒ b.setOption("writeBufferHighWaterMark", sz)) + settings.WriteBufferLowWaterMark.foreach(sz ⇒ b.setOption("writeBufferLowWaterMark", sz)) b } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 64bc184408..0d105eda1d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -37,8 +37,21 @@ private[akka] class NettySettings(config: Config, val systemName: String) { val WriteTimeout: Duration = Duration(getMilliseconds("write-timeout"), MILLISECONDS) val AllTimeout: Duration = Duration(getMilliseconds("all-timeout"), MILLISECONDS) val ReconnectDelay: Duration = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS) + val MessageFrameSize: Int = getBytes("message-frame-size").toInt + private[this] def optionSize(s: String): Option[Int] = getBytes(s).toInt match { + case 0 ⇒ None + case x if x < 0 ⇒ + throw new ConfigurationException("Setting '%s' must be 0 or positive (and fit in an Int)" format s) + case other ⇒ Some(other) + } + + val WriteBufferHighWaterMark: Option[Int] = optionSize("write-buffer-high-water-mark") + val WriteBufferLowWaterMark: Option[Int] = optionSize("write-buffer-low-water-mark") + val SendBufferSize: Option[Int] = optionSize("send-buffer-size") + val ReceiveBufferSize: Option[Int] = optionSize("receive-buffer-size") + val Hostname: String = getString("hostname") match { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index f1809d42a5..8ac11e2440 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -56,6 +56,10 @@ class RemoteConfigSpec extends AkkaSpec( WriteTimeout must be(10 seconds) AllTimeout must be(0 millis) ReconnectionTimeWindow must be(10 minutes) + WriteBufferHighWaterMark must be(None) + WriteBufferLowWaterMark must be(None) + SendBufferSize must be(None) + ReceiveBufferSize must be(None) } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index e20144b418..23d51fe77c 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -7,11 +7,14 @@ package akka import sbt._ import sbt.Keys._ import com.typesafe.sbtmultijvm.MultiJvmPlugin -import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions, multiNodeTest } +import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions, multiNodeExecuteTests } import com.typesafe.sbtscalariform.ScalariformPlugin import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys import com.typesafe.sbtosgi.OsgiPlugin.{ OsgiKeys, osgiSettings } +import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings +import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact import java.lang.Boolean.getBoolean +import sbt.Tests import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments, sphinxTags } object AkkaBuild extends Build { @@ -26,7 +29,8 @@ object AkkaBuild extends Build { lazy val akka = Project( id = "akka", base = file("."), - settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Sphinx.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq( + settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Sphinx.settings ++ Publish.versionSettings ++ + Dist.settings ++ mimaSettings ++ Seq( testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean, parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", @@ -53,7 +57,8 @@ object AkkaBuild extends Build { artifact in (Compile, packageBin) ~= (_.copy(`type` = "bundle")), // to fix scaladoc generation fullClasspath in doc in Compile <<= fullClasspath in Compile, - libraryDependencies ++= Dependencies.actor + libraryDependencies ++= Dependencies.actor, + previousArtifact := akkaPreviousArtifact("akka-actor") ) ) @@ -62,7 +67,8 @@ object AkkaBuild extends Build { base = file("akka-testkit"), dependencies = Seq(actor), settings = defaultSettings ++ Seq( - libraryDependencies ++= Dependencies.testkit + libraryDependencies ++= Dependencies.testkit, + previousArtifact := akkaPreviousArtifact("akka-testkit") ) ) @@ -100,7 +106,8 @@ object AkkaBuild extends Build { (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, - jvmOptions in MultiJvm := defaultMultiJvmOptions + jvmOptions in MultiJvm := defaultMultiJvmOptions, + previousArtifact := akkaPreviousArtifact("akka-remote") ) ) configs (MultiJvm) @@ -116,7 +123,8 @@ object AkkaBuild extends Build { (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, - jvmOptions in MultiJvm := defaultMultiJvmOptions + jvmOptions in MultiJvm := defaultMultiJvmOptions, + previousArtifact := akkaPreviousArtifact("akka-remote") ) ) configs (MultiJvm) @@ -134,7 +142,8 @@ object AkkaBuild extends Build { base = file("akka-agent"), dependencies = Seq(actor, testkit % "test->test"), settings = defaultSettings ++ OSGi.agent ++ Seq( - libraryDependencies ++= Dependencies.agent + libraryDependencies ++= Dependencies.agent, + previousArtifact := akkaPreviousArtifact("akka-agent") ) ) @@ -143,7 +152,8 @@ object AkkaBuild extends Build { base = file("akka-transactor"), dependencies = Seq(actor, testkit % "test->test"), settings = defaultSettings ++ OSGi.transactor ++ Seq( - libraryDependencies ++= Dependencies.transactor + libraryDependencies ++= Dependencies.transactor, + previousArtifact := akkaPreviousArtifact("akka-transactor") ) ) @@ -162,7 +172,8 @@ object AkkaBuild extends Build { dependencies = Seq(remote, testkit % "compile;test->test"), settings = defaultSettings ++ OSGi.mailboxesCommon ++ Seq( libraryDependencies ++= Dependencies.mailboxes, - // DurableMailboxSpec published in akka-mailboxes-common-test + previousArtifact := akkaPreviousArtifact("akka-mailboxes-common"), + // DurableMailboxSpec published in akka-mailboxes-common-test publishArtifact in Test := true ) ) @@ -172,7 +183,8 @@ object AkkaBuild extends Build { base = file("akka-durable-mailboxes/akka-file-mailbox"), dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"), settings = defaultSettings ++ OSGi.fileMailbox ++ Seq( - libraryDependencies ++= Dependencies.fileMailbox + libraryDependencies ++= Dependencies.fileMailbox, + previousArtifact := akkaPreviousArtifact("akka-file-mailbox") ) ) @@ -181,7 +193,8 @@ object AkkaBuild extends Build { base = file("akka-zeromq"), dependencies = Seq(actor, testkit % "test;test->test"), settings = defaultSettings ++ OSGi.zeroMQ ++ Seq( - libraryDependencies ++= Dependencies.zeroMQ + libraryDependencies ++= Dependencies.zeroMQ, + previousArtifact := akkaPreviousArtifact("akka-zeromq") ) ) @@ -190,7 +203,8 @@ object AkkaBuild extends Build { base = file("akka-kernel"), dependencies = Seq(actor, testkit % "test->test"), settings = defaultSettings ++ Seq( - libraryDependencies ++= Dependencies.kernel + libraryDependencies ++= Dependencies.kernel, + previousArtifact := akkaPreviousArtifact("akka-kernel") ) ) @@ -329,14 +343,16 @@ object AkkaBuild extends Build { if (prop.isEmpty) Seq.empty else prop.split(",").toSeq } + val multiNodeEnabled = java.lang.Boolean.getBoolean("akka.test.multi-node") + lazy val defaultMultiJvmScalatestOptions: Seq[String] = { val excludeTags = (useExcludeTestTags -- useIncludeTestTags).toSeq Seq("-r", "org.scalatest.akka.QuietReporter") ++ - (if (excludeTags.isEmpty) Seq.empty else Seq("-l", excludeTags.mkString("\"", " ", "\""))) ++ - (if (useOnlyTestTags.isEmpty) Seq.empty else Seq("-n", useOnlyTestTags.mkString("\"", " ", "\""))) + (if (excludeTags.isEmpty) Seq.empty else Seq("-l", if (multiNodeEnabled) excludeTags.mkString("\"", " ", "\"") else excludeTags.mkString(" "))) ++ + (if (useOnlyTestTags.isEmpty) Seq.empty else Seq("-n", if (multiNodeEnabled) useOnlyTestTags.mkString("\"", " ", "\"") else useOnlyTestTags.mkString(" "))) } - lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq( + lazy val defaultSettings = baseSettings ++ formatSettings ++ mimaSettings ++ Seq( resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", // compile options @@ -359,12 +375,12 @@ object AkkaBuild extends Build { // add arguments for tests excluded by tag - includes override excludes (opposite to scalatest) testOptions in Test <++= (excludeTestTags, includeTestTags) map { (excludes, includes) => val tags = (excludes -- includes) - if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-l", tags.mkString("\"", " ", "\""))) + if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-l", tags.mkString(" "))) }, // add arguments for running only tests by tag testOptions in Test <++= onlyTestTags map { tags => - if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-n", tags.mkString("\"", " ", "\""))) + if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-n", tags.mkString(" "))) }, // show full stack traces @@ -387,11 +403,29 @@ object AkkaBuild extends Build { lazy val multiJvmSettings = MultiJvmPlugin.settings ++ inConfig(MultiJvm)(ScalariformPlugin.scalariformSettings) ++ Seq( compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (ScalariformKeys.format in MultiJvm), ScalariformKeys.preferences in MultiJvm := formattingPreferences, - if (java.lang.Boolean.getBoolean("akka.test.multi-node")) - test in Test <<= ((test in Test), (multiNodeTest in MultiJvm)) map { case x => x } + if (multiNodeEnabled) + executeTests in Test <<= ((executeTests in Test), (multiNodeExecuteTests in MultiJvm)) map { + case (tr, mr) => + val r = tr._2 ++ mr._2 + (Tests.overall(r.values), r) + } else - test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } + executeTests in Test <<= ((executeTests in Test), (executeTests in MultiJvm)) map { + case (tr, mr) => + val r = tr._2 ++ mr._2 + (Tests.overall(r.values), r) + } ) + + lazy val mimaSettings = mimaDefaultSettings ++ Seq( + // MiMa + previousArtifact := None + ) + + def akkaPreviousArtifact(id: String, organization: String = "com.typesafe.akka", version: String = "2.0"): Option[sbt.ModuleID] = { + // the artifact to compare binary compatibility with + Some(organization % id % version) + } } // Dependencies diff --git a/project/plugins.sbt b/project/plugins.sbt index 754b9eefa2..e077802cfa 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,12 +1,14 @@ resolvers += Classpaths.typesafeResolver -addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M2") +addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M3") addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0") addSbtPlugin("com.typesafe.sbtosgi" % "sbtosgi" % "0.2.0") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.3") + resolvers ++= Seq( // needed for sbt-assembly, which comes with sbt-multi-jvm Resolver.url("sbtonline", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns), diff --git a/project/scripts/multi-node-log-replace b/project/scripts/multi-node-log-replace new file mode 100755 index 0000000000..83f1b8a136 --- /dev/null +++ b/project/scripts/multi-node-log-replace @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +# +# Utility to make log files from multi-node tests easier to analyze. +# Replaces jvm names and host:port with corresponding logical role name. +# + + +# check for an sbt command +type -P sbt &> /dev/null || fail "sbt command not found" + +sbt "project akka-remote-tests" "test:run-main akka.remote.testkit.LogRoleReplace $1 $2" \ No newline at end of file diff --git a/project/scripts/release b/project/scripts/release index 058d0d1615..886e6629b1 100755 --- a/project/scripts/release +++ b/project/scripts/release @@ -219,6 +219,13 @@ echolog "Creating gzipped tar download..." try tar -cz -C ${unzipped_dir} -f ${release_dir}/downloads/akka-${version}.tgz akka-${version} echolog "Successfully created local release" +# check binary compatibility for dry run +if [ $dry_run ]; then + echodry "Running migration manager report..." + sbt mima-report-binary-issues + echodry "Finished migration manager report" +fi + # commit and tag this release echolog "Committing and tagging..." try git add . diff --git a/scripts/multi-node-log-replace.sh b/scripts/multi-node-log-replace.sh new file mode 100755 index 0000000000..8e8af7112a --- /dev/null +++ b/scripts/multi-node-log-replace.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# +# Utility to make log files from multi-node tests easier to analyze. +# Replaces jvm names and host:port with corresponding logical role name. +# +# Use with 0, 1 or 2 arguments. +# +# When using 0 arguments it reads from standard input +# and writes to standard output. +# +# With 1 argument it reads from the file specified in the first argument +# and writes to standard output. +# +# With 2 arguments it reads the file specified in the first argument +# and writes to the file specified in the second argument. +# +# You can also replace the contents of the clipboard instead of using files +# by supplying `clipboard` as argument +# + + +# check for an sbt command +type -P sbt &> /dev/null || fail "sbt command not found" + +sbt "project akka-remote-tests" "test:run-main akka.remote.testkit.LogRoleReplace $1 $2" \ No newline at end of file