fixed broken runtime name of threads + added Transactor trait to some samples
This commit is contained in:
parent
3a069f0e86
commit
c87812aac1
4 changed files with 30 additions and 34 deletions
|
|
@ -56,9 +56,10 @@ package se.scalablesolutions.akka.dispatch
|
|||
class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder {
|
||||
@volatile private var active: Boolean = false
|
||||
|
||||
val name = "event-driven:executor:dispatcher:" + _name
|
||||
val name: String = "event-driven:executor:dispatcher:" + _name
|
||||
|
||||
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = if (active) {
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
|
|
|
|||
|
|
@ -12,18 +12,19 @@ import java.util.Collection
|
|||
|
||||
trait ThreadPoolBuilder {
|
||||
val name: String
|
||||
|
||||
|
||||
private val NR_START_THREADS = 4
|
||||
private val NR_MAX_THREADS = 128
|
||||
private val KEEP_ALIVE_TIME = 60000L // default is one minute
|
||||
private val MILLISECONDS = TimeUnit.MILLISECONDS
|
||||
|
||||
private var threadPoolBuilder: ThreadPoolExecutor = _
|
||||
private val threadFactory = new MonitorableThreadFactory(name)
|
||||
private var boundedExecutorBound = -1
|
||||
private var inProcessOfBuilding = false
|
||||
private var blockingQueue: BlockingQueue[Runnable] = _
|
||||
|
||||
private lazy val threadFactory = new MonitorableThreadFactory(name)
|
||||
|
||||
protected var executor: ExecutorService = _
|
||||
|
||||
def buildThreadPool = synchronized {
|
||||
|
|
@ -38,7 +39,7 @@ trait ThreadPoolBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
def withNewThreadPoolWithQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
|
||||
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
inProcessOfBuilding = false
|
||||
|
|
@ -52,7 +53,7 @@ trait ThreadPoolBuilder {
|
|||
* <p/>
|
||||
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
|
||||
*/
|
||||
def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): ThreadPoolBuilder = synchronized {
|
||||
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
||||
|
|
@ -61,19 +62,19 @@ trait ThreadPoolBuilder {
|
|||
this
|
||||
}
|
||||
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized {
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
||||
threadPoolBuilder = new ThreadPoolExecutor(
|
||||
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
||||
this
|
||||
}
|
||||
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized {
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
|
||||
threadPoolBuilder = new ThreadPoolExecutor(
|
||||
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
||||
this
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
package sample.lift
|
||||
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
import se.scalablesolutions.akka.actor.{Transactor, Actor}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
|
||||
|
||||
|
|
@ -8,7 +8,6 @@ import java.lang.Integer
|
|||
import javax.ws.rs.{GET, Path, Produces}
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
* <pre>
|
||||
|
|
@ -17,9 +16,7 @@ import java.nio.ByteBuffer
|
|||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/liftcount")
|
||||
class SimpleService extends Actor {
|
||||
makeTransactionRequired
|
||||
|
||||
class SimpleService extends Transactor {
|
||||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package sample.scala
|
||||
|
||||
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
|
||||
import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
|
||||
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
|
@ -32,7 +32,7 @@ class Boot {
|
|||
Supervise(
|
||||
new PersistentSimpleService,
|
||||
LifeCycle(Permanent)) ::
|
||||
Supervise(
|
||||
Supervise(
|
||||
new PubSub,
|
||||
LifeCycle(Permanent))
|
||||
:: Nil))
|
||||
|
|
@ -47,9 +47,8 @@ class Boot {
|
|||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/scalacount")
|
||||
class SimpleService extends Actor {
|
||||
makeTransactionRequired
|
||||
|
||||
class SimpleService extends Transactor {
|
||||
|
||||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
|
|
@ -74,21 +73,21 @@ class SimpleService extends Actor {
|
|||
|
||||
@Path("/pubsub/")
|
||||
class PubSub extends Actor {
|
||||
case class Msg(topic: String, message: String)
|
||||
case class Msg(topic: String, message: String)
|
||||
|
||||
@GET
|
||||
@Suspend
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
@Path("/topic/{topic}/")
|
||||
def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
|
||||
@GET
|
||||
@Suspend
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
@Path("/topic/{topic}/")
|
||||
def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
|
||||
|
||||
@GET
|
||||
@Broadcast
|
||||
@Path("/topic/{topic}/{message}/")
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
||||
@GET
|
||||
@Broadcast
|
||||
@Path("/topic/{topic}/{message}/")
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
||||
|
||||
override def receive = { case _ => }
|
||||
def receive = { case _ => }
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -127,9 +126,7 @@ class PersistentSimpleService extends Actor {
|
|||
}
|
||||
|
||||
@Path("/chat")
|
||||
class Chat extends Actor with Logging {
|
||||
makeTransactionRequired
|
||||
|
||||
class Chat extends Transactor {
|
||||
case class Chat(val who: String, val what: String, val msg: String)
|
||||
|
||||
@Suspend
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue