Adding support for completing senderFutures when actor is stopped, closing ticket #894. Also renaming DurableEventBasedDispatcher to DurableDispatcher

This commit is contained in:
Viktor Klang 2011-05-26 20:38:42 +02:00
parent e94b722a4b
commit 49883d8c59
8 changed files with 61 additions and 16 deletions

View file

@ -8,13 +8,13 @@ import org.junit.Test
import org.scalatest.Assertions._ import org.scalatest.Assertions._
import akka.testing._ import akka.testing._
import akka.dispatch._ import akka.dispatch._
import akka.actor.{ ActorRef, Actor }
import akka.actor.Actor._ import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{ Duration, Switch } import akka.util.{ Duration, Switch }
import org.multiverse.api.latches.StandardLatch import org.multiverse.api.latches.StandardLatch
import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor }
object ActorModelSpec { object ActorModelSpec {
@ -342,6 +342,22 @@ abstract class ActorModelSpec extends JUnitSuite {
assertDispatcher(dispatcher)(starts = run, stops = run) assertDispatcher(dispatcher)(starts = run, stops = run)
} }
} }
@Test
def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
dispatcher.suspend(a)
val f1: Future[String] = a !!! Reply("foo")
val stopped = a !!! PoisonPill
val shouldBeCompleted = for (i 1 to 10) yield a !!! Reply(i)
dispatcher.resume(a)
assert(f1.get === "foo")
stopped.await
for (each shouldBeCompleted)
assert(each.exception.get.isInstanceOf[ActorKilledException])
a.stop()
}
} }
class DispatcherModelTest extends ActorModelSpec { class DispatcherModelTest extends ActorModelSpec {

View file

@ -38,6 +38,10 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec {
} }
} }
override def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister {
//Can't handle this...
}
} }
// vim: set ts=2 sw=2 et: // vim: set ts=2 sw=2 et:

View file

@ -5,10 +5,9 @@
package akka.dispatch package akka.dispatch
import akka.event.EventHandler import akka.event.EventHandler
import akka.actor.{ ActorRef }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue }
import akka.actor.{ ActorKilledException, ActorRef }
/** /**
* Default settings are: * Default settings are:
@ -159,6 +158,21 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox) registerForExecution(mbox)
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
while (invocation ne null) {
val f = invocation.senderFuture
if (f.isDefined)
f.get.completeWithException(exception)
invocation = m.dequeue
}
}
}
override val toString = getClass.getSimpleName + "[" + name + "]" override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actorRef: ActorRef) { def suspend(actorRef: ActorRef) {

View file

@ -228,6 +228,7 @@ object Future {
/** /**
* Create an empty Future with default timeout * Create an empty Future with default timeout
*/ */
@deprecated("Superceded by Promise.apply", "1.2")
def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout)
import scala.collection.mutable.Builder import scala.collection.mutable.Builder
@ -540,7 +541,7 @@ sealed trait Future[+T] {
} }
} }
final def withFilter(p: T Boolean) = new FutureWithFilter[T](this, p) /*final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p)
final class FutureWithFilter[+A](self: Future[A], p: A Boolean) { final class FutureWithFilter[+A](self: Future[A], p: A Boolean) {
def foreach(f: A Unit): Unit = self filter p foreach f def foreach(f: A Unit): Unit = self filter p foreach f
@ -549,7 +550,8 @@ sealed trait Future[+T] {
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x)) def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
} }
final def filter(p: T Boolean): Future[T] = { final def filter(p: T Boolean): Future[T] = { */
final def filter(p: Any Boolean): Future[T] = {
val f = new DefaultPromise[T](timeoutInNanos, NANOS) val f = new DefaultPromise[T](timeoutInNanos, NANOS)
onComplete { ft onComplete { ft
val optv = ft.value val optv = ft.value

View file

@ -147,6 +147,7 @@ trait MessageDispatcher {
private[akka] def unregister(actorRef: ActorRef) = { private[akka] def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) { if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef)
actorRef.mailbox = null actorRef.mailbox = null
if (uuids.isEmpty && futures.get == 0) { if (uuids.isEmpty && futures.get == 0) {
shutdownSchedule match { shutdownSchedule match {
@ -161,6 +162,12 @@ trait MessageDispatcher {
} }
} }
/**
* Overridable callback to clean up the mailbox for a given actor,
* called when an actor is unregistered.
*/
protected def cleanUpMailboxFor(actorRef: ActorRef) {}
/** /**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
*/ */

View file

@ -52,7 +52,7 @@ The durable dispatchers and their configuration options reside in the
You configure durable mailboxes through the "Akka"-only durable dispatchers, the You configure durable mailboxes through the "Akka"-only durable dispatchers, the
actor is oblivious to which type of mailbox it is using. Here is an example:: actor is oblivious to which type of mailbox it is using. Here is an example::
val dispatcher = DurableEventBasedDispatcher( val dispatcher = DurableDispatcher(
"my:service", "my:service",
FileDurableMailboxStorage) FileDurableMailboxStorage)
// Then set the actors dispatcher to this dispatcher // Then set the actors dispatcher to this dispatcher
@ -63,7 +63,7 @@ or for a thread-based durable dispatcher::
self, self,
FileDurableMailboxStorage) FileDurableMailboxStorage)
There are 2 different durable dispatchers, ``DurableEventBasedDispatcher`` and There are 2 different durable dispatchers, ``DurableDispatcher`` and
``DurablePinnedDispatcher``, which are durable versions of ``DurablePinnedDispatcher``, which are durable versions of
``Dispatcher`` and ``PinnedDispatcher``. ``Dispatcher`` and ``PinnedDispatcher``.
@ -114,7 +114,7 @@ mailboxes. Read more in the Redis documentation on how to do that.
Here is an example of how you can configure your dispatcher to use this mailbox:: Here is an example of how you can configure your dispatcher to use this mailbox::
val dispatcher = DurableEventBasedDispatcher( val dispatcher = DurableDispatcher(
"my:service", "my:service",
RedisDurableMailboxStorage) RedisDurableMailboxStorage)
@ -158,7 +158,7 @@ there will not be that much more work to set up this durable mailbox.
Here is an example of how you can configure your dispatcher to use this mailbox:: Here is an example of how you can configure your dispatcher to use this mailbox::
val dispatcher = DurableEventBasedDispatcher( val dispatcher = DurableDispatcher(
"my:service", "my:service",
ZooKeeperDurableMailboxStorage) ZooKeeperDurableMailboxStorage)
@ -196,7 +196,7 @@ Beanstalk is a simple, fast work queue. This means that you have to start up a
Beanstalk server that can host these durable mailboxes. Read more in the Beanstalk server that can host these durable mailboxes. Read more in the
Beanstalk documentation on how to do that. :: Beanstalk documentation on how to do that. ::
val dispatcher = DurableEventBasedDispatcher( val dispatcher = DurableDispatcher(
"my:service", "my:service",
BeanstalkDurableMailboxStorage) BeanstalkDurableMailboxStorage)

View file

@ -56,7 +56,7 @@ case object ZooKeeperDurableMailboxStorage extends DurableMailboxStorage("akka.a
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
case class DurableEventBasedDispatcher( case class DurableDispatcher(
_name: String, _name: String,
_storage: DurableMailboxStorage, _storage: DurableMailboxStorage,
_throughput: Int = Dispatchers.THROUGHPUT, _throughput: Int = Dispatchers.THROUGHPUT,
@ -98,6 +98,8 @@ case class DurableEventBasedDispatcher(
throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!") throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!")
super.dispatch(invocation) super.dispatch(invocation)
} }
protected override def cleanUpMailboxFor(actorRef: ActorRef) {} //No need to clean up Futures since we don't support them
} }
/** /**
@ -136,14 +138,14 @@ case class DurablePinnedDispatcher(
} }
/** /**
* Configurator for the DurableEventBasedDispatcher * Configurator for the DurableDispatcher
* Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper" and "file" * Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper" and "file"
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class DurableEventBasedDispatcherConfigurator extends MessageDispatcherConfigurator { class DurableDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: Configuration): MessageDispatcher = { def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig => new DurableEventBasedDispatcher( configureThreadPool(config, threadPoolConfig => new DurableDispatcher(
config.getString("name", newUuid.toString), config.getString("name", newUuid.toString),
getStorage(config), getStorage(config),
config.getInt("throughput", Dispatchers.THROUGHPUT), config.getInt("throughput", Dispatchers.THROUGHPUT),
@ -161,6 +163,6 @@ class DurableEventBasedDispatcherConfigurator extends MessageDispatcherConfigura
case unknown => throw new IllegalArgumentException("[%s] is not a valid storage, valid options are [redis, beanstalk, zookeeper, file]" format unknown) case unknown => throw new IllegalArgumentException("[%s] is not a valid storage, valid options are [redis, beanstalk, zookeeper, file]" format unknown)
} }
storage.getOrElse(throw new DurableMailboxException("No 'storage' defined for DurableEventBasedDispatcherConfigurator")) storage.getOrElse(throw new DurableMailboxException("No 'storage' defined for DurableDispatcherConfigurator"))
} }
} }

View file

@ -32,7 +32,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM
WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll { WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll {
import DurableMailboxSpecActorFactory._ import DurableMailboxSpecActorFactory._
implicit val dispatcher = DurableEventBasedDispatcher(backendName, storage, 1) implicit val dispatcher = DurableDispatcher(backendName, storage, 1)
"A " + backendName + " based mailbox backed actor" should { "A " + backendName + " based mailbox backed actor" should {