Patrik’s comments: change CircuitBreaker to Runnable

was using Callable for no apparent reason; also fix
FaultHandlingDocTestBase.java
This commit is contained in:
Roland 2012-10-16 14:38:23 +02:00
parent e86144499c
commit 6792d11b36
5 changed files with 32 additions and 31 deletions

View file

@ -234,8 +234,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] = private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] =
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries) if (maxNrOfRetries < 0) None else Some(maxNrOfRetries)
private[akka] val escalateDefault = (_: Any) => Escalate private[akka] val escalateDefault = (_: Any) Escalate
} }
/** /**

View file

@ -155,22 +155,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor.
* *
* @param callback Handler to be invoked on state change * @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage * @return CircuitBreaker for fluent usage
*/ */
def onOpen[T](callback: T): CircuitBreaker = { def onOpen(callback: Unit): CircuitBreaker = onOpen(new Runnable { def run = callback })
Open.addListener(() callback)
this
}
/** /**
* Java API for onOpen * Java API for onOpen
* *
* @param callback Handler to be invoked on state change * @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage * @return CircuitBreaker for fluent usage
*/ */
def onOpen[T](callback: Callable[T]): CircuitBreaker = onOpen(callback.call) def onOpen(callback: Runnable): CircuitBreaker = {
Open addListener callback
this
}
/** /**
* Adds a callback to execute when circuit breaker transitions to half-open * Adds a callback to execute when circuit breaker transitions to half-open
@ -178,22 +176,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor.
* *
* @param callback Handler to be invoked on state change * @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage * @return CircuitBreaker for fluent usage
*/ */
def onHalfOpen[T](callback: T): CircuitBreaker = { def onHalfOpen(callback: Unit): CircuitBreaker = onHalfOpen(new Runnable { def run = callback })
HalfOpen.addListener(() callback)
this
}
/** /**
* JavaAPI for onHalfOpen * JavaAPI for onHalfOpen
* *
* @param callback Handler to be invoked on state change * @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage * @return CircuitBreaker for fluent usage
*/ */
def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = onHalfOpen(callback.call) def onHalfOpen(callback: Runnable): CircuitBreaker = {
HalfOpen addListener callback
this
}
/** /**
* Adds a callback to execute when circuit breaker state closes * Adds a callback to execute when circuit breaker state closes
@ -201,22 +197,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor.
* *
* @param callback Handler to be invoked on state change * @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage * @return CircuitBreaker for fluent usage
*/ */
def onClose[T](callback: T): CircuitBreaker = { def onClose(callback: Unit): CircuitBreaker = onClose(new Runnable { def run = callback })
Closed.addListener(() callback)
this
}
/** /**
* JavaAPI for onClose * JavaAPI for onClose
* *
* @param callback Handler to be invoked on state change * @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage * @return CircuitBreaker for fluent usage
*/ */
def onClose[T](callback: Callable[T]): CircuitBreaker = onClose(callback.call) def onClose(callback: Runnable): CircuitBreaker = {
Closed addListener callback
this
}
/** /**
* Retrieves current failure count. * Retrieves current failure count.
@ -261,7 +255,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* Internal state abstraction * Internal state abstraction
*/ */
private sealed trait State { private sealed trait State {
private val listeners = new CopyOnWriteArrayList[() _] private val listeners = new CopyOnWriteArrayList[Runnable]
/** /**
* Add a listener function which is invoked on state entry * Add a listener function which is invoked on state entry
@ -269,7 +263,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
* @param listener listener implementation * @param listener listener implementation
* @tparam T return type of listener, not used - but supplied for type inference purposes * @tparam T return type of listener, not used - but supplied for type inference purposes
*/ */
def addListener[T](listener: () T): Unit = listeners add listener def addListener(listener: Runnable): Unit = listeners add listener
/** /**
* Test for whether listeners exist * Test for whether listeners exist
@ -288,7 +282,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
val iterator = listeners.iterator val iterator = listeners.iterator
while (iterator.hasNext) { while (iterator.hasNext) {
val listener = iterator.next val listener = iterator.next
executor.execute(new Runnable { def run = listener() }) executor.execute(listener)
} }
} }
} }

View file

@ -29,10 +29,9 @@ public class DangerousJavaActor extends UntypedActor {
this.breaker = new CircuitBreaker( this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(), getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.create(10, "s"), Duration.create(1, "m")) 5, Duration.create(10, "s"), Duration.create(1, "m"))
.onOpen(new Callable<Object>() { .onOpen(new Runnable() {
public Object call() throws Exception { public void run() {
notifyMeOnOpen(); notifyMeOnOpen();
return null;
} }
}); });
} }

View file

@ -7,7 +7,11 @@ package docs.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.*; import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.OneForOneStrategy; import akka.actor.OneForOneStrategy;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.Terminated; import akka.actor.Terminated;

View file

@ -22,7 +22,11 @@ import com.typesafe.config.ConfigFactory;
import static akka.japi.Util.classTag; import static akka.japi.Util.classTag;
import static akka.actor.SupervisorStrategy.*; import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe; import static akka.pattern.Patterns.pipe;