diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 0e9a1b20cf..d3dd9e9209 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,12 +1,13 @@ package akka.dispatch + import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import java.util.concurrent.{ TimeUnit, BlockingQueue } +import java.util.concurrent.ConcurrentLinkedQueue import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec import akka.actor.ActorRef -import akka.actor.ActorCell -import java.util.concurrent.ConcurrentLinkedQueue +import akka.actor.ActorContext @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -155,7 +156,7 @@ object CustomMailboxSpec { } """ - class MyMailbox(owner: ActorCell) extends Mailbox(owner) + class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 633898c4e9..9940b16aa3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -12,6 +12,7 @@ import annotation.tailrec import akka.event.Logging.Error import com.typesafe.config.Config import java.lang.reflect.InvocationTargetException +import akka.actor.ActorContext class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -35,7 +36,17 @@ object Mailbox { final val debug = false } -abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { +/** + * Custom mailbox implementations are implemented by extending this class. + */ +abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell]) + +/** + * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, + * but can't be exposed to user defined mailbox subclasses. + * + */ +private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { import Mailbox._ @volatile @@ -319,15 +330,15 @@ trait QueueBasedMessageQueue extends MessageQueue { * Mailbox configuration. */ trait MailboxType { - def create(receiver: ActorCell): Mailbox + def create(receiver: ActorContext): Mailbox } /** * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -337,16 +348,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut } } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } } @@ -356,8 +367,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } @@ -365,8 +376,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va class CustomMailboxType(mailboxFQN: String) extends MailboxType { - def create(receiver: ActorCell): Mailbox = { - val constructorSignature = Array[Class[_]](classOf[ActorCell]) + override def create(receiver: ActorContext): Mailbox = { + val constructorSignature = Array[Class[_]](classOf[ActorContext]) ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { case Right(instance) ⇒ instance.asInstanceOf[Mailbox] case Left(exception) ⇒ @@ -379,7 +390,7 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType { } } - private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { + private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorContext].getClassLoader) match { case Right(clazz) ⇒ clazz case Left(exception) ⇒ val cause = exception match { diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index c680511697..57d7b3e098 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef @@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess /** * @author Jonas Bonér */ -class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt @@ -78,7 +78,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) // TODO PN: Why volatile on local variable? @volatile var connected = false - // TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue + // TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue var attempts = 0 var client: Client = null while (!connected) { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 8fa7f81e25..55c96ea65c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -5,12 +5,12 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef -class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMailbox") diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 2c5fd7cb88..aa27a731e5 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -6,13 +6,13 @@ package akka.actor.mailbox import akka.util.ReflectiveAccess import java.lang.reflect.InvocationTargetException import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.actor.ActorRef import akka.actor.SerializedActorRef import akka.dispatch.Envelope import akka.dispatch.DefaultSystemMessageQueue import akka.dispatch.Dispatcher -import akka.dispatch.Mailbox +import akka.dispatch.CustomMailbox import akka.dispatch.MailboxType import akka.dispatch.MessageDispatcher import akka.dispatch.MessageQueue @@ -34,7 +34,7 @@ class DurableMailboxException private[akka] (message: String, cause: Throwable) def this(message: String) = this(message, null) } -abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with DefaultSystemMessageQueue { +abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ def system = owner.system @@ -46,7 +46,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa trait DurableMessageSerialization { - def owner: ActorCell + def owner: ActorContext def serialize(durableMessage: Envelope): Array[Byte] = { diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 6e1c28219d..b404e3c844 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -7,7 +7,7 @@ import akka.AkkaException import com.mongodb.async._ import com.mongodb.async.futures.RequestFutures import org.bson.collection._ -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise } @@ -26,7 +26,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message) * * @author Brendan W. McAdams */ -class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { +class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index f937be09e0..21c5555590 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -6,14 +6,14 @@ package akka.actor.mailbox import com.redis._ import akka.actor.LocalActorRef import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef class RedisBasedMailboxException(message: String) extends AkkaException(message) -class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = RedisBasedMailboxExtension(owner.system) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 3a50b93e93..1cdedba25d 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -8,7 +8,7 @@ import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import org.I0Itec.zkclient.serialize._ -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.cluster.zookeeper.AkkaZkClient import akka.dispatch.Envelope import akka.event.Logging @@ -17,7 +17,7 @@ import akka.actor.ActorRef class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) -class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues"