Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-05-07 11:22:18 +02:00
commit 8bf320c246
26 changed files with 63 additions and 29 deletions

View file

@ -6,8 +6,12 @@ package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.ActorRef import se.scalablesolutions.akka.actor.ActorRef
/** An Iterator that is either always empty or yields an infinite number of Ts
*/
trait InfiniteIterator[T] extends Iterator[T] trait InfiniteIterator[T] extends Iterator[T]
/** CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List
*/
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
@volatile private[this] var current: List[T] = items @volatile private[this] var current: List[T] = items
@ -20,6 +24,10 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
} }
} }
/**
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
* useful for work-stealing.
*/
class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] { class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] {
def hasNext = items != Nil def hasNext = items != Nil

View file

@ -11,6 +11,12 @@ case class Listen(listener: ActorRef) extends ListenerMessage
case class Deafen(listener: ActorRef) extends ListenerMessage case class Deafen(listener: ActorRef) extends ListenerMessage
case class WithListeners(f: Set[ActorRef] => Unit) extends ListenerMessage case class WithListeners(f: Set[ActorRef] => Unit) extends ListenerMessage
/** Listeners is a generic trait to implement listening capability on an Actor
* Use the <code>gossip(msg)</code> method to have it sent to the listenees
* Send <code>Listen(self)</code> to start listening
* Send <code>Deafen(self)</code> to stop listening
* Send <code>WithListeners(fun)</code> to traverse the current listeners
*/
trait Listeners { self : Actor => trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[ActorRef]()) private lazy val listeners = Agent(Set[ActorRef]())

View file

@ -10,8 +10,7 @@ import se.scalablesolutions.akka.actor.Actor._
object Patterns { object Patterns {
type PF[A, B] = PartialFunction[A, B] type PF[A, B] = PartialFunction[A, B]
/** /** Creates a new PartialFunction whose isDefinedAt is a combination
* Creates a new PartialFunction whose isDefinedAt is a combination
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply * of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
*/ */
def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
@ -20,19 +19,21 @@ object Patterns {
filtered(a) filtered(a)
} }
/** /** Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
*/ */
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
//FIXME 2.8, use default params with CyclicIterator /** Creates a LoadBalancer from the thunk-supplied InfiniteIterator
*/
def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef = def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
newActor(() => new Actor with LoadBalancer { newActor(() => new Actor with LoadBalancer {
start start
val seq = actors val seq = actors
}) })
/** Creates a Dispatcher given a routing and a message-transforming function
*/
def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef = def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
newActor(() => new Actor with Dispatcher { newActor(() => new Actor with Dispatcher {
start start
@ -40,11 +41,16 @@ object Patterns {
def routes = routing def routes = routing
}) })
/** Creates a Dispatcher given a routing
*/
def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = newActor(() => new Actor with Dispatcher { def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = newActor(() => new Actor with Dispatcher {
start start
def routes = routing def routes = routing
}) })
/** Creates an actor that pipes all incoming messages to
* both another actor and through the supplied function
*/
def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef = def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef =
dispatcherActor({case _ => actorToLog}, logger) dispatcherActor({case _ => actorToLog}, logger)
} }

View file

@ -6,6 +6,8 @@ package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/** A Dispatcher is a trait whose purpose is to route incoming messages to actors
*/
trait Dispatcher { self: Actor => trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg protected def transform(msg: Any): Any = msg
@ -15,12 +17,16 @@ trait Dispatcher { self: Actor =>
protected def dispatch: PartialFunction[Any, Unit] = { protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) => case a if routes.isDefinedAt(a) =>
if (self.replyTo.isDefined) routes(a) forward transform(a) if (self.replyTo.isDefined) routes(a) forward transform(a)
else routes(a) ! transform(a) else routes(a).!(transform(a))(None)
} }
def receive = dispatch def receive = dispatch
} }
/** A LoadBalancer is a specialized kind of Dispatcher,
* that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to
*/
trait LoadBalancer extends Dispatcher { self: Actor => trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorRef] protected def seq: InfiniteIterator[ActorRef]

View file

@ -15,11 +15,12 @@ import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.Config._
import org.multiverse.api.{Transaction => MultiverseTransaction, TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate} import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
import org.multiverse.utils.backoff.ExponentialBackoffPolicy import org.multiverse.api.backoff.ExponentialBackoffPolicy
import org.multiverse.stms.alpha.AlphaStm import org.multiverse.stms.alpha.AlphaStm
class NoTransactionInScopeException extends RuntimeException class NoTransactionInScopeException extends RuntimeException

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.util.UUID import se.scalablesolutions.akka.util.UUID
import org.multiverse.stms.alpha.AlphaRef import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
/** /**
* Example Scala usage: * Example Scala usage:
@ -56,6 +56,14 @@ trait Committable {
def commit: Unit def commit: Unit
} }
object RefFactory {
private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build
def createRef[T] = factory.atomicCreateReference[T]()
def createRef[T](value: T) = factory.atomicCreateReference(value)
}
/** /**
* Alias to TransactionalRef. * Alias to TransactionalRef.
* *
@ -101,8 +109,8 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
val uuid = UUID.newUuid.toString val uuid = UUID.newUuid.toString
private[this] val ref = { private[this] val ref = {
if (initialOpt.isDefined) new AlphaRef(initialOpt.get) if (initialOpt.isDefined) RefFactory.createRef(initialOpt.get)
else new AlphaRef[T] else RefFactory.createRef[T]
} }
def swap(elem: T) = { def swap(elem: T) = {

View file

@ -51,6 +51,14 @@
port = 9998 port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#IF you are using a KerberosAuthenticationActor
# <kerberos>
# servicePrincipal = "HTTP/localhost@EXAMPLE.COM"
# keyTabLocation = "URL to keytab"
# kerberosDebug = "true"
# realm = "EXAMPLE.COM"
# </kerberos>
</rest> </rest>
<remote> <remote>

View file

@ -51,9 +51,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------ // ------------------------------------------------------------
// project defintions // project defintions
lazy val akka_java_util = project("akka-util-java", "akka-util-java", new AkkaJavaUtilProject(_)) lazy val akka_core = project("akka-core", "akka-core", new AkkaCoreProject(_))
lazy val akka_util = project("akka-util", "akka-util", new AkkaUtilProject(_))
lazy val akka_core = project("akka-core", "akka-core", new AkkaCoreProject(_), akka_util, akka_java_util)
lazy val akka_amqp = project("akka-amqp", "akka-amqp", new AkkaAMQPProject(_), akka_core) lazy val akka_amqp = project("akka-amqp", "akka-amqp", new AkkaAMQPProject(_), akka_core)
lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_core, akka_camel) lazy val akka_http = project("akka-http", "akka-http", new AkkaHttpProject(_), akka_core, akka_camel)
lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_core) lazy val akka_camel = project("akka-camel", "akka-camel", new AkkaCamelProject(_), akka_core)
@ -90,8 +88,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
.map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName) .map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName)
.mkString(" ") + .mkString(" ") +
" scala-library.jar" + " scala-library.jar" +
" dist/akka-util_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-util-java_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-core_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-core_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-cluster%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-cluster%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-http_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-http_%s-%s.jar".format(buildScalaVersion, version) +
@ -151,21 +147,16 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile"
val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile"
val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
}
class AkkaUtilProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % "2.1" % "compile" val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % "2.1" % "compile"
val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % "2.1" % "compile" val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % "2.1" % "compile"
val configgy = "net.lag" % "configgy" % "2.8.0.Beta1-1.5-SNAPSHOT" % "compile" val configgy = "net.lag" % "configgy" % "2.8.0.Beta1-1.5-SNAPSHOT" % "compile"
}
class AkkaJavaUtilProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val guicey = "org.guiceyfruit" % "guice-core" % "2.0-beta-4" % "compile" val guicey = "org.guiceyfruit" % "guice-core" % "2.0-beta-4" % "compile"
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.2.0" % "compile" val protobuf = "com.google.protobuf" % "protobuf-java" % "2.2.0" % "compile"
val multiverse = "org.multiverse" % "multiverse-alpha" % "0.4" % "compile" val multiverse = "org.multiverse" % "multiverse-alpha" % "0.5" % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
} }
class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {