/** * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException} import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import se.scalablesolutions.akka.AkkaException import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue class MessageQueueAppendFailedException(message: String) extends AkkaException(message) /** * @author Jonas Bonér */ trait MessageQueue { val dispatcherLock = new SimpleLock def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int def isEmpty: Boolean } /** * Mailbox configuration. */ sealed trait MailboxType abstract class TransientMailboxType(val blocking: Boolean = false) extends MailboxType case class UnboundedMailbox(block: Boolean = false) extends TransientMailboxType(block) case class BoundedMailbox( block: Boolean = false, val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends TransientMailboxType(block) { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") } abstract class DurableMailboxType(val serializer: EnterpriseModule.Serializer) extends MailboxType { if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null") } case class FileBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class RedisBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class BeanstalkBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class ZooKeeperBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class AMQPBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class JMSBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) class DefaultUnboundedMessageQueue(blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { final def enqueue(handle: MessageInvocation) { this add handle } final def dequeue(): MessageInvocation = { if (blockDequeue) this.take() else this.poll() } } class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequeue: Boolean) extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue { final def enqueue(handle: MessageInvocation) { if (pushTimeOut.toMillis > 0) { if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit)) throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) } else this put handle } final def dequeue(): MessageInvocation = if (blockDequeue) this.take() else this.poll() } /** * @author Jonas Bonér */ trait MailboxFactory { val mailboxType: Option[MailboxType] /** * Creates a MessageQueue (Mailbox) with the specified properties. */ protected def createMailbox(actorRef: ActorRef): AnyRef = mailboxType.getOrElse(throw new IllegalStateException("No mailbox type defined")) match { case mb: TransientMailboxType => createTransientMailbox(actorRef, mb) case mb: DurableMailboxType => createDurableMailbox(actorRef, mb) } /** * Creates and returns a transient mailbox for the given actor. */ protected def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef /** * Creates and returns a durable mailbox for the given actor. */ protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef.uuid, serializer).asInstanceOf[MessageQueue] case RedisBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("RedisBasedDurableMailbox is not yet supported") case BeanstalkBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("BeanstalkBasedDurableMailbox is not yet supported") case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported") case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } }