Merge remote branch 'remotes/origin/master' into 458-krasserm and resolved conflict in akka-camel/src/main/scala/CamelService.scala

This commit is contained in:
Martin Krasser 2010-10-29 11:04:48 +02:00
commit 564692cd42
405 changed files with 3953 additions and 3120 deletions

View file

@ -2,19 +2,20 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently}
import se.scalablesolutions.akka.AkkaException
import akka.dispatch._
import akka.config.Config._
import akka.config.Supervision._
import akka.util.Helpers.{narrow, narrowSilently}
import akka.AkkaException
import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.util. {ReflectiveAccess, Logging, Duration}
import akka.util. {ReflectiveAccess, Logging, Duration}
import akka.japi.Procedure
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@ -44,7 +45,9 @@ abstract class RemoteActor(address: InetSocketAddress) extends Actor {
*/
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: Actor.Receive) extends LifeCycleMessage
case class HotSwap(code: Actor.Receive) extends LifeCycleMessage {
def this(behavior: Procedure[Any]) = this({ case msg => behavior.apply(msg) }: Actor.Receive)
}
case object RevertHotSwap extends LifeCycleMessage

View file

@ -2,16 +2,16 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{ TransactionManagement, TransactionSetAbortedException }
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util._
import akka.dispatch._
import akka.config.Config._
import akka.config.Supervision._
import akka.stm.global._
import akka.stm.TransactionManagement._
import akka.stm.{ TransactionManagement, TransactionSetAbortedException }
import akka.AkkaException
import akka.util._
import ReflectiveAccess._
import org.multiverse.api.ThreadLocalTransaction._
@ -29,7 +29,7 @@ import scala.collection.immutable.Stack
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import annotation.tailrec
private[akka] object ActorRefInternals {
private[akka] object ActorRefInternals extends Logging {
/** LifeCycles for ActorRefs
*/
@ -77,7 +77,9 @@ private[akka] object ActorRefInternals {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends ActorRefShared with TransactionManagement with Logging with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
trait ActorRef extends ActorRefShared with TransactionManagement with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
//Reuse same logger
import Actor.log
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
@ -156,7 +158,7 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
* This means that all actors will share the same event-driven executor based dispatcher.
* <p/>
* You can override it so it fits the specific use-case that the actor is used for.
* See the <tt>se.scalablesolutions.akka.dispatch.Dispatchers</tt> class for the different
* See the <tt>akka.dispatch.Dispatchers</tt> class for the different
* dispatchers available.
* <p/>
* The default is also that all actors that are created and spawned from within this actor
@ -780,14 +782,13 @@ class LocalActorRef private[akka] (
if (isShutdown) throw new ActorStartException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
dispatcher.register(this)
dispatcher.start
dispatcher.attach(this)
if (isTransactor)
transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id)))
_status = ActorRefInternals.RUNNING
//If we are not currently creating this ActorRef instance
// If we are not currently creating this ActorRef instance
if ((actorInstance ne null) && (actorInstance.get ne null))
initializeActorInstance
@ -803,7 +804,7 @@ class LocalActorRef private[akka] (
if (isRunning) {
receiveTimeout = None
cancelReceiveTimeout
dispatcher.unregister(this)
dispatcher.detach(this)
transactorConfig = transactorConfig.copy(factory = None)
_status = ActorRefInternals.SHUTDOWN
actor.postStop
@ -854,11 +855,8 @@ class LocalActorRef private[akka] (
* To be invoked from within the actor itself.
*/
def startLink(actorRef: ActorRef): Unit = guard.withGuard {
try {
link(actorRef)
} finally {
actorRef.start
}
link(actorRef)
actorRef.start
}
/**
@ -868,12 +866,9 @@ class LocalActorRef private[akka] (
*/
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = guard.withGuard {
ensureRemotingEnabled
try {
actorRef.makeRemote(hostname, port)
link(actorRef)
} finally {
actorRef.start
}
actorRef.makeRemote(hostname, port)
link(actorRef)
actorRef.start
}
/**
@ -905,11 +900,8 @@ class LocalActorRef private[akka] (
*/
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard {
val actor = Actor.actorOf(clazz)
try {
link(actor)
} finally {
actor.start
}
link(actor)
actor.start
actor
}
@ -921,12 +913,9 @@ class LocalActorRef private[akka] (
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard {
ensureRemotingEnabled
val actor = Actor.actorOf(clazz)
try {
actor.makeRemote(hostname, port)
link(actor)
} finally {
actor.start
}
actor.makeRemote(hostname, port)
link(actor)
actor.start
actor
}
@ -965,7 +954,7 @@ class LocalActorRef private[akka] (
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
} else {
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
dispatcher dispatch invocation
dispatcher dispatchMessage invocation
}
}
@ -986,7 +975,7 @@ class LocalActorRef private[akka] (
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(
this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
dispatcher dispatch invocation
dispatcher dispatchMessage invocation
future
}
}
@ -995,8 +984,7 @@ class LocalActorRef private[akka] (
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown)
Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
else {
currentMessage = messageHandle
try {
@ -1005,8 +993,7 @@ class LocalActorRef private[akka] (
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
finally {
} finally {
currentMessage = null //TODO: Don't reset this, we might want to resend the message
}
}
@ -1032,8 +1019,7 @@ class LocalActorRef private[akka] (
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
false
}
else if (withinTimeRange.isEmpty) { // restrict number of restarts
} else if (withinTimeRange.isEmpty) { // restrict number of restarts
maxNrOfRetriesCount += 1 //Increment number of retries
maxNrOfRetriesCount > maxNrOfRetries.get
} else { // cannot restart more than N within M timerange
@ -1042,10 +1028,8 @@ class LocalActorRef private[akka] (
val now = System.currentTimeMillis
val retries = maxNrOfRetriesCount
//We are within the time window if it isn't the first restart, or if the window hasn't closed
val insideWindow = if (windowStart == 0)
false
else
(now - windowStart) <= withinTimeRange.get
val insideWindow = if (windowStart == 0) false
else (now - windowStart) <= withinTimeRange.get
//The actor is dead if it dies X times within the window of restart
val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1)

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import scala.collection.mutable.{ListBuffer, Map}
import scala.reflect.Manifest
@ -11,8 +11,8 @@ import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set => JSet}
import annotation.tailrec
import se.scalablesolutions.akka.util.ReflectiveAccess._
import se.scalablesolutions.akka.util.{ReadWriteGuard, Address, ListenerManagement}
import akka.util.ReflectiveAccess._
import akka.util.{ReadWriteGuard, Address, ListenerManagement}
import java.net.InetSocketAddress
/**

View file

@ -2,13 +2,14 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.stm.Ref
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.japi.{ Function => JFunc, Procedure => JProc }
import akka.stm.Ref
import akka.AkkaException
import akka.japi.{ Function => JFunc, Procedure => JProc }
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
import akka.config.RemoteAddress
class AgentException private[akka](message: String) extends AkkaException(message)
@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag
* @author Viktor Klang
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Agent[T] private (initialValue: T) {
sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) {
import Agent._
import Actor._
private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
val dispatcher = remote match {
case Some(address) =>
val d = actorOf(new AgentDispatcher[T]())
d.makeRemote(remote.get.hostname,remote.get.port)
d.start
d ! Value(initialValue)
d
case None =>
actorOf(new AgentDispatcher(initialValue)).start
}
/**
* Submits a request to read the internal state.
@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) {
if (dispatcher.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction."+
"\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
sendProc((v: T) => {ref.set(v); latch.countDown})
latch.await
ref.get
val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await
if (f.exception.isDefined) throw f.exception.get
else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out"))
}
/**
@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) {
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def map[B](f: (T) => B): Agent[B] = Agent(f(get))
final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote)
/**
* Applies function with type 'T => B' to the agent's internal state.
@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) {
* Does not change the value of the agent (this).
* Java API
*/
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get))
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
* Java API
*/
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)())
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote)
/**
* Applies procedure with type T to the agent's internal state.
@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Agent {
import Actor._
/*
* The internal messages for passing around requests.
*/
private[akka] case class Value[T](value: T)
private[akka] case class Function[T](fun: ((T) => T))
private[akka] case class Procedure[T](fun: ((T) => Unit))
private[akka] case object Read
/**
* Creates a new Agent of type T with the initial value of value.
*/
def apply[T](value: T): Agent[T] = new Agent(value)
def apply[T](value: T): Agent[T] =
apply(value,None)
/**
* Creates an Agent backed by a client managed Actor if Some(remoteAddress)
* or a local agent if None
*/
def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] =
new Agent[T](value,remoteAddress)
/**
* Creates an Agent backed by a client managed Actor
*/
def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] =
apply(value,Some(remoteAddress))
}
/**
@ -254,12 +277,15 @@ object Agent {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor {
final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor {
import Agent._
import Actor._
log.debug("Starting up Agent [%s]", self.uuid)
private val value = Ref[T](initialValue)
private[akka] def this(initialValue: T) = this(Ref(initialValue))
private[akka] def this() = this(Ref[T]())
private val value = ref
log.debug("Starting up Agent [%s]", self.uuid)
/**
* Periodically handles incoming messages.
@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
def receive = {
case Value(v: T) =>
swap(v)
case Read => self.reply_?(value.get())
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>

View file

@ -2,15 +2,15 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
import java.util.Enumeration
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.config.Config._
import akka.util.{Bootable, Logging}
import akka.config.Config._
class AkkaDeployClassLoader(urls : List[URL], parent : ClassLoader) extends URLClassLoader(urls.toArray.asInstanceOf[Array[URL]],parent)
{

View file

@ -2,60 +2,145 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.stm.Ref
import se.scalablesolutions.akka.stm.local._
package akka.actor
import scala.collection.mutable
import java.util.concurrent.{ScheduledFuture, TimeUnit}
trait FSM[S] { this: Actor =>
trait FSM[S, D] {
this: Actor =>
type StateFunction = scala.PartialFunction[Event, State]
var currentState: State = initialState
var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
/** DSL */
protected final def notifying(transitionHandler: PartialFunction[Transition, Unit]) = {
transitionEvent = transitionHandler
}
def initialState: State
protected final def when(stateName: S)(stateFunction: StateFunction) = {
register(stateName, stateFunction)
}
def handleEvent: StateFunction = {
case event@Event(value, stateData) =>
log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id)
State(NextState, currentState.stateFunction, stateData, currentState.timeout)
protected final def startWith(stateName: S, stateData: D, timeout: Option[Long] = None) = {
setState(State(stateName, stateData, timeout))
}
protected final def goto(nextStateName: S): State = {
State(nextStateName, currentState.stateData)
}
protected final def stay(): State = {
goto(currentState.stateName)
}
protected final def stop(): State = {
stop(Normal)
}
protected final def stop(reason: Reason): State = {
stop(reason, currentState.stateData)
}
protected final def stop(reason: Reason, stateData: D): State = {
self ! Stop(reason, stateData)
stay
}
def whenUnhandled(stateFunction: StateFunction) = {
handleEvent = stateFunction
}
def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = {
terminateEvent = terminationHandler
}
/** FSM State data and default handlers */
private var currentState: State = _
private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
private val transitions = mutable.Map[S, StateFunction]()
private def register(name: S, function: StateFunction) {
if (transitions contains name) {
transitions(name) = transitions(name) orElse function
} else {
transitions(name) = function
}
}
private var handleEvent: StateFunction = {
case Event(value, stateData) =>
log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName)
stay
}
private var terminateEvent: PartialFunction[Reason, Unit] = {
case failure@Failure(_) => log.error("Stopping because of a %s", failure)
case reason => log.info("Stopping because of reason: %s", reason)
}
private var transitionEvent: PartialFunction[Transition, Unit] = {
case Transition(from, to) => log.debug("Transitioning from state %s to %s", from, to)
}
override final protected def receive: Receive = {
case Stop(reason, stateData) =>
terminateEvent.apply(reason)
self.stop
case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) =>
log.trace("Ignoring StateTimeout - ")
// state timeout when new message in queue, skip this timeout
case value => {
timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None}
val event = Event(value, currentState.stateData)
val newState = (currentState.stateFunction orElse handleEvent).apply(event)
val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event)
setState(nextState)
}
}
currentState = newState
newState match {
case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue)
case _ => () // ignore for now
private def setState(nextState: State) = {
if (!transitions.contains(nextState.stateName)) {
stop(Failure("Next state %s does not exist".format(nextState.stateName)))
} else {
if (currentState != null && currentState.stateName != nextState.stateName) {
transitionEvent.apply(Transition(currentState.stateName, nextState.stateName))
}
newState.timeout.foreach {
timeout =>
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS))
currentState = nextState
currentState.timeout.foreach {
t =>
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))
}
}
}
case class State(stateEvent: StateEvent,
stateFunction: StateFunction,
stateData: S,
timeout: Option[Int] = None,
replyValue: Option[Any] = None)
case class Event(event: Any, stateData: D)
case class Event(event: Any, stateData: S)
case class State(stateName: S, stateData: D, timeout: Option[Long] = None) {
sealed trait StateEvent
object NextState extends StateEvent
object Reply extends StateEvent
def until(timeout: Long): State = {
copy(timeout = Some(timeout))
}
object StateTimeout
def replying(replyValue:Any): State = {
self.sender match {
case Some(sender) => sender ! replyValue
case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue)
}
this
}
def using(nextStateDate: D): State = {
copy(stateData = nextStateDate)
}
}
sealed trait Reason
case object Normal extends Reason
case object Shutdown extends Reason
case class Failure(cause: Any) extends Reason
case object StateTimeout
case class Transition(from: S, to: S)
private case class Stop(reason: Reason, stateData: D)
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka
package akka
import actor.{ScalaActorRef, ActorRef}

View file

@ -13,21 +13,21 @@
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
*/
package se.scalablesolutions.akka.actor
package akka.actor
import scala.collection.JavaConversions
import java.util.concurrent._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.AkkaException
import akka.util.Logging
import akka.AkkaException
object Scheduler extends Logging {
import Actor._
case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
@volatile private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
log.info("Starting up Scheduler")
@ -108,12 +108,12 @@ object Scheduler extends Logging {
}
}
def shutdown = {
def shutdown: Unit = synchronized {
log.info("Shutting down Scheduler")
service.shutdown
}
def restart = {
def restart: Unit = synchronized {
log.info("Restarting Scheduler")
shutdown
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)

View file

@ -2,17 +2,17 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util._
import akka.config.Supervision._
import akka.AkkaException
import akka.util._
import ReflectiveAccess._
import Actor._
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.net.InetSocketAddress
import se.scalablesolutions.akka.config.Supervision._
import akka.config.Supervision._
class SupervisorException private[akka](message: String) extends AkkaException(message)

View file

@ -2,15 +2,16 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.Supervision._
import akka.dispatch._
import akka.stm.global._
import akka.config.Supervision._
import java.net.InetSocketAddress
import scala.reflect.BeanProperty
import akka.japi.Procedure
/**
* Subclass this abstract class to create a MDB-style untyped actor.
@ -67,6 +68,11 @@ abstract class UntypedActor extends Actor {
case msg => onReceive(msg)
}
/**
* Java API for become
*/
def become(behavior: Procedure[Any]): Unit = super.become { case msg => behavior.apply(msg) }
@throws(classOf[Exception])
def onReceive(message: Any): Unit
}

View file

@ -2,12 +2,12 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.config
package akka.config
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.dispatch.CompletableFuture
import akka.AkkaException
import akka.util.Logging
import akka.actor.{ActorRef, IllegalActorStateException}
import akka.dispatch.CompletableFuture
import net.lag.configgy.{Config => CConfig, Configgy, ParseException}
@ -27,10 +27,6 @@ object ConfigLogger extends Logging
object Config {
val VERSION = "1.0-SNAPSHOT"
// Set Multiverse options for max speed
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")
System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast")
val HOME = {
val envHome = System.getenv("AKKA_HOME") match {
case null | "" | "." => None

View file

@ -2,10 +2,10 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.config
package akka.config
/*
import se.scalablesolutions.akka.kernel.{TypedActor, TypedActorProxy}
import akka.kernel.{TypedActor, TypedActorProxy}
import com.google.inject.{AbstractModule}
import java.util.{List => JList, ArrayList}
import scala.reflect.BeanProperty
@ -18,43 +18,43 @@ import scala.reflect.BeanProperty
sealed abstract class Configuration
class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
def transform = se.scalablesolutions.akka.kernel.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
def transform = akka.kernel.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
def transform = se.scalablesolutions.akka.kernel.LifeCycle(scope.transform, shutdownTime)
def transform = akka.kernel.LifeCycle(scope.transform, shutdownTime)
}
abstract class Scope extends Configuration {
def transform: se.scalablesolutions.akka.kernel.Scope
def transform: akka.kernel.Scope
}
class Permanent extends Scope {
override def transform = se.scalablesolutions.akka.kernel.Permanent
override def transform = akka.kernel.Permanent
}
class Transient extends Scope {
override def transform = se.scalablesolutions.akka.kernel.Transient
override def transform = akka.kernel.Transient
}
class Temporary extends Scope {
override def transform = se.scalablesolutions.akka.kernel.Temporary
override def transform = akka.kernel.Temporary
}
abstract class FailOverScheme extends Configuration {
def transform: se.scalablesolutions.akka.kernel.FailOverScheme
def transform: akka.kernel.FailOverScheme
}
class AllForOne extends FailOverScheme {
override def transform = se.scalablesolutions.akka.kernel.AllForOne
override def transform = akka.kernel.AllForOne
}
class OneForOne extends FailOverScheme {
override def transform = se.scalablesolutions.akka.kernel.OneForOne
override def transform = akka.kernel.OneForOne
}
abstract class Server extends Configuration
//class kernelConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
// def transform = se.scalablesolutions.akka.kernel.kernelConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
// def transform = akka.kernel.kernelConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
//}
class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server {
def newWorker(proxy: TypedActorProxy) = se.scalablesolutions.akka.kernel.Supervise(proxy.server, lifeCycle.transform)
def newWorker(proxy: TypedActorProxy) = akka.kernel.Supervise(proxy.server, lifeCycle.transform)
}
*/

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.config
package akka.config
import se.scalablesolutions.akka.config.Supervision. {SuperviseTypedActor, FaultHandlingStrategy}
import akka.config.Supervision. {SuperviseTypedActor, FaultHandlingStrategy}
private[akka] trait TypedActorConfiguratorBase {
def getExternalDependency[T](clazz: Class[T]): T

View file

@ -2,10 +2,10 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.config
package akka.config
import se.scalablesolutions.akka.actor.{ActorRef}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import akka.actor.{ActorRef}
import akka.dispatch.MessageDispatcher
case class RemoteAddress(val hostname: String, val port: Int)

View file

@ -2,16 +2,16 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dataflow
package akka.dataflow
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.CompletableFuture
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.japi.{ Function, SideEffect }
import akka.actor.{Actor, ActorRef}
import akka.actor.Actor._
import akka.dispatch.CompletableFuture
import akka.AkkaException
import akka.japi.{ Function, SideEffect }
/**
* Implements Oz-style dataflow (single assignment) variables.

View file

@ -2,12 +2,12 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.{Duration, Logging}
import se.scalablesolutions.akka.actor.newUuid
import akka.actor.{Actor, ActorRef}
import akka.config.Config._
import akka.util.{Duration, Logging}
import akka.actor.newUuid
import net.lag.configgy.ConfigMap
@ -52,7 +52,7 @@ object Dispatchers extends Logging {
val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
val MAILBOX_TYPE = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox()
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox()
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
@ -60,20 +60,14 @@ object Dispatchers extends Logging {
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher(
"global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) {
override def register(actor: ActorRef) = {
if (isShutdown) init
super.register(actor)
}
}
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
* <p/>
* Can be beneficial to use the <code>HawtDispatcher.pin(self)</code> to "pin" an actor to a specific thread.
* <p/>
* See the ScalaDoc for the {@link se.scalablesolutions.akka.dispatch.HawtDispatcher} for details.
* See the ScalaDoc for the {@link akka.dispatch.HawtDispatcher} for details.
*/
def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate)
@ -83,11 +77,12 @@ object Dispatchers extends Logging {
* <p/>
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor, BoundedMailbox(true))
def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout
* If capacity is negative, it's Integer.MAX_VALUE
* <p/>
* E.g. each actor consumes its own thread.
*/
@ -95,6 +90,7 @@ object Dispatchers extends Logging {
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* If capacity is negative, it's Integer.MAX_VALUE
* <p/>
* E.g. each actor consumes its own thread.
*/
@ -106,7 +102,8 @@ object Dispatchers extends Logging {
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
def newExecutorBasedEventDrivenDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name,config),ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -114,7 +111,8 @@ object Dispatchers extends Logging {
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxType)
ThreadPoolConfigDispatcherBuilder(config =>
new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config),ThreadPoolConfig())
/**
@ -122,30 +120,32 @@ object Dispatchers extends Logging {
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType)
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config =>
new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config),ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name)
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String): ThreadPoolConfigDispatcherBuilder =
newExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_TYPE)
/**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) =
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType = mailboxType)
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig())
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
*/
def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
config.getConfigMap(key).flatMap(from).getOrElse(default)
config getConfigMap key flatMap from getOrElse default
/*
* Creates of obtains a dispatcher from a ConfigMap according to the format below
@ -172,23 +172,23 @@ object Dispatchers extends Logging {
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name", newUuid.toString)
def threadPoolConfig(b: ThreadPoolBuilder) {
b.configureIfPossible( builder => {
cfg.getInt("keep-alive-time").foreach(time => builder.setKeepAliveTimeInMillis(Duration(time, TIME_UNIT).toMillis.toInt))
cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_))
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_))
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
cfg.getInt("mailbox-capacity").foreach(builder.setMailboxCapacity(_))
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
cfg.getString("rejection-policy").map({
//Apply the following options to the config if they are present in the cfg
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(cfg getString "rejection-policy" map {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(builder.setRejectionPolicy(_))
})
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
})(policy => _.setRejectionPolicy(policy)))
}
lazy val mailboxType: MailboxType = {
@ -200,15 +200,17 @@ object Dispatchers extends Logging {
cfg.getString("type") map {
case "ExecutorBasedEventDriven" =>
new ExecutorBasedEventDrivenDispatcher(
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType, threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true))
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)

View file

@ -2,14 +2,15 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import akka.actor.{ActorRef, IllegalActorStateException}
import akka.util.ReflectiveAccess.EnterpriseModule
import java.util.Queue
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.util.Switch
import akka.util.Switch
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
@ -56,7 +57,7 @@ import se.scalablesolutions.akka.util.Switch
* <p/>
*
* But the preferred way of creating dispatchers is to use
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
* the {@link akka.dispatch.Dispatchers} factory object.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
@ -69,11 +70,11 @@ class ExecutorBasedEventDrivenDispatcher(
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: (ThreadPoolBuilder) => Unit = _ => ())
extends MessageDispatcher with ThreadPoolBuilder {
val config: ThreadPoolConfig = ThreadPoolConfig())
extends MessageDispatcher {
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType, _ => ()) // Needed for Java API usage
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
@ -81,19 +82,19 @@ class ExecutorBasedEventDrivenDispatcher(
def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _config: ThreadPoolConfig) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
def this(_name: String) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
val name = "akka:event-driven:dispatcher:" + _name
val mailboxType = Some(_mailboxType)
private[akka] val active = new Switch(false)
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
val name = "akka:event-driven:dispatcher:" + _name
//Initialize
init
def dispatch(invocation: MessageInvocation) = {
private[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
registerForExecution(mbox)
@ -112,8 +113,7 @@ class ExecutorBasedEventDrivenDispatcher(
}
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity
new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox {
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
}
@ -131,25 +131,21 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
def start = active switchOn {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
private[akka] def start = log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
}
}
def shutdown = active switchOff {
log.debug("Shutting down %s", toString)
executor.shutdownNow
uuids.clear
}
def ensureNotActive(): Unit = if (active.isOn) {
throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
}
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
try {
executor execute mbox
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
mbox.dispatcherLock.unlock()
@ -171,13 +167,6 @@ class ExecutorBasedEventDrivenDispatcher(
mbox.suspended.switchOff
registerForExecution(mbox)
}
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}
/**
@ -189,7 +178,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
final def run = {
val reschedule = try {
processMailbox()
try { processMailbox() } catch { case ie: InterruptedException => true }
} finally {
dispatcherLock.unlock()
}

View file

@ -2,13 +2,14 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import java.util.concurrent.CopyOnWriteArrayList
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.util.Switch
import akka.actor.{Actor, ActorRef, IllegalActorStateException}
import akka.util.Switch
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
import java.util.concurrent.atomic.AtomicReference
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -23,39 +24,36 @@ import se.scalablesolutions.akka.util.Switch
* TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?!
* <p/>
* The preferred way of creating dispatchers is to use
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
* the {@link akka.dispatch.Dispatchers} factory object.
*
* @see se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
* @see se.scalablesolutions.akka.dispatch.Dispatchers
* @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
* @see akka.dispatch.Dispatchers
*
* @author Jan Van Besien
*/
class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher {
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType, _ => ())
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig())
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE, _ => ())
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig())
//implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor
val mailboxType = Some(_mailboxType)
private val active = new Switch(false)
implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
/** Type of the actors registered in this dispatcher. */
private var actorType: Option[Class[_]] = None
@volatile private var actorType: Option[Class[_]] = None
private val pooledActors = new CopyOnWriteArrayList[ActorRef]
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
/** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
init
/**
* @return the mailbox associated with the actor
*/
@ -63,11 +61,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def dispatch(invocation: MessageInvocation) = if (active.isOn) {
private[akka] def dispatch(invocation: MessageInvocation) {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
executor execute mbox
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
executorService.get() execute mbox
}
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
@ -97,7 +95,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
* Process the messages in the mailbox of the given actor.
* @return
*/
private def processMailbox(mailbox: MessageQueue): Boolean = {
private def processMailbox(mailbox: MessageQueue): Boolean = try {
if (mailbox.suspended.isOn)
return false
@ -109,6 +107,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
messageInvocation = mailbox.dequeue
}
true
} catch {
case ie: InterruptedException => false
}
private def findThief(receiver: ActorRef): Option[ActorRef] = {
@ -161,22 +161,22 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
val donated = getMailbox(receiver).pollLast
if (donated ne null) {
if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
donated.message, receiver.timeout, donated.sender, donated.senderFuture)
else if (donated.sender.isDefined) thief.self.postMessageToMailbox(donated.message, donated.sender)
else thief.self.postMessageToMailbox(donated.message, None)
else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender)
else thief.postMessageToMailbox(donated.message, None)
true
} else false
}
def start = active switchOn {
log.debug("Starting up %s",toString)
}
private[akka] def start = log.debug("Starting up %s",toString)
def shutdown = active switchOff {
log.debug("Shutting down %s", toString)
executor.shutdownNow
uuids.clear
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
}
}
@ -187,21 +187,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef)
mbox.suspended.switchOff
executor execute mbox
executorService.get() execute mbox
}
def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
@ -214,9 +205,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
}
}
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity
new LinkedBlockingDeque[MessageInvocation](cap) with MessageQueue with Runnable {
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
def dequeue: MessageInvocation = this.poll()
@ -232,7 +222,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/**
* Creates and returns a durable mailbox for the given actor.
*/
protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
// FIXME make generic (work for TypedActor as well)
case FileBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("FileBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
@ -242,13 +232,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported for ExecutorBasedEventDrivenWorkStealingDispatcher")
}
override def register(actorRef: ActorRef) = {
private[akka] override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef)
pooledActors add actorRef
super.register(actorRef)
}
override def unregister(actorRef: ActorRef) = {
private[akka] override def unregister(actorRef: ActorRef) = {
pooledActors remove actorRef
super.unregister(actorRef)
}

View file

@ -2,13 +2,13 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.actor.Actor.spawn
import akka.AkkaException
import akka.actor.Actor.spawn
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.routing.Dispatcher
import akka.routing.Dispatcher
class FutureTimeoutException(message: String) extends AkkaException(message)
@ -37,10 +37,16 @@ object Futures {
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[Future[_]]): Future[_] = {
/**
* Returns the First Future that is completed
* if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans
*/
def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
if (sleepMs > 0 && future.isEmpty)
Thread.sleep(sleepMs)
} while (future.isEmpty)
future.get
}
@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] {
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private val TIME_UNIT = TimeUnit.MILLISECONDS
import TimeUnit.{MILLISECONDS => TIME_UNIT}
def this() = this(0)
val timeoutInNanos = TIME_UNIT.toNanos(timeout)

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import se.scalablesolutions.akka.actor.ActorRef
import akka.actor.ActorRef
import org.fusesource.hawtdispatch.DispatchQueue
import org.fusesource.hawtdispatch.ScalaDispatch._
@ -13,7 +13,7 @@ import org.fusesource.hawtdispatch.ListEventAggregator
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.util.Switch
import akka.util.Switch
/**
* Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher.
@ -142,20 +142,14 @@ object HawtDispatcher {
class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher {
import HawtDispatcher._
private val active = new Switch(false)
val mailboxType: Option[MailboxType] = None
def start = active switchOn { retainNonDaemon }
private[akka] def start { retainNonDaemon }
def shutdown = active switchOff { releaseNonDaemon }
private[akka] def shutdown { releaseNonDaemon }
def isShutdown = active.isOff
def dispatch(invocation: MessageInvocation) = if (active.isOn) {
private[akka] def dispatch(invocation: MessageInvocation){
mailbox(invocation.receiver).dispatch(invocation)
} else {
log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver)
}
// hawtdispatch does not have a way to get queue sizes, getting an accurate
@ -172,12 +166,12 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue =
def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend
def resume(actorRef:ActorRef) = mailbox(actorRef).resume
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef]
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef]
/**
* Creates and returns a durable mailbox for the given actor.
*/
protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = null.asInstanceOf[AnyRef]
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = null.asInstanceOf[AnyRef]
override def toString = "HawtDispatcher"
}

View file

@ -2,16 +2,16 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import se.scalablesolutions.akka.AkkaException
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.util.ReflectiveAccess.EnterpriseModule
import akka.AkkaException
import java.util.{Queue, List}
import java.util.concurrent._
import concurrent.forkjoin.LinkedTransferQueue
import se.scalablesolutions.akka.util._
import akka.util._
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
@ -90,7 +90,7 @@ trait MailboxFactory {
/**
* Creates a MessageQueue (Mailbox) with the specified properties.
*/
protected def createMailbox(actorRef: ActorRef): AnyRef =
private[akka] def createMailbox(actorRef: ActorRef): AnyRef =
mailboxType.getOrElse(throw new IllegalStateException("No mailbox type defined")) match {
case mb: TransientMailboxType => createTransientMailbox(actorRef, mb)
case mb: DurableMailboxType => createDurableMailbox(actorRef, mb)
@ -99,10 +99,10 @@ trait MailboxFactory {
/**
* Creates and returns a transient mailbox for the given actor.
*/
protected def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef
/**
* Creates and returns a durable mailbox for the given actor.
*/
protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef
}

View file

@ -2,17 +2,14 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorRef, Uuid, ActorInitializationException}
import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import se.scalablesolutions.akka.AkkaException
package akka.dispatch
import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.{Queue, List}
import java.util.concurrent._
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
import akka.util. {Switch, ReentrantGuard, Logging, HashCode}
import akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -55,36 +52,125 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
object MessageDispatcher {
val UNSCHEDULED = 0
val SCHEDULED = 1
val RESCHEDULED = 2
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher extends MailboxFactory with Logging {
import MessageDispatcher._
protected val uuids = new ConcurrentSkipListSet[Uuid]
def dispatch(invocation: MessageInvocation): Unit
protected val guard = new ReentrantGuard
private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
protected val active = new Switch(false)
def start: Unit
/**
* Attaches the specified actorRef to this dispatcher
*/
final def attach(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef)
}
def shutdown: Unit
/**
* Detaches the specified actorRef from this dispatcher
*/
final def detach(actorRef: ActorRef): Unit = guard withGuard {
unregister(actorRef)
}
def register(actorRef: ActorRef) {
private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = if (active.isOn) {
dispatch(invocation)
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
private[akka] def register(actorRef: ActorRef) {
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
if (active.isOff) {
active.switchOn {
start
}
}
}
def unregister(actorRef: ActorRef) = {
uuids remove actorRef.uuid
actorRef.mailbox = null
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
private[akka] def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
actorRef.mailbox = null
if (uuids.isEmpty){
shutdownSchedule match {
case UNSCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
shutdownSchedule = RESCHEDULED
case RESCHEDULED => //Already marked for reschedule
}
}
}
}
def suspend(actorRef: ActorRef): Unit
def resume(actorRef: ActorRef): Unit
def canBeShutDown: Boolean = uuids.isEmpty
def isShutdown: Boolean
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
*/
def stopAllAttachedActors {
val i = uuids.iterator
while(i.hasNext()) {
val uuid = i.next()
ActorRegistry.actorFor(uuid) match {
case Some(actor) => actor.stop
case None =>
log.error("stopAllLinkedActors couldn't find linked actor: " + uuid)
}
}
}
private val shutdownAction = new Runnable {
def run = guard withGuard {
shutdownSchedule match {
case RESCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
if (uuids.isEmpty()) {
active switchOff {
shutdown // shut down in the dispatcher's references is zero
}
}
shutdownSchedule = UNSCHEDULED
case UNSCHEDULED => //Do nothing
}
}
}
private[akka] def timeoutMs: Long = 1000
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
def suspend(actorRef: ActorRef): Unit
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
def resume(actorRef: ActorRef): Unit
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
private[akka] def dispatch(invocation: MessageInvocation): Unit
/**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/
private[akka] def start: Unit
/**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
*/
private[akka] def shutdown: Unit
/**
* Returns the size of the mailbox for the specified actor

View file

@ -2,45 +2,51 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.util.Duration
import akka.actor.{Actor, ActorRef}
import akka.config.Config.config
import akka.util.Duration
import java.util.Queue
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
import akka.actor
import java.util.concurrent.atomic.AtomicReference
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxType)
class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType)
extends ExecutorBasedEventDrivenDispatcher(
actor.getClass.getName + ":" + actor.uuid,
Dispatchers.THROUGHPUT,
-1,
_mailboxType,
ThreadBasedDispatcher.oneThread) {
_actor.uuid.toString,Dispatchers.THROUGHPUT,-1,_mailboxType,ThreadBasedDispatcher.oneThread) {
def this(actor: ActorRef) = this(actor, BoundedMailbox(true)) // For Java API
private[akka] val owner = new AtomicReference[ActorRef](_actor)
def this(actor: ActorRef, capacity: Int) = this(actor, BoundedMailbox(true, capacity))
def this(actor: ActorRef) =
this(actor, UnboundedMailbox(true)) // For Java API
def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = this(actor, BoundedMailbox(true, capacity, pushTimeOut))
def this(actor: ActorRef, capacity: Int) =
this(actor, BoundedMailbox(true, capacity)) //For Java API
def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API
this(actor, BoundedMailbox(true, capacity, pushTimeOut))
override def register(actorRef: ActorRef) = {
if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
val actor = owner.get()
if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner.compareAndSet(null,actorRef) //Register if unregistered
super.register(actorRef)
}
override def unregister(actorRef: ActorRef) = {
super.unregister(actorRef)
owner.compareAndSet(actorRef,null) //Unregister (prevent memory leak)
}
}
object ThreadBasedDispatcher {
val oneThread: (ThreadPoolBuilder) => Unit = b => {
b setCorePoolSize 1
b setMaxPoolSize 1
b setAllowCoreThreadTimeout true
}
val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
}

View file

@ -2,306 +2,263 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import java.util.Collection
import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.util.{Logger, Logging}
import akka.util. {Duration, Logging}
trait ThreadPoolBuilder extends Logging {
val name: String
object ThreadPoolConfig {
type Bounds = Int
type FlowHandler = Either[RejectedExecutionHandler,Bounds]
type QueueFactory = () => BlockingQueue[Runnable]
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private val MILLISECONDS = TimeUnit.MILLISECONDS
val defaultAllowCoreThreadTimeout: Boolean = false
val defaultCorePoolSize: Int = 16
val defaultMaxPoolSize: Int = 128
val defaultTimeout: Duration = Duration(60000L,TimeUnit.MILLISECONDS)
def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy)
private var threadPoolBuilder: ThreadPoolExecutor = _
private var boundedExecutorBound = -1
protected var mailboxCapacity = -1
@volatile private var inProcessOfBuilding = false
private var blockingQueue: BlockingQueue[Runnable] = _
def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
def flowHandler(bounds: Int): FlowHandler = Right(bounds)
private lazy val threadFactory = new MonitorableThreadFactory(name)
protected var executor: ExecutorService = _
def isShutdown = executor.isShutdown
def buildThreadPool(): Unit = synchronized {
ensureNotActive
inProcessOfBuilding = false
log.debug("Creating a %s with config [core-pool:%d,max-pool:%d,timeout:%d,allowCoreTimeout:%s,rejectPolicy:%s]",
getClass.getName,
threadPoolBuilder.getCorePoolSize,
threadPoolBuilder.getMaximumPoolSize,
threadPoolBuilder.getKeepAliveTime(MILLISECONDS),
threadPoolBuilder.allowsCoreThreadTimeOut,
threadPoolBuilder.getRejectedExecutionHandler.getClass.getSimpleName)
if (boundedExecutorBound > 0) {
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
boundedExecutorBound = -1 //Why is this here?
executor = boundedExecutor
} else {
executor = threadPoolBuilder
}
}
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = queue
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this
}
/**
* Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeded.
* <p/>
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
*/
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new LinkedBlockingQueue[Runnable]
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
boundedExecutorBound = bound
this
}
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new LinkedBlockingQueue[Runnable]
threadPoolBuilder = new ThreadPoolExecutor(
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
threadPoolBuilder = new ThreadPoolExecutor(
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new SynchronousQueue[Runnable](fair)
threadPoolBuilder = new ThreadPoolExecutor(
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair)
threadPoolBuilder = new ThreadPoolExecutor(
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def configureIfPossible(f: (ThreadPoolBuilder) => Unit): Boolean = synchronized {
if(inProcessOfBuilding) {
f(this)
true
}
else {
log.warning("Tried to configure an already started ThreadPoolBuilder of type [%s]",getClass.getName)
false
}
}
/**
* Default is 16.
*/
def setCorePoolSize(size: Int): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setCorePoolSize(size))
/**
* Default is 128.
*/
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
/**
* Sets the core pool size to (availableProcessors * multipliers).ceil.toInt
*/
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier)))
/**
* Sets the max pool size to (availableProcessors * multipliers).ceil.toInt
*/
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier)))
/**
* Sets the bound, -1 is unbounded
*/
def setExecutorBounds(bounds: Int): Unit = synchronized {
this.boundedExecutorBound = bounds
}
/**
* Sets the mailbox capacity, -1 is unbounded
*/
def setMailboxCapacity(capacity: Int): Unit = synchronized {
this.mailboxCapacity = capacity
}
protected def procs(multiplier: Double): Int =
def fixedPoolSize(size: Int): Int = size
def scaledPoolSize(multiplier: Double): Int =
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
/**
* Default is 60000 (one minute).
*/
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
() => new ArrayBlockingQueue[Runnable](capacity,fair)
/**
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
*/
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setRejectedExecutionHandler(policy))
def synchronousQueue(fair: Boolean): QueueFactory =
() => new SynchronousQueue[Runnable](fair)
/**
* Default false, set to true to conserve thread for potentially unused dispatchers
*/
def setAllowCoreThreadTimeout(allow: Boolean) =
setThreadPoolExecutorProperty(_.allowCoreThreadTimeOut(allow))
def linkedBlockingQueue(): QueueFactory =
() => new LinkedBlockingQueue[Runnable]()
/**
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
*/
protected def setThreadPoolExecutorProperty(f: (ThreadPoolExecutor) => Unit): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyInConstructionPhase
f(threadPoolBuilder)
this
def linkedBlockingQueue(capacity: Int): QueueFactory =
() => new LinkedBlockingQueue[Runnable](capacity)
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
() => queue
def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
val queue = queueFactory()
() => queue
}
}
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
protected def verifyNotInConstructionPhase = {
if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool")
inProcessOfBuilding = true
}
final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService =
new LazyExecutorServiceWrapper(createExecutorService(threadFactory))
protected def verifyInConstructionPhase = {
if (!inProcessOfBuilding) throw new IllegalActorStateException(
"Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods")
}
def ensureNotActive(): Unit
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging {
protected val semaphore = new Semaphore(bound)
def execute(command: Runnable) = {
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException =>
semaphore.release
case e =>
log.error(e,"Unexpected exception")
throw e
}
}
// Delegating methods for the ExecutorService interface
def shutdown = executor.shutdown
def shutdownNow = executor.shutdownNow
def isShutdown = executor.isShutdown
def isTerminated = executor.isTerminated
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
protected val counter = new AtomicLong
def newThread(runnable: Runnable) =
new MonitorableThread(runnable, name)
// new Thread(runnable, name + "-" + counter.getAndIncrement)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile var debugLifecycle = false
}
// FIXME fix the issues with using the monitoring in MonitorableThread
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging {
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) =
log.error(cause, "UNCAUGHT in thread [%s]", thread.getName)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
log.debug("Created thread %s", getName)
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
log.debug("Exiting thread %s", getName)
}
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
flowHandler match {
case Left(rejectHandler) =>
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service
case Right(bounds) =>
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
new BoundedExecutorDecorator(service,bounds)
}
}
}
trait DispatcherBuilder {
def build: MessageDispatcher
}
object ThreadPoolConfigDispatcherBuilder {
def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder):
Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun
}
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
import ThreadPoolConfig._
def build = dispatcherFactory(config)
//TODO remove this, for backwards compat only
def buildThreadPool = build
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity,fair), flowHandler = defaultFlowHandler))
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
setCorePoolSize(scaledPoolSize(multiplier))
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
setMaxPoolSize(scaledPoolSize(multiplier))
def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
setKeepAliveTime(Duration(time,TimeUnit.MILLISECONDS))
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(threadTimeout = time))
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
setFlowHandler(flowHandler(policy))
def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = newFlowHandler))
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow))
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder,ThreadPoolConfigDispatcherBuilder]]*):
ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)( (c,f) => f.map( _(c) ).getOrElse(c))
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
protected val counter = new AtomicLong
def newThread(runnable: Runnable) = new MonitorableThread(runnable, name)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile var debugLifecycle = false
}
// FIXME fix the issues with using the monitoring in MonitorableThread
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging {
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) =
log.error(cause, "UNCAUGHT in thread [%s]", thread.getName)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
log.debug("Created thread %s", getName)
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
log.debug("Exiting thread %s", getName)
}
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
protected val semaphore = new Semaphore(bound)
override def execute(command: Runnable) = {
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException =>
semaphore.release
case e =>
log.error(e,"Unexpected exception")
throw e
}
}
}
trait ExecutorServiceDelegate extends ExecutorService with Logging {
def executor: ExecutorService
def execute(command: Runnable) = executor.execute(command)
def shutdown = executor.shutdown
def shutdownNow = executor.shutdownNow
def isShutdown = executor.isShutdown
def isTerminated = executor.isTerminated
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
trait LazyExecutorService extends ExecutorServiceDelegate {
def createExecutor: ExecutorService
lazy val executor = {
log.info("Lazily initializing ExecutorService for ",this)
createExecutor
}
}
class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService {
def createExecutor = executorFactory
}

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.japi
package akka.japi
/**
* A Function interface. Used to create first-class-functions is Java (sort of).

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.routing
package akka.routing
import se.scalablesolutions.akka.actor.ActorRef
import akka.actor.ActorRef
import scala.collection.JavaConversions._
/**

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.routing
package akka.routing
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import akka.actor.{Actor, ActorRef}
import java.util.concurrent.ConcurrentSkipListSet
import scala.collection.JavaConversions._

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.routing
package akka.routing
import se.scalablesolutions.akka.actor.{UntypedActor, Actor, ActorRef}
import akka.actor.{UntypedActor, Actor, ActorRef}
/**
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.

View file

@ -2,10 +2,10 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.routing
package akka.routing
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import akka.actor.{Actor, ActorRef}
import akka.actor.Actor._
object Routing {

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import se.scalablesolutions.akka.actor.{newUuid, Uuid}
import akka.actor.{newUuid, Uuid}
import org.multiverse.transactional.refs.BasicRef

View file

@ -2,18 +2,18 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.util.ReflectiveAccess.JtaModule
import akka.util.ReflectiveAccess.JtaModule
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.AkkaException
import akka.util.Logging
import akka.config.Config._
import akka.AkkaException
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
@ -28,10 +28,10 @@ class StmConfigurationException(message: String) extends AkkaException(message)
object Transaction {
val idFactory = new AtomicLong(-1L)
@deprecated("Use the se.scalablesolutions.akka.stm.local package object instead.")
@deprecated("Use the akka.stm.local package object instead.")
object Local extends LocalStm
@deprecated("Use the se.scalablesolutions.akka.stm.global package object instead.")
@deprecated("Use the akka.stm.global package object instead.")
object Global extends GlobalStm
object Util extends StmUtil

View file

@ -2,12 +2,12 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import java.lang.{Boolean => JBoolean}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Duration
import akka.config.Config._
import akka.util.Duration
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.stms.alpha.AlphaStm

View file

@ -2,11 +2,11 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import java.lang.{Boolean => JBoolean}
import se.scalablesolutions.akka.util.Duration
import akka.util.Duration
import org.multiverse.api.TraceLevel
import org.multiverse.api.{PropagationLevel => Propagation}

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import se.scalablesolutions.akka.AkkaException
import akka.AkkaException
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
import org.multiverse.api.ThreadLocalTransaction._
@ -18,7 +18,7 @@ class TransactionSetAbortedException(msg: String) extends AkkaException(msg)
* Internal helper methods and properties for transaction management.
*/
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.config.Config._
import akka.config.Config._
// FIXME move to stm.global.fair?
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
@ -126,16 +126,16 @@ trait StmUtil {
}
trait StmCommon {
type TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
val TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
type TransactionConfig = akka.stm.TransactionConfig
val TransactionConfig = akka.stm.TransactionConfig
type TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
val TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
type TransactionFactory = akka.stm.TransactionFactory
val TransactionFactory = akka.stm.TransactionFactory
val Propagation = se.scalablesolutions.akka.stm.Transaction.Propagation
val Propagation = akka.stm.Transaction.Propagation
val TraceLevel = se.scalablesolutions.akka.stm.Transaction.TraceLevel
val TraceLevel = akka.stm.Transaction.TraceLevel
type Ref[T] = se.scalablesolutions.akka.stm.Ref[T]
val Ref = se.scalablesolutions.akka.stm.Ref
type Ref[T] = akka.stm.Ref[T]
val Ref = akka.stm.Ref
}

View file

@ -2,11 +2,11 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import scala.collection.immutable.HashMap
import se.scalablesolutions.akka.actor.{newUuid}
import akka.actor.{newUuid}
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction

View file

@ -2,11 +2,11 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import scala.collection.immutable.Vector
import se.scalablesolutions.akka.actor.newUuid
import akka.actor.newUuid
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm.global
package akka.stm.global
/**
* Java-friendly atomic blocks.
@ -10,8 +10,8 @@ package se.scalablesolutions.akka.stm.global
* Example usage (in Java):
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.*;
* import se.scalablesolutions.akka.stm.global.Atomic;
* import akka.stm.*;
* import akka.stm.global.Atomic;
*
* final Ref<Integer> ref = new Ref<Integer>(0);
*

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import se.scalablesolutions.akka.util.Logging
import akka.util.Logging
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.templates.TransactionalCallable
@ -18,7 +18,7 @@ object GlobalStm extends Logging
* Example of atomic transaction management using the atomic block:
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.global._
* import akka.stm.global._
*
* atomic {
* // do something within a transaction

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
/**
* For easily importing global STM.

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm.local
package akka.stm.local
/**
* Java-friendly atomic blocks.
@ -10,8 +10,8 @@ package se.scalablesolutions.akka.stm.local
* Example usage (in Java):
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.*;
* import se.scalablesolutions.akka.stm.local.Atomic;
* import akka.stm.*;
* import akka.stm.local.Atomic;
*
* final Ref<Integer> ref = new Ref<Integer>(0);
*

View file

@ -2,9 +2,9 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
import se.scalablesolutions.akka.util.Logging
import akka.util.Logging
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.templates.TransactionalCallable
@ -19,7 +19,7 @@ object LocalStm extends Logging
* Example of atomic transaction management using the atomic block.
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.local._
* import akka.stm.local._
*
* atomic {
* // do something within a transaction

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
/**
* For easily importing local STM.

View file

@ -2,18 +2,18 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package akka.stm
/**
* For importing the transactional datastructures, including the primitive refs
* and transactional data structures from Multiverse.
*/
package object transactional {
type TransactionalMap[K,V] = se.scalablesolutions.akka.stm.TransactionalMap[K,V]
val TransactionalMap = se.scalablesolutions.akka.stm.TransactionalMap
type TransactionalMap[K,V] = akka.stm.TransactionalMap[K,V]
val TransactionalMap = akka.stm.TransactionalMap
type TransactionalVector[T] = se.scalablesolutions.akka.stm.TransactionalVector[T]
val TransactionalVector = se.scalablesolutions.akka.stm.TransactionalVector
type TransactionalVector[T] = akka.stm.TransactionalVector[T]
val TransactionalVector = akka.stm.TransactionalVector
type BooleanRef = org.multiverse.transactional.refs.BooleanRef
type ByteRef = org.multiverse.transactional.refs.ByteRef

View file

@ -1,7 +1,7 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
object Address {
def apply(hostname: String, port: Int) = new Address(hostname, port)

View file

@ -2,10 +2,10 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka
package akka
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.newUuid
import akka.util.Logging
import akka.actor.newUuid
import java.io.{StringWriter, PrintWriter}
import java.net.{InetAddress, UnknownHostException}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
trait Bootable {
def onLoad {}

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.util
import java.security.{MessageDigest, SecureRandom}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Crypt extends Logging {
val hex = "0123456789ABCDEF"
val lineSeparator = System.getProperty("line.separator")
lazy val random = SecureRandom.getInstance("SHA1PRNG")
def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII"))
def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5"))
def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII"))
def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1"))
def generateSecureCookie: String = {
log.info("Generating secure cookie...")
val bytes = Array.fill(32)(0.byteValue)
random.nextBytes(bytes)
sha1(bytes)
}
def digest(bytes: Array[Byte], md: MessageDigest): String = {
md.update(bytes)
hexify(md.digest)
}
def hexify(bytes: Array[Byte]): String = {
val builder = new StringBuilder
bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) }
builder.toString
}
private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n")
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
import java.util.concurrent.TimeUnit
@ -24,7 +24,7 @@ object Duration {
* <p/>
* Examples of usage from Java:
* <pre>
* import se.scalablesolutions.akka.util.Duration;
* import akka.util.Duration;
* import java.util.concurrent.TimeUnit;
*
* Duration duration = new Duration(100, TimeUnit.MILLISECONDS);
@ -36,7 +36,7 @@ object Duration {
* <p/>
* Examples of usage from Scala:
* <pre>
* import se.scalablesolutions.akka.util.Duration
* import akka.util.Duration
* import java.util.concurrent.TimeUnit
*
* val duration = Duration(100, TimeUnit.MILLISECONDS)
@ -48,7 +48,7 @@ object Duration {
* <p/>
* Implicits are also provided for Int and Long. Example usage:
* <pre>
* import se.scalablesolutions.akka.util.duration._
* import akka.util.duration._
*
* val duration = 100.millis
* </pre>

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
import java.lang.reflect.{Array => JArray}
import java.lang.{Float => JFloat, Double => JDouble}

View file

@ -2,9 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import java.security.MessageDigest
package akka.util
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -22,18 +20,8 @@ object Helpers extends Logging {
bytes
}
def getMD5For(s: String) = {
val digest = MessageDigest.getInstance("MD5")
digest.update(s.getBytes("ASCII"))
val bytes = digest.digest
val sb = new StringBuilder
val hex = "0123456789ABCDEF"
bytes.foreach(b => {
val n = b.asInstanceOf[Int]
sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF))
})
sb.toString
def bytesToInt(bytes: Array[Byte], offset: Int): Int = {
(0 until 4).foldLeft(0)((value, index) => value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
}
/**
@ -57,4 +45,56 @@ object Helpers extends Logging {
log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName)
None
}
/**
* Reference that can hold either a typed value or an exception.
*
* Usage:
* <pre>
* scala> ResultOrError(1)
* res0: ResultOrError[Int] = ResultOrError@a96606
*
* scala> res0()
res1: Int = 1
*
* scala> res0() = 3
*
* scala> res0()
* res3: Int = 3
*
* scala> res0() = { println("Hello world"); 3}
* Hello world
*
* scala> res0()
* res5: Int = 3
*
* scala> res0() = error("Lets see what happens here...")
*
* scala> res0()
* java.lang.RuntimeException: Lets see what happens here...
* at ResultOrError.apply(Helper.scala:11)
* at .<init>(<console>:6)
* at .<clinit>(<console>)
* at Re...
* </pre>
*/
class ResultOrError[R](result: R){
private[this] var contents: Either[R, Throwable] = Left(result)
def update(value: => R) = {
contents = try {
Left(value)
} catch {
case (error : Throwable) => Right(error)
}
}
def apply() = contents match {
case Left(result) => result
case Right(error) => throw error.fillInStackTrace
}
}
object ResultOrError {
def apply[R](result: R) = new ResultOrError(result)
}
}

View file

@ -2,11 +2,11 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
import java.util.concurrent.ConcurrentSkipListSet
import se.scalablesolutions.akka.actor.ActorRef
import akka.actor.ActorRef
/**
* A manager for listener actors. Intended for mixin by observables.

View file

@ -2,10 +2,10 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic. {AtomicBoolean}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -119,8 +119,8 @@ class SimpleLock {
class Switch(startAsOn: Boolean = false) {
private val switch = new AtomicBoolean(startAsOn)
protected def transcend(from: Boolean,action: => Unit): Boolean = {
if (switch.compareAndSet(from,!from)) {
protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized {
if (switch.compareAndSet(from, !from)) {
try {
action
} catch {
@ -133,43 +133,35 @@ class Switch(startAsOn: Boolean = false) {
}
def switchOff(action: => Unit): Boolean = transcend(from = true, action)
def switchOn(action: => Unit): Boolean = transcend(from = false,action)
def switchOn(action: => Unit): Boolean = transcend(from = false, action)
def switchOff: Boolean = switch.compareAndSet(true,false)
def switchOn: Boolean = switch.compareAndSet(false,true)
def switchOff: Boolean = synchronized { switch.compareAndSet(true, false) }
def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) }
def ifOnYield[T](action: => T): Option[T] = {
if (switch.get)
Some(action)
else
None
if (switch.get) Some(action)
else None
}
def ifOffYield[T](action: => T): Option[T] = {
if (switch.get)
Some(action)
else
None
if (switch.get) Some(action)
else None
}
def ifOn(action: => Unit): Boolean = {
if (switch.get) {
action
true
}
else
false
} else false
}
def ifOff(action: => Unit): Boolean = {
if (!switch.get) {
action
true
}
else
false
} else false
}
def isOn = switch.get
def isOff = !isOn
}
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
import org.slf4j.{Logger => SLFLogger,LoggerFactory => SLFLoggerFactory}

View file

@ -2,13 +2,13 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
package akka.util
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException, ActorType, Uuid}
import se.scalablesolutions.akka.dispatch.{Future, CompletableFuture, MessageInvocation}
import se.scalablesolutions.akka.config.{Config, ModuleNotAvailableException}
import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.AkkaException
import akka.actor.{ActorRef, IllegalActorStateException, ActorType, Uuid}
import akka.dispatch.{Future, CompletableFuture, MessageInvocation}
import akka.config.{Config, ModuleNotAvailableException}
import akka.stm.Transaction
import akka.AkkaException
import java.net.InetSocketAddress
@ -65,7 +65,7 @@ object ReflectiveAccess extends Logging {
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] =
getObjectFor("se.scalablesolutions.akka.remote.RemoteClient$")
getObjectFor("akka.remote.RemoteClient$")
def register(address: InetSocketAddress, uuid: Uuid) = {
ensureRemotingEnabled
@ -123,10 +123,10 @@ object ReflectiveAccess extends Logging {
}
val remoteServerObjectInstance: Option[RemoteServerObject] =
getObjectFor("se.scalablesolutions.akka.remote.RemoteServer$")
getObjectFor("akka.remote.RemoteServer$")
val remoteNodeObjectInstance: Option[RemoteNodeObject] =
getObjectFor("se.scalablesolutions.akka.remote.RemoteNode$")
getObjectFor("akka.remote.RemoteNode$")
def registerActor(address: InetSocketAddress, uuid: Uuid, actorRef: ActorRef) = {
ensureRemotingEnabled
@ -165,7 +165,7 @@ object ReflectiveAccess extends Logging {
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
val typedActorObjectInstance: Option[TypedActorObject] =
getObjectFor("se.scalablesolutions.akka.actor.TypedActor$")
getObjectFor("akka.actor.TypedActor$")
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
ensureTypedActorEnabled
@ -194,7 +194,7 @@ object ReflectiveAccess extends Logging {
"Can't load the typed actor module, make sure that akka-jta.jar is on the classpath")
val transactionContainerObjectInstance: Option[TransactionContainerObject] =
getObjectFor("se.scalablesolutions.akka.jta.TransactionContainer$")
getObjectFor("akka.jta.TransactionContainer$")
def createTransactionContainer: TransactionContainer = {
ensureJtaEnabled
@ -217,21 +217,21 @@ object ReflectiveAccess extends Logging {
lazy val isEnterpriseEnabled = clusterObjectInstance.isDefined
val clusterObjectInstance: Option[AnyRef] =
getObjectFor("se.scalablesolutions.akka.cluster.Cluster$")
getObjectFor("akka.cluster.Cluster$")
val serializerClass: Option[Class[_]] =
getClassFor("se.scalablesolutions.akka.serialization.Serializer")
getClassFor("akka.serialization.Serializer")
def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException(
"Feature is only available in Akka Enterprise edition")
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.actor.mailbox.FileBasedMailbox", actorRef)
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.FileBasedMailbox", actorRef)
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef)
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef)
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.actor.mailbox.BeanstalkBasedMailbox", actorRef)
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.BeanstalkBasedMailbox", actorRef)
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.actor.mailbox.RedisBasedMailbox", actorRef)
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.RedisBasedMailbox", actorRef)
private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = {
ensureEnterpriseEnabled

View file

@ -1,12 +1,12 @@
package se.scalablesolutions.akka.config;
package akka.config;
import se.scalablesolutions.akka.actor.*;
import akka.actor.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static se.scalablesolutions.akka.config.Supervision.*;
import static akka.config.Supervision.*;
public class SupervisionConfig {
/*Just some sample code to demonstrate the declarative supervision configuration for Java */

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.japi;
package akka.japi;
import org.junit.Test;

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
public class Address {
private String location;

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import se.scalablesolutions.akka.stm.Ref;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.Ref;
import akka.stm.local.Atomic;
public class CounterExample {
final static Ref<Integer> ref = new Ref<Integer>(0);

View file

@ -1,11 +1,11 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.Before;
import se.scalablesolutions.akka.stm.*;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.*;
import akka.stm.local.Atomic;
import org.multiverse.api.ThreadLocalTransaction;
import org.multiverse.api.TransactionConfiguration;

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import se.scalablesolutions.akka.stm.Ref;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.Ref;
import akka.stm.local.Atomic;
public class RefExample {
public static void main(String[] args) {

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import se.scalablesolutions.akka.stm.Ref;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.Ref;
import akka.stm.local.Atomic;
public class StmExamples {
public static void main(String[] args) {

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import se.scalablesolutions.akka.stm.*;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.*;
import akka.stm.local.Atomic;
import org.multiverse.api.ThreadLocalTransaction;
import org.multiverse.api.TransactionConfiguration;

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import se.scalablesolutions.akka.stm.*;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.*;
import akka.stm.local.Atomic;
public class TransactionalMapExample {
public static void main(String[] args) {

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
import se.scalablesolutions.akka.stm.*;
import se.scalablesolutions.akka.stm.local.Atomic;
import akka.stm.*;
import akka.stm.local.Atomic;
public class TransactionalVectorExample {
public static void main(String[] args) {

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.stm;
package akka.stm;
public class User {
private String name;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka
package akka
abstract class TestMessage

View file

@ -1,11 +1,11 @@
package se.scalablesolutions.akka.actor
package akka.actor
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
import se.scalablesolutions.akka.config.Supervision._
import akka.config.Supervision._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import akka.dispatch.Dispatchers
import Actor._
object ActorFireForgetRequestReplySpec {

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
@ -10,7 +10,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor._
import akka.actor._
import java.util.concurrent.{CountDownLatch, TimeUnit}
object ActorRefSpec {

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.Suite
import org.scalatest.junit.JUnitRunner

View file

@ -3,9 +3,9 @@
contributed by Julien Gaugaz
inspired by the version contributed by Yura Taras and modified by Isaac Gouy
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.actor.Actor._
import akka.actor.Actor._
object Chameneos {

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@ -13,37 +13,63 @@ import java.util.concurrent.TimeUnit
object FSMActorSpec {
class Lock(code: String,
timeout: Int,
unlockedLatch: StandardLatch,
lockedLatch: StandardLatch) extends Actor with FSM[CodeState] {
val unlockedLatch = new StandardLatch
val lockedLatch = new StandardLatch
val unhandledLatch = new StandardLatch
val terminatedLatch = new StandardLatch
val transitionLatch = new StandardLatch
def initialState = State(NextState, locked, CodeState("", code))
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
def locked: StateFunction = {
class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] {
notifying {
case Transition(Locked, Open) => transitionLatch.open
case Transition(_, _) => ()
}
when(Locked) {
case Event(digit: Char, CodeState(soFar, code)) => {
soFar + digit match {
case incomplete if incomplete.length < code.length =>
State(NextState, locked, CodeState(incomplete, code))
stay using CodeState(incomplete, code)
case codeTry if (codeTry == code) => {
doUnlock
State(NextState, open, CodeState("", code), Some(timeout))
goto(Open) using CodeState("", code) until timeout
}
case wrong => {
log.error("Wrong code %s", wrong)
State(NextState, locked, CodeState("", code))
stay using CodeState("", code)
}
}
}
case Event("hello", _) => stay replying "world"
case Event("bye", _) => stop(Shutdown)
}
def open: StateFunction = {
when(Open) {
case Event(StateTimeout, stateData) => {
doLock
State(NextState, locked, stateData)
goto(Locked)
}
}
startWith(Locked, CodeState("", code))
whenUnhandled {
case Event(_, stateData) => {
log.info("Unhandled")
unhandledLatch.open
stay
}
}
onTermination {
case reason => terminatedLatch.open
}
private def doLock() {
log.info("Locked")
lockedLatch.open
@ -63,11 +89,9 @@ class FSMActorSpec extends JUnitSuite {
@Test
def unlockTheLock = {
val unlockedLatch = new StandardLatch
val lockedLatch = new StandardLatch
// lock that locked after being open for 1 sec
val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start
val lock = Actor.actorOf(new Lock("33221", 1000)).start
lock ! '3'
lock ! '3'
@ -76,7 +100,27 @@ class FSMActorSpec extends JUnitSuite {
lock ! '1'
assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
assert(transitionLatch.tryAwait(1, TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))
lock ! "not_handled"
assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS))
val answerLatch = new StandardLatch
object Hello
object Bye
val tester = Actor.actorOf(new Actor {
protected def receive = {
case Hello => lock ! "hello"
case "world" => answerLatch.open
case Bye => lock ! "bye"
}
}).start
tester ! Hello
assert(answerLatch.tryAwait(2, TimeUnit.SECONDS))
tester ! Bye
assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS))
}
}

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor
package akka.actor
import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test

View file

@ -1,10 +1,10 @@
package se.scalablesolutions.akka.actor
package akka.actor
import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector}
import akka.stm.{Ref, TransactionalMap, TransactionalVector}
import Actor._
object TransactorSpec {

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import java.lang.Thread.sleep
@ -11,7 +11,7 @@ import org.junit.Test
import Actor._
import java.util.concurrent.{TimeUnit, CountDownLatch}
import se.scalablesolutions.akka.config.Supervision.{Permanent, LifeCycle, OneForOneStrategy}
import akka.config.Supervision.{Permanent, LifeCycle, OneForOneStrategy}
import org.multiverse.api.latches.StandardLatch
class RestartStrategySpec extends JUnitSuite {

View file

@ -2,13 +2,13 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import se.scalablesolutions.akka.config.Supervision.OneForOneStrategy
import akka.config.Supervision.OneForOneStrategy
import java.util.concurrent.{TimeUnit, CountDownLatch}

View file

@ -1,12 +1,12 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, Permanent}
import akka.dispatch.Dispatchers
import akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, Permanent}
import java.util.concurrent.CountDownLatch
class SupervisorMiscSpec extends WordSpec with MustMatchers {
@ -36,7 +36,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
}).start
val actor3 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test")
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build
override def postRestart(cause: Throwable) {countDownLatch.countDown}
protected def receive = {
@ -70,10 +70,10 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
actor4 ! "kill"
countDownLatch.await()
assert(!actor1.dispatcher.isShutdown, "dispatcher1 is shutdown")
assert(!actor2.dispatcher.isShutdown, "dispatcher2 is shutdown")
assert(!actor3.dispatcher.isShutdown, "dispatcher3 is shutdown")
assert(!actor4.dispatcher.isShutdown, "dispatcher4 is shutdown")
assert(!actor1.isShutdown, "actor1 is shutdown")
assert(!actor2.isShutdown, "actor2 is shutdown")
assert(!actor3.isShutdown, "actor3 is shutdown")
assert(!actor4.isShutdown, "actor4 is shutdown")
}
}
}

View file

@ -2,10 +2,10 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
package akka.actor
import se.scalablesolutions.akka.config.Supervision._
import se.scalablesolutions.akka.{OneWay, Die, Ping}
import akka.config.Supervision._
import akka.{OneWay, Die, Ping}
import Actor._
import org.scalatest.junit.JUnitSuite

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dataflow
package akka.dataflow
import org.scalatest.Spec
import org.scalatest.Assertions
@ -11,11 +11,11 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture
import akka.dispatch.DefaultCompletableFuture
import java.util.concurrent.{TimeUnit, CountDownLatch}
import annotation.tailrec
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
import se.scalablesolutions.akka.actor.ActorRegistry
import akka.actor.ActorRegistry
@RunWith(classOf[JUnitRunner])
class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
@ -28,8 +28,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
val x, y, z = new DataFlowVariable[Int]
thread {
z << x() + y()
latch.countDown
result.set(z())
latch.countDown
}
thread { x << 40 }
thread { y << 2 }

View file

@ -0,0 +1,299 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.Assertions._
import akka.dispatch._
import akka.actor.{ActorRef, Actor}
import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
case class Forward(to: ActorRef,msg: Any) extends ActorModelMessage
case class CountDown(latch: CountDownLatch) extends ActorModelMessage
case class Increment(counter: AtomicLong) extends ActorModelMessage
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
case object Restart extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
class DispatcherActor(dispatcher: MessageDispatcherInterceptor) extends Actor {
self.dispatcher = dispatcher.asInstanceOf[MessageDispatcher]
def ack { dispatcher.getStats(self).msgsProcessed.incrementAndGet() }
override def postRestart(reason: Throwable) {
dispatcher.getStats(self).restarts.incrementAndGet()
}
def receive = {
case Await(latch) => ack; latch.await()
case Meet(sign, wait) => ack; sign.countDown(); wait.await()
case Reply(msg) => ack; self.reply(msg)
case Reply_?(msg) => ack; self.reply_?(msg)
case Forward(to,msg) => ack; to.forward(msg)
case CountDown(latch) => ack; latch.countDown()
case Increment(count) => ack; count.incrementAndGet()
case CountDownNStop(l)=> ack; l.countDown; self.stop
case Restart => ack; throw new Exception("Restart requested")
}
}
class InterceptorStats {
val suspensions = new AtomicLong(0)
val resumes = new AtomicLong(0)
val registers = new AtomicLong(0)
val unregisters = new AtomicLong(0)
val msgsReceived = new AtomicLong(0)
val msgsProcessed = new AtomicLong(0)
val restarts = new AtomicLong(0)
}
trait MessageDispatcherInterceptor extends MessageDispatcher {
val stats = new ConcurrentHashMap[ActorRef,InterceptorStats]
val starts = new AtomicLong(0)
val stops = new AtomicLong(0)
def getStats(actorRef: ActorRef) = {
stats.putIfAbsent(actorRef,new InterceptorStats)
stats.get(actorRef)
}
abstract override def suspend(actorRef: ActorRef) {
super.suspend(actorRef)
getStats(actorRef).suspensions.incrementAndGet()
}
abstract override def resume(actorRef: ActorRef) {
super.resume(actorRef)
getStats(actorRef).resumes.incrementAndGet()
}
private[akka] abstract override def register(actorRef: ActorRef) {
super.register(actorRef)
getStats(actorRef).registers.incrementAndGet()
}
private[akka] abstract override def unregister(actorRef: ActorRef) {
super.unregister(actorRef)
getStats(actorRef).unregisters.incrementAndGet()
}
private[akka] abstract override def dispatch(invocation: MessageInvocation) {
super.dispatch(invocation)
getStats(invocation.receiver).msgsReceived.incrementAndGet()
}
private[akka] abstract override def start {
super.start
starts.incrementAndGet()
}
private[akka] abstract override def shutdown {
super.shutdown
stops.incrementAndGet()
}
}
def assertDispatcher(dispatcher: MessageDispatcherInterceptor)(
starts: Long = dispatcher.starts.get(),
stops: Long = dispatcher.stops.get()
) {
assert(starts === dispatcher.starts.get(), "Dispatcher starts")
assert(stops === dispatcher.stops.get(), "Dispatcher stops")
}
def assertCountDown(latch: CountDownLatch,wait: Long,hint: AnyRef){
assert(latch.await(wait,TimeUnit.MILLISECONDS) === true)
}
def assertNoCountDown(latch: CountDownLatch,wait: Long,hint: AnyRef){
assert(latch.await(wait,TimeUnit.MILLISECONDS) === false)
}
def statsFor(actorRef: ActorRef, dispatcher: MessageDispatcher = null) =
dispatcher.asInstanceOf[MessageDispatcherInterceptor].getStats(actorRef)
def assertRefDefaultZero(actorRef: ActorRef,dispatcher: MessageDispatcher = null)(
suspensions: Long = 0,
resumes: Long = 0,
registers: Long = 0,
unregisters: Long = 0,
msgsReceived: Long = 0,
msgsProcessed: Long = 0,
restarts: Long = 0) {
assertRef(actorRef,dispatcher)(
suspensions,
resumes,
registers,
unregisters,
msgsReceived,
msgsProcessed,
restarts
)
}
def assertRef(actorRef: ActorRef,dispatcher: MessageDispatcher = null)(
suspensions: Long = statsFor(actorRef).suspensions.get(),
resumes: Long = statsFor(actorRef).resumes.get(),
registers: Long = statsFor(actorRef).registers.get(),
unregisters: Long = statsFor(actorRef).unregisters.get(),
msgsReceived: Long = statsFor(actorRef).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(),
restarts: Long = statsFor(actorRef).restarts.get()
) {
val stats = statsFor(actorRef,if (dispatcher eq null) actorRef.dispatcher else dispatcher)
assert(stats.suspensions.get() === suspensions, "Suspensions")
assert(stats.resumes.get() === resumes, "Resumes")
assert(stats.registers.get() === registers, "Registers")
assert(stats.unregisters.get() === unregisters, "Unregisters")
assert(stats.msgsReceived.get() === msgsReceived, "Received")
assert(stats.msgsProcessed.get() === msgsProcessed, "Processed")
assert(stats.restarts.get() === restarts, "Restarts")
}
def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(new DispatcherActor(d))
}
abstract class ActorModelSpec extends JUnitSuite {
import ActorModelSpec._
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
@Test def dispatcherShouldDynamicallyHandleItsOwnLifeCycle {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
assertDispatcher(dispatcher)(starts = 0, stops = 0)
a.start
assertDispatcher(dispatcher)(starts = 1, stops = 0)
a.stop
Thread.sleep(dispatcher.timeoutMs + 100)
assertDispatcher(dispatcher)(starts = 1, stops = 1)
assertRef(a,dispatcher)(
suspensions = 0,
resumes = 0,
registers = 1,
unregisters = 1,
msgsReceived = 0,
msgsProcessed = 0,
restarts = 0
)
}
@Test def dispatcherShouldProcessMessagesOneAtATime {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
val start,step1,step2,oneAtATime = new CountDownLatch(1)
a.start
a ! CountDown(start)
assertCountDown(start,3000, "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Meet(step1,step2)
assertCountDown(step1,3000, "Didn't process the Meet message in 3 seocnds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 2, msgsProcessed = 2)
a ! CountDown(oneAtATime)
assertNoCountDown(oneAtATime,500,"Processed message when not allowed to")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 2)
step2.countDown()
assertCountDown(oneAtATime,500,"Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
}
@Test def dispatcherShouldProcessMessagesInParallel: Unit = {
implicit val dispatcher = newInterceptedDispatcher
val a, b = newTestActor.start
val aStart,aStop,bParallel = new CountDownLatch(1)
a ! Meet(aStart,aStop)
assertCountDown(aStart,3000, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, 3000, "Should process other actors in parallel")
aStop.countDown()
a.stop
b.stop
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
@Test def dispatcherShouldSuspendAndResumeAFailingNonSupervisedPermanentActor {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, 3000, "Should be suspended+resumed and done with next message within 3 seconds")
a.stop
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
}
@Test def dispatcherShouldNotProcessMessagesForASuspendedActor {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start
val done = new CountDownLatch(1)
dispatcher.suspend(a)
a ! CountDown(done)
assertNoCountDown(done, 1000, "Should not process messages while suspended")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
dispatcher.resume(a)
assertCountDown(done, 3000, "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
a.stop
assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
}
@Test def dispatcherShouldHandleWavesOfActors {
implicit val dispatcher = newInterceptedDispatcher
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
(1 to num) foreach {
_ => newTestActor.start ! cachedMessage
}
assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns")
}
for(run <- 1 to 3) {
flood(10000)
Thread.sleep(dispatcher.timeoutMs * 2)
assertDispatcher(dispatcher)(starts = run, stops = run)
}
}
}
class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher =
new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor
}
class HawtDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new HawtDispatcher(false) with MessageDispatcherInterceptor
}
class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor
}

View file

@ -1,7 +1,7 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
@ -9,7 +9,7 @@ import org.junit.Test
import net.lag.configgy.Config
import scala.reflect.{Manifest}
import se.scalablesolutions.akka.dispatch._
import akka.dispatch._
object DispatchersSpec {
import Dispatchers._

View file

@ -1,16 +1,16 @@
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.{Dispatchers,ExecutorBasedEventDrivenDispatcher}
import se.scalablesolutions.akka.actor.Actor
import akka.dispatch.{Dispatchers,ExecutorBasedEventDrivenDispatcher}
import akka.actor.Actor
import Actor._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
object ExecutorBasedEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString)
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build
def receive = {
case "Hello" =>
self.reply("World")
@ -23,7 +23,7 @@ object ExecutorBasedEventDrivenDispatcherActorSpec {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString)
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}
@ -68,9 +68,10 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
}
@Test def shouldRespectThroughput {
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_TYPE, (e) => {
e.setCorePoolSize(1)
})
val throughputDispatcher = Dispatchers.
newExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_TYPE).
setCorePoolSize(1).
build
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(100)
@ -97,16 +98,15 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
val result = latch.await(3,TimeUnit.SECONDS)
fastOne.stop
slowOne.stop
throughputDispatcher.shutdown
assert(result === true)
}
@Test def shouldRespectThroughputDeadline {
val deadlineMs = 100
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_TYPE, (e) => {
e.setCorePoolSize(1)
})
val throughputDispatcher = Dispatchers.
newExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_TYPE).
setCorePoolSize(1).
build
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(1)
val start = new CountDownLatch(1)

View file

@ -1,10 +1,10 @@
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.actor.Actor
import akka.actor.Actor
import Actor._
/**

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
@ -6,14 +6,14 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.{TimeUnit, CountDownLatch}
import se.scalablesolutions.akka.actor.{IllegalActorStateException, Actor}
import akka.actor.{IllegalActorStateException, Actor}
import Actor._
import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers}
import akka.dispatch.{MessageQueue, Dispatchers}
object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
self.dispatcher = delayableActorDispatcher

View file

@ -1,8 +1,8 @@
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Futures
import akka.dispatch.Futures
import Actor._
import org.multiverse.api.latches.StandardLatch

View file

@ -1,12 +1,12 @@
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.{HawtDispatcher, Dispatchers}
import se.scalablesolutions.akka.actor.Actor
import akka.dispatch.{HawtDispatcher, Dispatchers}
import akka.actor.Actor
import Actor._
object HawtDispatcherActorSpec {

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import scala.collection.mutable.ListBuffer
@ -12,9 +12,9 @@ import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.HawtDispatcher
import akka.actor._
import akka.actor.Actor._
import akka.dispatch.HawtDispatcher
import org.fusesource.hawtdispatch.DispatchSource
import org.fusesource.hawtdispatch.ScalaDispatch._

View file

@ -1,12 +1,12 @@
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.util.Duration
import se.scalablesolutions.akka.dispatch._
import akka.actor.Actor
import akka.util.Duration
import akka.dispatch._
import Actor._
import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}

View file

@ -1,11 +1,11 @@
package se.scalablesolutions.akka.actor.dispatch
package akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.actor.Actor
import akka.dispatch.Dispatchers
import akka.actor.Actor
import Actor._
object ThreadBasedActorSpec {

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.dispatch
package akka.dispatch
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
@ -9,7 +9,7 @@ import java.util.concurrent.locks.ReentrantLock
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import se.scalablesolutions.akka.actor.Actor
import akka.actor.Actor
import Actor._
// FIXME use this test when we have removed the MessageInvoker classes

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.japi
package akka.japi
import org.scalatest.junit.JUnitSuite

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test

View file

@ -1,9 +1,9 @@
package se.scalablesolutions.akka.actor
package akka.actor
import org.scalatest.junit.JUnitSuite
import Actor._
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.config.Supervision._
import akka.config.Supervision._
import org.multiverse.api.latches.StandardLatch
import org.junit.Test

View file

@ -1,8 +1,8 @@
package se.scalablesolutions.akka.actor.routing
package akka.actor.routing
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util.Logging
import akka.actor.Actor
import akka.actor.Actor._
import akka.util.Logging
import org.scalatest.Suite
import org.junit.runner.RunWith
@ -12,7 +12,7 @@ import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.routing._
import akka.routing._
@RunWith(classOf[JUnitRunner])
class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers with Logging {
@ -134,7 +134,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
}
@Test def testIsDefinedAt = {
import se.scalablesolutions.akka.actor.ActorRef
import akka.actor.ActorRef
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")

View file

@ -1,5 +1,5 @@
package se.scalablesolutions.akka.stm
package akka.stm
import org.scalatest.junit.JUnitWrapperSuite
class JavaStmSpec extends JUnitWrapperSuite("se.scalablesolutions.akka.stm.JavaStmTests", Thread.currentThread.getContextClassLoader)
class JavaStmSpec extends JUnitWrapperSuite("akka.stm.JavaStmTests", Thread.currentThread.getContextClassLoader)

View file

@ -1,11 +1,11 @@
package se.scalablesolutions.akka.stm
package akka.stm
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
class RefSpec extends WordSpec with MustMatchers {
import se.scalablesolutions.akka.stm.local._
import akka.stm.local._
"A Ref" should {

View file

@ -1,6 +1,6 @@
package se.scalablesolutions.akka.stm
package akka.stm
import se.scalablesolutions.akka.actor.{Actor, Transactor}
import akka.actor.{Actor, Transactor}
import Actor._
import org.multiverse.api.exceptions.ReadonlyException
@ -12,7 +12,7 @@ class StmSpec extends WordSpec with MustMatchers {
"Local STM" should {
import se.scalablesolutions.akka.stm.local._
import akka.stm.local._
"be able to do multiple consecutive atomic {..} statements" in {
val ref = Ref(0)
@ -178,7 +178,7 @@ object GlobalTransactionVectorTestActor {
class GlobalTransactionVectorTestActor extends Actor {
import GlobalTransactionVectorTestActor._
import se.scalablesolutions.akka.stm.global._
import akka.stm.global._
private val vector: TransactionalVector[Int] = atomic { TransactionalVector(1) }

View file

@ -1,4 +1,4 @@
package se.scalablesolutions.akka.actor.ticket
package akka.actor.ticket
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers

View file

@ -1,17 +1,17 @@
package se.scalablesolutions.akka.amqp;
package akka.amqp;
import org.multiverse.api.latches.StandardLatch;
import scala.Option;
import se.scalablesolutions.akka.actor.ActorRef;
import se.scalablesolutions.akka.actor.ActorRegistry;
import se.scalablesolutions.akka.actor.UntypedActor;
import se.scalablesolutions.akka.actor.UntypedActorFactory;
import akka.actor.ActorRef;
import akka.actor.ActorRegistry;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import se.scalablesolutions.akka.amqp.rpc.RPC;
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol;
import akka.amqp.rpc.RPC;
import akka.remote.protocol.RemoteProtocol;
import se.scalablesolutions.akka.japi.Function;
import se.scalablesolutions.akka.japi.Procedure;
import akka.japi.Function;
import akka.japi.Procedure;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Some files were not shown because too many files have changed in this diff Show more