From 49883d8c59f52743fefd8f3ffaa7c791d55d9c5c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 May 2011 20:38:42 +0200 Subject: [PATCH] Adding support for completing senderFutures when actor is stopped, closing ticket #894. Also renaming DurableEventBasedDispatcher to DurableDispatcher --- .../scala/akka/dispatch/ActorModelSpec.scala | 18 +++++++++++++++++- .../CallingThreadDispatcherModelSpec.scala | 4 ++++ .../main/scala/akka/dispatch/Dispatcher.scala | 18 ++++++++++++++++-- .../src/main/scala/akka/dispatch/Future.scala | 6 ++++-- .../scala/akka/dispatch/MessageHandling.scala | 7 +++++++ akka-docs/cluster/durable-mailbox.rst | 10 +++++----- .../akka/actor/mailbox/DurableDispatcher.scala | 12 +++++++----- .../actor/mailbox/DurableMailboxSpec.scala | 2 +- 8 files changed, 61 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 97dc67da22..07414be0bc 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -8,13 +8,13 @@ import org.junit.Test import org.scalatest.Assertions._ import akka.testing._ import akka.dispatch._ -import akka.actor.{ ActorRef, Actor } import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.util.{ Duration, Switch } import org.multiverse.api.latches.StandardLatch +import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor } object ActorModelSpec { @@ -342,6 +342,22 @@ abstract class ActorModelSpec extends JUnitSuite { assertDispatcher(dispatcher)(starts = run, stops = run) } } + + @Test + def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + dispatcher.suspend(a) + val f1: Future[String] = a !!! Reply("foo") + val stopped = a !!! PoisonPill + val shouldBeCompleted = for (i ← 1 to 10) yield a !!! Reply(i) + dispatcher.resume(a) + assert(f1.get === "foo") + stopped.await + for (each ← shouldBeCompleted) + assert(each.exception.get.isInstanceOf[ActorKilledException]) + a.stop() + } } class DispatcherModelTest extends ActorModelSpec { diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index 655083a6d8..e0aa5369e7 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -38,6 +38,10 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec { } } + override def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister { + //Can't handle this... + } + } // vim: set ts=2 sw=2 et: diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index ab2287a589..bf02af5997 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -5,10 +5,9 @@ package akka.dispatch import akka.event.EventHandler -import akka.actor.{ ActorRef } - import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } +import akka.actor.{ ActorKilledException, ActorRef } /** * Default settings are: @@ -159,6 +158,21 @@ class Dispatcher( private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = registerForExecution(mbox) + protected override def cleanUpMailboxFor(actorRef: ActorRef) { + val m = getMailbox(actorRef) + if (!m.isEmpty) { + var invocation = m.dequeue + lazy val exception = new ActorKilledException("Actor has been stopped") + while (invocation ne null) { + val f = invocation.senderFuture + if (f.isDefined) + f.get.completeWithException(exception) + + invocation = m.dequeue + } + } + } + override val toString = getClass.getSimpleName + "[" + name + "]" def suspend(actorRef: ActorRef) { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2e3d7cb826..84e4b2f79b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -228,6 +228,7 @@ object Future { /** * Create an empty Future with default timeout */ + @deprecated("Superceded by Promise.apply", "1.2") def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) import scala.collection.mutable.Builder @@ -540,7 +541,7 @@ sealed trait Future[+T] { } } - final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) + /*final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { def foreach(f: A ⇒ Unit): Unit = self filter p foreach f @@ -549,7 +550,8 @@ sealed trait Future[+T] { def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } - final def filter(p: T ⇒ Boolean): Future[T] = { + final def filter(p: T ⇒ Boolean): Future[T] = { */ + final def filter(p: Any ⇒ Boolean): Future[T] = { val f = new DefaultPromise[T](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index f5cda388c2..22c3b46c52 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -147,6 +147,7 @@ trait MessageDispatcher { private[akka] def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { + cleanUpMailboxFor(actorRef) actorRef.mailbox = null if (uuids.isEmpty && futures.get == 0) { shutdownSchedule match { @@ -161,6 +162,12 @@ trait MessageDispatcher { } } + /** + * Overridable callback to clean up the mailbox for a given actor, + * called when an actor is unregistered. + */ + protected def cleanUpMailboxFor(actorRef: ActorRef) {} + /** * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors */ diff --git a/akka-docs/cluster/durable-mailbox.rst b/akka-docs/cluster/durable-mailbox.rst index 5ebc34df5a..63564825a9 100644 --- a/akka-docs/cluster/durable-mailbox.rst +++ b/akka-docs/cluster/durable-mailbox.rst @@ -52,7 +52,7 @@ The durable dispatchers and their configuration options reside in the You configure durable mailboxes through the "Akka"-only durable dispatchers, the actor is oblivious to which type of mailbox it is using. Here is an example:: - val dispatcher = DurableEventBasedDispatcher( + val dispatcher = DurableDispatcher( "my:service", FileDurableMailboxStorage) // Then set the actors dispatcher to this dispatcher @@ -63,7 +63,7 @@ or for a thread-based durable dispatcher:: self, FileDurableMailboxStorage) -There are 2 different durable dispatchers, ``DurableEventBasedDispatcher`` and +There are 2 different durable dispatchers, ``DurableDispatcher`` and ``DurablePinnedDispatcher``, which are durable versions of ``Dispatcher`` and ``PinnedDispatcher``. @@ -114,7 +114,7 @@ mailboxes. Read more in the Redis documentation on how to do that. Here is an example of how you can configure your dispatcher to use this mailbox:: - val dispatcher = DurableEventBasedDispatcher( + val dispatcher = DurableDispatcher( "my:service", RedisDurableMailboxStorage) @@ -158,7 +158,7 @@ there will not be that much more work to set up this durable mailbox. Here is an example of how you can configure your dispatcher to use this mailbox:: - val dispatcher = DurableEventBasedDispatcher( + val dispatcher = DurableDispatcher( "my:service", ZooKeeperDurableMailboxStorage) @@ -196,7 +196,7 @@ Beanstalk is a simple, fast work queue. This means that you have to start up a Beanstalk server that can host these durable mailboxes. Read more in the Beanstalk documentation on how to do that. :: - val dispatcher = DurableEventBasedDispatcher( + val dispatcher = DurableDispatcher( "my:service", BeanstalkDurableMailboxStorage) diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala index 8a1b2c60f1..142fbea84f 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala @@ -56,7 +56,7 @@ case object ZooKeeperDurableMailboxStorage extends DurableMailboxStorage("akka.a * * @author Jonas Bonér */ -case class DurableEventBasedDispatcher( +case class DurableDispatcher( _name: String, _storage: DurableMailboxStorage, _throughput: Int = Dispatchers.THROUGHPUT, @@ -98,6 +98,8 @@ case class DurableEventBasedDispatcher( throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!") super.dispatch(invocation) } + + protected override def cleanUpMailboxFor(actorRef: ActorRef) {} //No need to clean up Futures since we don't support them } /** @@ -136,14 +138,14 @@ case class DurablePinnedDispatcher( } /** - * Configurator for the DurableEventBasedDispatcher + * Configurator for the DurableDispatcher * Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper" and "file" * * @author Jonas Bonér */ -class DurableEventBasedDispatcherConfigurator extends MessageDispatcherConfigurator { +class DurableDispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig => new DurableEventBasedDispatcher( + configureThreadPool(config, threadPoolConfig => new DurableDispatcher( config.getString("name", newUuid.toString), getStorage(config), config.getInt("throughput", Dispatchers.THROUGHPUT), @@ -161,6 +163,6 @@ class DurableEventBasedDispatcherConfigurator extends MessageDispatcherConfigura case unknown => throw new IllegalArgumentException("[%s] is not a valid storage, valid options are [redis, beanstalk, zookeeper, file]" format unknown) } - storage.getOrElse(throw new DurableMailboxException("No 'storage' defined for DurableEventBasedDispatcherConfigurator")) + storage.getOrElse(throw new DurableMailboxException("No 'storage' defined for DurableDispatcherConfigurator")) } } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index b1b3b57d9a..b7d2ee0ff1 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -32,7 +32,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll { import DurableMailboxSpecActorFactory._ - implicit val dispatcher = DurableEventBasedDispatcher(backendName, storage, 1) + implicit val dispatcher = DurableDispatcher(backendName, storage, 1) "A " + backendName + " based mailbox backed actor" should {