diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index a3f282becd..c242a2318f 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -6,15 +6,15 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently} -import se.scalablesolutions.akka.util.{Logging, Duration} import se.scalablesolutions.akka.AkkaException import java.util.concurrent.TimeUnit import java.net.InetSocketAddress import scala.reflect.BeanProperty +import se.scalablesolutions.akka.util. {ReflectiveAccess, Logging, Duration} /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -107,6 +107,7 @@ object Actor extends Logging { def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** +<<<<<<< HEAD:akka-actor/src/main/scala/actor/Actor.scala * Creates an ActorRef out of the Actor with type T. *
    *   import Actor._
@@ -120,7 +121,15 @@ object Actor extends Logging {
    *   val actor = actorOf[MyActor].start
    * 
*/ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(clazz) + def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { + import ReflectiveAccess.{ createInstance, noParams, noArgs } + createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( + throw new ActorInitializationException( + "Could not instantiate Actor" + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) + }) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 5548d030ff..7a6e493f43 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -6,8 +6,7 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.config.{NoFaultHandlingStrategy, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.{ TransactionManagement, TransactionSetAbortedException } @@ -28,6 +27,7 @@ import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import annotation.tailrec private[akka] object ActorRefInternals { @@ -635,7 +635,7 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi * @author Jonas Bonér */ class LocalActorRef private[akka] ( - private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) + private[this] val actorFactory: () => Actor) extends ActorRef with ScalaActorRef { @volatile @@ -653,16 +653,9 @@ class LocalActorRef private[akka] ( protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } - // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor - // instance elegible for garbage collection - private val actorSelfFields = findActorSelfField(actor.getClass) - //If it was started inside "newActor", initialize it if (isRunning) initializeActorInstance - private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) - private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) - // used only for deserialization private[akka] def this(__uuid: Uuid, __id: String, @@ -685,8 +678,7 @@ class LocalActorRef private[akka] ( lifeCycle = __lifeCycle _supervisor = __supervisor hotswap = __hotswap - actorSelfFields._1.set(actor, this) - actorSelfFields._2.set(actor, Some(this)) + setActorSelfFields(actor,this) start ActorRegistry.register(this) } @@ -821,7 +813,7 @@ class LocalActorRef private[akka] ( RemoteClientModule.unregister(remoteAddress.get, uuid) RemoteServerModule.unregister(this) } - nullOutActorRefReferencesFor(actorInstance.get) + setActorSelfFields(actorInstance.get,null) } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } @@ -890,7 +882,7 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. */ def spawn(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard { - spawnButDoNotStart(clazz).start + Actor.actorOf(clazz).start } /** @@ -900,7 +892,7 @@ class LocalActorRef private[akka] ( */ def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { ensureRemotingEnabled - val actor = spawnButDoNotStart(clazz) + val actor = Actor.actorOf(clazz) actor.makeRemote(hostname, port) actor.start actor @@ -912,7 +904,7 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. */ def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard { - val actor = spawnButDoNotStart(clazz) + val actor = Actor.actorOf(clazz) try { link(actor) } finally { @@ -928,7 +920,7 @@ class LocalActorRef private[akka] ( */ def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { ensureRemotingEnabled - val actor = spawnButDoNotStart(clazz) + val actor = Actor.actorOf(clazz) try { actor.makeRemote(hostname, port) link(actor) @@ -1142,7 +1134,7 @@ class LocalActorRef private[akka] ( private def restartActor(failedActor: Actor, reason: Throwable) = { failedActor.preRestart(reason) - nullOutActorRefReferencesFor(failedActor) + setActorSelfFields(failedActor,null) val freshActor = newActor freshActor.preStart actorInstance.set(freshActor) @@ -1152,25 +1144,9 @@ class LocalActorRef private[akka] ( freshActor.postRestart(reason) } - private def spawnButDoNotStart(clazz: Class[_ <: Actor]): ActorRef = Actor.actorOf(clazz.newInstance) - private[this] def newActor: Actor = { Actor.actorRefInCreation.withValue(Some(this)) { - val actor = actorFactory match { - case Left(Some(clazz)) => - import ReflectiveAccess.{ createInstance, noParams, noArgs } - createInstance(clazz.asInstanceOf[Class[_]], noParams, noArgs). - getOrElse(throw new ActorInitializationException( - "Could not instantiate Actor" + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - case Right(Some(factory)) => - factory() - case _ => - throw new ActorInitializationException( - "Can't create Actor, no Actor class or factory function in scope") - } + val actor = actorFactory() if (actor eq null) throw new ActorInitializationException( "Actor instance passed to ActorRef can not be 'null'") actor @@ -1289,25 +1265,33 @@ class LocalActorRef private[akka] ( } } - private def nullOutActorRefReferencesFor(actor: Actor) = { - actorSelfFields._1.set(actor, null) - actorSelfFields._2.set(actor, null) - } + private def setActorSelfFields(actor: Actor, value: ActorRef) { - private def findActorSelfField(clazz: Class[_]): Tuple2[Field, Field] = { - try { - val selfField = clazz.getDeclaredField("self") - val someSelfField = clazz.getDeclaredField("someSelf") - selfField.setAccessible(true) - someSelfField.setAccessible(true) - (selfField, someSelfField) - } catch { - case e: NoSuchFieldException => + @tailrec def lookupAndSetSelfFields(clazz: Class[_],actor: Actor, value: ActorRef): Boolean = { + val success = try { + val selfField = clazz.getDeclaredField("self") + val someSelfField = clazz.getDeclaredField("someSelf") + selfField.setAccessible(true) + someSelfField.setAccessible(true) + selfField.set(actor,value) + someSelfField.set(actor, if (value ne null) Some(value) else null) + true + } catch { + case e: NoSuchFieldException => false + } + + if (success) { + true + } + else { val parent = clazz.getSuperclass - if (parent ne null) findActorSelfField(parent) - else throw new IllegalActorStateException( - toString + " is not an Actor since it have not mixed in the 'Actor' trait") + if (parent eq null) + throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") + lookupAndSetSelfFields(parent,actor,value) + } } + + lookupAndSetSelfFields(actor.getClass,actor,value) } private def initializeActorInstance = { diff --git a/akka-actor/src/main/scala/actor/Supervisor.scala b/akka-actor/src/main/scala/actor/Supervisor.scala index ba559e6945..74eb2fcda8 100644 --- a/akka-actor/src/main/scala/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/actor/Supervisor.scala @@ -4,8 +4,7 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.AkkaException import se.scalablesolutions.akka.util._ import ReflectiveAccess._ @@ -13,6 +12,7 @@ import Actor._ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} import java.net.InetSocketAddress +import se.scalablesolutions.akka.config.Supervision._ class SupervisorException private[akka](message: String) extends AkkaException(message) @@ -81,11 +81,7 @@ object SupervisorFactory { private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig): FaultHandlingStrategy = config match { - case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) => - scheme match { - case AllForOne => AllForOneStrategy(trapExceptions,maxNrOfRetries, timeRange) - case OneForOne => OneForOneStrategy(trapExceptions,maxNrOfRetries, timeRange) - } + case SupervisorConfig(faultHandler, _) => faultHandler } } diff --git a/akka-actor/src/main/scala/actor/UntypedActor.scala b/akka-actor/src/main/scala/actor/UntypedActor.scala index cf16b7f064..a1d724a566 100644 --- a/akka-actor/src/main/scala/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/actor/UntypedActor.scala @@ -6,8 +6,7 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import java.net.InetSocketAddress diff --git a/akka-actor/src/main/scala/config/Configurator.scala b/akka-actor/src/main/scala/config/Configurator.scala index ba7e1f35f2..8560649018 100644 --- a/akka-actor/src/main/scala/config/Configurator.scala +++ b/akka-actor/src/main/scala/config/Configurator.scala @@ -4,12 +4,12 @@ package se.scalablesolutions.akka.config -import ScalaConfig.{RestartStrategy, Component} +import se.scalablesolutions.akka.config.Supervision. {SuperviseTypedActor, FaultHandlingStrategy} private[akka] trait TypedActorConfiguratorBase { def getExternalDependency[T](clazz: Class[T]): T - def configure(restartStrategy: RestartStrategy, components: List[Component]): TypedActorConfiguratorBase + def configure(restartStrategy: FaultHandlingStrategy, components: List[SuperviseTypedActor]): TypedActorConfiguratorBase def inject: TypedActorConfiguratorBase diff --git a/akka-actor/src/main/scala/config/SupervisionConfig.scala b/akka-actor/src/main/scala/config/SupervisionConfig.scala index c74820818b..83ac8f1fee 100644 --- a/akka-actor/src/main/scala/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/config/SupervisionConfig.scala @@ -7,89 +7,101 @@ package se.scalablesolutions.akka.config import se.scalablesolutions.akka.actor.{ActorRef} import se.scalablesolutions.akka.dispatch.MessageDispatcher -sealed abstract class FaultHandlingStrategy { - def trapExit: List[Class[_ <: Throwable]] -} - -object AllForOneStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - new AllForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange) -} - -case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { - def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toList,maxNrOfRetries,withinTimeRange) -} - -object OneForOneStrategy { - def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - new OneForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange) -} - -case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { - def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) - def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toList,maxNrOfRetries,withinTimeRange) -} - -case object NoFaultHandlingStrategy extends FaultHandlingStrategy { - def trapExit: List[Class[_ <: Throwable]] = Nil -} +case class RemoteAddress(val hostname: String, val port: Int) /** * Configuration classes - not to be used as messages. * * @author Jonas Bonér */ -object ScalaConfig { +object Supervision { sealed abstract class ConfigElement abstract class Server extends ConfigElement - abstract class FailOverScheme extends ConfigElement - abstract class LifeCycle extends ConfigElement + sealed abstract class LifeCycle extends ConfigElement + sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement - case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server { - //Java API - def this(restartStrategy: RestartStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList) + case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server]) extends Server { + //Java API + def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList) } - class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { - val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) + class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val remoteAddress: Option[RemoteAddress]) extends Server { + //Java API + def this(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = + this(actorRef, lifeCycle, Option(remoteAddress)) + + //Java API + def this(actorRef: ActorRef, lifeCycle: LifeCycle) = + this(actorRef, lifeCycle, None) } object Supervise { def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) - def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, None) def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } - case class RestartStrategy( - scheme: FailOverScheme, - maxNrOfRetries: Int, - withinTimeRange: Int, - trapExceptions: List[Class[_ <: Throwable]]) extends ConfigElement + object AllForOneStrategy { + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = + new AllForOneStrategy(trapExit, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + } - case object AllForOne extends FailOverScheme - case object OneForOne extends FailOverScheme + case class AllForOneStrategy(override val trapExit: List[Class[_ <: Throwable]], + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) { + def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toList, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + } + + object OneForOneStrategy { + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = + new OneForOneStrategy(trapExit, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + } + + case class OneForOneStrategy(override val trapExit: List[Class[_ <: Throwable]], + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) { + def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toList, + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + } + + case object NoFaultHandlingStrategy extends FaultHandlingStrategy(Nil) + + //Scala API case object Permanent extends LifeCycle case object Temporary extends LifeCycle case object UndefinedLifeCycle extends LifeCycle - case class RemoteAddress(val hostname: String, val port: Int) extends ConfigElement + //Java API (& Scala if you fancy) + def permanent() = Permanent + def temporary() = Temporary + def undefinedLifeCycle = UndefinedLifeCycle - class Component(_intf: Class[_], + //Java API + def noFaultHandlingStrategy = NoFaultHandlingStrategy + + case class SuperviseTypedActor(_intf: Class[_], val target: Class[_], val lifeCycle: LifeCycle, val timeout: Long, @@ -97,165 +109,53 @@ object ScalaConfig { _dispatcher: MessageDispatcher, // optional _remoteAddress: RemoteAddress // optional ) extends Server { - val intf: Option[Class[_]] = if (_intf eq null) None else Some(_intf) - val dispatcher: Option[MessageDispatcher] = if (_dispatcher eq null) None else Some(_dispatcher) - val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) - } - object Component { - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - new Component(intf, target, lifeCycle, timeout, false, null, null) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - new Component(null, target, lifeCycle, timeout, false, null, null) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - new Component(intf, target, lifeCycle, timeout, false, dispatcher, null) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - new Component(null, target, lifeCycle, timeout, false, dispatcher, null) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - new Component(intf, target, lifeCycle, timeout, false, null, remoteAddress) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - new Component(null, target, lifeCycle, timeout, false, null, remoteAddress) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - new Component(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - new Component(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) = - new Component(intf, target, lifeCycle, timeout, transactionRequired, null, null) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) = - new Component(null, target, lifeCycle, timeout, transactionRequired, null, null) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) = - new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) = - new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) = - new Component(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) = - new Component(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress) - - def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress) - - def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress) - } -} - -/** - * @author Jonas Bonér - */ -object JavaConfig { - import scala.reflect.BeanProperty - - sealed abstract class ConfigElement - - class RestartStrategy( - @BeanProperty val scheme: FailOverScheme, - @BeanProperty val maxNrOfRetries: Int, - @BeanProperty val withinTimeRange: Int, - @BeanProperty val trapExceptions: Array[Class[_ <: Throwable]]) extends ConfigElement { - def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartStrategy( - scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList) - } - - abstract class LifeCycle extends ConfigElement { - def transform: se.scalablesolutions.akka.config.ScalaConfig.LifeCycle - } - - class Permanent extends LifeCycle { - override def transform = se.scalablesolutions.akka.config.ScalaConfig.Permanent - } - - class Temporary extends LifeCycle { - override def transform = se.scalablesolutions.akka.config.ScalaConfig.Temporary - } - - class UndefinedLifeCycle extends LifeCycle { - override def transform = se.scalablesolutions.akka.config.ScalaConfig.UndefinedLifeCycle - } - - abstract class FailOverScheme extends ConfigElement { - def transform: se.scalablesolutions.akka.config.ScalaConfig.FailOverScheme - } - class AllForOne extends FailOverScheme { - override def transform = se.scalablesolutions.akka.config.ScalaConfig.AllForOne - } - class OneForOne extends FailOverScheme { - override def transform = se.scalablesolutions.akka.config.ScalaConfig.OneForOne - } - - class RemoteAddress(@BeanProperty val hostname: String, @BeanProperty val port: Int) - - abstract class Server extends ConfigElement - class Component(@BeanProperty val intf: Class[_], - @BeanProperty val target: Class[_], - @BeanProperty val lifeCycle: LifeCycle, - @BeanProperty val timeout: Long, - @BeanProperty val transactionRequired: Boolean, // optional - @BeanProperty val dispatcher: MessageDispatcher, // optional - @BeanProperty val remoteAddress: RemoteAddress // optional - ) extends Server { - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(intf, target, lifeCycle, timeout, false, null, null) + val intf: Option[Class[_]] = Option(_intf) + val dispatcher: Option[MessageDispatcher] = Option(_dispatcher) + val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) = - this(null, target, lifeCycle, timeout, false, null, null) + this(null: Class[_], target, lifeCycle, timeout, false, null.asInstanceOf[MessageDispatcher], null: RemoteAddress) - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(intf, target, lifeCycle, timeout, false, null, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = - this(null, target, lifeCycle, timeout, false, null, remoteAddress) + def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = + this(intf, target, lifeCycle, timeout, false, null.asInstanceOf[MessageDispatcher], null: RemoteAddress) def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = this(intf, target, lifeCycle, timeout, false, dispatcher, null) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = - this(null, target, lifeCycle, timeout, false, dispatcher, null) + this(null: Class[_], target, lifeCycle, timeout, false, dispatcher, null:RemoteAddress) + + def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = + this(intf, target, lifeCycle, timeout, false, null: MessageDispatcher, remoteAddress) + + def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) = + this(null: Class[_], target, lifeCycle, timeout, false, null, remoteAddress) + + def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = + this(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - this(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress) + this(null: Class[_], target, lifeCycle, timeout, false, dispatcher, remoteAddress) def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) = - this(intf, target, lifeCycle, timeout, transactionRequired, null, null) + this(intf, target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, null: RemoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) = - this(null, target, lifeCycle, timeout, transactionRequired, null, null) - - def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) = - this(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress) - - def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) = - this(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress) + this(null: Class[_], target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, null: RemoteAddress) def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) = - this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null) + this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null: RemoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) = - this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null) + this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, null: RemoteAddress) + + def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) = + this(intf, target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, remoteAddress) + + def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) = + this(null: Class[_], target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, remoteAddress) def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) = - this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress) - - def transform = - se.scalablesolutions.akka.config.ScalaConfig.Component( - intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, - if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) - - def newSupervised(actorRef: ActorRef) = - se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) + this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress) } - -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 3fd1c708e4..ea06ebb4ec 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -45,6 +45,12 @@ object Futures { future.get } + /** + * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + */ + def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = + in map { f => fun(f.await) } + /* def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = { import Actor.Sender.Self @@ -83,6 +89,18 @@ sealed trait Future[T] { def timeoutInNanos: Long def result: Option[T] def exception: Option[Throwable] + def map[O](f: (T) => O): Future[O] = { + val wrapped = this + new Future[O] { + def await = { wrapped.await; this } + def awaitBlocking = { wrapped.awaitBlocking; this } + def isCompleted = wrapped.isCompleted + def isExpired = wrapped.isExpired + def timeoutInNanos = wrapped.timeoutInNanos + def result: Option[O] = { wrapped.result map f } + def exception: Option[Throwable] = wrapped.exception + } + } } trait CompletableFuture[T] extends Future[T] { diff --git a/akka-actor/src/test/java/se/scalablesolutions/akka/config/SupervisionConfig.java b/akka-actor/src/test/java/se/scalablesolutions/akka/config/SupervisionConfig.java new file mode 100644 index 0000000000..dcd2001d27 --- /dev/null +++ b/akka-actor/src/test/java/se/scalablesolutions/akka/config/SupervisionConfig.java @@ -0,0 +1,21 @@ +package se.scalablesolutions.akka.config; + +import se.scalablesolutions.akka.actor.ActorRef; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static se.scalablesolutions.akka.config.Supervision.*; + +public class SupervisionConfig { + /*Just some sample code to demonstrate the declarative supervision configuration for Java */ + public SupervisorConfig createSupervisorConfig(List toSupervise) { + ArrayList targets = new ArrayList(toSupervise.size()); + for(ActorRef ref : toSupervise) { + targets.add(new Supervise(ref, permanent(), new RemoteAddress("localhost",9999))); + } + + return new SupervisorConfig(new OneForOneStrategy(new Class[] { Exception.class },50,1000), targets.toArray(new Server[0])); + } +} diff --git a/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala index 7741b79cea..0bff02a1a9 100644 --- a/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -1,7 +1,7 @@ package se.scalablesolutions.akka.actor import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException} -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import org.scalatest.junit.JUnitSuite import org.junit.Test diff --git a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala index 887785f568..ba88ccd842 100644 --- a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala @@ -10,9 +10,8 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import se.scalablesolutions.akka.config.OneForOneStrategy import java.util.concurrent.{TimeUnit, CountDownLatch} -import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import se.scalablesolutions.akka.config.Supervision.{Permanent, LifeCycle, OneForOneStrategy} import org.multiverse.api.latches.StandardLatch class RestartStrategySpec extends JUnitSuite { diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorHierarchySpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorHierarchySpec.scala index b1f8af27c0..4091215571 100644 --- a/akka-actor/src/test/scala/actor/supervisor/SupervisorHierarchySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorHierarchySpec.scala @@ -8,7 +8,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.OneForOneStrategy import java.util.concurrent.{TimeUnit, CountDownLatch} diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala index 2805a8675d..5fcaf13173 100644 --- a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.dispatch.Dispatchers -import se.scalablesolutions.akka.config.ScalaConfig.{RestartStrategy, SupervisorConfig, LifeCycle, Permanent, OneForOne, Supervise} +import se.scalablesolutions.akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, Permanent} import java.util.concurrent.CountDownLatch class SupervisorMiscSpec extends WordSpec with MustMatchers { @@ -57,7 +57,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { val sup = Supervisor( SupervisorConfig( - RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]),3, 5000), Supervise(actor1, Permanent) :: Supervise(actor2, Permanent) :: Supervise(actor3, Permanent) :: diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala index d7390a0d43..5f73c4ce8e 100644 --- a/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala @@ -4,8 +4,7 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.{OneWay, Die, Ping} import Actor._ @@ -502,7 +501,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise( temporaryActor, Temporary) @@ -514,7 +513,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise( pingpong1, Permanent) @@ -526,7 +525,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise( pingpong1, Permanent) @@ -540,7 +539,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise( pingpong1, Permanent) @@ -562,7 +561,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise( pingpong1, Permanent) @@ -584,13 +583,13 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 5000), Supervise( pingpong1, Permanent) :: SupervisorConfig( - RestartStrategy(AllForOne, 3, 5000, Nil), + AllForOneStrategy(Nil, 3, 5000), Supervise( pingpong2, Permanent) diff --git a/akka-actor/src/test/scala/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/dispatch/FutureSpec.scala index f740763fdf..04316f8a3d 100644 --- a/akka-actor/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/dispatch/FutureSpec.scala @@ -4,6 +4,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import se.scalablesolutions.akka.dispatch.Futures import Actor._ +import org.multiverse.api.latches.StandardLatch object FutureSpec { class TestActor extends Actor { @@ -15,6 +16,18 @@ object FutureSpec { throw new RuntimeException("Expected exception; to test fault-tolerance") } } + + class TestDelayActor(await: StandardLatch) extends Actor { + def receive = { + case "Hello" => + await.await + self.reply("World") + case "NoReply" => { await.await } + case "Failure" => + await.await + throw new RuntimeException("Expected exception; to test fault-tolerance") + } + } } class FutureSpec extends JUnitSuite { @@ -103,4 +116,31 @@ class FutureSpec extends JUnitSuite { actor1.stop actor2.stop } + + @Test def shouldFutureMapBeDeferred { + val latch = new StandardLatch + val actor1 = actorOf(new TestDelayActor(latch)).start + + val mappedFuture = (actor1.!!![String]("Hello")).map(x => 5) + assert(mappedFuture.isCompleted === false) + assert(mappedFuture.isExpired === false) + latch.open + mappedFuture.await + assert(mappedFuture.isCompleted === true) + assert(mappedFuture.isExpired === false) + assert(mappedFuture.result === Some(5)) + } + + @Test def shouldFuturesAwaitMapHandleEmptySequence { + assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) + } + + @Test def shouldFuturesAwaitMapHandleNonEmptySequence { + val latches = (1 to 3) map (_ => new StandardLatch) + val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start) + val futures = actors map (actor => (actor.!!![String]("Hello"))) + latches foreach { _.open } + + assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length)) + } } diff --git a/akka-actor/src/test/scala/misc/SchedulerSpec.scala b/akka-actor/src/test/scala/misc/SchedulerSpec.scala index 2c7d43896c..2278536c1d 100644 --- a/akka-actor/src/test/scala/misc/SchedulerSpec.scala +++ b/akka-actor/src/test/scala/misc/SchedulerSpec.scala @@ -3,7 +3,7 @@ package se.scalablesolutions.akka.actor import org.scalatest.junit.JUnitSuite import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import org.multiverse.api.latches.StandardLatch import org.junit.Test @@ -107,10 +107,10 @@ class SchedulerSpec extends JUnitSuite { override def postRestart(reason: Throwable) = restartLatch.open }) + Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 1000, - List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise( actor, Permanent) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 9553aebf20..662f2b6a24 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import ConnectionFactory._ import com.rabbitmq.client.AMQP.BasicProperties diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index f560acd807..9bda7f54f4 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -69,10 +69,9 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct) - val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => - log.info("@george_bush received message from: %s", new String(payload)) } - }).start, None, Some(exchangeParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actorOf(new Actor { def receive = { + case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + }}), None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing") @@ -85,15 +84,13 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => - log.info("@george_bush received message from: %s", new String(payload)) } - }).start, None, Some(exchangeParameters))) + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf(new Actor { def receive = { + case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + }}), None, Some(exchangeParameters))) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => - log.info("@barack_obama received message from: %s", new String(payload)) } - }).start, None, Some(exchangeParameters))) + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf(new Actor { def receive = { + case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) + }}), None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: I'm going surfing".getBytes, "") @@ -106,15 +103,13 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => - log.info("@george_bush received message from: %s", new String(payload)) } - }).start, None, Some(exchangeParameters))) + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actorOf(new Actor { def receive = { + case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + }}), None, Some(exchangeParameters))) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf( new Actor { - def receive = { case Delivery(payload, _, _, _, _) => - log.info("@barack_obama received message from: %s", new String(payload)) } - }).start, None, Some(exchangeParameters))) + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actorOf(new Actor { def receive = { + case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) + }}), None, Some(exchangeParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush") @@ -125,31 +120,27 @@ object ExampleSession { val channelCountdown = new CountDownLatch(2) - val connectionCallback = actorOf( new Actor { - def receive = { - case Connected => log.info("Connection callback: Connected!") - case Reconnecting => () // not used, sent when connection fails and initiates a reconnect - case Disconnected => log.info("Connection callback: Disconnected!") - } - }).start + val connectionCallback = actorOf(new Actor { def receive = { + case Connected => log.info("Connection callback: Connected!") + case Reconnecting => () // not used, sent when connection fails and initiates a reconnect + case Disconnected => log.info("Connection callback: Disconnected!") + }}) val connection = AMQP.newConnection(new ConnectionParameters(connectionCallback = Some(connectionCallback))) - val channelCallback = actorOf( new Actor { - def receive = { - case Started => { - log.info("Channel callback: Started") - channelCountdown.countDown - } - case Restarting => // not used, sent when channel or connection fails and initiates a restart - case Stopped => log.info("Channel callback: Stopped") + val channelCallback = actorOf(new Actor { def receive = { + case Started => { + log.info("Channel callback: Started") + channelCountdown.countDown } - }).start + case Restarting => // not used, sent when channel or connection fails and initiates a restart + case Stopped => log.info("Channel callback: Stopped") + }}) val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", - actorOf( new Actor { def receive = { case _ => } }).start, - None, Some(exchangeParameters), channelParameters = Some(channelParameters))) + val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actorOf(new Actor { def receive = { + case _ => () // not used + }}), None, Some(exchangeParameters), channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 16ec8db389..066075f26c 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -8,8 +8,7 @@ import java.util.{TimerTask, Timer} import java.io.IOException import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters -import se.scalablesolutions.akka.config.ScalaConfig.{Permanent} -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{ Permanent, OneForOneStrategy } import se.scalablesolutions.akka.actor.{Exit, Actor} private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { diff --git a/akka-http/src/test/scala/SecuritySpec.scala b/akka-http/src/test/scala/SecuritySpec.scala index 6a3cf4f803..5931df69d0 100644 --- a/akka-http/src/test/scala/SecuritySpec.scala +++ b/akka-http/src/test/scala/SecuritySpec.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.security -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.Suite diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala index 14eba7d4e3..1884621aaa 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala @@ -11,10 +11,9 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy, Permanent} import Actor._ import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import StorageObj._ diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala index 930a3b25a7..be5429e134 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala @@ -7,10 +7,9 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import HbaseStorageBackend._ diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala index a614fbc78d..cd90620cef 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala @@ -7,10 +7,9 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import MongoStorageBackend._ diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala index 53205a029d..afde9ebb1b 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala @@ -16,19 +16,19 @@ class Subscriber(client: RedisClient) extends Actor { def receive = { case Subscribe(channels) => client.subscribe(channels.head, channels.tail: _*)(callback) - self.reply(true) + self.reply_?(true) case Register(cb) => callback = cb - self.reply(true) + self.reply_?(true) case Unsubscribe(channels) => client.unsubscribe(channels.head, channels.tail: _*) - self.reply(true) + self.reply_?(true) case UnsubscribeAll => client.unsubscribe - self.reply(true) + self.reply_?(true) } } @@ -36,7 +36,7 @@ class Publisher(client: RedisClient) extends Actor { def receive = { case Publish(channel, message) => client.publish(channel, message) - self.reply(true) + self.reply_?(true) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala index 1bd2c34d86..78c21f2082 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala @@ -5,11 +5,10 @@ import sbinary.Operations._ import sbinary.DefaultProtocol._ import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy, Permanent} import Actor._ import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import java.util.{Calendar, Date} diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala index f46aa9f224..f1167ef84d 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala @@ -7,11 +7,10 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import se.scalablesolutions.akka.actor.{Actor} -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import RedisStorageBackend._ diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorage.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorage.scala new file mode 100644 index 0000000000..446311f715 --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorage.scala @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.riak + +import se.scalablesolutions.akka.actor.{newUuid} +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ + + +object RiakStorage extends Storage { + + type ElementType = Array[Byte] + def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(newUuid.toString) + override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new RiakPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new RiakPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new RiakPersistentRef(id) + override def newQueue(id:String): PersistentQueue[ElementType] = new RiakPersistentQueue(id) +} + + +class RiakPersistentMap(id: String) extends PersistentMapBinary { + val uuid = id + val storage = RiakStorageBackend +} + + +class RiakPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = RiakStorageBackend +} + +class RiakPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = RiakStorageBackend +} + +class RiakPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { + val uuid = id + val storage = RiakStorageBackend +} diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala new file mode 100644 index 0000000000..dbe2ce9548 --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala @@ -0,0 +1,596 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.riak + +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.Helpers._ +import se.scalablesolutions.akka.config.Config.config + +import java.lang.String +import collection.JavaConversions +import java.nio.ByteBuffer +import collection.Map +import collection.mutable.ArrayBuffer +import java.util.{Properties, Map => JMap} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ +import collection.immutable._ +import com.google.protobuf.ByteString +import com.google.protobuf.ByteString._ +import com.trifork.riak.{RequestMeta, RiakObject, RiakClient} + + +private[akka] object RiakStorageBackend extends +MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + QueueStorageBackend[Array[Byte]] with + Logging { + val refBucket = config.getString("akka.storage.riak.bucket.ref", "Refs") + val mapBucket = config.getString("akka.storage.riak.bucket.map", "Maps") + val vectorBucket = config.getString("akka.storage.riak.bucket.vector", "Vectors") + val queueBucket = config.getString("akka.storage.riak.bucket.queue", "Queues") + val clientHost = config.getString("akka.storage.riak.client.host", "localhost") + val clientPort = config.getInt("akka.storage.riak.client.port", 8087) + val riakClient: RiakClient = new RiakClient(clientHost, clientPort); + + val nullMapValueHeader = 0x00.byteValue + val nullMapValue: Array[Byte] = Array(nullMapValueHeader) + val notNullMapValueHeader: Byte = 0xff.byteValue + val underscoreBytesUTF8 = "_".getBytes("UTF-8") + val mapKeysIndex = getIndexedBytes(-1) + val vectorSizeIndex = getIndexedBytes(-1) + val queueHeadIndex = getIndexedBytes(-1) + val queueTailIndex = getIndexedBytes(-2) + //explicit implicit :) + implicit val ordering = ArrayOrdering + import RiakAccess._ + + def getRefStorageFor(name: String): Option[Array[Byte]] = { + val result: Array[Byte] = RefClient.getValue(name) + Option(result) + } + + def insertRefStorageFor(name: String, element: Array[Byte]) = { + element match { + case null => RefClient.delete(name) + case _ => RefClient.put(name, element) + } + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + val allkeys: SortedSet[Array[Byte]] = getMapKeys(name) + val range = allkeys.rangeImpl(start, finish).take(count) + getKeyValues(name, range) + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + val keys = getMapKeys(name) + getKeyValues(name, keys) + } + + private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { + val all: Map[Array[Byte], Array[Byte]] = + MapClient.getAll(keys.map { + mapKey => getKey(name, mapKey) + }) + + var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) + all.foreach { + (entry) => { + entry match { + case (namePlusKey: Array[Byte], value: Array[Byte]) => { + returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(value) + } + } + } + } + returned.toList + } + + def getMapStorageSizeFor(name: String): Int = { + val keys = getMapKeys(name) + keys.size + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + val result: Array[Byte] = MapClient.getValue(getKey(name, key)) + result match { + case null => None + case _ => Some(getMapValueFromStored(result)) + } + } + + def removeMapStorageFor(name: String, key: Array[Byte]) = { + var keys = getMapKeys(name) + keys -= key + putMapKeys(name, keys) + MapClient.delete(getKey(name, key)) + } + + + def removeMapStorageFor(name: String) = { + val keys = getMapKeys(name) + keys.foreach { + key => + MapClient.delete(getKey(name, key)) + log.debug("deleted key %s for %s", key, name) + } + MapClient.delete(getKey(name, mapKeysIndex)) + } + + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { + MapClient.put(getKey(name, key), getStoredMapValue(value)) + var keys = getMapKeys(name) + keys += key + putMapKeys(name, keys) + } + + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { + val newKeys = entries.map { + case (key, value) => { + MapClient.put(getKey(name, key), getStoredMapValue(value)) + key + } + } + var keys = getMapKeys(name) + keys ++= newKeys + putMapKeys(name, keys) + } + + def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = { + MapClient.put(getKey(name, mapKeysIndex), SortedSetSerializer.toBytes(keys)) + } + + def getMapKeys(name: String): SortedSet[Array[Byte]] = { + SortedSetSerializer.fromBytes(MapClient.getValue(getKey(name, mapKeysIndex), Array.empty[Byte])) + } + + + def getVectorStorageSizeFor(name: String): Int = { + IntSerializer.fromBytes(VectorClient.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) + } + + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + val size = getVectorStorageSizeFor(name) + val st = start.getOrElse(0) + var cnt = + if (finish.isDefined) { + val f = finish.get + if (f >= st) (f - st) else count + } else { + count + } + if (cnt > (size - st)) { + cnt = size - st + } + + + val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { + index => getIndexedKey(name, (size - 1) - index) + } //read backwards + + val all: Map[Array[Byte], Array[Byte]] = VectorClient.getAll(seq) + + var storage = new ArrayBuffer[Array[Byte]](seq.size) + storage = storage.padTo(seq.size, Array.empty[Byte]) + var idx = 0; + seq.foreach { + key => { + if (all.isDefinedAt(key)) { + storage.update(idx, all.get(key).get) + } + idx += 1 + } + } + + storage.toList + } + + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + val size = getVectorStorageSizeFor(name) + if (size > 0 && index < size) { + VectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) + } + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { + val size = getVectorStorageSizeFor(name) + if (size > 0 && index < size) { + elem match { + case null => VectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + case _ => VectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem) + } + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) + } + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + var size = getVectorStorageSizeFor(name) + elements.foreach { + element => + if (element != null) { + VectorClient.put(getIndexedKey(name, size), element) + } + size += 1 + } + VectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) + } + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + insertVectorStorageEntriesFor(name, List(element)) + } + + + def remove(name: String): Boolean = { + val mdata = getQueueMetadata(name) + mdata.getActiveIndexes foreach { + index => + QueueClient.delete(getIndexedKey(name, index)) + } + QueueClient.delete(getKey(name, queueHeadIndex)) + QueueClient.delete(getKey(name, queueTailIndex)) + true + } + + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { + val mdata = getQueueMetadata(name) + val ret = mdata.getPeekIndexes(start, count).toList map { + index: Int => { + log.debug("peeking:" + index) + QueueClient.getValue(getIndexedKey(name, index)) + } + } + ret + } + + def size(name: String): Int = { + getQueueMetadata(name).size + } + + def dequeue(name: String): Option[Array[Byte]] = { + val mdata = getQueueMetadata(name) + if (mdata.canDequeue) { + val key = getIndexedKey(name, mdata.head) + try { + val dequeued = QueueClient.getValue(key) + QueueClient.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue)) + Some(dequeued) + } + finally { + try { + QueueClient.delete(key) + } catch { + //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around + case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") + } + } + } else { + None + } + } + + def enqueue(name: String, item: Array[Byte]): Option[Int] = { + val mdata = getQueueMetadata(name) + if (mdata.canEnqueue) { + val key = getIndexedKey(name, mdata.tail) + item match { + case null => QueueClient.delete(key) + case _ => QueueClient.put(key, item) + } + QueueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue)) + Some(mdata.size + 1) + } else { + None + } + } + + + def getQueueMetadata(name: String): QueueMetadata = { + val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex)) + val qdata = QueueClient.getAll(keys) + val values = keys.map { + qdata.get(_) match { + case Some(value) => IntSerializer.fromBytes(value) + case None => 0 + } + } + QueueMetadata(values.head, values.tail.head) + } + + /** + * Concat the ownerlenght+owner+key+ of owner so owned data will be colocated + * Store the length of owner as first byte to work around the rare case + * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 + */ + + + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { + val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") + val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) + val theKey = new Array[Byte](ownerLenghtBytes.length + ownerBytes.length + key.length) + System.arraycopy(ownerLenghtBytes, 0, theKey, 0, ownerLenghtBytes.length) + System.arraycopy(ownerBytes, 0, theKey, ownerLenghtBytes.length, ownerBytes.length) + System.arraycopy(key, 0, theKey, ownerLenghtBytes.length + ownerBytes.length, key.length) + theKey + } + + def getIndexedBytes(index: Int): Array[Byte] = { + val indexbytes = IntSerializer.toBytes(index) + val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length) + System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length) + System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length) + theIndexKey + } + + def getIndexedKey(owner: String, index: Int): Array[Byte] = { + getKey(owner, getIndexedBytes(index)) + } + + def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = { + val indexBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(key, key.length - IntSerializer.bytesPerInt, indexBytes, 0, IntSerializer.bytesPerInt) + IntSerializer.fromBytes(indexBytes) + } + + def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = { + val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length + val mapkey = new Array[Byte](mapKeyLength) + System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength) + mapkey + } + + //wrapper for null + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { + value match { + case null => nullMapValue + case value => { + val stored = new Array[Byte](value.length + 1) + stored(0) = notNullMapValueHeader + System.arraycopy(value, 0, stored, 1, value.length) + stored + } + } + } + + def getMapValueFromStored(value: Array[Byte]): Array[Byte] = { + + if (value(0) == nullMapValueHeader) { + null + } else if (value(0) == notNullMapValueHeader) { + val returned = new Array[Byte](value.length - 1) + System.arraycopy(value, 1, returned, 0, value.length - 1) + returned + } else { + throw new StorageException("unknown header byte on map value:" + value(0)) + } + } + + + def getClientConfig(configMap: Map[String, String]): Properties = { + val properites = new Properties + configMap.foreach { + keyval => keyval match { + case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String]) + } + } + properites + } + + + + + case class QueueMetadata(head: Int, tail: Int) { + //queue is an sequence with indexes from 0 to Int.MAX_VALUE + //wraps around when one pointer gets to max value + //head has an element in it. + //tail is the next slot to write to. + def size = { + if (tail >= head) { + tail - head + } else { + //queue has wrapped + (Integer.MAX_VALUE - head) + (tail + 1) + } + } + + def canEnqueue = { + //the -1 stops the tail from catching the head on a wrap around + size < Integer.MAX_VALUE - 1 + } + + def canDequeue = {size > 0} + + def getActiveIndexes(): IndexedSeq[Int] = { + if (tail >= head) { + Range(head, tail) + } else { + //queue has wrapped + val headRange = Range.inclusive(head, Integer.MAX_VALUE) + (if (tail > 0) {headRange ++ Range(0, tail)} else {headRange}) + } + } + + def getPeekIndexes(start: Int, count: Int): IndexedSeq[Int] = { + val indexes = getActiveIndexes + if (indexes.size < start) + {IndexedSeq.empty[Int]} else + {indexes.drop(start).take(count)} + } + + def nextEnqueue = { + tail match { + case Integer.MAX_VALUE => 0 + case _ => tail + 1 + } + } + + def nextDequeue = { + head match { + case Integer.MAX_VALUE => 0 + case _ => head + 1 + } + } + } + + + + object RiakAccess { + implicit def byteArrayToByteString(ary: Array[Byte]): ByteString = { + ByteString.copyFrom(ary) + } + + implicit def byteStringToByteArray(bs: ByteString): Array[Byte] = { + bs.toByteArray + } + + implicit def stringToByteString(bucket: String): ByteString = { + ByteString.copyFromUtf8(bucket) + } + + implicit def stringToByteArray(st: String): Array[Byte] = { + st.getBytes("UTF-8") + } + } + + trait RiakAccess { + def bucket: String + //http://www.mail-archive.com/riak-users@lists.basho.com/msg01013.html + val quorum: Int = 0xfffffffd + val one: Int = 0xfffffffe + val all: Int = 0xfffffffc + val default: Int = 0xfffffffb + + def put(key: Array[Byte], value: Array[Byte]) = { + val objs: Array[RiakObject] = riakClient.fetch(bucket, key, quorum) + objs.size match { + case 0 => riakClient.store(new RiakObject(bucket, key, value), new RequestMeta().w(quorum).dw(quorum)) + case _ => riakClient.store(new RiakObject(objs(0).getVclock, bucket, key, value),new RequestMeta().w(quorum).dw(quorum)) + } + } + + def getValue(key: Array[Byte]): Array[Byte] = { + val objs = riakClient.fetch(bucket, key, quorum) + objs.size match { + case 0 => null; + case _ => objs(0).getValue.isEmpty match { + case true => null + case false => objs(0).getValue + } + } + } + + def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + Option(getValue(key)) match { + case Some(value) => value + case None => default + } + } + + def getAll(keys: Traversable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + var result = new HashMap[Array[Byte], Array[Byte]] + keys.foreach { + key => + val value = getValue(key) + Option(value) match { + case Some(value) => result += key -> value + case None => () + } + } + result + } + + def delete(key: Array[Byte]) = { + riakClient.delete(bucket, key, quorum) + } + + def drop() { + val keys = riakClient.listKeys(bucket) + JavaConversions.asIterable(keys) foreach { + delete(_) + } + keys.close + } + } + + object RefClient extends RiakAccess { + def bucket: String = refBucket + } + + object MapClient extends RiakAccess { + def bucket = mapBucket + } + + object VectorClient extends RiakAccess { + def bucket = vectorBucket + } + + object QueueClient extends RiakAccess { + def bucket = queueBucket + } + + + + object IntSerializer { + val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE + + def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array() + + def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt() + + def toString(obj: Int) = obj.toString + + def fromString(str: String) = str.toInt + } + + object SortedSetSerializer { + def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { + val length = set.foldLeft(0) { + (total, bytes) => { + total + bytes.length + IntSerializer.bytesPerInt + } + } + val allBytes = new Array[Byte](length) + val written = set.foldLeft(0) { + (total, bytes) => { + val sizeBytes = IntSerializer.toBytes(bytes.length) + System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) + System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) + total + sizeBytes.length + bytes.length + } + } + require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length)) + allBytes + } + + def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + + var set = new TreeSet[Array[Byte]] + if (bytes.length > IntSerializer.bytesPerInt) { + var pos = 0 + while (pos < bytes.length) { + val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt) + pos += IntSerializer.bytesPerInt + val length = IntSerializer.fromBytes(lengthBytes) + val item = new Array[Byte](length) + System.arraycopy(bytes, pos, item, 0, length) + set = set + item + pos += length + } + } + set + } + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..473931104a --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendCompatibilityTest.scala @@ -0,0 +1,49 @@ +package se.scalablesolutions.akka.persistence.riak + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class RiakRefStorageBackendTestIntegration extends RefStorageBackendTest { + def dropRefs = { + RiakStorageBackend.RefClient.drop + } + + + def storage = RiakStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class RiakMapStorageBackendTestIntegration extends MapStorageBackendTest { + def dropMaps = { + RiakStorageBackend.MapClient.drop + } + + + def storage = RiakStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class RiakVectorStorageBackendTestIntegration extends VectorStorageBackendTest { + def dropVectors = { + RiakStorageBackend.VectorClient.drop + } + + + def storage = RiakStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class RiakQueueStorageBackendTestIntegration extends QueueStorageBackendTest { + def dropQueues = { + RiakStorageBackend.QueueClient.drop + } + + + def storage = RiakStorageBackend +} + + diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala new file mode 100644 index 0000000000..5fe32e9424 --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.riak + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class RiakTicket343TestIntegration extends Ticket343Test { + def dropMapsAndVectors: Unit = { + RiakStorageBackend.VectorClient.drop + RiakStorageBackend.MapClient.drop + } + + def getVector: (String) => PersistentVector[Array[Byte]] = RiakStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = RiakStorage.getMap +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/remote/Cluster.scala b/akka-remote/src/main/scala/remote/Cluster.scala index c668228291..f7ef779289 100644 --- a/akka-remote/src/main/scala/remote/Cluster.scala +++ b/akka-remote/src/main/scala/remote/Cluster.scala @@ -5,11 +5,13 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.config.Config.config -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRef, ActorRegistry} import se.scalablesolutions.akka.util.Logging import scala.collection.immutable.{Map, HashMap} +import se.scalablesolutions.akka.config.Supervision.{Permanent} +import se.scalablesolutions.akka.config.{RemoteAddress} /** * Interface for interacting with the Cluster Membership API. @@ -239,8 +241,7 @@ object Cluster extends Cluster with Logging { private[akka] def createSupervisor(actor: ActorRef): Option[Supervisor] = Some(Supervisor( - SupervisorConfig( - RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), + SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 5, 1000), Supervise(actor, Permanent) :: Nil))) private[this] def clusterActor = if (clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor]) diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 7e5f13e078..922db92ad4 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -11,8 +11,7 @@ import se.scalablesolutions.akka.dispatch.MessageInvocation import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import ActorTypeProtocol._ -import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.actor.{uuidFrom,newUuid} import se.scalablesolutions.akka.actor._ diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index 40f0d27640..9834b035af 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor.remote import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import se.scalablesolutions.akka.serialization.BinaryString -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.OneWay import org.scalatest.junit.JUnitSuite @@ -480,7 +480,7 @@ class RemoteSupervisorSpec extends JUnitSuite { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( pingpong1, Permanent) @@ -496,7 +496,7 @@ class RemoteSupervisorSpec extends JUnitSuite { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( pingpong1, Permanent) @@ -517,7 +517,7 @@ class RemoteSupervisorSpec extends JUnitSuite { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( pingpong1, Permanent) @@ -548,7 +548,7 @@ class RemoteSupervisorSpec extends JUnitSuite { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( pingpong1, Permanent) @@ -577,13 +577,13 @@ class RemoteSupervisorSpec extends JUnitSuite { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( pingpong1, Permanent) :: SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + AllForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( pingpong2, Permanent) diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 5a3a5bc2c4..cd8f09a615 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -8,15 +8,13 @@ import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.Config -import se.scalablesolutions.akka.config._ -import se.scalablesolutions.akka.config.TypedActorConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll} +import se.scalablesolutions.akka.config.{Config, TypedActorConfigurator, RemoteAddress} object RemoteTypedActorSpec { val HOSTNAME = "localhost" @@ -50,18 +48,18 @@ class RemoteTypedActorSpec extends server.start("localhost", 9995) Config.config conf.configure( - new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), + new AllForOneStrategy(List(classOf[Exception]), 3, 5000), List( - new Component( + new SuperviseTypedActor( classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], - new Permanent, + Permanent, 10000, new RemoteAddress("localhost", 9995)), - new Component( + new SuperviseTypedActor( classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwoImpl], - new Permanent, + Permanent, 10000, new RemoteAddress("localhost", 9995)) ).toArray).supervise diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index fc6afc856a..3892583026 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -9,7 +9,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{TypedActor, Supervisor} import se.scalablesolutions.akka.camel.CamelContextManager -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ /** * @author Martin Krasser diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index b65f833763..aa70cabeb9 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -11,8 +11,7 @@ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient} import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.persistence.redis.RedisStorage import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.config.OneForOneStrategy +import se.scalablesolutions.akka.config.Supervision.{OneForOneStrategy,Permanent} import se.scalablesolutions.akka.util.Logging import Actor._ diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java index 4702eead02..d9b33cf559 100644 --- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java @@ -5,23 +5,23 @@ package sample.rest.java; import se.scalablesolutions.akka.config.TypedActorConfigurator; -import static se.scalablesolutions.akka.config.JavaConfig.*; +import static se.scalablesolutions.akka.config.Supervision.*; public class Boot { public final static TypedActorConfigurator configurator = new TypedActorConfigurator(); static { configurator.configure( - new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}), - new Component[] { - new Component( + new OneForOneStrategy(new Class[]{Exception.class}, 3, 5000), + new SuperviseTypedActor[] { + new SuperviseTypedActor( SimpleService.class, SimpleServiceImpl.class, - new Permanent(), + permanent(), 1000), - new Component( + new SuperviseTypedActor( PersistentSimpleService.class, PersistentSimpleServiceImpl.class, - new Permanent(), + permanent(), 1000) }).supervise(); } diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index fb8bd7c381..a316c54fcc 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.stm.TransactionalMap import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter import scala.xml.NodeSeq @@ -25,7 +25,7 @@ import org.atmosphere.jersey.Broadcastable class Boot { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), 3, 100), Supervise( actorOf[SimpleServiceActor], Permanent) :: diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index 3f2b76a359..0f2dd0e59a 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -6,7 +6,7 @@ package sample.security import se.scalablesolutions.akka.actor.{SupervisorFactory, Transactor, Actor} import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo} import se.scalablesolutions.akka.stm.TransactionalMap @@ -15,7 +15,7 @@ import se.scalablesolutions.akka.actor.ActorRegistry.actorFor class Boot { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + OneForOneStrategy(List(classOf[Exception]), 3, 100), // Dummy implementations of all authentication actors // see akka.conf to enable one of these for the AkkaSecurityFilterFactory Supervise( diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala index 34a3a012ea..893b44e24d 100644 --- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.spring import org.springframework.beans.factory.config.AbstractFactoryBean -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import AkkaSpringConfigurationTags._ import reflect.BeanProperty import se.scalablesolutions.akka.actor.ActorRef diff --git a/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala b/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala index 164018f588..8eb2d6ef37 100644 --- a/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala +++ b/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.spring import se.scalablesolutions.akka.util.Logging import org.springframework.beans.factory.support.BeanDefinitionBuilder import org.springframework.beans.factory.xml.{ParserContext, AbstractSingleBeanDefinitionParser} -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import AkkaSpringConfigurationTags._ @@ -47,12 +47,17 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser } private[akka] def parseRestartStrategy(element: Element, builder: BeanDefinitionBuilder) { - val failover = if (mandatory(element, FAILOVER) == "AllForOne") new AllForOne() else new OneForOne() + val failover = mandatory(element, FAILOVER) val timeRange = mandatory(element, TIME_RANGE).toInt val retries = mandatory(element, RETRIES).toInt val trapExitsElement = mandatoryElement(element, TRAP_EXISTS_TAG) val trapExceptions = parseTrapExits(trapExitsElement) - val restartStrategy = new RestartStrategy(failover, retries, timeRange, trapExceptions) + + val restartStrategy = failover match { + case "AllForOne" => new AllForOneStrategy(trapExceptions, retries, timeRange) + case "OneForOne" => new OneForOneStrategy(trapExceptions, retries, timeRange) + case _ => new OneForOneStrategy(trapExceptions, retries, timeRange) //Default to OneForOne + } builder.addPropertyValue("restartStrategy", restartStrategy) } @@ -71,7 +76,7 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser private def parseTrapExits(element: Element): Array[Class[_ <: Throwable]] = { import StringReflect._ val trapExits = DomUtils.getChildElementsByTagName(element, TRAP_EXIT_TAG).toArray.toList.asInstanceOf[List[Element]] - trapExits.map(DomUtils.getTextValue(_).toClass).toArray + trapExits.map(DomUtils.getTextValue(_).toClass.asInstanceOf[Class[_ <: Throwable]]).toArray } /* diff --git a/akka-spring/src/main/scala/SupervisionFactoryBean.scala b/akka-spring/src/main/scala/SupervisionFactoryBean.scala index c6d1e7ddc0..b4559304b5 100644 --- a/akka-spring/src/main/scala/SupervisionFactoryBean.scala +++ b/akka-spring/src/main/scala/SupervisionFactoryBean.scala @@ -4,20 +4,18 @@ package se.scalablesolutions.akka.spring import org.springframework.beans.factory.config.AbstractFactoryBean -import se.scalablesolutions.akka.config.TypedActorConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ -import se.scalablesolutions.akka.config.ScalaConfig.{Supervise, Server, SupervisorConfig, RemoteAddress => SRemoteAddress} +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor} import AkkaSpringConfigurationTags._ import reflect.BeanProperty - +import se.scalablesolutions.akka.config.{TypedActorConfigurator, RemoteAddress} /** * Factory bean for supervisor configuration. * @author michaelkober */ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { - @BeanProperty var restartStrategy: RestartStrategy = _ + @BeanProperty var restartStrategy: FaultHandlingStrategy = _ @BeanProperty var supervised: List[ActorProperties] = _ @BeanProperty var typed: String = "" @@ -46,7 +44,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { private def createInstanceForUntypedActors() : Supervisor = { val factory = new SupervisorFactory( new SupervisorConfig( - restartStrategy.transform, + restartStrategy, supervised.map(createSupervise(_)))) factory.newInstance } @@ -54,24 +52,24 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { /** * Create configuration for TypedActor */ - private[akka] def createComponent(props: ActorProperties): Component = { + private[akka] def createComponent(props: ActorProperties): SuperviseTypedActor = { import StringReflect._ - val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new Temporary() else new Permanent() + val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) Temporary else Permanent val isRemote = (props.host ne null) && (!props.host.isEmpty) val withInterface = (props.interface ne null) && (!props.interface.isEmpty) if (isRemote) { //val remote = new RemoteAddress(props.host, props.port) val remote = new RemoteAddress(props.host, props.port.toInt) if (withInterface) { - new Component(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional, remote) + new SuperviseTypedActor(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional, remote) } else { - new Component(props.target.toClass, lifeCycle, props.timeout, props.transactional, remote) + new SuperviseTypedActor(props.target.toClass, lifeCycle, props.timeout, props.transactional, remote) } } else { if (withInterface) { - new Component(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional) + new SuperviseTypedActor(props.interface.toClass, props.target.toClass, lifeCycle, props.timeout, props.transactional) } else { - new Component(props.target.toClass, lifeCycle, props.timeout, props.transactional) + new SuperviseTypedActor(props.target.toClass, lifeCycle, props.timeout, props.transactional) } } } @@ -81,7 +79,7 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { */ private[akka] def createSupervise(props: ActorProperties): Server = { import StringReflect._ - val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) new Temporary() else new Permanent() + val lifeCycle = if (!props.lifecycle.isEmpty && props.lifecycle.equalsIgnoreCase(VAL_LIFECYCYLE_TEMPORARY)) Temporary else Permanent val isRemote = (props.host ne null) && (!props.host.isEmpty) val actorRef = Actor.actorOf(props.target.toClass) if (props.timeout > 0) { @@ -92,10 +90,10 @@ class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] { } val supervise = if (isRemote) { - val remote = new SRemoteAddress(props.host, props.port.toInt) - Supervise(actorRef, lifeCycle.transform, remote) + val remote = new RemoteAddress(props.host, props.port.toInt) + Supervise(actorRef, lifeCycle, remote) } else { - Supervise(actorRef, lifeCycle.transform) + Supervise(actorRef, lifeCycle) } supervise } diff --git a/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala b/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala index 3453fb5200..f4e9f640a4 100644 --- a/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala @@ -7,7 +7,7 @@ import org.scalatest.Spec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.dispatch.MessageDispatcher @RunWith(classOf[JUnitRunner]) diff --git a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala index 15734fc9fa..85c28b0c0e 100644 --- a/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/SupervisionBeanDefinitionParserTest.scala @@ -9,10 +9,9 @@ import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import ScalaDom._ -import se.scalablesolutions.akka.config.JavaConfig._ - import org.w3c.dom.Element import org.springframework.beans.factory.support.BeanDefinitionBuilder +import se.scalablesolutions.akka.config.Supervision. {FaultHandlingStrategy, AllForOneStrategy} /** * Test for SupervisionBeanDefinitionParser @@ -36,13 +35,11 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers { it("should parse the supervisor restart strategy") { parser.parseSupervisor(createSupervisorElement, builder); - val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[RestartStrategy] + val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[FaultHandlingStrategy] assert(strategy ne null) - assert(strategy.scheme match { - case x:AllForOne => true - case _ => false }) - expect(3) { strategy.maxNrOfRetries } - expect(1000) { strategy.withinTimeRange } + assert(strategy.isInstanceOf[AllForOneStrategy]) + expect(3) { strategy.asInstanceOf[AllForOneStrategy].maxNrOfRetries.get } + expect(1000) { strategy.asInstanceOf[AllForOneStrategy].withinTimeRange.get } } it("should parse the supervised typed actors") { diff --git a/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala b/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala index 79872b18d4..bb2fbb0b27 100644 --- a/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/SupervisionFactoryBeanTest.scala @@ -7,7 +7,7 @@ import org.scalatest.Spec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.config.TypedActorConfigurator private[akka] class Foo @@ -15,7 +15,7 @@ private[akka] class Foo @RunWith(classOf[JUnitRunner]) class SupervisionFactoryBeanTest extends Spec with ShouldMatchers { - val restartStrategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Throwable])) + val faultHandlingStrategy = new AllForOneStrategy(List(classOf[Exception]), 3, 1000) val typedActors = List(createTypedActorProperties("se.scalablesolutions.akka.spring.Foo", "1000")) private def createTypedActorProperties(target: String, timeout: String) : ActorProperties = { @@ -28,8 +28,8 @@ class SupervisionFactoryBeanTest extends Spec with ShouldMatchers { describe("A SupervisionFactoryBean") { val bean = new SupervisionFactoryBean it("should have java getters and setters for all properties") { - bean.setRestartStrategy(restartStrategy) - assert(bean.getRestartStrategy == restartStrategy) + bean.setRestartStrategy(faultHandlingStrategy) + assert(bean.getRestartStrategy == faultHandlingStrategy) bean.setSupervised(typedActors) assert(bean.getSupervised == typedActors) } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 8b9cc2034a..0aeb127fdc 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -5,9 +5,8 @@ package se.scalablesolutions.akka.actor import Actor._ -import se.scalablesolutions.akka.config.FaultHandlingStrategy import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture, Dispatchers} -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.util._ import ReflectiveAccess._ @@ -732,8 +731,8 @@ object TypedActor extends Logging { private[akka] def returnsFuture_?(methodRtti: MethodRtti): Boolean = classOf[Future[_]].isAssignableFrom(methodRtti.getMethod.getReturnType) - private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = - Supervisor(SupervisorConfig(restartStrategy, components)) + private[akka] def supervise(faultHandlingStrategy: FaultHandlingStrategy, components: List[Supervise]): Supervisor = + Supervisor(SupervisorConfig(faultHandlingStrategy, components)) def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message)) isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti]) diff --git a/akka-typed-actor/src/main/scala/config/TypedActorConfigurator.scala b/akka-typed-actor/src/main/scala/config/TypedActorConfigurator.scala index d639d21f5f..f23cb6b8ec 100644 --- a/akka-typed-actor/src/main/scala/config/TypedActorConfigurator.scala +++ b/akka-typed-actor/src/main/scala/config/TypedActorConfigurator.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.config -import JavaConfig._ +import Supervision._ import java.util.{List => JList} import java.util.{ArrayList} @@ -43,10 +43,10 @@ class TypedActorConfigurator { */ def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head - def configure(restartStrategy: RestartStrategy, components: Array[Component]): TypedActorConfigurator = { + def configure(faultHandlingStrategy: FaultHandlingStrategy, components: Array[SuperviseTypedActor]): TypedActorConfigurator = { INSTANCE.configure( - restartStrategy.transform, - components.toList.asInstanceOf[scala.List[Component]].map(_.transform)) + faultHandlingStrategy, + components.toList.asInstanceOf[scala.List[SuperviseTypedActor]]) this } diff --git a/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala b/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala index 5ca249a3ec..38378f5e8b 100644 --- a/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala +++ b/akka-typed-actor/src/main/scala/config/TypedActorGuiceConfigurator.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.config import se.scalablesolutions.akka.actor._ -import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.util._ import ReflectiveAccess._ @@ -27,12 +27,12 @@ import com.google.inject._ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBase with Logging { private var injector: Injector = _ private var supervisor: Option[Supervisor] = None - private var restartStrategy: RestartStrategy = _ - private var components: List[Component] = _ + private var faultHandlingStrategy: FaultHandlingStrategy = NoFaultHandlingStrategy + private var components: List[SuperviseTypedActor] = _ private var supervised: List[Supervise] = Nil private var bindings: List[DependencyBinding] = Nil - private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? - private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]] + private var configRegistry = new HashMap[Class[_], SuperviseTypedActor] // TODO is configRegistry needed? + private var typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]] private var modules = new java.util.ArrayList[Module] private var methodToUriRegistry = new HashMap[Method, String] @@ -68,10 +68,10 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa else c.target } - override def configure(restartStrategy: RestartStrategy, components: List[Component]): + override def configure(faultHandlingStrategy: FaultHandlingStrategy, components: List[SuperviseTypedActor]): TypedActorConfiguratorBase = synchronized { - this.restartStrategy = restartStrategy - this.components = components.toArray.toList.asInstanceOf[List[Component]] + this.faultHandlingStrategy = faultHandlingStrategy + this.components = components.toArray.toList.asInstanceOf[List[SuperviseTypedActor]] bindings = for (component <- this.components) yield { newDelegatingProxy(component) // if (component.intf.isDefined) newDelegatingProxy(component) @@ -84,7 +84,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa } /* - private def newSubclassingProxy(component: Component): DependencyBinding = { + private def newSubclassingProxy(component: SuperviseTypedActor): DependencyBinding = { val targetClass = if (component.target.isInstanceOf[Class[_ <: TypedActor]]) component.target.asInstanceOf[Class[_ <: TypedActor]] else throw new IllegalArgumentException("TypedActor [" + component.target.getName + "] must be a subclass of TypedActor") @@ -101,7 +101,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa new DependencyBinding(targetClass, proxy) } */ - private def newDelegatingProxy(component: Component): DependencyBinding = { + private def newDelegatingProxy(component: SuperviseTypedActor): DependencyBinding = { component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) val interfaceClass = if (component.intf.isDefined) component.intf.get else throw new IllegalActorStateException("No interface for TypedActor specified") @@ -144,7 +144,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa override def supervise: TypedActorConfiguratorBase = synchronized { if (injector eq null) inject - supervisor = Some(TypedActor.supervise(restartStrategy, supervised)) + supervisor = Some(TypedActor.supervise(faultHandlingStrategy, supervised)) this } @@ -169,11 +169,11 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa def reset = synchronized { modules = new java.util.ArrayList[Module] - configRegistry = new HashMap[Class[_], Component] - typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]] + configRegistry = new HashMap[Class[_], SuperviseTypedActor] + typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]] methodToUriRegistry = new HashMap[Method, String] injector = null - restartStrategy = null + faultHandlingStrategy = NoFaultHandlingStrategy } def stop = synchronized { diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala index ea5db11531..63c8856075 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartNestedTransactionalTypedActorSpec.scala @@ -14,7 +14,7 @@ import org.junit.runner.RunWith import se.scalablesolutions.akka.config.Config import se.scalablesolutions.akka.config._ import se.scalablesolutions.akka.config.TypedActorConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.actor._ @RunWith(classOf[JUnitRunner]) diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala index 8f80fbcd1b..968379a003 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/RestartTransactionalTypedActorSpec.scala @@ -11,11 +11,9 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.Config -import se.scalablesolutions.akka.config._ -import se.scalablesolutions.akka.config.TypedActorConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.actor._ +import se.scalablesolutions.akka.config.{Config, TypedActorConfigurator} @RunWith(classOf[JUnitRunner]) class RestartTransactionalTypedActorSpec extends @@ -29,15 +27,15 @@ class RestartTransactionalTypedActorSpec extends def before { Config.config conf.configure( - new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), + AllForOneStrategy(List(classOf[Exception]), 3, 5000), List( - new Component( + new SuperviseTypedActor( classOf[TransactionalTypedActor], - new Temporary, + Temporary, 10000), - new Component( + new SuperviseTypedActor( classOf[TypedActorFailer], - new Temporary, + Temporary, 10000) ).toArray).supervise } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala index 814cd299d9..ea6e939386 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala @@ -14,11 +14,10 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.Config -import se.scalablesolutions.akka.config.TypedActorConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.dispatch.FutureTimeoutException +import se.scalablesolutions.akka.config.{Config, TypedActorConfigurator} @RunWith(classOf[JUnitRunner]) class TypedActorGuiceConfiguratorSpec extends @@ -36,21 +35,21 @@ class TypedActorGuiceConfiguratorSpec extends conf.addExternalGuiceModule(new AbstractModule { def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON) }).configure( - new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), - List( - new Component( - classOf[Foo], - classOf[FooImpl], - new Permanent, - 1000, - dispatcher), - new Component( - classOf[Bar], - classOf[BarImpl], - new Permanent, - 1000, - dispatcher) - ).toArray).inject.supervise + AllForOneStrategy(classOf[Exception] :: Nil, 3, 5000), + List( + new SuperviseTypedActor( + classOf[Foo], + classOf[FooImpl], + Permanent, + 1000, + dispatcher), + new SuperviseTypedActor( + classOf[Bar], + classOf[BarImpl], + Permanent, + 1000, + dispatcher) + ).toArray).inject.supervise } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index f2903adf03..7bc521fe73 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -7,10 +7,10 @@ import org.scalatest.matchers.ShouldMatchers import se.scalablesolutions.akka.actor.TypedActor._ -import se.scalablesolutions.akka.config.{OneForOneStrategy, TypedActorConfigurator} -import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.config.Supervision._ import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.config.TypedActorConfigurator /** * @author Martin Krasser @@ -21,9 +21,9 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft var conf2: TypedActorConfigurator = _ override protected def beforeAll() = { - val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception])) - val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new Permanent(), 1000) - val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new Temporary(), 1000) + val strategy = AllForOneStrategy(classOf[Exception] :: Nil, 3, 1000) + val comp3 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], permanent(), 1000) + val comp4 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], temporary(), 1000) conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise conf2 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise } @@ -87,7 +87,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft SamplePojoImpl.reset val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) - link(supervisor, pojo, OneForOneStrategy(Array(classOf[Throwable]), 3, 2000)) + link(supervisor, pojo, OneForOneStrategy(classOf[Throwable] :: Nil, 3, 2000)) pojo.throwException Thread.sleep(500) SimpleJavaPojoImpl._pre should be(true) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 5d3ff387f0..0ca834b8d4 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -186,6 +186,20 @@ akka { bootstrap_urls = "tcp://localhost:6666" # All Valid Voldemort Client properties are valid here, in string form } } + + riak { + bucket { + ref = "Refs" + maps = "Maps" + vector = "Vectors" + queue = "Queues" + } + + client{ + host = "localhost" + port = 8087 #Default Riak Protobuf port + } + } } camel { diff --git a/embedded-repo/com/redis/redisclient/2.8.0-2.0.2/redisclient-2.8.0-2.0.2.jar b/embedded-repo/com/redis/redisclient/2.8.0-2.0.2/redisclient-2.8.0-2.0.2.jar new file mode 100644 index 0000000000..cbaf69ad80 Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0-2.0.2/redisclient-2.8.0-2.0.2.jar differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0-2.0.2/redisclient-2.8.0-2.0.2.pom b/embedded-repo/com/redis/redisclient/2.8.0-2.0.2/redisclient-2.8.0-2.0.2.pom new file mode 100644 index 0000000000..32e34d89a0 --- /dev/null +++ b/embedded-repo/com/redis/redisclient/2.8.0-2.0.2/redisclient-2.8.0-2.0.2.pom @@ -0,0 +1,8 @@ + + + 4.0.0 + com.redis + redisclient + 2.8.0-2.0.2 + jar + diff --git a/embedded-repo/com/redis/redisclient/2.8.0-2.0.3/redisclient-2.8.0-2.0.3.jar b/embedded-repo/com/redis/redisclient/2.8.0-2.0.3/redisclient-2.8.0-2.0.3.jar new file mode 100644 index 0000000000..be75baa5aa Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0-2.0.3/redisclient-2.8.0-2.0.3.jar differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0-2.0.3/redisclient-2.8.0-2.0.3.pom b/embedded-repo/com/redis/redisclient/2.8.0-2.0.3/redisclient-2.8.0-2.0.3.pom new file mode 100644 index 0000000000..7b1878f070 --- /dev/null +++ b/embedded-repo/com/redis/redisclient/2.8.0-2.0.3/redisclient-2.8.0-2.0.3.pom @@ -0,0 +1,8 @@ + + + 4.0.0 + com.redis + redisclient + 2.8.0-2.0.3 + jar + diff --git a/embedded-repo/com/trifork/riak-java-pb-client/1.0-for-akka-by-ticktock/riak-java-pb-client-1.0-for-akka-by-ticktock.jar b/embedded-repo/com/trifork/riak-java-pb-client/1.0-for-akka-by-ticktock/riak-java-pb-client-1.0-for-akka-by-ticktock.jar new file mode 100644 index 0000000000..053eb397c7 Binary files /dev/null and b/embedded-repo/com/trifork/riak-java-pb-client/1.0-for-akka-by-ticktock/riak-java-pb-client-1.0-for-akka-by-ticktock.jar differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index d8353a4bfd..23240a9d87 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -217,7 +217,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" - lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.1" % "compile" + lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.3" % "compile" lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" @@ -249,6 +249,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val hbase_core = "org.apache.hbase" % "hbase-core" % "0.20.6" % "compile" + //Riak PB Client + lazy val riak_pb_client = "com.trifork" % "riak-java-pb-client" % "1.0-for-akka-by-ticktock" % "compile" + // Test lazy val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test" @@ -329,6 +332,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-persistence-redis_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-persistence-mongo_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-persistence-cassandra_%s-%s.jar".format(buildScalaVersion, version) + + " dist/akka-persistence-voldemort_%s-%s.jar".format(buildScalaVersion, version) + + " dist/akka-persistence-riak_%s-%s.jar".format(buildScalaVersion, version) + + " dist/akka-persistence-hbase_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-kernel_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version) @@ -529,6 +535,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaHbaseProject(_), akka_persistence_common) lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort", new AkkaVoldemortProject(_), akka_persistence_common) + lazy val akka_persistence_riak = project("akka-persistence-riak", "akka-persistence-riak", + new AkkaRiakProject(_), akka_persistence_common) lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb", new AkkaCouchDBProject(_), akka_persistence_common) } @@ -631,6 +639,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = createTestFilter({ s:String=> s.endsWith("Suite") || s.endsWith("Test")}) } +// akka-persistence-riak subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaRiakProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val riak_pb = Dependencies.riak_pb_client + val protobuf = Dependencies.protobuf + //testing + val scalatest = Dependencies.scalatest + + + override def testOptions = createTestFilter(_.endsWith("Test")) + } + class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val couch = Dependencies.commonsHttpClient val spec = Dependencies.specs