merged with upstream

This commit is contained in:
Jonas Bonér 2010-03-01 22:07:17 +01:00
commit 16fe4bc164
29 changed files with 971 additions and 132 deletions

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.util._
@ -299,7 +299,7 @@ private[akka] sealed class ActiveObjectAspect {
} }
} }
private def getResultOrThrowException[T](future: FutureResult): Option[T] = private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) { if (future.exception.isDefined) {
val (_, cause) = future.exception.get val (_, cause) = future.exception.get
throw cause throw cause

View file

@ -17,11 +17,11 @@ import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.{Queue, HashSet} import java.util.{Queue, HashSet}
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.net.InetSocketAddress import java.net.InetSocketAddress
import org.multiverse.commitbarriers.CountDownCommitBarrier
/** /**
* Implements the Transactor abstraction. E.g. a transactional actor. * Implements the Transactor abstraction. E.g. a transactional actor.
@ -91,7 +91,7 @@ object Actor extends Logging {
*/ */
def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() {
start start
def receive = body def receive: PartialFunction[Any, Unit] = body
} }
/** /**
@ -134,11 +134,14 @@ object Actor extends Logging {
* } * }
* </pre> * </pre>
*/ */
def spawn(body: => Unit): Actor = new Actor() { def spawn(body: => Unit): Actor = {
start case object Spawn
body new Actor() {
def receive = { start
case _ => throw new IllegalArgumentException("Actors created with 'actor(body: => Unit)' do not respond to messages.") send(Spawn)
def receive = {
case Spawn => body; stop
}
} }
} }
@ -199,8 +202,6 @@ trait Actor extends TransactionManagement {
// Only mutable for RemoteServer in order to maintain identity across nodes // Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString private[akka] var _uuid = UUID.newUuid.toString
def uuid = _uuid
// ==================================== // ====================================
// private fields // private fields
// ==================================== // ====================================
@ -239,7 +240,7 @@ trait Actor extends TransactionManagement {
* But it can be used for advanced use-cases when one might want to store away the future and * But it can be used for advanced use-cases when one might want to store away the future and
* resolve it later and/or somewhere else. * resolve it later and/or somewhere else.
*/ */
protected var senderFuture: Option[CompletableFutureResult] = None protected var senderFuture: Option[CompletableFuture] = None
// ==================================== // ====================================
// ==== USER CALLBACKS TO OVERRIDE ==== // ==== USER CALLBACKS TO OVERRIDE ====
@ -256,7 +257,7 @@ trait Actor extends TransactionManagement {
* use a custom name to be able to retrieve the "correct" persisted state * use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc. * upon restart, remote restart etc.
*/ */
protected[akka] var id: String = this.getClass.getName protected var id: String = this.getClass.getName
/** /**
* User overridable callback/setting. * User overridable callback/setting.
@ -266,8 +267,6 @@ trait Actor extends TransactionManagement {
*/ */
@volatile var timeout: Long = Actor.TIMEOUT @volatile var timeout: Long = Actor.TIMEOUT
ActorRegistry.register(this)
/** /**
* User overridable callback/setting. * User overridable callback/setting.
* <p/> * <p/>
@ -415,6 +414,7 @@ trait Actor extends TransactionManagement {
init init
} }
Actor.log.debug("[%s] has started", toString) Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
this this
} }
@ -533,26 +533,17 @@ trait Actor extends TransactionManagement {
*/ */
def !![T](message: Any): Option[T] = !![T](message, timeout) def !![T](message: Any): Option[T] = !![T](message, timeout)
/**
/* * FIXME document !!!
//FIXME 2.8 def !!!(message: Any)(implicit sender: AnyRef = None): FutureResult = { */
def !!!(message: Any)(implicit sender: AnyRef): FutureResult = { def !!!(message: Any): Future = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) { if (_isRunning) {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
else None
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, from)
} else throw new IllegalStateException( } else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it") "Actor has not been started, you need to invoke 'actor.start' before using it")
} }
*/
/**
* This method is evil and has been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: Any): T = throw new UnsupportedOperationException(
"'!?' is evil and has been removed. Use '!!' with a timeout instead")
/** /**
* Forwards the message and passes the original sender actor as the sender. * Forwards the message and passes the original sender actor as the sender.
* <p/> * <p/>
@ -763,6 +754,16 @@ trait Actor extends TransactionManagement {
actor actor
} }
/**
* Returns the id for the actor.
*/
def getId = id
/**
* Returns the uuid for the actor.
*/
def uuid = _uuid
// ========================================= // =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ==== // ==== INTERNAL IMPLEMENTATION DETAILS ====
// ========================================= // =========================================
@ -804,12 +805,12 @@ trait Actor extends TransactionManagement {
// set the source fields used to reply back to the original sender // set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor) // (i.e. not the remote proxy actor)
if (sender.isDefined) { if(sender.isDefined) {
val s = sender.get val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName) requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid) requestBuilder.setSourceUuid(s.uuid)
val (host, port) = s._replyToAddress.map(actor => (actor.getHostName, actor.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
@ -823,7 +824,9 @@ trait Actor extends TransactionManagement {
if (_isEventBased) { if (_isEventBased) {
_mailbox.add(invocation) _mailbox.add(invocation)
if (_isSuspended) invocation.send if (_isSuspended) invocation.send
} else invocation.send }
else
invocation.send
} }
clearTransactionSet clearTransactionSet
} }
@ -831,7 +834,7 @@ trait Actor extends TransactionManagement {
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, message: Any,
timeout: Long, timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { senderFuture: Option[CompletableFuture]): CompletableFuture = {
if (isTransactionSetInScope) { if (isTransactionSetInScope) {
log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message) log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
getTransactionSetInScope.incParties getTransactionSetInScope.incParties
@ -855,7 +858,7 @@ trait Actor extends TransactionManagement {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else { } else {
val future = if (senderFuture.isDefined) senderFuture.get val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(timeout) else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get) val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get)
if (_isEventBased) { if (_isEventBased) {
_mailbox.add(invocation) _mailbox.add(invocation)
@ -956,7 +959,7 @@ trait Actor extends TransactionManagement {
} }
} }
private def getResultOrThrowException[T](future: FutureResult): Option[T] = private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) throw future.exception.get._2 if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]] else future.result.asInstanceOf[Option[T]]
@ -1064,6 +1067,5 @@ trait Actor extends TransactionManagement {
that.asInstanceOf[Actor]._uuid == _uuid that.asInstanceOf[Actor]._uuid == _uuid
} }
override def toString(): String = "Actor[" + id + ":" + uuid + "]" override def toString = "Actor[" + id + ":" + uuid + "]"
} }

View file

@ -6,81 +6,123 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.{ListBuffer, HashMap} import scala.collection.mutable.ListBuffer
import scala.reflect.Manifest import scala.reflect.Manifest
import java.util.concurrent.ConcurrentHashMap
/** /**
* Registry holding all actor instances, mapped by class and the actor's id field (which can be set by user-code). * Registry holding all Actor instances in the whole system.
* Mapped by:
* <ul>
* <li>the Actor's UUID</li>
* <li>the Actor's id field (which can be set by user-code)</li>
* <li>the Actor's class</li>
* <ul>
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ActorRegistry extends Logging { object ActorRegistry extends Logging {
private val actorsByClassName = new HashMap[String, List[Actor]] private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new HashMap[String, List[Actor]] private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
/** /**
* Returns all actors in the system. * Returns all actors in the system.
*/ */
def actors: List[Actor] = synchronized { def actors: List[Actor] = {
val all = new ListBuffer[Actor] val all = new ListBuffer[Actor]
actorsById.values.foreach(all ++= _) val elements = actorsByUUID.elements
while (elements.hasMoreElements) all += elements.nextElement
all.toList all.toList
} }
/** /**
* Invokes a function for all actors. * Invokes a function for all actors.
*/ */
def foreach(f: (Actor) => Unit) = actors.foreach(f) def foreach(f: (Actor) => Unit) = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) f(elements.nextElement)
}
/** /**
* Finds all actors that are subtypes of the class passed in as the Manifest argument. * Finds all actors that are subtypes of the class passed in as the Manifest argument.
*/ */
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = synchronized { def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = {
for (actor <- actors; if manifest.erasure.isAssignableFrom(actor.getClass)) yield actor.asInstanceOf[T] val all = new ListBuffer[T]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actor = elements.nextElement
if (manifest.erasure.isAssignableFrom(actor.getClass)) {
all += actor.asInstanceOf[T]
}
}
all.toList
} }
/** /**
* Finds all actors of the exact type specified by the class passed in as the Class argument. * Finds all actors of the exact type specified by the class passed in as the Class argument.
*/ */
def actorsFor[T <: Actor](clazz: Class[T]): List[T] = synchronized { def actorsFor[T <: Actor](clazz: Class[T]): List[T] = {
actorsByClassName.get(clazz.getName) match { if (actorsByClassName.containsKey(clazz.getName)) {
case None => Nil actorsByClassName.get(clazz.getName).asInstanceOf[List[T]]
case Some(instances) => instances.asInstanceOf[List[T]] } else Nil
}
} }
/** /**
* Finds all actors that has a specific id. * Finds all actors that has a specific id.
*/ */
def actorsFor(id : String): List[Actor] = synchronized { def actorsFor(id: String): List[Actor] = {
actorsById.get(id) match { if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[Actor]]
case None => Nil else Nil
case Some(instances) => instances
}
} }
def register(actor: Actor) = synchronized { /**
val className = actor.getClass.getName * Finds the actor that has a specific UUID.
actorsByClassName.get(className) match { */
case Some(instances) => actorsByClassName + (className -> (actor :: instances)) def actorFor(uuid: String): Option[Actor] = {
case None => actorsByClassName + (className -> (actor :: Nil)) if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
} else None
val id = actor.id }
/**
* Registers an actor in the ActorRegistry.
*/
def register(actor: Actor) = {
// UUID
actorsByUUID.put(actor.uuid, actor)
// ID
val id = actor.getId
if (id eq null) throw new IllegalStateException("Actor.id is null " + actor) if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
actorsById.get(id) match { if (actorsById.containsKey(id)) actorsById.put(id, actor :: actorsById.get(id))
case Some(instances) => actorsById + (id -> (actor :: instances)) else actorsById.put(id, actor :: Nil)
case None => actorsById + (id -> (actor :: Nil))
} // Class name
val className = actor.getClass.getName
if (actorsByClassName.containsKey(className)) {
actorsByClassName.put(className, actor :: actorsByClassName.get(className))
} else actorsByClassName.put(className, actor :: Nil)
} }
def unregister(actor: Actor) = synchronized { /**
actorsByClassName - actor.getClass.getName * Unregisters an actor in the ActorRegistry.
actorsById - actor.getClass.getName */
def unregister(actor: Actor) = {
actorsByUUID remove actor.uuid
actorsById remove actor.getId
actorsByClassName remove actor.getClass.getName
} }
/**
* Shuts down and unregisters all actors in the system.
*/
def shutdownAll = { def shutdownAll = {
log.info("Shutting down all actors in the system...") log.info("Shutting down all actors in the system...")
actorsById.foreach(entry => entry._2.map(_.stop)) foreach(_.stop)
log.info("All actors have been shut down") actorsByUUID.clear
actorsById.clear
actorsByClassName.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
} }
} }

View file

@ -117,7 +117,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
actors.put(actor.getClass.getName, actor) actors.put(actor.getClass.getName, actor)
actor.lifeCycle = Some(lifeCycle) actor.lifeCycle = Some(lifeCycle)
startLink(actor) startLink(actor)
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.id, actor)) remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.getId, actor))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start val supervisor = factory.newInstanceFor(supervisorConfig).start

View file

@ -62,15 +62,22 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def dispatch(invocation: MessageInvocation) = if (active) { def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
def run = { def run = {
invocation.receiver.synchronized { var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
}
}
/* invocation.receiver.synchronized {
val messages = invocation.receiver._mailbox.iterator val messages = invocation.receiver._mailbox.iterator
while (messages.hasNext) { while (messages.hasNext) {
messages.next.asInstanceOf[MessageInvocation].invoke messages.next.invoke
messages.remove messages.remove
} }
} }
} }
}) */
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
def start = if (!active) { def start = if (!active) {

View file

@ -2,17 +2,75 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/ */
/**
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.dispatch package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
class FutureTimeoutException(message: String) extends RuntimeException(message) class FutureTimeoutException(message: String) extends RuntimeException(message)
sealed trait FutureResult { object Futures {
/**
* FIXME document
* <pre>
* val future = Futures.future(1000) {
* ... // do stuff
* }
* </pre>
*/
def future(timeout: Long)(body: => Any): Future = {
val promise = new DefaultCompletableFuture(timeout)
try {
promise completeWithResult body
} catch {
case e => promise completeWithException (None, e)
}
promise
}
def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[Future]): Future = {
var future: Option[Future] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
future.get
}
/*
def awaitEither(f1: Future, f2: Future): Option[Any] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
case class Result(res: Option[Any])
val handOff = new SynchronousQueue[Option[Any]]
spawn {
try {
println("f1 await")
f1.await
println("f1 offer")
handOff.offer(f1.result)
} catch {case _ => {}}
}
spawn {
try {
println("f2 await")
f2.await
println("f2 offer")
println("f2 offer: " + f2.result)
handOff.offer(f2.result)
} catch {case _ => {}}
}
Thread.sleep(100)
handOff.take
}
*/
}
sealed trait Future {
def await def await
def awaitBlocking def awaitBlocking
def isCompleted: Boolean def isCompleted: Boolean
@ -22,12 +80,13 @@ sealed trait FutureResult {
def exception: Option[Tuple2[AnyRef, Throwable]] def exception: Option[Tuple2[AnyRef, Throwable]]
} }
trait CompletableFutureResult extends FutureResult { trait CompletableFuture extends Future {
def completeWithResult(result: Any) def completeWithResult(result: Any)
def completeWithException(toBlame: AnyRef, exception: Throwable) def completeWithException(toBlame: AnyRef, exception: Throwable)
} }
class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult { // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val TIME_UNIT = TimeUnit.MILLISECONDS private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0) def this() = this(0)
@ -46,7 +105,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
var start = currentTimeInNanos var start = currentTimeInNanos
try { try {
wait = _signal.awaitNanos(wait) wait = _signal.awaitNanos(wait)
if (wait <= 0) throw new FutureTimeoutException("Future timed out after [" + timeout + "] milliseconds") if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
} catch { } catch {
case e: InterruptedException => case e: InterruptedException =>
wait = wait - (currentTimeInNanos - start) wait = wait - (currentTimeInNanos - start)

View file

@ -15,7 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: Actor, final class MessageInvocation(val receiver: Actor,
val message: Any, val message: Any,
val future: Option[CompletableFutureResult], val future: Option[CompletableFuture],
val sender: Option[Actor], val sender: Option[Actor],
val transactionSet: Option[CountDownCommitBarrier]) { val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null") if (receiver eq null) throw new IllegalArgumentException("receiver is null")

View file

@ -4,11 +4,11 @@
package se.scalablesolutions.akka.dispatch package se.scalablesolutions.akka.dispatch
import java.util.Collection
import java.util.concurrent._ import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger} import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy import ThreadPoolExecutor.CallerRunsPolicy
import java.util.Collection
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder { trait ThreadPoolBuilder {

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config import se.scalablesolutions.akka.Config.config
@ -86,7 +86,7 @@ object RemoteClient extends Logging {
override def postMessageToMailboxAndCreateFutureResultWithTimeout( override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, message: Any,
timeout: Long, timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = { senderFuture: Option[CompletableFuture]): CompletableFuture = {
val requestBuilder = RemoteRequest.newBuilder val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId) .setId(RemoteRequestIdFactory.nextId)
.setTarget(className) .setTarget(className)
@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false @volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] private val futures = new ConcurrentHashMap[Long, CompletableFuture]
private val supervisors = new ConcurrentHashMap[String, Actor] private val supervisors = new ConcurrentHashMap[String, Actor]
private val channelFactory = new NioClientSocketChannelFactory( private val channelFactory = new NioClientSocketChannelFactory(
@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
} }
} }
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) { def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
if (request.getIsOneWay) { if (request.getIsOneWay) {
connection.getChannel.write(request) connection.getChannel.write(request)
None None
} else { } else {
futures.synchronized { futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(request.getTimeout) else new DefaultCompletableFuture(request.getTimeout)
futures.put(request.getId, futureResult) futures.put(request.getId, futureResult)
connection.getChannel.write(request) connection.getChannel.write(request)
Some(futureResult) Some(futureResult)
@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class RemoteClientPipelineFactory(name: String, class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult], futures: ConcurrentMap[Long, CompletableFuture],
supervisors: ConcurrentMap[String, Actor], supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap, bootstrap: ClientBootstrap,
remoteAddress: SocketAddress, remoteAddress: SocketAddress,
@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String,
*/ */
@ChannelPipelineCoverage {val value = "all"} @ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String, class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult], val futures: ConcurrentMap[Long, CompletableFuture],
val supervisors: ConcurrentMap[String, Actor], val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap, val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress, val remoteAddress: SocketAddress,

View file

@ -196,8 +196,8 @@ class RemoteServer extends Logging {
* Register Remote Actor by the Actor's 'id' field. * Register Remote Actor by the Actor's 'id' field.
*/ */
def register(actor: Actor) = if (isRunning) { def register(actor: Actor) = if (isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
} }
/** /**

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFutureResult import se.scalablesolutions.akka.dispatch.CompletableFuture
/** /**
* Implements Oz-style dataflow (single assignment) variables. * Implements Oz-style dataflow (single assignment) variables.
@ -74,7 +74,7 @@ object DataFlow {
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT timeout = TIME_OUT
start start
private var readerFuture: Option[CompletableFutureResult] = None private var readerFuture: Option[CompletableFuture] = None
def receive = { def receive = {
case Get => case Get =>
val ref = dataFlow.value.get val ref = dataFlow.value.get

View file

@ -0,0 +1,160 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class ActorRegistryTest extends JUnitSuite {
var record = ""
class TestActor extends Actor {
id = "MyID"
def receive = {
case "ping" =>
record = "pong" + record
reply("got ping")
}
}
@Test def shouldGetActorByIdFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
actor.start
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId == "MyID")
actor.stop
}
@Test def shouldGetActorByUUIDFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
val uuid = actor.uuid
actor.start
val actorOrNone = ActorRegistry.actorFor(uuid)
assert(actorOrNone.isDefined)
assert(actorOrNone.get.uuid === uuid)
actor.stop
}
@Test def shouldGetActorByClassFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
actor.start
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
actor.stop
}
@Test def shouldGetActorByManifestFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
actor.start
val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
actor.stop
}
@Test def shouldGetActorsByIdFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetActorsByClassFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetActorsByManifestFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetAllActorsFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val actors = ActorRegistry.actors
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
record = ""
ActorRegistry.foreach(actor => actor !! "ping")
assert(record === "pongpong")
actor1.stop
actor2.stop
}
@Test def shouldShutdownAllActorsInActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
ActorRegistry.shutdownAll
assert(ActorRegistry.actors.size === 0)
}
@Test def shouldRemoveUnregisterActorInActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
assert(ActorRegistry.actors.size === 2)
ActorRegistry.unregister(actor1)
assert(ActorRegistry.actors.size === 1)
ActorRegistry.unregister(actor2)
assert(ActorRegistry.actors.size === 0)
}
}

View file

@ -0,0 +1,111 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Futures
class FutureTest extends JUnitSuite {
class TestActor extends Actor {
def receive = {
case "Hello" =>
reply("World")
case "NoReply" => {}
case "Failure" =>
throw new RuntimeException("expected")
}
}
@Test def shouldActorReplyResultThroughExplicitFuture = {
val actor = new TestActor
actor.start
val future = actor !!! "Hello"
future.await
assert(future.result.isDefined)
assert("World" === future.result.get)
actor.stop
}
@Test def shouldActorReplyExceptionThroughExplicitFuture = {
val actor = new TestActor
actor.start
val future = actor !!! "Failure"
future.await
assert(future.exception.isDefined)
assert("expected" === future.exception.get._2.getMessage)
actor.stop
}
/*
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitEitherRight = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop
actor2.stop
}
*/
@Test def shouldFutureAwaitOneLeft = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitOneRight = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitAll = {
val actor1 = new TestActor
actor1.start
val actor2 = new TestActor
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "Hello"
Futures.awaitAll(List(future1, future2))
assert(future1.result.isDefined)
assert("World" === future1.result.get)
assert(future2.result.isDefined)
assert("World" === future2.result.get)
actor1.stop
actor2.stop
}
}

View file

@ -28,11 +28,6 @@ object Patterns {
val seq = actors val seq = actors
} }
//FIXME 2.8, use default params with CyclicIterator
/*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor(
new CyclicIterator(actors())
) */
def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher { def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
override def transform(msg : Any) = msgTransformer(msg) override def transform(msg : Any) = msgTransformer(msg)
def routes = routing def routes = routing
@ -81,21 +76,4 @@ class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
current = nc.tail current = nc.tail
nc.head nc.head
} }
}
//Agent
/*
val a = agent(startValue)
a.set(_ + 5)
a.get
a.foreach println(_)
*/
object Agent {
sealed trait AgentMessage
case class FunMessage[T](f : (T) => T) extends AgentMessage
case class ProcMessage[T](f : (T) => Unit) extends AgentMessage
case class ValMessage[T](t : T) extends AgentMessage
}
sealed private[akka] class Agent[T] {
} }

View file

@ -18,6 +18,12 @@
<artifactId>akka-persistence-common</artifactId> <artifactId>akka-persistence-common</artifactId>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<version>${project.version}</version> <version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.google-collections</groupId>
<artifactId>google-collect</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<!-- For Cassandra --> <!-- For Cassandra -->
@ -26,6 +32,50 @@
<artifactId>cassandra</artifactId> <artifactId>cassandra</artifactId>
<version>0.5.0</version> <version>0.5.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>high-scale-lib</artifactId>
<version>0.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>clhm-production</artifactId>
<version>0.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0-rc1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.8</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>

View file

@ -104,6 +104,42 @@ trait CassandraSession extends Closeable with Flushable {
// ==================================== // ====================================
// ====== Java-style API names // ====== Java-style API names
// ==================================== // ====================================
def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel)
def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel)
def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate) = / (key, columnParent, slicePredicate)
def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate, consistencyLevel: Int) = / (key, columnParent, slicePredicate, consistencyLevel)
def get(key: String, colPath: ColumnPath) = |(key, colPath)
def get(key: String, colPath: ColumnPath, consistencyLevel: Int) = |(key, colPath, consistencyLevel)
def getCount(key: String, columnParent: ColumnParent)= |#(key, columnParent)
def getCount(key: String, columnParent: ColumnParent, consistencyLevel: Int) = |#(key, columnParent, consistencyLevel)
def insert(key: String, colPath: ColumnPath, value: Array[Byte]): Unit = ++|(key, colPath, value)
def insert(key: String, colPath: ColumnPath, value: Array[Byte], consistencyLevel: Int): Unit = ++|(key, colPath, value, consistencyLevel)
def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit = ++|(key, colPath, value, timestamp)
def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) = ++|(key, colPath, value, timestamp, consistencyLevel)
def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]]): Unit = ++|(key, batch)
def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]], consistencyLevel: Int): Unit = ++|(key, batch, consistencyLevel)
def remove(key: String, columnPath: ColumnPath, timestamp: Long): Unit = --(key, columnPath, timestamp)
def remove(key: String, columnPath: ColumnPath, timestamp: Long, consistencyLevel: Int): Unit = --(key, columnPath, timestamp, consistencyLevel)
} }
class CassandraSessionPool[T <: TTransport]( class CassandraSessionPool[T <: TTransport](

View file

@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=DEBUG,R
# rolling log file ("system.log
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
log4j.appender.R.File=target/logs/system.log

View file

@ -0,0 +1,337 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<Storage>
<!--======================================================================-->
<!-- Basic Configuration -->
<!--======================================================================-->
<!--
~ The name of this cluster. This is mainly used to prevent machines in
~ one logical cluster from joining another.
-->
<ClusterName>akka</ClusterName>
<!--
~ Turn on to make new [non-seed] nodes automatically migrate the right data
~ to themselves. (If no InitialToken is specified, they will pick one
~ such that they will get half the range of the most-loaded node.)
~ If a node starts up without bootstrapping, it will mark itself bootstrapped
~ so that you can't subsequently accidently bootstrap a node with
~ data on it. (You can reset this by wiping your data and commitlog
~ directories.)
~
~ Off by default so that new clusters and upgraders from 0.4 don't
~ bootstrap immediately. You should turn this on when you start adding
~ new nodes to a cluster that already has data on it. (If you are upgrading
~ from 0.4, start your cluster with it off once before changing it to true.
~ Otherwise, no data will be lost but you will incur a lot of unnecessary
~ I/O before your cluster starts up.)
-->
<AutoBootstrap>false</AutoBootstrap>
<!--
~ Keyspaces and ColumnFamilies:
~ A ColumnFamily is the Cassandra concept closest to a relational
~ table. Keyspaces are separate groups of ColumnFamilies. Except in
~ very unusual circumstances you will have one Keyspace per application.
~ There is an implicit keyspace named 'system' for Cassandra internals.
-->
<Keyspaces>
<Keyspace Name="akka">
<!-- The fraction of keys per sstable whose locations we
keep in memory in "mostly LRU" order. (JUST the key
locations, NOT any column values.)
The amount of memory used by the default setting of
0.01 is comparable to the amount used by the internal
per-sstable key index. Consider increasing this is
fine if you have fewer, wider rows. Set to 0 to
disable entirely.
-->
<KeysCachedFraction>0.01</KeysCachedFraction>
<!--
The CompareWith attribute tells Cassandra how to sort the columns
for slicing operations. For backwards compatibility, the default
is to use AsciiType, which is probably NOT what you want.
Other options are BytesType, UTF8Type, LexicalUUIDType, TimeUUIDType, and LongType.
You can also specify the fully-qualified class name to a class
of your choice implementing org.apache.cassandra.db.marshal.IType.
SuperColumns have a similar CompareSubcolumnsWith attribute.
ByteType: simple sort by byte value. No validation is performed.
AsciiType: like BytesType, but validates that the input can be parsed as US-ASCII.
UTF8Type: A string encoded as UTF8
LongType: A 64bit long
LexicalUUIDType: a 128bit UUID, compared lexically (by byte value)
TimeUUIDType: a 128bit version 1 UUID, compared by timestamp
(To get the closest approximation to 0.3-style supercolumns,
you would use CompareWith=UTF8Type CompareSubcolumnsWith=LongType.)
if FlushPeriodInMinutes is configured and positive, it will be
flushed to disk with that period whether it is dirty or not.
This is intended for lightly-used columnfamilies so that they
do not prevent commitlog segments from being purged.
-->
<ColumnFamily CompareWith="UTF8Type" Name="map"/>
<!-- FIXME: change vector to a super column -->
<ColumnFamily CompareWith="UTF8Type" Name="vector"/>
<ColumnFamily CompareWith="UTF8Type" Name="ref"/>
<!--ColumnFamily CompareWith="UTF8Type" Name="Standard1" FlushPeriodInMinutes="60"/>
<ColumnFamily CompareWith="TimeUUIDType" Name="StandardByUUID1"/>
<ColumnFamily ColumnType="Super" CompareWith="UTF8Type" CompareSubcolumnsWith="UTF8Type" Name="Super1"/-->
</Keyspace>
</Keyspaces>
<!--
~ Partitioner: any IPartitioner may be used, including your own as long
~ as it is on the classpath. Out of the box, Cassandra provides
~ org.apache.cassandra.dht.RandomPartitioner,
~ org.apache.cassandra.dht.OrderPreservingPartitioner, and
~ org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
~ (CollatingOPP colates according to EN,US rules, not naive byte
~ ordering. Use this as an example if you need locale-aware collation.)
~ Range queries require using an order-preserving partitioner.
~
~ Achtung! Changing this parameter requires wiping your data
~ directories, since the partitioner can modify the sstable on-disk
~ format.
-->
<Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
<!--
~ If you are using an order-preserving partitioner and you know your key
~ distribution, you can specify the token for this node to use. (Keys
~ are sent to the node with the "closest" token, so distributing your
~ tokens equally along the key distribution space will spread keys
~ evenly across your cluster.) This setting is only checked the first
~ time a node is started.
~ This can also be useful with RandomPartitioner to force equal spacing
~ of tokens around the hash space, especially for clusters with a small
~ number of nodes.
-->
<InitialToken></InitialToken>
<!--
~ EndPointSnitch: Setting this to the class that implements
~ IEndPointSnitch which will see if two endpoints are in the same data
~ center or on the same rack. Out of the box, Cassandra provides
~ org.apache.cassandra.locator.EndPointSnitch
-->
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
<!--
~ Strategy: Setting this to the class that implements
~ IReplicaPlacementStrategy will change the way the node picker works.
~ Out of the box, Cassandra provides
~ org.apache.cassandra.locator.RackUnawareStrategy and
~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
~ a different datacenter, and the others on different racks in the same
~ one.)
-->
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<!-- Number of replicas of the data -->
<ReplicationFactor>1</ReplicationFactor>
<!--
~ Directories: Specify where Cassandra should store different data on
~ disk. Keep the data disks and the CommitLog disks separate for best
~ performance
-->
<CommitLogDirectory>target/cassandra/commitlog</CommitLogDirectory>
<DataFileDirectories>
<DataFileDirectory>target/cassandra/data</DataFileDirectory>
</DataFileDirectories>
<CalloutLocation>target/cassandra/callouts</CalloutLocation>
<StagingFileDirectory>target/cassandra/staging</StagingFileDirectory>
<!--
~ Addresses of hosts that are deemed contact points. Cassandra nodes
~ use this list of hosts to find each other and learn the topology of
~ the ring. You must change this if you are running multiple nodes!
-->
<Seeds>
<Seed>127.0.0.1</Seed>
</Seeds>
<!-- Miscellaneous -->
<!-- Time to wait for a reply from other nodes before failing the command -->
<RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
<!-- Size to allow commitlog to grow to before creating a new segment -->
<CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
<!-- Local hosts and ports -->
<!--
~ Address to bind to and tell other nodes to connect to. You _must_
~ change this if you want multiple nodes to be able to communicate!
~
~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
~ will always do the Right Thing *if* the node is properly configured
~ (hostname, name resolution, etc), and the Right Thing is to use the
~ address associated with the hostname (it might not be).
-->
<ListenAddress>localhost</ListenAddress>
<!-- TCP port, for commands and data -->
<StoragePort>7000</StoragePort>
<!-- UDP port, for membership communications (gossip) -->
<ControlPort>7001</ControlPort>
<!--
~ The address to bind the Thrift RPC service to. Unlike ListenAddress
~ above, you *can* specify 0.0.0.0 here if you want Thrift to listen on
~ all interfaces.
~
~ Leaving this blank has the same effect it does for ListenAddress,
~ (i.e. it will be based on the configured hostname of the node).
-->
<ThriftAddress>localhost</ThriftAddress>
<!-- Thrift RPC port (the port clients connect to). -->
<ThriftPort>9160</ThriftPort>
<!--
~ Whether or not to use a framed transport for Thrift. If this option
~ is set to true then you must also use a framed transport on the
~ client-side, (framed and non-framed transports are not compatible).
-->
<ThriftFramedTransport>false</ThriftFramedTransport>
<!--======================================================================-->
<!-- Memory, Disk, and Performance -->
<!--======================================================================-->
<!--
~ Buffer size to use when performing contiguous column slices. Increase
~ this to the size of the column slices you typically perform.
~ (Name-based queries are performed with a buffer size of
~ ColumnIndexSizeInKB.)
-->
<SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
<!--
~ Buffer size to use when flushing memtables to disk. (Only one
~ memtable is ever flushed at a time.) Increase (decrease) the index
~ buffer size relative to the data buffer if you have few (many)
~ columns per key. Bigger is only better _if_ your memtables get large
~ enough to use the space. (Check in your data directory after your
~ app has been running long enough.) -->
<FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
<FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
<!--
~ Add column indexes to a row after its contents reach this size.
~ Increase if your column values are large, or if you have a very large
~ number of columns. The competing causes are, Cassandra has to
~ deserialize this much of the row to read a single column, so you want
~ it to be small - at least if you do many partial-row reads - but all
~ the index data is read for each access, so you don't want to generate
~ that wastefully either.
-->
<ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
<!--
~ The maximum amount of data to store in memory per ColumnFamily before
~ flushing to disk. Note: There is one memtable per column family, and
~ this threshold is based solely on the amount of data stored, not
~ actual heap memory usage (there is some overhead in indexing the
~ columns).
-->
<MemtableSizeInMB>64</MemtableSizeInMB>
<!--
~ The maximum number of columns in millions to store in memory per
~ ColumnFamily before flushing to disk. This is also a per-memtable
~ setting. Use with MemtableSizeInMB to tune memory usage.
-->
<MemtableObjectCountInMillions>0.1</MemtableObjectCountInMillions>
<!--
~ The maximum time to leave a dirty memtable unflushed.
~ (While any affected columnfamilies have unflushed data from a
~ commit log segment, that segment cannot be deleted.)
~ This needs to be large enough that it won't cause a flush storm
~ of all your memtables flushing at once because none has hit
~ the size or count thresholds yet. For production, a larger
~ value such as 1440 is recommended.
-->
<MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
<!--
~ Unlike most systems, in Cassandra writes are faster than reads, so
~ you can afford more of those in parallel. A good rule of thumb is 2
~ concurrent reads per processor core. Increase ConcurrentWrites to
~ the number of clients writing at once if you enable CommitLogSync +
~ CommitLogSyncDelay. -->
<ConcurrentReads>8</ConcurrentReads>
<ConcurrentWrites>32</ConcurrentWrites>
<!--
~ CommitLogSync may be either "periodic" or "batch." When in batch
~ mode, Cassandra won't ack writes until the commit log has been
~ fsynced to disk. It will wait up to CommitLogSyncBatchWindowInMS
~ milliseconds for other writes, before performing the sync.
~ This is less necessary in Cassandra than in traditional databases
~ since replication reduces the odds of losing data from a failure
~ after writing the log entry but before it actually reaches the disk.
~ So the other option is "timed," where writes may be acked immediately
~ and the CommitLog is simply synced every CommitLogSyncPeriodInMS
~ milliseconds.
-->
<CommitLogSync>periodic</CommitLogSync>
<!--
~ Interval at which to perform syncs of the CommitLog in periodic mode.
~ Usually the default of 10000ms is fine; increase it if your i/o
~ load is such that syncs are taking excessively long times.
-->
<CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
<!--
~ Delay (in milliseconds) during which additional commit log entries
~ may be written before fsync in batch mode. This will increase
~ latency slightly, but can vastly improve throughput where there are
~ many writers. Set to zero to disable (each entry will be synced
~ individually). Reasonable values range from a minimal 0.1 to 10 or
~ even more if throughput matters more than latency.
-->
<!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
<!--
~ Time to wait before garbage-collection deletion markers. Set this to
~ a large enough value that you are confident that the deletion marker
~ will be propagated to all replicas by the time this many seconds has
~ elapsed, even in the face of hardware failures. The default value is
~ ten days.
-->
<GCGraceSeconds>864000</GCGraceSeconds>
<!--
~ The threshold size in megabytes the binary memtable must grow to,
~ before it's submitted for flushing to disk.
-->
<BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>
</Storage>

View file

@ -6,6 +6,10 @@ import junit.framework.TestCase
import org.junit.Test import org.junit.Test
import org.junit.Assert._ import org.junit.Assert._
import org.apache.cassandra.service.CassandraDaemon
import org.junit.BeforeClass
import org.junit.Before
import org.scalatest.junit.JUnitSuite
case class GetMapState(key: String) case class GetMapState(key: String)
case object GetVectorState case object GetVectorState
@ -70,7 +74,10 @@ class CassandraPersistentActor extends Actor {
} }
} }
class CassandraPersistentActorSpec extends TestCase { class CassandraPersistentActorSpec extends JUnitSuite {
@Before
def startCassandra = EmbeddedCassandraService.start
@Test @Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
@ -144,4 +151,28 @@ class CassandraPersistentActorSpec extends TestCase {
val result: Array[Byte] = (stateful !! GetRefState).get val result: Array[Byte] = (stateful !! GetRefState).get
assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state
} }
} }
import org.apache.cassandra.service.CassandraDaemon
object EmbeddedCassandraService {
System.setProperty("storage-config", "src/test/resources");
val cassandra = new Runnable {
val cassandraDaemon = new CassandraDaemon
cassandraDaemon.init(null)
def run = cassandraDaemon.start
}
// spawn cassandra in a new thread
val t = new Thread(cassandra)
t.setDaemon(true)
t.start
def start: Unit = {}
}

View file

@ -27,8 +27,8 @@ case class Credit(accountNo: String, amount: BigInt)
case object LogSize case object LogSize
class AccountActor extends Transactor { class AccountActor extends Transactor {
private val accountState = RedisStorage.newMap private lazy val accountState = RedisStorage.newMap
private val txnLog = RedisStorage.newVector private lazy val txnLog = RedisStorage.newVector
def receive = { def receive = {
// check balance // check balance

View file

@ -20,7 +20,7 @@ case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: Actor)
case class SZ case class SZ
class QueueActor extends Transactor { class QueueActor extends Transactor {
private val accounts = RedisStorage.newQueue private lazy val accounts = RedisStorage.newQueue
def receive = { def receive = {
// enqueue // enqueue

View file

@ -10,9 +10,8 @@ import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.OneForOneStrategy import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.state.RedisStorage
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.state.{PersistentVector, RedisStorage}
/****************************************************************************** /******************************************************************************
To run the sample: To run the sample:
@ -77,7 +76,7 @@ trait ChatStorage extends Actor
class RedisChatStorage extends ChatStorage { class RedisChatStorage extends ChatStorage {
lifeCycle = Some(LifeCycle(Permanent)) lifeCycle = Some(LifeCycle(Permanent))
private var chatLog = RedisStorage.getVector("akka.chat.log") private var chatLog = atomic { RedisStorage.getVector("akka.chat.log") }
log.info("Redis-based chat storage is starting up...") log.info("Redis-based chat storage is starting up...")

View file

@ -20,7 +20,7 @@ class SimpleService extends Transactor {
case object Tick case object Tick
private val KEY = "COUNTER" private val KEY = "COUNTER"
private var hasStartedTicking = false private var hasStartedTicking = false
private val storage = TransactionalState.newMap[String, Integer] private lazy val storage = TransactionalState.newMap[String, Integer]
@GET @GET
@Produces(Array("text/html")) @Produces(Array("text/html"))
@ -52,7 +52,7 @@ class PersistentSimpleService extends Transactor {
case object Tick case object Tick
private val KEY = "COUNTER" private val KEY = "COUNTER"
private var hasStartedTicking = false private var hasStartedTicking = false
private val storage = CassandraStorage.newMap private lazy val storage = CassandraStorage.newMap
@GET @GET
@Produces(Array("text/html")) @Produces(Array("text/html"))

View file

@ -29,11 +29,12 @@ public class PersistentSimpleService {
private String KEY = "COUNTER"; private String KEY = "COUNTER";
private boolean hasStartedTicking = false; private boolean hasStartedTicking = false;
private PersistentMap<byte[], byte[]> storage = CassandraStorage.newMap(); private PersistentMap<byte[], byte[]> storage;
@GET @GET
@Produces({"application/html"}) @Produces({"application/html"})
public String count() { public String count() {
if (storage == null) storage = CassandraStorage.newMap();
if (!hasStartedTicking) { if (!hasStartedTicking) {
storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array()); storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array());
hasStartedTicking = true; hasStartedTicking = true;

View file

@ -27,11 +27,12 @@ public class SimpleService {
private String KEY = "COUNTER"; private String KEY = "COUNTER";
private boolean hasStartedTicking = false; private boolean hasStartedTicking = false;
private TransactionalMap storage = TransactionalState.newMap(); private TransactionalMap<String, Integer> storage;
@GET @GET
@Produces({"application/json"}) @Produces({"application/json"})
public String count() { public String count() {
if (storage == null) storage = TransactionalState.newMap();
if (!hasStartedTicking) { if (!hasStartedTicking) {
storage.put(KEY, 0); storage.put(KEY, 0);
hasStartedTicking = true; hasStartedTicking = true;

View file

@ -53,7 +53,7 @@ class SimpleService extends Transactor {
case object Tick case object Tick
private val KEY = "COUNTER" private val KEY = "COUNTER"
private var hasStartedTicking = false private var hasStartedTicking = false
private val storage = TransactionalState.newMap[String, Integer] private lazy val storage = TransactionalState.newMap[String, Integer]
@GET @GET
@Produces(Array("text/html")) @Produces(Array("text/html"))
@ -105,7 +105,7 @@ class PersistentSimpleService extends Transactor {
case object Tick case object Tick
private val KEY = "COUNTER" private val KEY = "COUNTER"
private var hasStartedTicking = false private var hasStartedTicking = false
private val storage = CassandraStorage.newMap private lazy val storage = CassandraStorage.newMap
@GET @GET
@Produces(Array("text/html")) @Produces(Array("text/html"))

View file

@ -96,7 +96,7 @@ class SecureTickActor extends Actor with Logging {
case object Tick case object Tick
private val KEY = "COUNTER" private val KEY = "COUNTER"
private var hasStartedTicking = false private var hasStartedTicking = false
private val storage = TransactionalState.newMap[String, Integer] private lazy val storage = TransactionalState.newMap[String, Integer]
/** /**
* allow access for any user to "/secureticker/public" * allow access for any user to "/secureticker/public"