Merge branch 'master' into jta

This commit is contained in:
Jonas Bonér 2010-04-17 18:24:48 +02:00
commit 27c54dedc1
33 changed files with 765 additions and 375 deletions

View file

@ -4,6 +4,7 @@
package se.scalablesolutions.akka.actor
import _root_.se.scalablesolutions.akka.config.FaultHandlingStrategy
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
@ -35,7 +36,7 @@ object Annotations {
*/
object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(false, None), None, timeout)
@ -199,6 +200,71 @@ object ActiveObject {
proxy.asInstanceOf[T]
}
/**
* Get the underlying dispatcher actor for the given active object.
*/
def actorFor(obj: AnyRef): Option[Actor] = {
ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj))
}
/**
* Links an other active object to this active object.
* @param supervisor the supervisor active object
* @param supervised the active object to link
*/
def link(supervisor: AnyRef, supervised: AnyRef) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object"))
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object"))
supervisorActor !! Link(supervisedActor)
}
/**
* Links an other active object to this active object and sets the fault handling for the supervisor.
* @param supervisor the supervisor active object
* @param supervised the active object to link
* @param handler fault handling strategy
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object"))
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object"))
supervisorActor.trapExit = trapExceptions.toList
supervisorActor.faultHandler = Some(handler)
supervisorActor !! Link(supervisedActor)
}
/**
* Unlink the supervised active object from the supervisor.
* @param supervisor the supervisor active object
* @param supervised the active object to unlink
*/
def unlink(supervisor: AnyRef, supervised: AnyRef) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object"))
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object"))
supervisorActor !! Unlink(supervisedActor)
}
/**
* Sets the trap exit for the given supervisor active object.
* @param supervisor the supervisor active object
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object"))
supervisorActor.trapExit = trapExceptions.toList
this
}
/**
* Sets the fault handling strategy for the given supervisor active object.
* @param supervisor the supervisor active object
* @param handler fault handling strategy
*/
def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object"))
supervisorActor.faultHandler = Some(handler)
this
}
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components))
@ -215,19 +281,19 @@ private[akka] object AspectInitRegistry {
val init = initializations.get(target)
initializations.remove(target)
init
}
}
def register(target: AnyRef, init: AspectInit) = initializations.put(target, init)
}
private[akka] sealed case class AspectInit(
val target: Class[_],
val actor: Dispatcher,
val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long) {
def this(target: Class[_],actor: Dispatcher, timeout: Long) = this(target, actor, None, timeout)
}
/**
* AspectWerkz Aspect that is turning POJOs into Active Object.
* Is deployed on a 'per-instance' basis.
@ -248,7 +314,7 @@ private[akka] sealed class ActiveObjectAspect {
if (!isInitialized) {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
target = init.target
actor = init.actor
actor = init.actor
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
@ -267,7 +333,7 @@ private[akka] sealed class ActiveObjectAspect {
(actor ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
}
else {
val result = actor !! Invocation(joinPoint, false, isVoid(rtti))
val result = actor !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
@ -307,7 +373,7 @@ private[akka] sealed class ActiveObjectAspect {
val (_, cause) = future.exception.get
throw cause
} else future.result.asInstanceOf[Option[T]]
private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway)
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
@ -353,9 +419,12 @@ private[akka] sealed class ActiveObjectAspect {
}
}
// Jan Kronquist: started work on issue 121
private[akka] case class Link(val actor: Actor)
object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()
}
/**
@ -393,7 +462,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
"Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any annotation defined restart callbacks
// See if we have any annotation defined restart callbacks
if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
@ -406,7 +475,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
// see if we have a method annotated with @inittransactionalstate, if so invoke it
initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition")
@ -418,6 +487,9 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else reply(joinPoint.proceed)
// Jan Kronquist: started work on issue 121
case Link(target) => link(target)
case Unlink(target) => unlink(target)
case unexpected =>
throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
@ -468,6 +540,6 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (!unserializable && hasMutableArgument) {
val copyOfArgs = Serializer.Java.deepClone(args)
joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
}
}
}
}

View file

@ -52,6 +52,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMe
case class Restart(reason: Throwable) extends LifeCycleMessage
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
case class Unlink(child: Actor) extends LifeCycleMessage
case class UnlinkAndStop(child: Actor) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
@ -151,29 +152,6 @@ object Actor extends Logging {
def receive = body
}
/**
* Use to create an anonymous event-driven remote actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = remoteActor("localhost", 9999) {
* case msg => ... // handle message
* }
* </pre>
*/
def remoteActor(hostname: String, port: Int)(body: PartialFunction[Any, Unit]): Actor = new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
makeRemote(hostname, port)
start
def receive = body
}
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
* <p/>
@ -244,7 +222,7 @@ object Actor extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Actor extends TransactionManagement with Logging {
implicit protected val self: Option[Actor] = Some(this)
implicit protected val self: Some[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
@ -255,7 +233,6 @@ trait Actor extends TransactionManagement with Logging {
@volatile private[this] var _isRunning = false
@volatile private[this] var _isSuspended = true
@volatile private[this] var _isShutDown = false
@volatile private[this] var _isEventBased: Boolean = false
@volatile private[akka] var _isKilled = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@ -275,7 +252,10 @@ trait Actor extends TransactionManagement with Logging {
// ====================================
/**
* TODO: Document replyTo
* Holds the reference to the sender of the currently processed message.
* Is None if no sender was specified
* Is Some(Left(Actor)) if sender is an actor
* Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/
protected var replyTo: Option[Either[Actor,CompletableFuture]] = None
@ -317,11 +297,7 @@ trait Actor extends TransactionManagement with Logging {
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
protected[akka] var messageDispatcher: MessageDispatcher = {
val dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
_isEventBased = dispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
dispatcher
}
protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
/**
* User overridable callback/setting.
@ -341,7 +317,7 @@ trait Actor extends TransactionManagement with Logging {
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
protected var trapExit: List[Class[_ <: Throwable]] = Nil
protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil
/**
* User overridable callback/setting.
@ -354,7 +330,7 @@ trait Actor extends TransactionManagement with Logging {
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* </pre>
*/
protected var faultHandler: Option[FaultHandlingStrategy] = None
protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None
/**
* User overridable callback/setting.
@ -536,13 +512,17 @@ trait Actor extends TransactionManagement with Logging {
if (isActiveObject) throw e
else None
}
getResultOrThrowException(future)
} else throw new IllegalStateException(
if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]]
}
else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Sends a message asynchronously and waits on a future for a reply message.
* Uses the time-out defined in the Actor.
* <p/>
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
@ -572,11 +552,10 @@ trait Actor extends TransactionManagement with Logging {
* <p/>
* Works with both '!' and '!!'.
*/
def forward(message: Any)(implicit sender: Option[Actor] = None) = {
def forward(message: Any)(implicit sender: Some[Actor]) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor"))
forwarder.replyTo match {
sender.get.replyTo match {
case Some(Left(actor)) => postMessageToMailbox(message, Some(actor))
case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future))
case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
@ -615,7 +594,6 @@ trait Actor extends TransactionManagement with Logging {
messageDispatcher.unregister(this)
messageDispatcher = md
messageDispatcher.register(this)
_isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@ -734,7 +712,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnRemote[T <: Actor : Manifest](hostname: String, port: Int): T = {
protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): T = {
val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port)
actor.start
@ -746,7 +724,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnLink[T <: Actor : Manifest] : T = {
protected[this] def spawnLink[T <: Actor: Manifest]: T = {
val actor = spawnButDoNotStart[T]
try {
actor.start
@ -838,7 +816,7 @@ trait Actor extends TransactionManagement with Logging {
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
} else {
val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get)
if (_isEventBased) {
if (messageDispatcher.usesActorMailbox) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
@ -871,10 +849,11 @@ trait Actor extends TransactionManagement with Logging {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get)
if (_isEventBased) {
if (messageDispatcher.usesActorMailbox)
_mailbox.add(invocation)
invocation.send
} else invocation.send
invocation.send
future
}
}
@ -980,18 +959,15 @@ trait Actor extends TransactionManagement with Logging {
}
}
private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]]
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
case Unlink(child) => unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
case Unlink(child) => unlink(child)
case UnlinkAndStop(child) => unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
@ -1024,7 +1000,7 @@ trait Actor extends TransactionManagement with Logging {
// if last temporary actor is gone, then unlink me from supervisor
if (getLinkedActors.isEmpty) {
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id)
_supervisor.foreach(_ ! Unlink(this))
_supervisor.foreach(_ ! UnlinkAndStop(this))
}
}
}

View file

@ -90,7 +90,7 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
*
* IMPORTANT:
* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach',
* 'map and 'flatMap' within an enclosing transaction since that would block
* 'map' and 'flatMap' within an enclosing transaction since that would block
* the transaction indefinitely. But all other operations are fine. The system
* will raise an error (e.g. *not* deadlock) if you try to do so, so as long as
* you test your application thoroughly you should be fine.
@ -99,11 +99,13 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Agent[T] private (initialValue: T) extends Transactor {
start
import Agent._
log.debug("Starting up Agent [%s]", _uuid)
private lazy val value = Ref[T]()
start
this !! Value(initialValue)
this ! Value(initialValue)
/**
* Periodically handles incoming messages.

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.actor.Actor
* <p/>
* Example usage:
* <pre/>
* val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
* val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@ -25,7 +25,7 @@ import se.scalablesolutions.akka.actor.Actor
* <p/>
* Example usage:
* <pre/>
* MessageDispatcher dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name");
* MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@ -40,9 +40,8 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor : Actor) = {
if (isShutdown)
init
override def register(actor: Actor) = {
if (isShutdown) init
super.register(actor)
}
}

View file

@ -94,6 +94,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
active = false
references.clear
}
def usesActorMailbox = true
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -168,7 +168,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
thief.forward(donated.message)(Some(donated.receiver))
thief ! donated.message
return Some(donated)
} else return None
}
@ -199,6 +199,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
pooledActors.remove(actor)
super.unregister(actor)
}
def usesActorMailbox = true
private def verifyActorsAreOfSameType(newActor: Actor) = {
actorType match {

View file

@ -68,6 +68,7 @@ trait MessageDispatcher extends Logging {
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
def usesActorMailbox : Boolean
}
trait MessageDemultiplexer {

View file

@ -37,6 +37,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
}
def isShutdown = !active
def usesActorMailbox = false
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {

View file

@ -134,6 +134,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
if (fair) true
else nrOfBusyMessages < 100
}
def usesActorMailbox = false
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -41,6 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
def isShutdown = !active
def usesActorMailbox = false
def shutdown = if (active) {
log.debug("Shutting down ThreadBasedDispatcher [%s]", name)
active = false

View file

@ -200,7 +200,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause))
listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(connection.getCause))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
@ -232,7 +232,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
}
} else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
throw exception
}
@ -325,12 +325,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId)
} else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
throw exception
}
} catch {
case e: Exception =>
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e))
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(e))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@ -345,7 +345,7 @@ class RemoteClientHandler(val name: String,
// Wait until the connection attempt succeeds or fails.
client.connection.awaitUninterruptibly
if (!client.connection.isSuccess) {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause))
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@ -353,17 +353,17 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port))
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port))
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause))
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(event.getCause))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.remote
//import se.scalablesolutions.akka.serialization.Serializable.SBinary
import se.scalablesolutions.akka.serialization.Serializable.SBinary
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
@ -14,7 +14,7 @@ object RemoteProtocolBuilder {
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
//private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
@ -26,9 +26,9 @@ object RemoteProtocolBuilder {
def getMessage(request: RemoteRequest): Any = {
request.getProtocol match {
//case SerializationProtocol.SBINARY =>
// val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
// renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
@ -47,9 +47,9 @@ object RemoteProtocolBuilder {
def getMessage(reply: RemoteReply): Any = {
reply.getProtocol match {
//case SerializationProtocol.SBINARY =>
// val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
// renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
@ -67,12 +67,12 @@ object RemoteProtocolBuilder {
}
def setMessage(message: Any, builder: RemoteRequest.Builder) = {
/*if (message.isInstanceOf[Serializable.SBinary[_]]) {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else*/ if (message.isInstanceOf[Message]) {
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
@ -95,12 +95,12 @@ object RemoteProtocolBuilder {
}
def setMessage(message: Any, builder: RemoteReply.Builder) = {
/*if (message.isInstanceOf[Serializable.SBinary[_]]) {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else*/ if (message.isInstanceOf[Message]) {
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))

View file

@ -30,7 +30,7 @@
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
**/
*/
package se.scalablesolutions.akka.stm
@ -52,9 +52,7 @@ final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] with Pe
def get(key: K) = root(key, key.hashCode)
override def +[A >: V](pair: (K, A)) = pair match {
case (k, v) => update(k, v)
}
override def +[A >: V](pair: (K, A)) = update(pair._1, pair._2)
override def update[A >: V](key: K, value: A) = new HashTrie(root(0, key, key.hashCode) = value)
@ -68,7 +66,7 @@ final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] with Pe
}
object HashTrie {
def apply[K, V](pairs: (K, V)*) = pairs.foldLeft((new HashTrie[K, V]).asInstanceOf[Map[K,V]]) { _ + _ }
def apply[K, V](pairs: (K, V)*) = pairs.foldLeft(new HashTrie[K, V]) { _ + _ }
def unapplySeq[K, V](map: HashTrie[K, V]) = map.toSeq
}

View file

@ -22,6 +22,25 @@ import org.multiverse.stms.alpha.AlphaStm
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
/**
* FIXDOC: document AtomicTemplate
* AtomicTemplate can be used to create atomic blocks from Java code.
* <pre>
* User newUser = new AtomicTemplate[User]() {
* User atomic() {
* ... // create user atomically
* return user;
* }
* }.execute();
* </pre>
*/
trait AtomicTemplate[T] {
def atomic: T
def execute: T = Transaction.Local.atomic {
atomic
}
}
object Transaction {
val idFactory = new AtomicLong(-1L)

View file

@ -32,8 +32,13 @@ import org.multiverse.stms.alpha.AlphaRef
*/
object TransactionalState {
def newMap[K, V] = TransactionalMap[K, V]()
def newMap[K, V](pairs: (K, V)*) = TransactionalMap(pairs: _*)
def newVector[T] = TransactionalVector[T]()
def newVector[T](elems: T*) = TransactionalVector(elems :_*)
def newRef[T] = TransactionalRef[T]()
def newRef[T](initialValue: T) = TransactionalRef(initialValue)
}
/**
@ -57,7 +62,11 @@ trait Committable {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Ref {
type Ref[T] = TransactionalRef[T]
def apply[T]() = new Ref[T]
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
}
/**
@ -68,20 +77,14 @@ object Ref {
object TransactionalRef {
/**
* An implicit conversion that converts an Option to an Iterable value.
* An implicit conversion that converts a TransactionalRef to an Iterable value.
*/
implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList
def apply[T]() = new TransactionalRef[T]
}
/**
* Implements a transactional managed reference.
* Alias to TransactionalRef.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Ref[T] extends TransactionalRef[T]
def apply[T](initialValue: T) = new TransactionalRef[T](Some(initialValue))
}
/**
* Implements a transactional managed reference.
@ -89,19 +92,32 @@ class Ref[T] extends TransactionalRef[T]
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalRef[T] extends Transactional {
class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
self =>
import org.multiverse.api.ThreadLocalTransaction._
implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
private[this] lazy val ref: AlphaRef[T] = new AlphaRef
private[this] lazy val ref = {
val r = new AlphaRef[T]
initialOpt.foreach(r.set(_))
r
}
def swap(elem: T) = {
ensureIsInTransaction
ref.set(elem)
}
def alter(f: T => T): T = {
ensureIsInTransaction
ensureNotNull
ref.set(f(ref.get))
ref.get
}
def get: Option[T] = {
ensureIsInTransaction
if (ref.isNull) None
@ -129,24 +145,36 @@ class TransactionalRef[T] extends Transactional {
ref.isNull
}
def map[B](f: T => B): Option[B] = {
def map[B](f: T => B): TransactionalRef[B] = {
ensureIsInTransaction
if (isEmpty) None else Some(f(ref.get))
if (isEmpty) TransactionalRef[B] else TransactionalRef(f(ref.get))
}
def flatMap[B](f: T => Option[B]): Option[B] = {
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = {
ensureIsInTransaction
if (isEmpty) None else f(ref.get)
if (isEmpty) TransactionalRef[B] else f(ref.get)
}
def filter(p: T => Boolean): Option[T] = {
def filter(p: T => Boolean): TransactionalRef[T] = {
ensureIsInTransaction
if (isEmpty || p(ref.get)) Some(ref.get) else None
if (isDefined && p(ref.get)) TransactionalRef(ref.get) else TransactionalRef[T]
}
def foreach(f: T => Unit) {
/**
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
*/
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
class WithFilter(p: T => Boolean) {
def map[B](f: T => B): TransactionalRef[B] = self filter p map f
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
def foreach[U](f: T => U): Unit = self filter p foreach f
def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
}
def foreach[U](f: T => U): Unit = {
ensureIsInTransaction
if (!isEmpty) f(ref.get)
if (isDefined) f(ref.get)
}
def elements: Iterator[T] = {
@ -171,10 +199,15 @@ class TransactionalRef[T] extends Transactional {
private def ensureIsInTransaction =
if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
private def ensureNotNull =
if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")
}
object TransactionalMap {
def apply[K, V]() = new TransactionalMap[K, V]
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashTrie(pairs: _*)))
}
/**
@ -184,11 +217,10 @@ object TransactionalMap {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
protected[this] val ref = TransactionalRef[HashTrie[K, V]]
class TransactionalMap[K, V](initialOpt: Option[HashTrie[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
val uuid = UUID.newUuid.toString
ref.swap(new HashTrie[K, V])
protected[this] lazy val ref = new TransactionalRef(initialOpt.orElse(Some(new HashTrie[K, V])))
def -=(key: K) = {
remove(key)
@ -239,10 +271,17 @@ class TransactionalMap[K, V] extends Transactional with scala.collection.mutable
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalMap[_, _]] &&
other.hashCode == hashCode
override def toString = if (outsideTransaction) "<TransactionalMap>" else super.toString
def outsideTransaction =
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
}
object TransactionalVector {
def apply[T]() = new TransactionalVector[T]
def apply[T](elems: T*) = new TransactionalVector(Some(Vector(elems: _*)))
}
/**
@ -252,12 +291,10 @@ object TransactionalVector {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalVector[T] extends Transactional with IndexedSeq[T] {
class TransactionalVector[T](initialOpt: Option[Vector[T]] = None) extends Transactional with IndexedSeq[T] {
val uuid = UUID.newUuid.toString
private[this] val ref = TransactionalRef[Vector[T]]
ref.swap(EmptyVector)
private[this] lazy val ref = new TransactionalRef(initialOpt.orElse(Some(EmptyVector)))
def clear = ref.swap(EmptyVector)
@ -283,5 +320,10 @@ class TransactionalVector[T] extends Transactional with IndexedSeq[T] {
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalVector[_]] &&
other.hashCode == hashCode
override def toString = if (outsideTransaction) "<TransactionalVector>" else super.toString
def outsideTransaction =
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
}

View file

@ -1,6 +1,5 @@
package se.scalablesolutions.akka.actor
import _root_.java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.Actor.transactor
import se.scalablesolutions.akka.stm.Transaction.Global.atomic
import se.scalablesolutions.akka.util.Logging
@ -10,51 +9,40 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.runner.RunWith
import org.junit.{Test}
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{TimeUnit, CountDownLatch}
@RunWith(classOf[JUnitRunner])
class AgentSpec extends junit.framework.TestCase
with Suite with MustMatchers
with ActorTestUtil with Logging {
class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
@Test def testSendFun = verify(new TestActor {
def test = {
val agent = Agent(5)
handle(agent) {
agent send (_ + 1)
agent send (_ * 2)
val result = agent()
result must be(12)
}
}
})
@Test def testSendFun = {
val agent = Agent(5)
agent send (_ + 1)
agent send (_ * 2)
val result = agent()
result must be(12)
agent.stop
}
@Test def testSendValue = verify(new TestActor {
def test = {
val agent = Agent(5)
handle(agent) {
agent send 6
val result = agent()
result must be(6)
}
}
})
@Test def testSendValue = {
val agent = Agent(5)
agent send 6
val result = agent()
result must be(6)
agent.stop
}
@Test def testSendProc = verify(new TestActor {
def test = {
val agent = Agent(5)
var result = 0
val latch = new CountDownLatch(2)
handle(agent) {
agent sendProc { e => result += e; latch.countDown }
agent sendProc { e => result += e; latch.countDown }
assert(latch.await(1, TimeUnit.SECONDS))
result must be(10)
}
}
})
@Test def testSendProc = {
val agent = Agent(5)
var result = 0
val latch = new CountDownLatch(2)
agent sendProc { e => result += e; latch.countDown }
agent sendProc { e => result += e; latch.countDown }
assert(latch.await(5, TimeUnit.SECONDS))
result must be(10)
agent.stop
}
@Test def testOneAgentsendWithinEnlosingTransactionSuccess = {
case object Go
@ -64,7 +52,7 @@ with ActorTestUtil with Logging {
case Go => agent send { e => latch.countDown; e + 1 }
}
tx ! Go
assert(latch.await(1, TimeUnit.SECONDS))
assert(latch.await(5, TimeUnit.SECONDS))
val result = agent()
result must be(6)
agent.close
@ -84,46 +72,40 @@ with ActorTestUtil with Logging {
}
}
tx ! Go
assert(latch.await(1, TimeUnit.SECONDS))
assert(latch.await(5, TimeUnit.SECONDS))
agent.close
tx.stop
assert(true)
}
@Test def testAgentForeach = verify(new TestActor {
def test = {
val agent1 = Agent(3)
var result = 0
for (first <- agent1) {
result = first + 1
}
result must be(4)
agent1.close
@Test def testAgentForeach = {
val agent1 = Agent(3)
var result = 0
for (first <- agent1) {
result = first + 1
}
})
result must be(4)
agent1.close
}
@Test def testAgentMap = {
val agent1 = Agent(3)
val result = for (first <- agent1) yield first + 1
result() must be(4)
result.close
agent1.close
}
@Test def testAgentMap = verify(new TestActor {
def test = {
val agent1 = Agent(3)
val result = for (first <- agent1) yield first + 1
result() must be(4)
result.close
agent1.close
}
})
@Test def testAgentFlatMap = verify(new TestActor {
def test = {
val agent1 = Agent(3)
val agent2 = Agent(5)
val result = for {
first <- agent1
second <- agent2
} yield second + first
result() must be(8)
result.close
agent1.close
agent2.close
}
})
@Test def testAgentFlatMap = {
val agent1 = Agent(3)
val agent2 = Agent(5)
val result = for {
first <- agent1
second <- agent2
} yield second + first
result() must be(8)
result.close
agent1.close
agent2.close
}
}

View file

@ -9,16 +9,17 @@ import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
object Global {
val oneWay = new CountDownLatch(1)
val remoteReply = new CountDownLatch(1)
case class Send(actor: Actor)
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case "OneWay" =>
Global.oneWay.countDown
RemoteActorSpecActorUnidirectional.latch.countDown
}
}
@ -31,21 +32,6 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
}
case class Send(actor: Actor)
class RemoteActorSpecActorAsyncSender extends Actor {
def receive = {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
Global.remoteReply.countDown
}
def send(actor: Actor) {
this ! Send(actor)
}
}
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
@ -53,6 +39,9 @@ class SendOneWayAndReplyReceiverActor extends Actor {
}
}
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: Actor = _
@ -63,7 +52,7 @@ class SendOneWayAndReplySenderActor extends Actor {
def receive = {
case msg: AnyRef =>
state = Some(msg)
latch.countDown
SendOneWayAndReplySenderActor.latch.countDown
}
}
@ -104,7 +93,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor.makeRemote(HOSTNAME, PORT1)
actor.start
actor ! "OneWay"
assert(Global.oneWay.await(1, TimeUnit.SECONDS))
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
actor.stop
}
@ -113,14 +102,12 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
val actor = new SendOneWayAndReplyReceiverActor
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val latch = new CountDownLatch(1)
val sender = new SendOneWayAndReplySenderActor
sender.setReplyToAddress(HOSTNAME, PORT2)
sender.sendTo = actor
sender.latch = latch
sender.start
sender.sendOff
assert(latch.await(1, TimeUnit.SECONDS))
assert(SendOneWayAndReplySenderActor.latch.await(1, TimeUnit.SECONDS))
assert(sender.state.isDefined === true)
assert("World" === sender.state.get.asInstanceOf[String])
actor.stop
@ -128,7 +115,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
}
@Test
def shouldSendReplyAsync = {
def shouldSendBangBangMessageAndReceiveReply = {
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(HOSTNAME, PORT1)
actor.start
@ -138,23 +125,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
}
@Test
def shouldSendRemoteReply = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.setReplyToAddress(HOSTNAME, PORT2)
actor.makeRemote(HOSTNAME, PORT2)
actor.start
val sender = new RemoteActorSpecActorAsyncSender
sender.setReplyToAddress(HOSTNAME, PORT1)
sender.start
sender.send(actor)
assert(Global.remoteReply.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test
def shouldSendReceiveException = {
def shouldSendAndReceiveRemoteException = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(HOSTNAME, PORT1)

View file

@ -1,11 +1,14 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.CountDownLatch
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import java.util.concurrent.{TimeUnit, CountDownLatch}
/**
* @author Jan Van Besien
*/
@ -51,7 +54,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
slow ! i
}
finishedCounter.await
finishedCounter.await(5, TimeUnit.SECONDS)
fast.invocationCount must be > (slow.invocationCount)
}
}

View file

@ -57,9 +57,9 @@ class PerformanceSpec extends JUnitSuite {
}
protected def sender : Option[Actor] = replyTo match {
case Some(Left(actor)) => Some(actor)
case _ => None
}
case Some(Left(actor)) => Some(actor)
case _ => None
}
def receive = {
case MeetingCount(i) => {
@ -104,9 +104,9 @@ class PerformanceSpec extends JUnitSuite {
}
protected def sender : Option[Actor] = replyTo match {
case Some(Left(actor)) => Some(actor)
case _ => None
}
case Some(Left(actor)) => Some(actor)
case _ => None
}
override def receive: PartialFunction[Any, Unit] = {
case Meet(from, otherColour) =>

View file

@ -11,20 +11,18 @@ object ServerInitiatedRemoteActorSpec {
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
case class Send(actor: Actor)
object Global {
val oneWay = new CountDownLatch(1)
var remoteReply = new CountDownLatch(1)
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
start
def receive = {
case "OneWay" =>
println("================== ONEWAY")
Global.oneWay.countDown
RemoteActorSpecActorUnidirectional.latch.countDown
}
}
@ -38,15 +36,16 @@ object ServerInitiatedRemoteActorSpec {
}
}
case class Send(actor: Actor)
object RemoteActorSpecActorAsyncSender {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorAsyncSender extends Actor {
start
def receive = {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
Global.remoteReply.countDown
RemoteActorSpecActorAsyncSender.latch.countDown
}
def send(actor: Actor) {
@ -91,7 +90,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
5000L,
HOSTNAME, PORT)
val result = actor ! "OneWay"
assert(Global.oneWay.await(1, TimeUnit.SECONDS))
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
actor.stop
}
@ -113,12 +112,11 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
timeout,
HOSTNAME, PORT)
val sender = new RemoteActorSpecActorAsyncSender
sender.setReplyToAddress(HOSTNAME, PORT)
sender.start
sender.send(actor)
assert(Global.remoteReply.await(1, TimeUnit.SECONDS))
assert(RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS))
actor.stop
}

View file

@ -0,0 +1,137 @@
package se.scalablesolutions.akka.stm
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@RunWith(classOf[JUnitRunner])
class TransactionalRefSpec extends Spec with ShouldMatchers {
describe("A TransactionalRef") {
import Transaction.Local._
it("should optionally accept an initial value") {
val emptyRef = Ref[Int]
val empty = atomic { emptyRef.get }
empty should be(None)
val ref = Ref(3)
val value = atomic { ref.get.get }
value should be(3)
}
it("should be settable using swap") {
val ref = Ref[Int]
atomic { ref.swap(3) }
val value = atomic { ref.get.get }
value should be(3)
}
it("should be changeable using alter") {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
}
increment
increment
increment
val value = atomic { ref.get.get }
value should be(3)
}
it("should not be changeable using alter if no value has been set") {
val ref = Ref[Int]
def increment = atomic {
ref alter (_ + 1)
}
evaluating { increment } should produce [RuntimeException]
}
it("should be able to be mapped") {
val ref1 = Ref(1)
val ref2 = atomic {
ref1 map (_ + 1)
}
val value1 = atomic { ref1.get.get }
val value2 = atomic { ref2.get.get }
value1 should be(1)
value2 should be(2)
}
it("should be able to be used in a 'foreach' for comprehension") {
val ref = Ref(3)
var result = 0
atomic {
for (value <- ref) {
result += value
}
}
result should be(3)
}
it("should be able to be used in a 'map' for comprehension") {
val ref1 = Ref(1)
val ref2 = atomic {
for (value <- ref1) yield value + 2
}
val value2 = atomic { ref2.get.get }
value2 should be(3)
}
it("should be able to be used in a 'flatMap' for comprehension") {
val ref1 = Ref(1)
val ref2 = Ref(2)
val ref3 = atomic {
for {
value1 <- ref1
value2 <- ref2
} yield value1 + value2
}
val value3 = atomic { ref3.get.get }
value3 should be(3)
}
it("should be able to be used in a 'filter' for comprehension") {
val ref1 = Ref(1)
val refLess2 = atomic {
for (value <- ref1 if value < 2) yield value
}
val optLess2 = atomic { refLess2.get }
val refGreater2 = atomic {
for (value <- ref1 if value > 2) yield value
}
val optGreater2 = atomic { refGreater2.get }
optLess2 should be(Some(1))
optGreater2 should be(None)
}
}
}

View file

@ -0,0 +1,42 @@
package se.scalablesolutions.akka.persistence.redis
import se.scalablesolutions.akka.actor.Actor
import com.redis._
sealed trait Msg
case class Subscribe(channels: Array[String]) extends Msg
case class Register(callback: PubSubMessage => Any) extends Msg
case class Unsubscribe(channels: Array[String]) extends Msg
case object UnsubscribeAll extends Msg
case class Publish(channel: String, msg: String) extends Msg
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
reply(true)
case Register(cb) =>
callback = cb
reply(true)
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
reply(true)
case UnsubscribeAll =>
client.unsubscribe
reply(true)
}
}
class Publisher(client: RedisClient) extends Actor {
def receive = {
case Publish(channel, message) =>
client.publish(channel, message)
reply(true)
}
}

View file

@ -17,10 +17,10 @@ Then to run the sample:
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
4. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> import sample.chat._
- scala> ChatService.start
5. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
5. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
6. See the chat simulation run.
7. Run it again to see full speed after first initialization.

View file

@ -33,10 +33,10 @@ Then to run the sample:
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> import sample.chat._
- scala> ChatService.start
3. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
3. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
4. See the chat simulation run.
5. Run it again to see full speed after first initialization.

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>.
*/
package sample.pubsub
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import se.scalablesolutions.akka.persistence.redis._
/**
* Sample Akka application for Redis PubSub
*
* Prerequisite: Need Redis Server running (the version that supports pubsub)
*
* 1. Download redis from http://github.com/antirez/redis
* 2. build using "make"
* 3. Run server as ./redis-server
*
* For running this sample application :-
*
* 1. Open a shell and set AKKA_HOME to the distribution root
* 2. cd $AKKA_HOME
* 3. sbt console
* 4. import sample.pubsub._
* 5. Sub.sub("a", "b") // starts Subscription server & subscribes to channels "a" and "b"
*
* 6. Open up another shell similarly as the above and set AKKA_HOME
* 7. cd $AKKA_HOME
* 8. sbt console
* 9. import sample.pubsub._
* 10. Pub.publish("a", "hello") // the first shell should get the message
* 11. Pub.publish("c", "hi") // the first shell should NOT get this message
*
* 12. Open up a redis-client from where you installed redis and issue a publish command
* ./redis-cli publish a "hi there" ## the first shell should get the message
*
* 13. Go back to the first shell
* 14. Sub.unsub("a") // should unsubscribe the first shell from channel "a"
*
* 15. Study the callback function defined below. It supports many other message formats.
* In the second shell window do the following:
* scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c"
* scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d"
* scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c"
* scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels
*/
object Pub {
println("starting publishing service ..")
val r = new RedisClient("localhost", 6379)
val p = new Publisher(r)
p.start
def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}
object Sub {
println("starting subscription service ..")
val r = new RedisClient("localhost", 6379)
val s = new Subscriber(r)
s.start
s ! Register(callback)
def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
}
def unsub(channels: String*) = {
s ! Unsubscribe(channels.toArray)
}
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
}
}
}

View file

@ -147,22 +147,22 @@
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0.Beta1</artifactId>
<version>0.8</version>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-util_2.8.0.Beta1</artifactId>
<version>0.8</version>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-util-java_2.8.0.Beta1</artifactId>
<version>0.8</version>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-spring_2.8.0.Beta1</artifactId>
<version>0.8</version>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>

View file

@ -1,10 +1,16 @@
package se.scalablesolutions.akka.spring.foo;
import java.io.IOException;
public class Bar implements IBar {
@Override
public String getBar() {
return "bar";
}
public void throwsIOException() throws IOException {
throw new IOException("some IO went wrong");
}
}

View file

@ -15,7 +15,7 @@ public class StatefulPojo {
@inittransactionalstate
public void init() {
if (!isInitialized) {
mapState = TransactionalState.newMap();
mapState = TransactionalState.newMap();
vectorState = TransactionalState.newVector();
refState = TransactionalState.newRef();
isInitialized = true;

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import net.lag.configgy.Config;
import org.junit.Before;
import org.junit.Test;
@ -14,64 +15,121 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import se.scalablesolutions.akka.actor.ActiveObject;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import se.scalablesolutions.akka.config.JavaConfig.AllForOne;
import se.scalablesolutions.akka.config.JavaConfig.Component;
import se.scalablesolutions.akka.config.JavaConfig.LifeCycle;
import se.scalablesolutions.akka.config.JavaConfig.Permanent;
import se.scalablesolutions.akka.config.JavaConfig.RemoteAddress;
import se.scalablesolutions.akka.config.JavaConfig.RestartStrategy;
import se.scalablesolutions.akka.remote.RemoteNode;
import se.scalablesolutions.akka.spring.foo.Foo;
import se.scalablesolutions.akka.spring.foo.IBar;
import se.scalablesolutions.akka.spring.foo.MyPojo;
import se.scalablesolutions.akka.spring.foo.StatefulPojo;
/**
* Testclass for supervisor configuration.
*
* @author michaelkober
*
*
*/
public class SupervisorConfigurationTest {
private ApplicationContext context = null;
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/supervisor-config.xml");
}
@Test
public void testSupervision() {
// get ActiveObjectConfigurator bean from spring context
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision1");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
IBar bar = myConfigurator.getInstance(IBar.class);
assertNotNull(bar);
MyPojo pojo = myConfigurator.getInstance(MyPojo.class);
assertNotNull(pojo);
}
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext(
"se/scalablesolutions/akka/spring/foo/supervisor-config.xml");
}
@Test
public void testTransactionalState() {
ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context.getBean("supervision2");
StatefulPojo stateful = conf.getInstance(StatefulPojo.class);
stateful.setMapState("testTransactionalState", "some map state");
stateful.setVectorState("some vector state");
stateful.setRefState("some ref state");
assertEquals("some map state", stateful.getMapState("testTransactionalState"));
assertEquals("some vector state", stateful.getVectorState());
assertEquals("some ref state", stateful.getRefState());
}
@Test
public void testInitTransactionalState() {
StatefulPojo stateful = ActiveObject.newInstance(StatefulPojo.class, 1000, true);
assertTrue("should be inititalized", stateful.isInitialized());
}
public void testSupervision() {
// get ActiveObjectConfigurator bean from spring context
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context
.getBean("supervision1");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
IBar bar = myConfigurator.getInstance(IBar.class);
assertNotNull(bar);
MyPojo pojo = myConfigurator.getInstance(MyPojo.class);
assertNotNull(pojo);
}
@Test
public void testSupervisionWithDispatcher() {
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision-with-dispatcher");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
// TODO how to check dispatcher?
}
@Test
public void testTransactionalState() {
ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context
.getBean("supervision2");
StatefulPojo stateful = conf.getInstance(StatefulPojo.class);
stateful.setMapState("testTransactionalState", "some map state");
stateful.setVectorState("some vector state");
stateful.setRefState("some ref state");
assertEquals("some map state", stateful
.getMapState("testTransactionalState"));
assertEquals("some vector state", stateful.getVectorState());
assertEquals("some ref state", stateful.getRefState());
}
@Test
public void testInitTransactionalState() {
StatefulPojo stateful = ActiveObject.newInstance(StatefulPojo.class,
1000, true);
assertTrue("should be inititalized", stateful.isInitialized());
}
@Test
public void testSupervisionWithDispatcher() {
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context
.getBean("supervision-with-dispatcher");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
// TODO how to check dispatcher?
}
@Test
public void testRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
Foo instance = ActiveObject.newRemoteInstance(Foo.class, 2000, "localhost", 9999);
System.out.println(instance.foo());
}
@Test
public void testSupervisedRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 10000, new Class[] { Exception.class }),
new Component[] {
new Component(
Foo.class,
new LifeCycle(new Permanent()),
10000,
new RemoteAddress("localhost", 9999))
}).supervise();
Foo instance = conf.getInstance(Foo.class);
assertEquals("foo", instance.foo());
}
}

View file

@ -65,12 +65,12 @@
hostname = "localhost"
port = 9999
connection-timeout = 1000 # in millis (1 sec default)
<server>
</server>
<client>
reconnect-delay = 5000 # in millis (5 sec default)
read-timeout = 10000 # in millis (10 sec default)
<client>
</client>
</remote>
<storage>

View file

@ -1,42 +1,10 @@
/*-------------------------------------------------------------------------------
Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
----------------------------------------------------
-------- sbt buildfile for the Akka project --------
----------------------------------------------------
Akka implements a unique hybrid of:
* Actors , which gives you:
* Simple and high-level abstractions for concurrency and parallelism.
* Asynchronous, non-blocking and highly performant event-driven programming model.
* Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM).
* Supervision hierarchies with let-it-crash semantics. For writing highly
fault-tolerant systems that never stop, systems that self-heal.
* Software Transactional Memory (STM). (Distributed transactions coming soon).
* Transactors: combine actors and STM into transactional actors. Allows you to
compose atomic message flows with automatic rollback and retry.
* Remoting: highly performant distributed actors with remote supervision and
error management.
* Cluster membership management.
Akka also has a set of add-on modules:
* Persistence: A set of pluggable back-end storage modules that work in sync with the STM.
* Cassandra distributed and highly scalable database.
* MongoDB document database.
* Redis data structures database
* Camel: Expose Actors as Camel endpoints.
* REST (JAX-RS): Expose actors as REST services.
* Comet: Expose actors as Comet services.
* Security: Digest and Kerberos based security.
* Spring: Spring integration
* Guice: Guice integration
* Microkernel: Run Akka as a stand-alone kernel.
-------------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------\
| Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> |
\---------------------------------------------------------------------------*/
import sbt._
import sbt.CompileOrder._
import scala.Array
import java.util.jar.Attributes
import java.util.jar.Attributes.Name._
import java.io.File
@ -51,18 +19,12 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.0.1-for-scala-2.8.0.Beta1-with-test-interfaces-0.3-SNAPSHOT"
// ------------------------------------------------------------
lazy val akkaHome = {
val home = System.getenv("AKKA_HOME")
if (home == null) throw new Error(
"You need to set the $AKKA_HOME environment variable to the root of the Akka distribution")
Path.fromFile(home)
}
val encodingUtf8 = List("-encoding", "UTF-8")
override def parallelExecution = true
// ------------------------------------------------------------
lazy val deployPath = info.projectPath / "deploy"
lazy val distPath = info.projectPath / "dist"
lazy val deployPath = akkaHome / "deploy"
lazy val distPath = akkaHome / "dist"
override def compileOptions = super.compileOptions ++
Seq("-deprecation", "-Xmigration", "-Xcheckinit", "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8").map(x => CompileOption(x))
override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList
@ -72,7 +34,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------
// repositories
val embeddedrepo = "embedded repo" at (akkaHome / "embedded-repo").asURL.toString
val embeddedrepo = "embedded repo" at (info.projectPath / "embedded-repo").asURL.toString
val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo"
val databinder = "DataBinder" at "http://databinder.net/repo"
// val configgy = "Configgy" at "http://www.lag.net/repo"
@ -277,13 +239,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaCassandraProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val cassandra = "org.apache.cassandra" % "cassandra" % CASSANDRA_VERSION % "compile"
val slf4j = "org.slf4j" % "slf4j-api" % "1.5.8" % "compile"
val slf4j_log4j = "org.slf4j" % "slf4j-log4j12" % "1.5.8" % "compile"
val log4j = "log4j" % "log4j" % "1.2.15" % "compile"
// testing
val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test"
val cassandra_clhm = "org.apache.cassandra" % "clhm-production" % CASSANDRA_VERSION % "test"
val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test"
val google_coll = "com.google.collections" % "google-collections" % "1.0" % "test"
val slf4j = "org.slf4j" % "slf4j-api" % "1.5.8" % "test"
val slf4j_log4j = "org.slf4j" % "slf4j-log4j12" % "1.5.8" % "test"
val log4j = "log4j" % "log4j" % "1.2.15" % "test"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
@ -350,6 +313,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleChatProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSamplePubSubProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSampleLiftProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {
val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile"
@ -384,6 +348,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) {
lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
new AkkaSampleChatProject(_), akka_kernel)
lazy val akka_sample_pubsub = project("akka-sample-pubsub", "akka-sample-pubsub",
new AkkaSamplePubSubProject(_), akka_kernel)
lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift",
new AkkaSampleLiftProject(_), akka_kernel)
lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",

View file

@ -0,0 +1,5 @@
import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
// val surefire = "bryanjswift" % "sbt-surefire-reporting" % "0.0.3-SNAPSHOT"
}