diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala
index e8c38f2b76..f340533186 100644
--- a/akka-actor/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala
@@ -109,18 +109,15 @@ object ActorRegistry extends ListenerManagement {
* Finds all actors that has a specific id.
*/
def actorsFor(id: String): Array[ActorRef] = {
- if (actorsById.containsKey(id)) {
- actorsById.get(id).toArray(Naught)
- } else Naught
+ val set = actorsById get id
+ if (set ne null) set toArray Naught
+ else Naught
}
/**
* Finds the actor that has a specific UUID.
*/
- def actorFor(uuid: String): Option[ActorRef] = {
- if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
- else None
- }
+ def actorFor(uuid: String): Option[ActorRef] = Option(actorsByUUID get uuid)
/**
* Registers an actor in the ActorRegistry.
diff --git a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
index 6bacec73be..24c566b48c 100644
--- a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
@@ -12,22 +12,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
protected val queue = new ReactiveMessageQueue(name)
- protected val messageInvokers = new HashMap[ActorRef, ActorRef]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
- override def register(actorRef: ActorRef) = synchronized {
- messageInvokers.put(actorRef, actorRef)
- super.register(actorRef)
- }
-
- override def unregister(actorRef: ActorRef) = synchronized {
- messageInvokers.remove(actorRef)
- super.unregister(actorRef)
- }
-
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false
diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala
index f9ffc219cf..803fd700cc 100644
--- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala
@@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.Config.config
import net.lag.configgy.ConfigMap
import se.scalablesolutions.akka.util.UUID
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
+import java.util.concurrent.TimeUnit
/**
* Scala API. Dispatcher factory.
@@ -44,8 +45,8 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D
* @author Jonas Bonér
*/
object Dispatchers extends Logging {
- val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
- val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
+ val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
+ val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
@@ -75,6 +76,7 @@ object Dispatchers extends Logging {
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
+ * Uses the default timeout
*
* E.g. each actor consumes its own thread.
*/
@@ -82,11 +84,19 @@ object Dispatchers extends Logging {
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
+ * Uses the default timeout
*
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity)
+ /**
+ * Creates an thread based dispatcher serving a single actor through the same single thread.
+ *
+ * E.g. each actor consumes its own thread.
+ */
+ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeout: Long, pushTimeUnit: TimeUnit) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeout, pushTimeUnit)
+
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
*
diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 1f8a6bfe9c..5f8469eb84 100644
--- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -89,12 +89,9 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
- override def register(actorRef: ActorRef) = {
- if (actorRef.mailbox eq null ) {
- if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedQueue[MessageInvocation]
- else actorRef.mailbox = new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
- }
- super.register(actorRef)
+ override def createMailbox(actorRef: ActorRef): AnyRef = {
+ if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation]
+ else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
}
def dispatch(receiver: ActorRef): Unit = if (active) {
diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index 4e5d626aed..f9409e91fb 100644
--- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -182,13 +182,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
+ protected override def createMailbox(actorRef: ActorRef): AnyRef = {
+ if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]
+ else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
+ }
+
override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef)
- // The actor will need a ConcurrentLinkedDeque based mailbox
- if (actorRef.mailbox == null) {
- if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]
- else actorRef.mailbox = new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
- }
pooledActors.add(actorRef)
super.register(actorRef)
}
diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala
index e0ddf05d26..cf3f71295c 100644
--- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala
@@ -185,16 +185,10 @@ class HawtDispatcher(val aggregate:Boolean=true, val parent:DispatchQueue=global
// TODO: figure out if this can be optional in akka
override def mailboxSize(actorRef: ActorRef) = 0
- override def register(actorRef: ActorRef) = {
- if( actorRef.mailbox == null ) {
- val queue = parent.createSerialQueue(actorRef.toString)
- if( aggregate ) {
- actorRef.mailbox = new AggregatingHawtDispatcherMailbox(queue)
- } else {
- actorRef.mailbox = new HawtDispatcherMailbox(queue)
- }
- }
- super.register(actorRef)
+ override def createMailbox(actorRef: ActorRef): AnyRef = {
+ val queue = parent.createSerialQueue(actorRef.toString)
+ if (aggregate) new AggregatingHawtDispatcherMailbox(queue)
+ else new HawtDispatcherMailbox(queue)
}
override def toString = "HawtDispatchEventDrivenDispatcher"
diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
index 20c58c9975..49f4cc3839 100644
--- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
@@ -10,6 +10,7 @@ import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
import org.multiverse.commitbarriers.CountDownCommitBarrier
+import se.scalablesolutions.akka.AkkaException
import java.util.concurrent.{ConcurrentSkipListSet}
/**
@@ -56,6 +57,8 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
+class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
+
/**
* @author Jonas Bonér
*/
@@ -68,21 +71,41 @@ trait MessageQueue {
*/
trait MessageDispatcher extends Logging {
protected val uuids = new ConcurrentSkipListSet[String]
+
def dispatch(invocation: MessageInvocation)
+
def start
+
def shutdown
- def register(actorRef: ActorRef) = uuids add actorRef.uuid
+
+ def register(actorRef: ActorRef) {
+ if(actorRef.mailbox eq null)
+ actorRef.mailbox = createMailbox(actorRef)
+ uuids add actorRef.uuid
+ }
def unregister(actorRef: ActorRef) = {
uuids remove actorRef.uuid
+ //actorRef.mailbox = null //FIXME should we null out the mailbox here?
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
}
+
def canBeShutDown: Boolean = uuids.isEmpty
+
def isShutdown: Boolean
+
+ /**
+ * Returns the size of the mailbox for the specified actor
+ */
def mailboxSize(actorRef: ActorRef):Int = 0
+
+ /**
+ * Creates and returns a mailbox for the given actor
+ */
+ protected def createMailbox(actorRef: ActorRef): AnyRef = null
}
/**
- * @author Jonas Bonér
+ * @author Jonas Bonér
*/
trait MessageDemultiplexer {
def select
diff --git a/akka-actor/src/main/scala/dispatch/Queues.scala b/akka-actor/src/main/scala/dispatch/Queues.scala
new file mode 100644
index 0000000000..5b5aa6683e
--- /dev/null
+++ b/akka-actor/src/main/scala/dispatch/Queues.scala
@@ -0,0 +1,148 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.dispatch
+
+import concurrent.forkjoin.LinkedTransferQueue
+import java.util.concurrent.{TimeUnit, Semaphore}
+import java.util.Iterator
+import se.scalablesolutions.akka.util.Logger
+
+class BoundedTransferQueue[E <: AnyRef](
+ val capacity: Int,
+ val pushTimeout: Long,
+ val pushTimeUnit: TimeUnit)
+ extends LinkedTransferQueue[E] {
+ require(capacity > 0)
+ require(pushTimeout > 0)
+ require(pushTimeUnit ne null)
+
+ protected val guard = new Semaphore(capacity)
+
+ override def take(): E = {
+ val e = super.take
+ if (e ne null) guard.release
+ e
+ }
+
+ override def poll(): E = {
+ val e = super.poll
+ if (e ne null) guard.release
+ e
+ }
+
+ override def poll(timeout: Long, unit: TimeUnit): E = {
+ val e = super.poll(timeout,unit)
+ if (e ne null) guard.release
+ e
+ }
+
+ override def remainingCapacity = guard.availablePermits
+
+ override def remove(o: AnyRef): Boolean = {
+ if (super.remove(o)) {
+ guard.release
+ true
+ } else {
+ false
+ }
+ }
+
+ override def offer(e: E): Boolean = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ val result = try {
+ super.offer(e)
+ } catch {
+ case e => guard.release; throw e
+ }
+ if (!result) guard.release
+ result
+ } else
+ false
+ }
+
+ override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ val result = try {
+ super.offer(e,timeout,unit)
+ } catch {
+ case e => guard.release; throw e
+ }
+ if (!result) guard.release
+ result
+ } else
+ false
+ }
+
+ override def add(e: E): Boolean = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ val result = try {
+ super.add(e)
+ } catch {
+ case e => guard.release; throw e
+ }
+ if (!result) guard.release
+ result
+ } else
+ false
+ }
+
+ override def put(e :E): Unit = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ try {
+ super.put(e)
+ } catch {
+ case e => guard.release; throw e
+ }
+ }
+ }
+
+ override def tryTransfer(e: E): Boolean = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ val result = try {
+ super.tryTransfer(e)
+ } catch {
+ case e => guard.release; throw e
+ }
+ if (!result) guard.release
+ result
+ } else
+ false
+ }
+
+ override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ val result = try {
+ super.tryTransfer(e,timeout,unit)
+ } catch {
+ case e => guard.release; throw e
+ }
+ if (!result) guard.release
+ result
+ } else
+ false
+ }
+
+ override def transfer(e: E): Unit = {
+ if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
+ try {
+ super.transfer(e)
+ } catch {
+ case e => guard.release; throw e
+ }
+ }
+ }
+
+ override def iterator: Iterator[E] = {
+ val it = super.iterator
+ new Iterator[E] {
+ def hasNext = it.hasNext
+ def next = it.next
+ def remove {
+ it.remove
+ guard.release //Assume remove worked if no exception was thrown
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
index d0850aa830..f76465f7c7 100644
--- a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
@@ -29,8 +29,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next
- val invoker = messageInvokers.get(invocation.receiver)
- if (invoker ne null) invoker.invoke(invocation)
+ val invoker = invocation.receiver
+ if (invoker ne null) invoker invoke invocation
iter.remove
}
}
diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
index 530184d4b2..0bb8f3de45 100644
--- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
@@ -103,14 +103,14 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
var nrOfBusyMessages = 0
- val totalNrOfActors = messageInvokers.size
+ val totalNrOfActors = uuids.size
val totalNrOfBusyActors = busyActors.size
val invocations = selectedInvocations.iterator
while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
val invocation = invocations.next
if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
if (!busyActors.contains(invocation.receiver)) {
- val invoker = messageInvokers.get(invocation.receiver)
+ val invoker = invocation.receiver
if (invoker eq null) throw new IllegalActorStateException(
"Message invoker for invocation [" + invocation + "] is null")
resume(invocation.receiver)
diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index d33d0fb337..8fe07e17ac 100644
--- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -4,27 +4,47 @@
package se.scalablesolutions.akka.dispatch
-import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
+import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author Jonas Bonér
*/
-class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher {
+class ThreadBasedDispatcher(private val actor: ActorRef,
+ val mailboxCapacity: Int = Dispatchers.MAILBOX_CAPACITY,
+ val pushTimeout: Long = 10000,
+ val pushTimeoutUnit: TimeUnit = TimeUnit.MILLISECONDS
+ ) extends MessageDispatcher {
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
- private val queue = new BlockingMessageQueue(name, mailboxCapacity)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
- def dispatch(invocation: MessageInvocation) = queue.append(invocation)
+ override def createMailbox(actorRef: ActorRef): AnyRef = {
+ if (mailboxCapacity > 0)
+ new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue
+ else
+ new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue
+ }
+
+ override def register(actorRef: ActorRef) = {
+ if(actorRef != actor)
+ throw new IllegalArgumentException("Cannot register to anyone but " + actor)
+
+ super.register(actorRef)
+ }
+
+ def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue]
+
+ def dispatch(invocation: MessageInvocation) = mailbox append invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
@@ -33,7 +53,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
override def run = {
while (active) {
try {
- actor.invoke(queue.take)
+ actor.invoke(mailbox.next)
} catch { case e: InterruptedException => active = false }
}
}
@@ -53,12 +73,14 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
-// FIXME: configure the LinkedBlockingQueue in BlockingMessageQueue, use a Builder like in the ReactorBasedThreadPoolEventDrivenDispatcher
-class BlockingMessageQueue(name: String, mailboxCapacity: Int) extends MessageQueue {
- private val queue = if (mailboxCapacity > 0) new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
- else new LinkedBlockingQueue[MessageInvocation]
- def append(invocation: MessageInvocation) = queue.put(invocation)
- def take: MessageInvocation = queue.take
- def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
- def interrupt = throw new UnsupportedOperationException
+trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] =>
+
+ final def append(invocation: MessageInvocation): Unit = {
+ if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
+ if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
+ throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
+ }
+ }
+
+ final def next: MessageInvocation = self.take
}
diff --git a/akka-actor/src/main/scala/util/Logging.scala b/akka-actor/src/main/scala/util/Logging.scala
index f13c229653..b6ddaaa16a 100644
--- a/akka-actor/src/main/scala/util/Logging.scala
+++ b/akka-actor/src/main/scala/util/Logging.scala
@@ -99,18 +99,26 @@ class Logger(val logger: SLFLogger) {
warning(t,message(fmt,arg,argN:_*))
}
+ def warn(t: Throwable, fmt: => String, arg: Any, argN: Any*) = warning(t, fmt, arg, argN)
+
def warning(t: Throwable, msg: => String) {
if (warning_?) logger.warn(msg,t)
}
+ def warn(t: Throwable, msg: => String) = warning(t, msg)
+
def warning(fmt: => String, arg: Any, argN: Any*) {
warning(message(fmt,arg,argN:_*))
}
+ def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN)
+
def warning(msg: => String) {
if (warning_?) logger warn msg
}
+ def warn(msg: => String) = warning(msg)
+
//Error
def error(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
error(t,message(fmt,arg,argN:_*))
diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala
deleted file mode 100644
index 7f16f7aa57..0000000000
--- a/akka-core/src/main/scala/dispatch/Queues.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.dispatch
-
-import concurrent.forkjoin.LinkedTransferQueue
-import java.util.concurrent.{TimeUnit, Semaphore}
-import java.util.Iterator
-import se.scalablesolutions.akka.util.Logger
-
-class BoundedTransferQueue[E <: AnyRef](
- val capacity: Int,
- val pushTimeout: Long,
- val pushTimeUnit: TimeUnit)
- extends LinkedTransferQueue[E] {
- require(capacity > 0)
- require(pushTimeout > 0)
- require(pushTimeUnit ne null)
-
- protected val guard = new Semaphore(capacity)
-
- //Enqueue an item within the push timeout (acquire Semaphore)
- protected def enq(f: => Boolean): Boolean = {
- if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
- val result = try {
- f
- } catch {
- case e =>
- guard.release //If something broke, release
- throw e
- }
- if (!result) guard.release //Didn't add anything
- result
- } else
- false
- }
-
- //Dequeue an item (release Semaphore)
- protected def deq(e: E): E = {
- if (e ne null) guard.release //Signal removal of item
- e
- }
-
- override def take(): E = deq(super.take)
- override def poll(): E = deq(super.poll)
- override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit))
-
- override def remainingCapacity = guard.availablePermits
-
- override def remove(o: AnyRef): Boolean = {
- if (super.remove(o)) {
- guard.release
- true
- } else {
- false
- }
- }
-
- override def offer(e: E): Boolean =
- enq(super.offer(e))
-
- override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean =
- enq(super.offer(e,timeout,unit))
-
- override def add(e: E): Boolean =
- enq(super.add(e))
-
- override def put(e :E): Unit =
- enq({ super.put(e); true })
-
- override def tryTransfer(e: E): Boolean =
- enq(super.tryTransfer(e))
-
- override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean =
- enq(super.tryTransfer(e,timeout,unit))
-
- override def transfer(e: E): Unit =
- enq({ super.transfer(e); true })
-
- override def iterator: Iterator[E] = {
- val it = super.iterator
- new Iterator[E] {
- def hasNext = it.hasNext
- def next = it.next
- def remove {
- it.remove
- guard.release //Assume remove worked if no exception was thrown
- }
- }
- }
-}
\ No newline at end of file
diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala
index 0313dfe6f7..4a3d61cc10 100644
--- a/akka-http/src/main/scala/AkkaCometServlet.scala
+++ b/akka-http/src/main/scala/AkkaCometServlet.scala
@@ -50,11 +50,16 @@ class AkkaServlet extends AtmosphereServlet with Logging {
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
- c.getInt("akka.rest.maxInactiveActivity").foreach { value =>
+
+ c.getInt("akka.rest.maxInactiveActivity") foreach { value =>
log.info("MAX_INACTIVE:%s",value.toString)
addInitParameter(CometSupport.MAX_INACTIVE,value.toString)
}
+ c.getString("akka.rest.cometSupport") foreach { value =>
+ addInitParameter("cometSupport",value)
+ }
+
val servlet = new AtmosphereRestServlet {
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
@@ -86,14 +91,7 @@ class AkkaServlet extends AtmosphereServlet with Logging {
import scala.collection.JavaConversions._
new DefaultCometSupportResolver(config) {
- type CS = CometSupport[_ <: AtmosphereResource[_,_]]
- override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
- available.filter(_ != classOf[GrizzlyCometSupport]).toList match {
- case Nil => new GrizzlyCometSupport(config)
- case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]])
- case _ => super.resolveMultipleNativeSupportConflict(available)
- }
- }
+ type CS = CometSupport[_ <: AtmosphereResource[_,_]]
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
val predef = config.getInitParameter("cometSupport")
diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/EmbeddedAppServer.scala
index 94cc2a01d1..b77a215157 100644
--- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala
+++ b/akka-kernel/src/main/scala/EmbeddedAppServer.scala
@@ -4,16 +4,16 @@
package se.scalablesolutions.akka.kernel
-import com.sun.grizzly.http.SelectorThread
-import com.sun.grizzly.http.servlet.{ ServletAdapter }
-import com.sun.grizzly.standalone.StaticStreamAlgorithm
-
import javax.ws.rs.core.UriBuilder
import javax.servlet.ServletConfig
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.comet.{ AkkaServlet }
+import org.eclipse.jetty.xml.XmlConfiguration
+import java.io.File
+import org.eclipse.jetty.server.{Handler, Server}
+import org.eclipse.jetty.server.handler.{HandlerList, HandlerCollection, ContextHandler}
/**
* Handles the Akka Comet Support (load/unload)
@@ -24,67 +24,46 @@ trait EmbeddedAppServer extends Bootable with Logging {
import se.scalablesolutions.akka.config.Config._
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
- val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = config.getInt("akka.rest.port", 9998)
- protected var jerseySelectorThread: Option[SelectorThread] = None
+ protected var server: Option[Server] = None
abstract override def onLoad = {
super.onLoad
if (config.getBool("akka.rest.service", true)) {
+ log.info("Attempting to start Akka REST service (Jersey)")
+
+ System.setProperty("jetty.port",REST_PORT.toString)
+ System.setProperty("jetty.host",REST_HOSTNAME)
+ System.setProperty("jetty.home",HOME.get + "/deploy/root")
- val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
-
- val scheme = uri.getScheme
- if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
- "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
-
- log.info("Attempting to start REST service on uri [%s]",uri)
-
- val adapter = new ServletAdapter
- adapter.setHandleStaticResources(true)
- adapter.setServletInstance(new AkkaServlet {
- override def init(sc : ServletConfig): Unit = {
- val cl = Thread.currentThread.getContextClassLoader
- try {
- Thread.currentThread.setContextClassLoader(applicationLoader.get)
- super.init(sc)
+ val configuration = new XmlConfiguration(new File(HOME.get + "/config/microkernel-server.xml").toURI.toURL)
+
+ server = Option(configuration.configure.asInstanceOf[Server]) map { s => //Set the correct classloader to our contexts
+ applicationLoader foreach { loader =>
+ //We need to provide the correct classloader to the servlets
+ def setClassLoader(handlers: Seq[Handler]): Unit = {
+ handlers foreach {
+ case c: ContextHandler => c.setClassLoader(loader)
+ case c: HandlerCollection => setClassLoader(c.getHandlers)
+ case _ =>
+ }
}
- finally {
- Thread.currentThread.setContextClassLoader(cl)
- }
- }
- })
-
- adapter.setContextPath(uri.getPath)
- adapter.addInitParameter("cometSupport",
- "org.atmosphere.container.GrizzlyCometSupport")
- adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig")
-
- if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
- log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath)
-
- val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
- ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
- jerseySelectorThread = Some(new SelectorThread).map { t =>
- t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
- t.setPort(REST_PORT)
- t.setAdapter(adapter)
- t.setEnableAsyncExecution(true)
- t.setAsyncHandler(ah)
- t.listen
- t
+ setClassLoader(s.getHandlers)
+ }
+ //Start the server
+ s.start()
+ s
}
- log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
+ log.info("Akka REST service started (Jersey)")
}
}
abstract override def onUnload = {
super.onUnload
- jerseySelectorThread foreach { (t) => {
+ server foreach { t => {
log.info("Shutting down REST service (Jersey)")
- t.stopEndpoint
+ t.stop()
}
}
}
diff --git a/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala b/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala
index ac8943f3ee..fd8ea053c3 100644
--- a/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala
+++ b/akka-samples/akka-sample-lift/src/test/scala/RunWebApp.scala
@@ -1,6 +1,5 @@
-import _root_.org.mortbay.jetty.Connector
-import _root_.org.mortbay.jetty.Server
-import _root_.org.mortbay.jetty.webapp.WebAppContext
+import org.eclipse.jetty.webapp.WebAppContext
+import org.eclipse.jetty.server.Server
object RunWebApp extends Application {
val server = new Server(8080)
@@ -9,7 +8,7 @@ object RunWebApp extends Application {
context.setContextPath("/")
context.setWar("src/main/webapp")
- server.addHandler(context)
+ server.setHandler(context)
try {
println(">>> STARTING EMBEDDED JETTY SERVER, PRESS ANY KEY TO STOP")
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 891ff0f2a2..394cb8490e 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -80,6 +80,7 @@ akka {
service = on
hostname = "localhost"
port = 9998
+ #cometSupport = "org.atmosphere.container.Jetty7CometSupport" # Disregard autodetection, for valid values: http://doc.akkasource.org/comet
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala",
"sample.rest.java",
diff --git a/config/microkernel-server.xml b/config/microkernel-server.xml
new file mode 100644
index 0000000000..ecb4bee120
--- /dev/null
+++ b/config/microkernel-server.xml
@@ -0,0 +1,100 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 10
+ 200
+
+
+
+
+
+
+
+
+
+
+
+
+ 300000
+ 2
+ false
+ 8443
+ 20000
+ 5000
+
+
+
+
+
+
+
+
+
+
+
+
+
+ -
+
+ /
+
+ se.scalablesolutions.akka.comet.AkkaServlet
+ /*
+
+
+
+ -
+
+
+
+
+
+
+
+
+
+
+ true
+ true
+ true
+ 1000
+
+
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 0ec12a3472..e4806fbf7e 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -60,7 +60,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
import Repositories._
lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
- lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
+ lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository)
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
// lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo)
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
@@ -90,12 +90,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val JACKSON_VERSION = "1.2.1"
lazy val JERSEY_VERSION = "1.2"
lazy val LIFT_VERSION = "2.1-M1"
- lazy val MULTIVERSE_VERSION = "0.6"
+ lazy val MULTIVERSE_VERSION = "0.6.1"
lazy val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT"
lazy val LOGBACK_VERSION = "0.9.24"
lazy val SLF4J_VERSION = "1.6.0"
lazy val SPRING_VERSION = "3.0.3.RELEASE"
lazy val ASPECTWERKZ_VERSION = "2.2.1"
+ lazy val JETTY_VERSION = "7.1.6.v20100715"
// -------------------------------------------------------------------------------------------------------------------
// Dependencies
@@ -135,7 +136,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile"
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile"
- lazy val grizzly = "com.sun.grizzly" % "grizzly-comet-webserver" % "1.9.18-i" % "compile"
+ lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile"
+ lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile"
+ lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile"
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
@@ -207,7 +210,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test"
lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "test"
lazy val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test"
- lazy val jettyServer = "org.mortbay.jetty" % "jetty" % "6.1.22" % "test"
+ lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test"
+ lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test"
+
lazy val junit = "junit" % "junit" % "4.5" % "test"
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test"
lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
@@ -417,7 +422,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val atmo_runtime = Dependencies.atmo_runtime
val atmo_tomcat = Dependencies.atmo_tomcat
val atmo_weblogic = Dependencies.atmo_weblogic
- val grizzly = Dependencies.grizzly
+ val jetty = Dependencies.jetty
+ val jetty_util = Dependencies.jetty_util
+ val jetty_xml = Dependencies.jetty_xml
val jackson_core_asl = Dependencies.jackson_core_asl
val jersey = Dependencies.jersey
val jersey_contrib = Dependencies.jersey_contrib
@@ -656,8 +663,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val servlet = Dependencies.servlet
// testing
- val jettyServer = Dependencies.jettyServer
- val junit = Dependencies.junit
+ val testJetty = Dependencies.testJetty
+ val testJettyWebApp = Dependencies.testJettyWebApp
+ val junit = Dependencies.junit
def deployPath = AkkaParentProject.this.deployPath
override def jarPath = warPath
@@ -758,10 +766,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject {
+ lazy val sourceArtifact = Artifact(this.artifactID, "sources", "jar", Some("sources"), Nil, None)
+ lazy val docsArtifact = Artifact(this.artifactID, "docs", "jar", Some("docs"), Nil, None)
override def runClasspath = super.runClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
override def testClasspath = super.testClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
override def packageDocsJar = this.defaultJarPath("-docs.jar")
override def packageSrcJar = this.defaultJarPath("-sources.jar")
+ override def packageToPublishActions = super.packageToPublishActions ++ Seq(this.packageDocs, this.packageSrc)
}
}