diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index ab3f45e388..1d18e2c60f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -146,30 +146,30 @@ class FutureSpec extends JUnitSuite { val future2 = future1 map (_ / 0) val future3 = future2 map (_.toString) - val future4 = future1 failure { + val future4 = future1 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) - val future5 = future2 failure { + val future5 = future2 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) - val future6 = future2 failure { + val future6 = future2 recover { case e: MatchError ⇒ 0 } map (_.toString) - val future7 = future3 failure { case e: ArithmeticException ⇒ "You got ERROR" } + val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" } val actor = actorOf[TestActor].start() val future8 = actor !!! "Failure" - val future9 = actor !!! "Failure" failure { + val future9 = actor !!! "Failure" recover { case e: RuntimeException ⇒ "FAIL!" } - val future10 = actor !!! "Hello" failure { + val future10 = actor !!! "Hello" recover { case e: RuntimeException ⇒ "FAIL!" } - val future11 = actor !!! "Failure" failure { case _ ⇒ "Oops!" } + val future11 = actor !!! "Failure" recover { case _ ⇒ "Oops!" } assert(future1.get === 5) intercept[ArithmeticException] { future2.get } @@ -269,7 +269,7 @@ class FutureSpec extends JUnitSuite { def receiveShouldExecuteOnComplete { val latch = new StandardLatch val actor = actorOf[TestActor].start() - actor !!! "Hello" receive { case "World" ⇒ latch.open } + actor !!! "Hello" onResult { case "World" ⇒ latch.open } assert(latch.tryAwait(5, TimeUnit.SECONDS)) actor.stop() } @@ -304,13 +304,13 @@ class FutureSpec extends JUnitSuite { val latch = new StandardLatch val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) - f2 receive { case _ ⇒ throw new ThrowableTest("dispatcher receive") } + f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open f2.await assert(f2.resultOrException === Some("success")) f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) - f2 receive { case _ ⇒ throw new ThrowableTest("current thread receive") } + f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } f3.await assert(f3.resultOrException === Some("SUCCESS")) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 0fa5b8e017..ddd37f760c 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -166,38 +166,6 @@ class RoutingSpec extends WordSpec with MustMatchers { for (a ← List(broadcast, a1, a2, a3)) a.stop() } - - "be defined at" in { - import akka.actor.ActorRef - - val Yes = "yes" - val No = "no" - - def testActor() = actorOf(new Actor() { - def receive = { - case Yes ⇒ "yes" - } - }).start() - - val t1 = testActor() - val t2 = testActor() - val t3 = testActor() - val t4 = testActor() - - val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) - - t1.isDefinedAt(Yes) must be(true) - t1.isDefinedAt(No) must be(false) - t2.isDefinedAt(Yes) must be(true) - t2.isDefinedAt(No) must be(false) - d1.isDefinedAt(Yes) must be(true) - d1.isDefinedAt(No) must be(false) - d2.isDefinedAt(Yes) must be(true) - d2.isDefinedAt(No) must be(false) - - for (a ← List(t1, t2, d1, d2)) a.stop() - } } "Actor Pool" must { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2ad30bcd55..a8aab54a90 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -17,6 +17,7 @@ import akka.AkkaException import akka.serialization.{ Format, Serializer } import akka.cluster.ClusterNode import akka.event.EventHandler +import scala.collection.immutable.Stack import scala.reflect.BeanProperty @@ -135,8 +136,8 @@ object Actor extends ListenerManagement { hook } - private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] { - override def initialValue = None + private[actor] val actorRefInCreation = new ThreadLocal[Stack[ActorRef]] { + override def initialValue = Stack[ActorRef]() } /** @@ -396,14 +397,15 @@ object Actor extends ListenerManagement { "] for serialization of actor [" + address + "] since " + reason) - val serializer: Serializer = { - if ((serializerClassName eq null) || - (serializerClassName == "") || - (serializerClassName == Format.defaultSerializerName)) { - Format.Default - } else { - val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match { - case Right(clazz) ⇒ clazz + val serializer: Serializer = serializerClassName match { + case null | "" | Format.`defaultSerializerName` ⇒ Format.Default + case specialSerializer ⇒ + ReflectiveAccess.getClassFor(specialSerializer) match { + case Right(clazz) ⇒ + clazz.newInstance match { + case s: Serializer ⇒ s + case other ⇒ serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") + } case Left(exception) ⇒ val cause = exception match { case i: InvocationTargetException ⇒ i.getTargetException @@ -411,15 +413,11 @@ object Actor extends ListenerManagement { } serializerErrorDueTo(cause.toString) } - val f = clazz.newInstance.asInstanceOf[AnyRef] - if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer] - else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") - } } val isStateful = state match { - case Stateless ⇒ false - case Stateful ⇒ true + case _: Stateless | Stateless ⇒ false + case _: Stateful | Stateful ⇒ true } if (isStateful && isHomeNode) { // stateful actor's home node @@ -505,16 +503,18 @@ trait Actor { */ @transient implicit val someSelf: Some[ActorRef] = { - val optRef = Actor.actorRefInCreation.get - if (optRef.isEmpty) throw new ActorInitializationException( + val refStack = Actor.actorRefInCreation.get + if (refStack.isEmpty) throw new ActorInitializationException( "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + "\n\tEither use:" + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") - Actor.actorRefInCreation.set(None) - optRef.asInstanceOf[Some[ActorRef]] + + val ref = refStack.head + Actor.actorRefInCreation.set(refStack.pop) + Some(ref) } /* @@ -616,21 +616,6 @@ trait Actor { throw new UnhandledMessageException(msg, self) } - /** - * Is the actor able to handle the message passed in as arguments? - */ - def isDefinedAt(message: Any): Boolean = { - val behaviorStack = self.hotswap - message match { //Same logic as apply(msg) but without the unhandled catch-all - case l: AutoReceivedMessage ⇒ true - case msg if behaviorStack.nonEmpty && - behaviorStack.head.isDefinedAt(msg) ⇒ true - case msg if behaviorStack.isEmpty && - processingBehavior.isDefinedAt(msg) ⇒ true - case _ ⇒ false - } - } - /** * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Puts the behavior on top of the hotswap stack. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bc5efa02b0..54336dd2d2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -233,12 +233,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED - /** - * Is the actor able to handle the message passed in as arguments? - */ - @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1") - def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - /** * Only for internal use. UUID is effectively final. */ @@ -685,6 +679,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, def supervisor: Option[ActorRef] = _supervisor // ========= AKKA PROTECTED FUNCTIONS ========= + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = { + val inetaddr = Actor.remote.address + SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort, timeout) + } protected[akka] def supervisor_=(sup: Option[ActorRef]) { _supervisor = sup @@ -801,11 +800,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, } def tooManyRestarts() { - _supervisor.foreach { sup ⇒ - // can supervisor handle the notification? - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) - if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) - } + notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)) stop() } @@ -869,13 +864,20 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, // ========= PRIVATE FUNCTIONS ========= private[this] def newActor: Actor = { - try { - Actor.actorRefInCreation.set(Some(this)) - val a = actorFactory() - if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - a - } finally { - Actor.actorRefInCreation.set(None) + import Actor.{ actorRefInCreation ⇒ refStack } + (try { + refStack.set(refStack.get.push(this)) + actorFactory() + } catch { + case e ⇒ + val stack = refStack.get + //Clean up if failed + if ((stack.nonEmpty) && (stack.head eq this)) refStack.set(stack.pop) + //Then rethrow + throw e + }) match { + case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") + case valid ⇒ valid } } @@ -1031,6 +1033,12 @@ private[akka] case class RemoteActorRef private[akka] ( } // ==== NOT SUPPORTED ==== + + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = { + SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, timeout) + } + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher) { @@ -1251,3 +1259,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ } else false } } + +case class SerializedActorRef(val uuid: Uuid, + val address: String, + val hostname: String, + val port: Int, + val timeout: Long) { + @throws(classOf[java.io.ObjectStreamException]) + def readResolve(): AnyRef = Actor.registry.local.actorFor(uuid) match { + case Some(actor) ⇒ actor + case None ⇒ RemoteActorRef(new InetSocketAddress(hostname, port), address, timeout, None) + } +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 8aa34a3187..947d43b4b7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -12,7 +12,7 @@ import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } import java.util.{ Set ⇒ JSet } import akka.util.ReflectiveAccess._ -import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement } +import akka.util.ListenerManagement import akka.serialization._ /** @@ -36,7 +36,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] - private val guard = new ReadWriteGuard val local = new LocalActorRegistry(actorsByAddress, actorsByUuid, typedActorsByUuid) diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 5943e28117..9da6df072f 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -139,6 +139,7 @@ object DeploymentConfig { case LeastRAM() ⇒ RouterType.LeastRAM case LeastMessages ⇒ RouterType.LeastMessages case LeastMessages() ⇒ RouterType.LeastMessages + case c: CustomRouter ⇒ throw new UnsupportedOperationException("routerTypeFor: " + c) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 84e4b2f79b..e760f97bb7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -217,20 +217,6 @@ object Future { def apply[T](body: ⇒ T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = dispatcher.dispatchFuture(() ⇒ body, timeout) - /** - * Construct a completable channel - */ - def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { - val future = empty[Any](timeout) - def !(msg: Any) = future completeWithResult msg - } - - /** - * Create an empty Future with default timeout - */ - @deprecated("Superceded by Promise.apply", "1.2") - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) - import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -273,9 +259,11 @@ object Future { */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ - val opte = f.exception - if (opte.isDefined) future completeWithException (opte.get) + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { + _.exception match { + case Some(e) ⇒ future completeWithException e + case None ⇒ + } } future } @@ -320,14 +308,6 @@ sealed trait Future[+T] { */ def await(atMost: Duration): Future[T] - /** - * Blocks the current thread until the Future has been completed. Use - * caution with this method as it ignores the timeout and will block - * indefinitely if the Future is never completed. - */ - @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1") - def awaitBlocking: Future[T] - /** * Tests whether this Future has been completed. */ @@ -383,17 +363,35 @@ sealed trait Future[+T] { * When the future is completed with a valid result, apply the provided * PartialFunction to the result. *
- * val result = future receive {
+ * val result = future onResult {
* case Foo => "foo"
* case Bar => "bar"
- * }.await.result
+ * }
*
*/
- final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒
+ final def onResult(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒
val optr = f.result
if (optr.isDefined) {
val r = optr.get
- if (pf.isDefinedAt(r)) pf(r)
+ if (pf isDefinedAt r) pf(r)
+ }
+ }
+
+ /**
+ * When the future is completed with an exception, apply the provided
+ * PartialFunction to the exception.
+ *
+ * val result = future onException {
+ * case Foo => "foo"
+ * case Bar => "bar"
+ * }
+ *
+ */
+ final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒
+ val opte = f.exception
+ if (opte.isDefined) {
+ val e = opte.get
+ if (pf isDefinedAt e) pf(e)
}
}
@@ -439,12 +437,12 @@ sealed trait Future[+T] {
* a valid result then the new Future will contain the same.
* Example:
*
- * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
- * Future(6 / 0) failure { case e: NotFoundException => 0 } // result: exception
- * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
+ * Future(6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
+ * Future(6 / 0) recover { case e: NotFoundException => 0 } // result: exception
+ * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
*
*/
- final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
+ final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
onComplete { ft ⇒
val opte = ft.exception
@@ -606,6 +604,14 @@ object Promise {
def apply[A](): Promise[A] = apply(Actor.TIMEOUT)
+ /**
+ * Construct a completable channel
+ */
+ def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
+ val promise = Promise[Any](timeout)
+ def !(msg: Any) = promise completeWithResult msg
+ }
+
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
override def initialValue = None
}
@@ -708,18 +714,6 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
}
- def awaitBlocking = {
- _lock.lock
- try {
- while (_value.isEmpty) {
- _signal.await
- }
- this
- } finally {
- _lock.unlock
- }
- }
-
def isExpired: Boolean = timeLeft() <= 0
def value: Option[Either[Throwable, T]] = {
@@ -734,11 +728,16 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
def complete(value: Either[Throwable, T]): DefaultPromise[T] = {
_lock.lock
val notifyTheseListeners = try {
- if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired
- _value = Some(value)
- val existingListeners = _listeners
- _listeners = Nil
- existingListeners
+ if (_value.isEmpty) { //Only complete if we aren't expired
+ if (!isExpired) {
+ _value = Some(value)
+ val existingListeners = _listeners
+ _listeners = Nil
+ existingListeners
+ } else {
+ _listeners = Nil
+ Nil
+ }
} else Nil
} finally {
_signal.signalAll
@@ -816,7 +815,6 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise
def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this }
def await(atMost: Duration): Future[T] = this
def await: Future[T] = this
- def awaitBlocking: Future[T] = this
def isExpired: Boolean = true
def timeoutInNanos: Long = 0
}
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index 83fd45a5cb..454760594a 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -90,8 +90,6 @@ trait LoadBalancer extends Router { self: Actor ⇒
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
-
- override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
/**
@@ -106,8 +104,6 @@ abstract class UntypedLoadBalancer extends UntypedRouter {
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
-
- override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
object Routing {
diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
index f1a643fc1a..d06bdcb70e 100644
--- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
@@ -295,7 +295,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
}
def actorClass: Class[_ <: Actor] = unsupported
- def actorClassName = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala
index 3ab9acdb8a..435f20d094 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala
@@ -47,7 +47,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
val deployments2 = ClusterDeployer.fetchDeploymentsFromCluster
deployments2.size must equal(1)
- deployments2.first must equal(deployments1.first)
+ deployments2.head must equal(deployments1.head)
}
}
diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst
index ddeedbe829..7d6cec6013 100644
--- a/akka-docs/java/untyped-actors.rst
+++ b/akka-docs/java/untyped-actors.rst
@@ -170,7 +170,6 @@ The 'Future' interface looks like this:
interface Future