Merge branch 'master' into 911-krasserm
This commit is contained in:
commit
2bd7751cba
16 changed files with 128 additions and 173 deletions
|
|
@ -146,30 +146,30 @@ class FutureSpec extends JUnitSuite {
|
|||
val future2 = future1 map (_ / 0)
|
||||
val future3 = future2 map (_.toString)
|
||||
|
||||
val future4 = future1 failure {
|
||||
val future4 = future1 recover {
|
||||
case e: ArithmeticException ⇒ 0
|
||||
} map (_.toString)
|
||||
|
||||
val future5 = future2 failure {
|
||||
val future5 = future2 recover {
|
||||
case e: ArithmeticException ⇒ 0
|
||||
} map (_.toString)
|
||||
|
||||
val future6 = future2 failure {
|
||||
val future6 = future2 recover {
|
||||
case e: MatchError ⇒ 0
|
||||
} map (_.toString)
|
||||
|
||||
val future7 = future3 failure { case e: ArithmeticException ⇒ "You got ERROR" }
|
||||
val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" }
|
||||
|
||||
val actor = actorOf[TestActor].start()
|
||||
|
||||
val future8 = actor !!! "Failure"
|
||||
val future9 = actor !!! "Failure" failure {
|
||||
val future9 = actor !!! "Failure" recover {
|
||||
case e: RuntimeException ⇒ "FAIL!"
|
||||
}
|
||||
val future10 = actor !!! "Hello" failure {
|
||||
val future10 = actor !!! "Hello" recover {
|
||||
case e: RuntimeException ⇒ "FAIL!"
|
||||
}
|
||||
val future11 = actor !!! "Failure" failure { case _ ⇒ "Oops!" }
|
||||
val future11 = actor !!! "Failure" recover { case _ ⇒ "Oops!" }
|
||||
|
||||
assert(future1.get === 5)
|
||||
intercept[ArithmeticException] { future2.get }
|
||||
|
|
@ -269,7 +269,7 @@ class FutureSpec extends JUnitSuite {
|
|||
def receiveShouldExecuteOnComplete {
|
||||
val latch = new StandardLatch
|
||||
val actor = actorOf[TestActor].start()
|
||||
actor !!! "Hello" receive { case "World" ⇒ latch.open }
|
||||
actor !!! "Hello" onResult { case "World" ⇒ latch.open }
|
||||
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
||||
actor.stop()
|
||||
}
|
||||
|
|
@ -304,13 +304,13 @@ class FutureSpec extends JUnitSuite {
|
|||
val latch = new StandardLatch
|
||||
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
|
||||
f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach"))
|
||||
f2 receive { case _ ⇒ throw new ThrowableTest("dispatcher receive") }
|
||||
f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") }
|
||||
val f3 = f2 map (s ⇒ s.toUpperCase)
|
||||
latch.open
|
||||
f2.await
|
||||
assert(f2.resultOrException === Some("success"))
|
||||
f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach"))
|
||||
f2 receive { case _ ⇒ throw new ThrowableTest("current thread receive") }
|
||||
f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") }
|
||||
f3.await
|
||||
assert(f3.resultOrException === Some("SUCCESS"))
|
||||
|
||||
|
|
|
|||
|
|
@ -166,38 +166,6 @@ class RoutingSpec extends WordSpec with MustMatchers {
|
|||
|
||||
for (a ← List(broadcast, a1, a2, a3)) a.stop()
|
||||
}
|
||||
|
||||
"be defined at" in {
|
||||
import akka.actor.ActorRef
|
||||
|
||||
val Yes = "yes"
|
||||
val No = "no"
|
||||
|
||||
def testActor() = actorOf(new Actor() {
|
||||
def receive = {
|
||||
case Yes ⇒ "yes"
|
||||
}
|
||||
}).start()
|
||||
|
||||
val t1 = testActor()
|
||||
val t2 = testActor()
|
||||
val t3 = testActor()
|
||||
val t4 = testActor()
|
||||
|
||||
val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
|
||||
val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil))
|
||||
|
||||
t1.isDefinedAt(Yes) must be(true)
|
||||
t1.isDefinedAt(No) must be(false)
|
||||
t2.isDefinedAt(Yes) must be(true)
|
||||
t2.isDefinedAt(No) must be(false)
|
||||
d1.isDefinedAt(Yes) must be(true)
|
||||
d1.isDefinedAt(No) must be(false)
|
||||
d2.isDefinedAt(Yes) must be(true)
|
||||
d2.isDefinedAt(No) must be(false)
|
||||
|
||||
for (a ← List(t1, t2, d1, d2)) a.stop()
|
||||
}
|
||||
}
|
||||
|
||||
"Actor Pool" must {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import akka.AkkaException
|
|||
import akka.serialization.{ Format, Serializer }
|
||||
import akka.cluster.ClusterNode
|
||||
import akka.event.EventHandler
|
||||
import scala.collection.immutable.Stack
|
||||
|
||||
import scala.reflect.BeanProperty
|
||||
|
||||
|
|
@ -135,8 +136,8 @@ object Actor extends ListenerManagement {
|
|||
hook
|
||||
}
|
||||
|
||||
private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] {
|
||||
override def initialValue = None
|
||||
private[actor] val actorRefInCreation = new ThreadLocal[Stack[ActorRef]] {
|
||||
override def initialValue = Stack[ActorRef]()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -396,14 +397,15 @@ object Actor extends ListenerManagement {
|
|||
"] for serialization of actor [" + address +
|
||||
"] since " + reason)
|
||||
|
||||
val serializer: Serializer = {
|
||||
if ((serializerClassName eq null) ||
|
||||
(serializerClassName == "") ||
|
||||
(serializerClassName == Format.defaultSerializerName)) {
|
||||
Format.Default
|
||||
} else {
|
||||
val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match {
|
||||
case Right(clazz) ⇒ clazz
|
||||
val serializer: Serializer = serializerClassName match {
|
||||
case null | "" | Format.`defaultSerializerName` ⇒ Format.Default
|
||||
case specialSerializer ⇒
|
||||
ReflectiveAccess.getClassFor(specialSerializer) match {
|
||||
case Right(clazz) ⇒
|
||||
clazz.newInstance match {
|
||||
case s: Serializer ⇒ s
|
||||
case other ⇒ serializerErrorDueTo("class must be of type [akka.serialization.Serializer]")
|
||||
}
|
||||
case Left(exception) ⇒
|
||||
val cause = exception match {
|
||||
case i: InvocationTargetException ⇒ i.getTargetException
|
||||
|
|
@ -411,15 +413,11 @@ object Actor extends ListenerManagement {
|
|||
}
|
||||
serializerErrorDueTo(cause.toString)
|
||||
}
|
||||
val f = clazz.newInstance.asInstanceOf[AnyRef]
|
||||
if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer]
|
||||
else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]")
|
||||
}
|
||||
}
|
||||
|
||||
val isStateful = state match {
|
||||
case Stateless ⇒ false
|
||||
case Stateful ⇒ true
|
||||
case _: Stateless | Stateless ⇒ false
|
||||
case _: Stateful | Stateful ⇒ true
|
||||
}
|
||||
|
||||
if (isStateful && isHomeNode) { // stateful actor's home node
|
||||
|
|
@ -505,16 +503,18 @@ trait Actor {
|
|||
*/
|
||||
@transient
|
||||
implicit val someSelf: Some[ActorRef] = {
|
||||
val optRef = Actor.actorRefInCreation.get
|
||||
if (optRef.isEmpty) throw new ActorInitializationException(
|
||||
val refStack = Actor.actorRefInCreation.get
|
||||
if (refStack.isEmpty) throw new ActorInitializationException(
|
||||
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
|
||||
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
|
||||
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
|
||||
"\n\tEither use:" +
|
||||
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
|
||||
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'")
|
||||
Actor.actorRefInCreation.set(None)
|
||||
optRef.asInstanceOf[Some[ActorRef]]
|
||||
|
||||
val ref = refStack.head
|
||||
Actor.actorRefInCreation.set(refStack.pop)
|
||||
Some(ref)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -616,21 +616,6 @@ trait Actor {
|
|||
throw new UnhandledMessageException(msg, self)
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the actor able to handle the message passed in as arguments?
|
||||
*/
|
||||
def isDefinedAt(message: Any): Boolean = {
|
||||
val behaviorStack = self.hotswap
|
||||
message match { //Same logic as apply(msg) but without the unhandled catch-all
|
||||
case l: AutoReceivedMessage ⇒ true
|
||||
case msg if behaviorStack.nonEmpty &&
|
||||
behaviorStack.head.isDefinedAt(msg) ⇒ true
|
||||
case msg if behaviorStack.isEmpty &&
|
||||
processingBehavior.isDefinedAt(msg) ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
|
||||
* Puts the behavior on top of the hotswap stack.
|
||||
|
|
|
|||
|
|
@ -233,12 +233,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
*/
|
||||
def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED
|
||||
|
||||
/**
|
||||
* Is the actor able to handle the message passed in as arguments?
|
||||
*/
|
||||
@deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1")
|
||||
def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message)
|
||||
|
||||
/**
|
||||
* Only for internal use. UUID is effectively final.
|
||||
*/
|
||||
|
|
@ -685,6 +679,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
|
|||
def supervisor: Option[ActorRef] = _supervisor
|
||||
|
||||
// ========= AKKA PROTECTED FUNCTIONS =========
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = {
|
||||
val inetaddr = Actor.remote.address
|
||||
SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort, timeout)
|
||||
}
|
||||
|
||||
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
|
||||
_supervisor = sup
|
||||
|
|
@ -801,11 +800,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
|
|||
}
|
||||
|
||||
def tooManyRestarts() {
|
||||
_supervisor.foreach { sup ⇒
|
||||
// can supervisor handle the notification?
|
||||
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
|
||||
}
|
||||
notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason))
|
||||
stop()
|
||||
}
|
||||
|
||||
|
|
@ -869,13 +864,20 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
|
|||
// ========= PRIVATE FUNCTIONS =========
|
||||
|
||||
private[this] def newActor: Actor = {
|
||||
try {
|
||||
Actor.actorRefInCreation.set(Some(this))
|
||||
val a = actorFactory()
|
||||
if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
|
||||
a
|
||||
} finally {
|
||||
Actor.actorRefInCreation.set(None)
|
||||
import Actor.{ actorRefInCreation ⇒ refStack }
|
||||
(try {
|
||||
refStack.set(refStack.get.push(this))
|
||||
actorFactory()
|
||||
} catch {
|
||||
case e ⇒
|
||||
val stack = refStack.get
|
||||
//Clean up if failed
|
||||
if ((stack.nonEmpty) && (stack.head eq this)) refStack.set(stack.pop)
|
||||
//Then rethrow
|
||||
throw e
|
||||
}) match {
|
||||
case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
|
||||
case valid ⇒ valid
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1031,6 +1033,12 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
}
|
||||
|
||||
// ==== NOT SUPPORTED ====
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = {
|
||||
SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, timeout)
|
||||
}
|
||||
|
||||
@deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
|
||||
def actorClass: Class[_ <: Actor] = unsupported
|
||||
def dispatcher_=(md: MessageDispatcher) {
|
||||
|
|
@ -1251,3 +1259,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
|
|||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
case class SerializedActorRef(val uuid: Uuid,
|
||||
val address: String,
|
||||
val hostname: String,
|
||||
val port: Int,
|
||||
val timeout: Long) {
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = Actor.registry.local.actorFor(uuid) match {
|
||||
case Some(actor) ⇒ actor
|
||||
case None ⇒ RemoteActorRef(new InetSocketAddress(hostname, port), address, timeout, None)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
|
|||
import java.util.{ Set ⇒ JSet }
|
||||
|
||||
import akka.util.ReflectiveAccess._
|
||||
import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement }
|
||||
import akka.util.ListenerManagement
|
||||
import akka.serialization._
|
||||
|
||||
/**
|
||||
|
|
@ -36,7 +36,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
private val actorsByAddress = new ConcurrentHashMap[String, ActorRef]
|
||||
private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef]
|
||||
private val guard = new ReadWriteGuard
|
||||
|
||||
val local = new LocalActorRegistry(actorsByAddress, actorsByUuid, typedActorsByUuid)
|
||||
|
||||
|
|
|
|||
|
|
@ -139,6 +139,7 @@ object DeploymentConfig {
|
|||
case LeastRAM() ⇒ RouterType.LeastRAM
|
||||
case LeastMessages ⇒ RouterType.LeastMessages
|
||||
case LeastMessages() ⇒ RouterType.LeastMessages
|
||||
case c: CustomRouter ⇒ throw new UnsupportedOperationException("routerTypeFor: " + c)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -217,20 +217,6 @@ object Future {
|
|||
def apply[T](body: ⇒ T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] =
|
||||
dispatcher.dispatchFuture(() ⇒ body, timeout)
|
||||
|
||||
/**
|
||||
* Construct a completable channel
|
||||
*/
|
||||
def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
|
||||
val future = empty[Any](timeout)
|
||||
def !(msg: Any) = future completeWithResult msg
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an empty Future with default timeout
|
||||
*/
|
||||
@deprecated("Superceded by Promise.apply", "1.2")
|
||||
def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout)
|
||||
|
||||
import scala.collection.mutable.Builder
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
|
||||
|
|
@ -273,9 +259,11 @@ object Future {
|
|||
*/
|
||||
def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = {
|
||||
val future = Promise[A](timeout)
|
||||
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒
|
||||
val opte = f.exception
|
||||
if (opte.isDefined) future completeWithException (opte.get)
|
||||
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete {
|
||||
_.exception match {
|
||||
case Some(e) ⇒ future completeWithException e
|
||||
case None ⇒
|
||||
}
|
||||
}
|
||||
future
|
||||
}
|
||||
|
|
@ -320,14 +308,6 @@ sealed trait Future[+T] {
|
|||
*/
|
||||
def await(atMost: Duration): Future[T]
|
||||
|
||||
/**
|
||||
* Blocks the current thread until the Future has been completed. Use
|
||||
* caution with this method as it ignores the timeout and will block
|
||||
* indefinitely if the Future is never completed.
|
||||
*/
|
||||
@deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1")
|
||||
def awaitBlocking: Future[T]
|
||||
|
||||
/**
|
||||
* Tests whether this Future has been completed.
|
||||
*/
|
||||
|
|
@ -383,17 +363,35 @@ sealed trait Future[+T] {
|
|||
* When the future is completed with a valid result, apply the provided
|
||||
* PartialFunction to the result.
|
||||
* <pre>
|
||||
* val result = future receive {
|
||||
* val result = future onResult {
|
||||
* case Foo => "foo"
|
||||
* case Bar => "bar"
|
||||
* }.await.result
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒
|
||||
final def onResult(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒
|
||||
val optr = f.result
|
||||
if (optr.isDefined) {
|
||||
val r = optr.get
|
||||
if (pf.isDefinedAt(r)) pf(r)
|
||||
if (pf isDefinedAt r) pf(r)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When the future is completed with an exception, apply the provided
|
||||
* PartialFunction to the exception.
|
||||
* <pre>
|
||||
* val result = future onException {
|
||||
* case Foo => "foo"
|
||||
* case Bar => "bar"
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒
|
||||
val opte = f.exception
|
||||
if (opte.isDefined) {
|
||||
val e = opte.get
|
||||
if (pf isDefinedAt e) pf(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -439,12 +437,12 @@ sealed trait Future[+T] {
|
|||
* a valid result then the new Future will contain the same.
|
||||
* Example:
|
||||
* <pre>
|
||||
* Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
|
||||
* Future(6 / 0) failure { case e: NotFoundException => 0 } // result: exception
|
||||
* Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
|
||||
* Future(6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
|
||||
* Future(6 / 0) recover { case e: NotFoundException => 0 } // result: exception
|
||||
* Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
|
||||
* </pre>
|
||||
*/
|
||||
final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
|
||||
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
|
||||
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
|
||||
onComplete { ft ⇒
|
||||
val opte = ft.exception
|
||||
|
|
@ -606,6 +604,14 @@ object Promise {
|
|||
|
||||
def apply[A](): Promise[A] = apply(Actor.TIMEOUT)
|
||||
|
||||
/**
|
||||
* Construct a completable channel
|
||||
*/
|
||||
def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
|
||||
val promise = Promise[Any](timeout)
|
||||
def !(msg: Any) = promise completeWithResult msg
|
||||
}
|
||||
|
||||
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
|
||||
override def initialValue = None
|
||||
}
|
||||
|
|
@ -708,18 +714,6 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
|
|||
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
|
||||
}
|
||||
|
||||
def awaitBlocking = {
|
||||
_lock.lock
|
||||
try {
|
||||
while (_value.isEmpty) {
|
||||
_signal.await
|
||||
}
|
||||
this
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
}
|
||||
|
||||
def isExpired: Boolean = timeLeft() <= 0
|
||||
|
||||
def value: Option[Either[Throwable, T]] = {
|
||||
|
|
@ -734,11 +728,16 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
|
|||
def complete(value: Either[Throwable, T]): DefaultPromise[T] = {
|
||||
_lock.lock
|
||||
val notifyTheseListeners = try {
|
||||
if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired
|
||||
_value = Some(value)
|
||||
val existingListeners = _listeners
|
||||
_listeners = Nil
|
||||
existingListeners
|
||||
if (_value.isEmpty) { //Only complete if we aren't expired
|
||||
if (!isExpired) {
|
||||
_value = Some(value)
|
||||
val existingListeners = _listeners
|
||||
_listeners = Nil
|
||||
existingListeners
|
||||
} else {
|
||||
_listeners = Nil
|
||||
Nil
|
||||
}
|
||||
} else Nil
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
|
|
@ -816,7 +815,6 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise
|
|||
def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this }
|
||||
def await(atMost: Duration): Future[T] = this
|
||||
def await: Future[T] = this
|
||||
def awaitBlocking: Future[T] = this
|
||||
def isExpired: Boolean = true
|
||||
def timeoutInNanos: Long = 0
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,8 +90,6 @@ trait LoadBalancer extends Router { self: Actor ⇒
|
|||
}
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -106,8 +104,6 @@ abstract class UntypedLoadBalancer extends UntypedRouter {
|
|||
else null
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
|
||||
}
|
||||
|
||||
object Routing {
|
||||
|
|
|
|||
|
|
@ -295,7 +295,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
|
|||
}
|
||||
|
||||
def actorClass: Class[_ <: Actor] = unsupported
|
||||
def actorClassName = unsupported
|
||||
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
|
||||
def dispatcher: MessageDispatcher = unsupported
|
||||
def makeRemote(hostname: String, port: Int): Unit = unsupported
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
|
|||
|
||||
val deployments2 = ClusterDeployer.fetchDeploymentsFromCluster
|
||||
deployments2.size must equal(1)
|
||||
deployments2.first must equal(deployments1.first)
|
||||
deployments2.head must equal(deployments1.head)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -170,7 +170,6 @@ The 'Future' interface looks like this:
|
|||
|
||||
interface Future<T> {
|
||||
void await();
|
||||
void awaitBlocking();
|
||||
boolean isCompleted();
|
||||
boolean isExpired();
|
||||
long timeoutInNanos();
|
||||
|
|
|
|||
|
|
@ -238,12 +238,12 @@ Exceptions
|
|||
|
||||
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``Actor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``get`` will cause it to be thrown again so it can be handled properly.
|
||||
|
||||
It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``failure`` method. For example:
|
||||
It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``recover`` method. For example:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val future = actor !!! msg1 failure {
|
||||
val future = actor !!! msg1 recover {
|
||||
case e: ArithmeticException => 0
|
||||
}
|
||||
|
||||
In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``failure`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``failure`` method.
|
||||
In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``recover`` method.
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ trait EmbeddedAppServer extends Bootable {
|
|||
super.onLoad
|
||||
if (isRestEnabled) {
|
||||
|
||||
val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(error("microkernel-server.xml not found!")))
|
||||
val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(sys.error("microkernel-server.xml not found!")))
|
||||
|
||||
System.setProperty("jetty.port", REST_PORT.toString)
|
||||
System.setProperty("jetty.host", REST_HOSTNAME)
|
||||
|
|
|
|||
|
|
@ -24,11 +24,6 @@ class TestActorRef[T <: Actor](factory: () ⇒ T, address: String) extends Local
|
|||
dispatcher = CallingThreadDispatcher.global
|
||||
receiveTimeout = None
|
||||
|
||||
/**
|
||||
* Query actor's current receive behavior.
|
||||
*/
|
||||
override def isDefinedAt(o: Any) = actor.isDefinedAt(o)
|
||||
|
||||
/**
|
||||
* Directly inject messages into actor receive behavior. Any exceptions
|
||||
* thrown will be available to you, while still being able to use
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec }
|
|||
import akka.actor._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.Future
|
||||
import akka.dispatch.{ Future, Promise }
|
||||
|
||||
/**
|
||||
* Test whether TestActorRef behaves as an ActorRef should, besides its own spec.
|
||||
|
|
@ -234,14 +234,12 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
|
|||
EventHandler.removeListener(log)
|
||||
}
|
||||
|
||||
"proxy isDefinedAt/apply for the underlying actor" in {
|
||||
"proxy apply for the underlying actor" in {
|
||||
val ref = TestActorRef[WorkerActor].start()
|
||||
ref.isDefinedAt("work") must be(true)
|
||||
ref.isDefinedAt("sleep") must be(false)
|
||||
intercept[IllegalActorStateException] { ref("work") }
|
||||
val ch = Future.channel()
|
||||
val ch = Promise.channel()
|
||||
ref ! ch
|
||||
val f = ch.future
|
||||
val f = ch.promise
|
||||
f must be('completed)
|
||||
f.get must be("complexReply")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,7 +67,6 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
|||
lazy val beanstalkModuleConfig = ModuleConfiguration("beanstalk", AkkaRepo)
|
||||
lazy val lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", "h2-lzf", AkkaRepo)
|
||||
lazy val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo)
|
||||
lazy val aspectWerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", "aspectwerkz", "2.2.3", AkkaRepo)
|
||||
lazy val objenesisModuleConfig = ModuleConfiguration("org.objenesis", sbt.DefaultMavenRepository)
|
||||
lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
|
||||
lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
|
||||
|
|
@ -101,8 +100,6 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
|||
|
||||
|
||||
// Compile
|
||||
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
|
||||
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2
|
||||
lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD
|
||||
lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2
|
||||
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_PATCH_VERSION % "compile" //ApacheV2
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue