diff --git a/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 916fe79228..0f77691d2d 100644 --- a/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actors/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -56,9 +56,10 @@ package se.scalablesolutions.akka.dispatch class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { @volatile private var active: Boolean = false - val name = "event-driven:executor:dispatcher:" + _name + val name: String = "event-driven:executor:dispatcher:" + _name withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { diff --git a/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala index 4a3659d981..6e975e885d 100644 --- a/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actors/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -12,18 +12,19 @@ import java.util.Collection trait ThreadPoolBuilder { val name: String - + private val NR_START_THREADS = 4 private val NR_MAX_THREADS = 128 private val KEEP_ALIVE_TIME = 60000L // default is one minute private val MILLISECONDS = TimeUnit.MILLISECONDS private var threadPoolBuilder: ThreadPoolExecutor = _ - private val threadFactory = new MonitorableThreadFactory(name) private var boundedExecutorBound = -1 private var inProcessOfBuilding = false private var blockingQueue: BlockingQueue[Runnable] = _ + private lazy val threadFactory = new MonitorableThreadFactory(name) + protected var executor: ExecutorService = _ def buildThreadPool = synchronized { @@ -38,7 +39,7 @@ trait ThreadPoolBuilder { } } - def withNewThreadPoolWithQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized { + def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase inProcessOfBuilding = false @@ -52,7 +53,7 @@ trait ThreadPoolBuilder { *
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed. */ - def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): ThreadPoolBuilder = synchronized { + def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase blockingQueue = new LinkedBlockingQueue[Runnable] @@ -61,19 +62,19 @@ trait ThreadPoolBuilder { this } - def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { + def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable](capacity) + blockingQueue = new LinkedBlockingQueue[Runnable] threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this } - def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { + def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] + blockingQueue = new LinkedBlockingQueue[Runnable](capacity) threadPoolBuilder = new ThreadPoolExecutor( NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this diff --git a/akka-samples-lift/src/main/scala/akka/SimpleService.scala b/akka-samples-lift/src/main/scala/akka/SimpleService.scala index 8bec513bb9..4f23ef965a 100644 --- a/akka-samples-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples-lift/src/main/scala/akka/SimpleService.scala @@ -1,6 +1,6 @@ package sample.lift -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Transactor, Actor} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} @@ -8,7 +8,6 @@ import java.lang.Integer import javax.ws.rs.{GET, Path, Produces} import java.nio.ByteBuffer - /** * Try service out by invoking (multiple times): *
@@ -17,9 +16,7 @@ import java.nio.ByteBuffer
* Or browse to the URL from a web browser.
*/
@Path("/liftcount")
-class SimpleService extends Actor {
- makeTransactionRequired
-
+class SimpleService extends Transactor {
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala
index 3ad42a4540..2ccd8f13a4 100644
--- a/akka-samples-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples-scala/src/main/scala/SimpleService.scala
@@ -4,7 +4,7 @@
package sample.scala
-import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
+import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
@@ -32,7 +32,7 @@ class Boot {
Supervise(
new PersistentSimpleService,
LifeCycle(Permanent)) ::
- Supervise(
+ Supervise(
new PubSub,
LifeCycle(Permanent))
:: Nil))
@@ -47,9 +47,8 @@ class Boot {
* Or browse to the URL from a web browser.
*/
@Path("/scalacount")
-class SimpleService extends Actor {
- makeTransactionRequired
-
+class SimpleService extends Transactor {
+
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
@@ -74,21 +73,21 @@ class SimpleService extends Actor {
@Path("/pubsub/")
class PubSub extends Actor {
- case class Msg(topic: String, message: String)
+ case class Msg(topic: String, message: String)
- @GET
- @Suspend
- @Produces(Array("text/plain;charset=ISO-8859-1"))
- @Path("/topic/{topic}/")
- def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
+ @GET
+ @Suspend
+ @Produces(Array("text/plain;charset=ISO-8859-1"))
+ @Path("/topic/{topic}/")
+ def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
- @GET
- @Broadcast
- @Path("/topic/{topic}/{message}/")
- @Produces(Array("text/plain;charset=ISO-8859-1"))
- def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
+ @GET
+ @Broadcast
+ @Path("/topic/{topic}/{message}/")
+ @Produces(Array("text/plain;charset=ISO-8859-1"))
+ def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
- override def receive = { case _ => }
+ def receive = { case _ => }
}
@@ -127,9 +126,7 @@ class PersistentSimpleService extends Actor {
}
@Path("/chat")
-class Chat extends Actor with Logging {
- makeTransactionRequired
-
+class Chat extends Transactor {
case class Chat(val who: String, val what: String, val msg: String)
@Suspend