diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index ed021c1e16..61c70ba10b 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -17,11 +17,10 @@ import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.Helpers.ReadWriteLock -import se.scalablesolutions.akka.util.Logging - import org.codehaus.aspectwerkz.proxy.Uuid import org.multiverse.api.ThreadLocalTransaction._ +import se.scalablesolutions.akka.util.{HashCode, Logging} /** * Mix in this trait to give an actor TransactionRequired semantics. @@ -116,7 +115,7 @@ object Actor extends Logging { * */ def actor[A](body: => Unit) = { - def handler[A](body: Unit) = new { + def handler[A](body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start body @@ -215,6 +214,7 @@ trait Actor extends TransactionManagement { implicit protected val self: Actor = this // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait + // Only mutable for RemoteServer in order to maintain identity across nodes private[akka] var _uuid = Uuid.newUuid.toString def uuid = _uuid @@ -418,7 +418,7 @@ trait Actor extends TransactionManagement { _isRunning = true //if (isTransactional) this !! TransactionalInit } - Actor.log.info("[%s] has started", toString) + Actor.log.debug("[%s] has started", toString) this } @@ -471,7 +471,7 @@ trait Actor extends TransactionManagement { * actor.send(message) * */ - def !(message: AnyRef)(implicit sender: AnyRef) = { + def !(message: Any)(implicit sender: AnyRef) = { val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) else None if (_isRunning) postMessageToMailbox(message, from) @@ -482,7 +482,7 @@ trait Actor extends TransactionManagement { /** * Same as the '!' method but does not take an implicit sender as second parameter. */ - def send(message: AnyRef) = { + def send(message: Any) = { if (_isRunning) postMessageToMailbox(message, None) else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") @@ -500,7 +500,7 @@ trait Actor extends TransactionManagement { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: AnyRef, timeout: Long): Option[T] = if (_isRunning) { + def !![T](message: Any, timeout: Long): Option[T] = if (_isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) val isActiveObject = message.isInstanceOf[Invocation] if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None) @@ -527,19 +527,19 @@ trait Actor extends TransactionManagement { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: AnyRef): Option[T] = !![T](message, timeout) + def !![T](message: Any): Option[T] = !![T](message, timeout) /** * This method is evil and has been removed. Use '!!' with a timeout instead. */ - def !?[T](message: AnyRef): T = throw new UnsupportedOperationException( + def !?[T](message: Any): T = throw new UnsupportedOperationException( "'!?' is evil and has been removed. Use '!!' with a timeout instead") /** * Use reply(..) to reply with a message to the original sender of the message currently * being processed. */ - protected[this] def reply(message: AnyRef) = { + protected[this] def reply(message: Any) = { sender match { case Some(senderActor) => senderActor ! message @@ -648,8 +648,11 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def startLink(actor: Actor) = { - actor.start - link(actor) + try { + actor.start + } finally { + link(actor) + } } /** @@ -658,9 +661,12 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def startLinkRemote(actor: Actor, hostname: String, port: Int) = { - actor.makeRemote(hostname, port) - actor.start - link(actor) + try { + actor.makeRemote(hostname, port) + actor.start + } finally { + link(actor) + } } /** @@ -669,11 +675,7 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def spawn[T <: Actor](actorClass: Class[T]): T = { - val actor = actorClass.newInstance.asInstanceOf[T] - if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { - actor.dispatcher = dispatcher - actor._mailbox = _mailbox - } + val actor = spawnButDoNotStart(actorClass) actor.start actor } @@ -684,12 +686,8 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def spawnRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = { - val actor = actorClass.newInstance.asInstanceOf[T] + val actor = spawnButDoNotStart(actorClass) actor.makeRemote(hostname, port) - if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { - actor.dispatcher = dispatcher - actor._mailbox = _mailbox - } actor.start actor } @@ -700,8 +698,12 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def spawnLink[T <: Actor](actorClass: Class[T]): T = { - val actor = spawn[T](actorClass) - link(actor) + val actor = spawnButDoNotStart(actorClass) + try { + actor.start + } finally { + link(actor) + } actor } @@ -711,17 +713,30 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def spawnLinkRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = { - val actor = spawn[T](actorClass) - actor.makeRemote(hostname, port) - link(actor) + val actor = spawnButDoNotStart(actorClass) + try { + actor.makeRemote(hostname, port) + actor.start + } finally { + link(actor) + } actor } - // ================================ - // ==== IMPLEMENTATION DETAILS ==== - // ================================ + // ========================================= + // ==== INTERNAL IMPLEMENTATION DETAILS ==== + // ========================================= - private def postMessageToMailbox(message: AnyRef, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime + private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = { + val actor = actorClass.newInstance.asInstanceOf[T] + if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { + actor.dispatcher = dispatcher + actor._mailbox = _mailbox + } + actor + } + + private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -741,7 +756,7 @@ trait Actor extends TransactionManagement { } } - private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): + private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder @@ -833,7 +848,7 @@ trait Actor extends TransactionManagement { } else proceed } catch { case e => - Actor.log.error(e, "Could not invoke actor [%s]", this) + Actor.log.error(e, "Exception when invoking actor [%s] with message [%s]", this, message) if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) clearTransaction // need to clear currentTransaction before call to supervisor // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client @@ -850,7 +865,7 @@ trait Actor extends TransactionManagement { private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) private val lifeCycles: PartialFunction[Any, Unit] = { - case Init(config) => init(config) + case Init(config) => _config = Some(config); init(config) case HotSwap(code) => _hotswap = code case Restart(reason) => restart(reason) case Exit(dead, reason) => handleTrapExit(dead, reason) @@ -882,8 +897,9 @@ trait Actor extends TransactionManagement { case Permanent => actor.restart(reason) case Temporary => - Actor.log.info("Actor [%s] configured as TEMPORARY will not be restarted.", actor.id) + Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id) getLinkedActors.remove(actor) // remove the temporary actor + actor.stop } } } @@ -943,5 +959,17 @@ trait Actor extends TransactionManagement { } else message } else message + override def hashCode(): Int = { + var result = HashCode.SEED + result = HashCode.hash(result, _uuid) + result + } + + override def equals(that: Any): Boolean = { + that != null && + that.isInstanceOf[Actor] && + that.asInstanceOf[Actor]._uuid == _uuid + } + override def toString(): String = "Actor[" + id + ":" + uuid + "]" } diff --git a/akka-actors/src/main/scala/actor/ActorRegistry.scala b/akka-actors/src/main/scala/actor/ActorRegistry.scala index 327a7e6395..33ead73448 100755 --- a/akka-actors/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actors/src/main/scala/actor/ActorRegistry.scala @@ -13,7 +13,7 @@ import scala.collection.mutable.HashMap * * @author Jonas Bonér */ -object ActorRegistry { +object ActorRegistry extends Logging { private val actorsByClassName = new HashMap[String, List[Actor]] private val actorsById = new HashMap[String, List[Actor]] @@ -48,4 +48,11 @@ object ActorRegistry { actorsByClassName - actor.getClass.getName actorsById - actor.getClass.getName } + + // TODO: document ActorRegistry.shutdownAll + def shutdownAll = { + log.info("Shutting down all actors in the system...") + actorsById.foreach(entry => entry._2.map(_.stop)) + log.info("All actors have been shut down") + } } diff --git a/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala index da5f6cce1d..297c1f7087 100644 --- a/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala +++ b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala @@ -126,14 +126,13 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B val iterator = invocations.iterator while (iterator.hasNext) { val invocation = iterator.next + if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (concurrentMode) { val invoker = messageHandlers.get(invocation.receiver) - if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") - result.put(invocation, invoker) + result.put(invocation, invoker) } else if (!busyInvokers.contains(invocation.receiver)) { val invoker = messageHandlers.get(invocation.receiver) - if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") result.put(invocation, invoker) busyInvokers.add(invocation.receiver) diff --git a/akka-actors/src/main/scala/dispatch/Future.scala b/akka-actors/src/main/scala/dispatch/Future.scala index 1f9a1a4726..5b9e90c604 100644 --- a/akka-actors/src/main/scala/dispatch/Future.scala +++ b/akka-actors/src/main/scala/dispatch/Future.scala @@ -18,12 +18,12 @@ sealed trait FutureResult { def isCompleted: Boolean def isExpired: Boolean def timeoutInNanos: Long - def result: Option[AnyRef] + def result: Option[Any] def exception: Option[Tuple2[AnyRef, Throwable]] } trait CompletableFutureResult extends FutureResult { - def completeWithResult(result: AnyRef) + def completeWithResult(result: Any) def completeWithException(toBlame: AnyRef, exception: Throwable) } @@ -36,7 +36,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes private val _lock = new ReentrantLock private val _signal = _lock.newCondition private var _completed: Boolean = _ - private var _result: Option[AnyRef] = None + private var _result: Option[Any] = None private var _exception: Option[Tuple2[AnyRef, Throwable]] = None def await = try { @@ -79,7 +79,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes _lock.unlock } - def result: Option[AnyRef] = try { + def result: Option[Any] = try { _lock.lock _result } finally { @@ -93,7 +93,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes _lock.unlock } - def completeWithResult(result: AnyRef) = try { + def completeWithResult(result: Any) = try { _lock.lock if (!_completed) { _completed = true diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala index 36a1dfb989..befa25e807 100644 --- a/akka-actors/src/main/scala/dispatch/Reactor.scala +++ b/akka-actors/src/main/scala/dispatch/Reactor.scala @@ -38,7 +38,7 @@ trait MessageDemultiplexer { } class MessageInvocation(val receiver: Actor, - val message: AnyRef, + val message: Any, val future: Option[CompletableFutureResult], val sender: Option[Actor], val tx: Option[Transaction]) { @@ -67,10 +67,12 @@ class MessageInvocation(val receiver: Actor, } override def toString(): String = synchronized { - "MessageInvocation[message = " + message + - ", receiver = " + receiver + - ", sender = " + sender + - ", future = " + future + - ", tx = " + tx + "]" + "MessageInvocation[" + + "\n\tmessage = " + message + + "\n\treceiver = " + receiver + + "\n\tsender = " + sender + + "\n\tfuture = " + future + + "\n\ttx = " + tx + + "\n]" } } diff --git a/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala b/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala index 1c846a42cd..18e2e62d9b 100644 --- a/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala +++ b/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala @@ -4,14 +4,14 @@ package se.scalablesolutions.akka.nio -import akka.serialization.Serializable.SBinary +import se.scalablesolutions.akka.serialization.Serializable.SBinary +import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol} +import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} + import com.google.protobuf.{Message, ByteString} -import serialization.{Serializer, Serializable, SerializationProtocol} -import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} - object RemoteProtocolBuilder { - def getMessage(request: RemoteRequest): AnyRef = { + def getMessage(request: RemoteRequest): Any = { request.getProtocol match { case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] @@ -26,13 +26,13 @@ object RemoteProtocolBuilder { val messageClass = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] Serializer.Protobuf.in(request.getMessage.toByteArray, Some(messageClass)) case SerializationProtocol.JAVA => - Serializer.Java.in(request.getMessage.toByteArray, None) + unbox(Serializer.Java.in(request.getMessage.toByteArray, None)) case SerializationProtocol.AVRO => throw new UnsupportedOperationException("Avro protocol is not yet supported") } } - def getMessage(reply: RemoteReply): AnyRef = { + def getMessage(reply: RemoteReply): Any = { reply.getProtocol match { case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] @@ -47,15 +47,15 @@ object RemoteProtocolBuilder { val messageClass = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] Serializer.Protobuf.in(reply.getMessage.toByteArray, Some(messageClass)) case SerializationProtocol.JAVA => - Serializer.Java.in(reply.getMessage.toByteArray, None) + unbox(Serializer.Java.in(reply.getMessage.toByteArray, None)) case SerializationProtocol.AVRO => throw new UnsupportedOperationException("Avro protocol is not yet supported") } } - def setMessage(message: AnyRef, builder: RemoteRequest.Builder) = { + def setMessage(message: Any, builder: RemoteRequest.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { - val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]] + val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) @@ -68,22 +68,22 @@ object RemoteProtocolBuilder { val serializable = message.asInstanceOf[Serializable.ScalaJSON] builder.setProtocol(SerializationProtocol.SCALA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else if (message.isInstanceOf[Serializable.JavaJSON]) { val serializable = message.asInstanceOf[Serializable.JavaJSON] builder.setProtocol(SerializationProtocol.JAVA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else { // default, e.g. if no protocol used explicitly then use Java serialization builder.setProtocol(SerializationProtocol.JAVA) - builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message))) + builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message)))) } } - def setMessage(message: AnyRef, builder: RemoteReply.Builder) = { + def setMessage(message: Any, builder: RemoteReply.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { - val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]] + val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) @@ -96,16 +96,41 @@ object RemoteProtocolBuilder { val serializable = message.asInstanceOf[Serializable.ScalaJSON] builder.setProtocol(SerializationProtocol.SCALA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else if (message.isInstanceOf[Serializable.JavaJSON]) { val serializable = message.asInstanceOf[Serializable.JavaJSON] builder.setProtocol(SerializationProtocol.JAVA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else { // default, e.g. if no protocol used explicitly then use Java serialization builder.setProtocol(SerializationProtocol.JAVA) - builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message))) + builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message)))) } } + + private def box(value: Any): AnyRef = value match { + case value: Boolean => new java.lang.Boolean(value) + case value: Char => new java.lang.Character(value) + case value: Short => new java.lang.Short(value) + case value: Int => new java.lang.Integer(value) + case value: Long => new java.lang.Long(value) + case value: Float => new java.lang.Float(value) + case value: Double => new java.lang.Double(value) + case value: Byte => new java.lang.Byte(value) + case value => value.asInstanceOf[AnyRef] + } + + private def unbox(value: AnyRef): Any = value match { + case value: java.lang.Boolean => value.booleanValue + case value: java.lang.Character => value.charValue + case value: java.lang.Short => value.shortValue + case value: java.lang.Integer => value.intValue + case value: java.lang.Long => value.longValue + case value: java.lang.Float => value.floatValue + case value: java.lang.Double => value.doubleValue + case value: java.lang.Byte => value.byteValue + case value => value + } + } diff --git a/akka-actors/src/main/scala/serialization/Serializer.scala b/akka-actors/src/main/scala/serialization/Serializer.scala index 4e596a4bcb..643855a141 100644 --- a/akka-actors/src/main/scala/serialization/Serializer.scala +++ b/akka-actors/src/main/scala/serialization/Serializer.scala @@ -4,11 +4,12 @@ package se.scalablesolutions.akka.serialization -import com.google.protobuf.Message import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream} -import reflect.{BeanProperty, Manifest} -import sbinary.DefaultProtocol + +import com.google.protobuf.Message + import org.codehaus.jackson.map.ObjectMapper + import sjson.json.{Serializer =>SJSONSerializer} /** diff --git a/akka-actors/src/main/scala/stm/DataFlowVariable.scala b/akka-actors/src/main/scala/stm/DataFlowVariable.scala index 44a40f50af..2a2bcf8e0e 100644 --- a/akka-actors/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-actors/src/main/scala/stm/DataFlowVariable.scala @@ -4,61 +4,59 @@ package se.scalablesolutions.akka.state -import scala.actors.Actor -import scala.actors.OutputChannel -import scala.actors.Future -import scala.actors.Actor._ - import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} +import se.scalablesolutions.akka.actor.Actor + /** * Implements Oz-style dataflow (single assignment) variables. - * + * * @author Jonas Bonér */ -object DataFlow { - def thread(body: => Unit) = { +object DataFlow { + case object Start + case object Exit + + def thread(body: => Unit) = { val thread = new IsolatedEventBasedThread(body).start - thread ! 'start + thread send Start thread } - def thread[MessageType, ReturnType](body: MessageType => ReturnType) = - new ReactiveEventBasedThread(body).start - private class IsolatedEventBasedThread(body: => Unit) extends Actor { - def act = loop { - react { - case 'start => body - case 'exit => exit() - } + def receive = { + case Start => body + case Exit => exit } } - private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor { - def act = loop { - react { - case 'exit => exit() - case message => sender ! body(message.asInstanceOf[MessageType]) - } + def thread[A <: AnyRef, R <: AnyRef](body: A => R) = + new ReactiveEventBasedThread(body).start + + private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) + extends Actor { + def receive = { + case Exit => exit + case message => reply(body(message.asInstanceOf[A])) } } /** * @author Jonas Bonér */ - sealed class DataFlowVariable[T] { - - private sealed abstract class DataFlowVariableMessage - private case class Set[T](value: T) extends DataFlowVariableMessage - private case object Get extends DataFlowVariableMessage - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[Actor] + sealed class DataFlowVariable[T <: Any] { + val TIME_OUT = 10000 - private class In[T](dataFlow: DataFlowVariable[T]) extends Actor { - def act = loop { react { + private sealed abstract class DataFlowVariableMessage + private case class Set[T <: Any](value: T) extends DataFlowVariableMessage + private case object Get extends DataFlowVariableMessage + + private val value = new AtomicReference[Option[T]](None) + private val blockedReaders = new ConcurrentLinkedQueue[Actor] + + private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { + def receive = { case Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { val iterator = dataFlow.blockedReaders.iterator @@ -66,73 +64,75 @@ object DataFlow { dataFlow.blockedReaders.clear } else throw new DataFlowVariableException( "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case 'exit => exit() - }} + case Exit => exit + } } - - private class Out[T](dataFlow: DataFlowVariable[T]) extends Actor { - var reader: Option[OutputChannel[Any]] = None - def act = loop { react { + + private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { + var reader: Option[Actor] = None + def receive = { case Get => val ref = dataFlow.value.get - if (ref.isDefined) reply(ref.get) else reader = Some(sender) + if (ref.isDefined) reply(ref.get) + else reader = Some(sender.getOrElse(throw new IllegalStateException("No reader to DataFlowVariable is in scope"))) case Set(v) => if (reader.isDefined) reader.get ! v - case 'exit => exit() - }} + case Exit => exit + } } - + private[this] val in = { val in = new In(this); in.start; in } - def <<(ref: DataFlowVariable[T]) = in ! Set(ref()) + def <<(ref: DataFlowVariable[T]) = in send Set(ref()) - def <<(value: T) = in ! Set(value) - - def apply(): T = { + def <<(value: T) = in send Set(value) + + def apply(): T = { val ref = value.get if (ref.isDefined) ref.get else { val out = { val out = new Out(this); out.start; out } blockedReaders.offer(out) - val future: Future[T] = out !! (Get, {case t: T => t}) - val result = future() - out ! 'exit - result + val result = out !! (Get, TIME_OUT) + out send Exit + result.getOrElse(throw new DataFlowVariableException( + "Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")) } } - - def shutdown = in ! 'exit + + def shutdown = in send Exit } /** * @author Jonas Bonér */ - class DataFlowStream[T] extends Seq[T] { + class DataFlowStream[T <: Any] extends Seq[T] { private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) - - def <<<(value: T) = { + + def <<<(value: T) = { val ref = new DataFlowVariable[T] ref << value queue.offer(ref) - } - + } + def apply(): T = { val ref = queue.take ref() } - + def take: DataFlowVariable[T] = queue.take //==== For Seq ==== - + def length: Int = queue.size def apply(i: Int): T = { if (i == 0) apply() - else throw new UnsupportedOperationException("Access by index other than '0' is not supported by DataFlowSream") - } - + else throw new UnsupportedOperationException( + "Access by index other than '0' is not supported by DataFlowStream") + } + override def elements: Iterator[T] = new Iterator[T] { private val iter = queue.iterator def hasNext: Boolean = iter.hasNext @@ -141,7 +141,7 @@ object DataFlow { override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] } - + /** * @author Jonas Bonér */ @@ -158,8 +158,8 @@ object Test1 extends Application { // ======================================= // This example is rom Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language) - /* - thread + /* + thread Z = X+Y % will wait until both X and Y are bound to a value. {Browse Z} % shows the value of Z. end @@ -183,7 +183,7 @@ object Test2 extends Application { /* fun {Ints N Max} if N == Max then nil - else + else {Delay 1000} N|{Ints N+1 Max} end @@ -224,11 +224,11 @@ object Test2 extends Application { object Test3 extends Application { // Using DataFlowStream and foldLeft to calculate sum - + /* fun {Ints N Max} if N == Max then nil - else + else {Delay 1000} N|{Ints N+1 Max} end @@ -248,20 +248,20 @@ object Test3 extends Application { import DataFlow._ - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream <<< n ints(n + 1, max, stream) } - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { println("Calculating: " + s) out <<< s sum(in() + s, in, out) } def printSum(stream: DataFlowStream[Int]): Unit = { - println("Result: " + stream()) + println("Result: " + stream()) printSum(stream) } @@ -269,22 +269,22 @@ object Test3 extends Application { val consumer = new DataFlowStream[Int] thread { ints(0, 1000, producer) } - thread { + thread { Thread.sleep(1000) - println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _)) + println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _)) } } // ======================================= -object Test4 extends Application { +object Test4 extends Application { // Using DataFlowStream and recursive function to calculate sum - + /* fun {Ints N Max} if N == Max then nil - else + else {Delay 1000} N|{Ints N+1 Max} end @@ -304,20 +304,20 @@ object Test4 extends Application { import DataFlow._ - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream <<< n ints(n + 1, max, stream) } - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { println("Calculating: " + s) out <<< s sum(in() + s, in, out) } def printSum(stream: DataFlowStream[Int]): Unit = { - println("Result: " + stream()) + println("Result: " + stream()) printSum(stream) } @@ -332,6 +332,7 @@ object Test4 extends Application { // ======================================= object Test5 extends Application { + import Actor.Sender.Self import DataFlow._ // create four 'Int' data flow variables @@ -339,20 +340,20 @@ object Test5 extends Application { val main = thread { println("Thread 'main'") - + x << 1 println("'x' set to: " + x()) - + println("Waiting for 'y' to be set...") - - if (x() > y()) { + + if (x() > y()) { z << x println("'z' set to 'x': " + z()) - } else { + } else { z << y println("'z' set to 'y': " + z()) } - + // main completed, shut down the data flow variables x.shutdown y.shutdown @@ -365,18 +366,20 @@ object Test5 extends Application { Thread.sleep(5000) y << 2 println("'y' set to: " + y()) - } + } val setV = thread { println("Thread 'setV'") v << y - println("'v' set to 'y': " + v()) + println("'v' set to 'y': " + v()) } - // shut down the threads - main ! 'exit - setY ! 'exit - setV ! 'exit + // shut down the threads + main ! Exit + setY ! Exit + setV ! Exit //System.gc } + + diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index 09222366bb..606fa33fb4 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger import se.scalablesolutions.akka.state.Committable import se.scalablesolutions.akka.util.Logging -import org.multiverse.api.{Stm, Transaction => MultiverseTransaction} +import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.templates.OrElseTemplate diff --git a/akka-actors/src/test/scala/MemoryTest.scala b/akka-actors/src/test/scala/MemoryTest.scala index 083b964bc7..bfdb5b8d37 100644 --- a/akka-actors/src/test/scala/MemoryTest.scala +++ b/akka-actors/src/test/scala/MemoryTest.scala @@ -1,10 +1,7 @@ package se.scalablesolutions.akka.actor -import junit.framework.TestCase - import org.scalatest.junit.JUnitSuite import org.junit.Test -import scala.collection.mutable.HashSet class MemoryFootprintTest extends JUnitSuite { class Mem extends Actor { @@ -13,20 +10,24 @@ class MemoryFootprintTest extends JUnitSuite { } } + val NR_OF_ACTORS = 100000 + val MAX_MEMORY_FOOTPRINT_PER_ACTOR = 600 + @Test - def shouldCreateManyActors = { - /* println("============== MEMORY TEST ==============") - val actors = new HashSet[Actor] - println("Total memory: " + Runtime.getRuntime.totalMemory) - (1 until 1000000).foreach {i => - val mem = new Mem - actors += mem - if ((i % 100000) == 0) { - println("Nr actors: " + i) - println("Total memory: " + (Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory)) - } - } - */ - assert(true) + def actorsShouldHaveLessMemoryFootprintThan630Bytes = { + println("============== MEMORY FOOTPRINT TEST ==============") + // warm up + (1 until 10000).foreach(i => new Mem) + + // Actors are put in AspectRegistry when created so they won't be GCd here + + val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory + (1 until NR_OF_ACTORS).foreach(i => new Mem) + + val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory + val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS + + println("Memory footprint per actor is : " + memPerActor) + assert(memPerActor < MAX_MEMORY_FOOTPRINT_PER_ACTOR) // memory per actor should be less than 630 bytes } } diff --git a/akka-fun-test-java/pom.xml b/akka-fun-test-java/pom.xml index f6e6a727b7..8f8a6b41fc 100644 --- a/akka-fun-test-java/pom.xml +++ b/akka-fun-test-java/pom.xml @@ -23,7 +23,7 @@ com.sun.grizzly grizzly-servlet-webserver - 1.8.6.3 + ${grizzly.version} test diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java index 8ec14fcb91..059b81c1e8 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java @@ -8,7 +8,7 @@ public class PersistentClasher { @inittransactionalstate public void init() { - state = PersistentState.newMap(new CassandraStorageConfig()); + state = CassandraStorage.newMap(); } public String getState(String key) { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 02a091c1f6..3cac0ae062 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -12,18 +12,19 @@ public class PersistentStateful { @inittransactionalstate public void init() { - mapState = PersistentState.newMap(new CassandraStorageConfig()); - vectorState = PersistentState.newVector(new CassandraStorageConfig()); - refState = PersistentState.newRef(new CassandraStorageConfig()); + mapState = CassandraStorage.newMap(); + vectorState = CassandraStorage.newVector(); + refState = CassandraStorage.newRef(); } public String getMapState(String key) { - return (String) mapState.get(key).get(); + byte[] bytes = (byte[]) mapState.get(key.getBytes()).get(); + return new String(bytes, 0, bytes.length); } - public String getVectorState(int index) { - return (String) vectorState.get(index); + byte[] bytes = (byte[]) vectorState.get(index); + return new String(bytes, 0, bytes.length); } public int getVectorLength() { @@ -32,62 +33,51 @@ public class PersistentStateful { public String getRefState() { if (refState.isDefined()) { - return (String) refState.get().get(); + byte[] bytes = (byte[]) refState.get().get(); + return new String(bytes, 0, bytes.length); } else throw new IllegalStateException("No such element"); } - public void setMapState(String key, String msg) { - mapState.put(key, msg); + mapState.put(key.getBytes(), msg.getBytes()); } - public void setVectorState(String msg) { - vectorState.add(msg); + vectorState.add(msg.getBytes()); } - public void setRefState(String msg) { - refState.swap(msg); + refState.swap(msg.getBytes()); } - public void success(String key, String msg) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); } - public String failure(String key, String msg, PersistentFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); failer.fail(); return msg; } public String success(String key, String msg, PersistentStatefulNested nested) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); nested.success(key, msg); return msg; } - public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); nested.failure(key, msg, failer); return msg; } - - - - public void thisMethodHangs(String key, String msg, PersistentFailer failer) { - setMapState(key, msg); - } } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java index 3251184789..50e9b7ae1d 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java @@ -12,18 +12,20 @@ public class PersistentStatefulNested { @inittransactionalstate public void init() { - mapState = PersistentState.newMap(new CassandraStorageConfig()); - vectorState = PersistentState.newVector(new CassandraStorageConfig()); - refState = PersistentState.newRef(new CassandraStorageConfig()); + mapState = CassandraStorage.newMap(); + vectorState = CassandraStorage.newVector(); + refState = CassandraStorage.newRef(); } public String getMapState(String key) { - return (String) mapState.get(key).get(); + byte[] bytes = (byte[]) mapState.get(key.getBytes()).get(); + return new String(bytes, 0, bytes.length); } public String getVectorState(int index) { - return (String) vectorState.get(index); + byte[] bytes = (byte[]) vectorState.get(index); + return new String(bytes, 0, bytes.length); } public int getVectorLength() { @@ -32,45 +34,36 @@ public class PersistentStatefulNested { public String getRefState() { if (refState.isDefined()) { - return (String) refState.get().get(); + byte[] bytes = (byte[]) refState.get().get(); + return new String(bytes, 0, bytes.length); } else throw new IllegalStateException("No such element"); } - public void setMapState(String key, String msg) { - mapState.put(key, msg); + mapState.put(key.getBytes(), msg.getBytes()); } - public void setVectorState(String msg) { - vectorState.add(msg); + vectorState.add(msg.getBytes()); } - public void setRefState(String msg) { - refState.swap(msg); + refState.swap(msg.getBytes()); } - public String success(String key, String msg) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); return msg; } - public String failure(String key, String msg, PersistentFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); failer.fail(); return msg; } - - - public void thisMethodHangs(String key, String msg, PersistentFailer failer) { - setMapState(key, msg); - } } diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index 3d0f0f9313..f021c59e67 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -165,7 +165,7 @@ com.sun.grizzly grizzly-comet-webserver - 1.8.6.3 + ${grizzly.version} com.sun.jersey diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 70fc709f0f..79f256874c 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -14,6 +14,7 @@ import java.net.URLClassLoader import se.scalablesolutions.akka.nio.RemoteNode import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.actor.ActorRegistry /** * The Akka Kernel. @@ -32,10 +33,14 @@ object Kernel extends Logging { // FIXME add API to shut server down gracefully @volatile private var hasBooted = false - private var jerseySelectorThread: SelectorThread = _ + private var jerseySelectorThread: Option[SelectorThread] = None private val startTime = System.currentTimeMillis private var applicationLoader: Option[ClassLoader] = None + private lazy val remoteServerThread = new Thread(new Runnable() { + def run = RemoteNode.start(applicationLoader) + }, "Akka Remote Service") + def main(args: Array[String]) = boot /** @@ -61,20 +66,33 @@ object Kernel extends Logging { hasBooted = true } } - - def startRemoteService = { - // FIXME manage remote serve thread for graceful shutdown - val remoteServerThread = new Thread(new Runnable() { - def run = RemoteNode.start(applicationLoader) - }, "Akka Remote Service") - remoteServerThread.start + + // TODO document Kernel.shutdown + def shutdown = synchronized { + if (hasBooted) { + log.info("Shutting down Akka...") + ActorRegistry.shutdownAll + if (jerseySelectorThread.isDefined) { + log.info("Shutting down REST service (Jersey)") + jerseySelectorThread.get.stopEndpoint + } + if (remoteServerThread.isAlive) { + log.info("Shutting down remote service") + RemoteNode.shutdown + remoteServerThread.join(1000) + } + log.info("Akka succesfully shut down") + } } + def startRemoteService = remoteServerThread.start + def startREST = { val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() val scheme = uri.getScheme - if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException("The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") + if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( + "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") val adapter = new ServletAdapter adapter.setHandleStaticResources(true) @@ -83,19 +101,19 @@ object Kernel extends Logging { //Using autodetection for now //adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") - log.info("REST service root path: [" + adapter.getRootFolder + "] and context path [" + adapter.getContextPath + "] ") + log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) val ah = new com.sun.grizzly.arp.DefaultAsyncHandler ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter) - jerseySelectorThread = new SelectorThread - jerseySelectorThread.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName) - jerseySelectorThread.setPort(REST_PORT) - jerseySelectorThread.setAdapter(adapter) - jerseySelectorThread.setEnableAsyncExecution(true) - jerseySelectorThread.setAsyncHandler(ah) - jerseySelectorThread.listen + jerseySelectorThread = Some(new SelectorThread) + jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName) + jerseySelectorThread.get.setPort(REST_PORT) + jerseySelectorThread.get.setAdapter(adapter) + jerseySelectorThread.get.setEnableAsyncExecution(true) + jerseySelectorThread.get.setAsyncHandler(ah) + jerseySelectorThread.get.listen - log.info("REST service started successfully. Listening to port [" + REST_PORT + "]") + log.info("REST service started successfully. Listening to port [%s]", REST_PORT) } private def runApplicationBootClasses = { @@ -113,7 +131,8 @@ object Kernel extends Logging { new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { getClass.getClassLoader - } else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") + } else throw new IllegalStateException( + "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") for (clazz <- BOOT_CLASSES) { log.info("Loading boot class [%s]", clazz) loader.loadClass(clazz).newInstance @@ -132,7 +151,7 @@ object Kernel extends Logging { (____ /__|_ \__|_ \(____ / \/ \/ \/ \/ """) - log.info(" Running version " + VERSION) + log.info(" Running version %s", VERSION) log.info("==============================") } } \ No newline at end of file diff --git a/akka-persistence/pom.xml b/akka-persistence/pom.xml index 91a0079611..10f90035b9 100644 --- a/akka-persistence/pom.xml +++ b/akka-persistence/pom.xml @@ -30,7 +30,7 @@ com.mongodb mongo - 0.6 + 1.0 @@ -49,7 +49,12 @@ commons-pool 1.5.1 - + + log4j + log4j + 1.2.13 + + org.scalatest diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorageBackend.scala similarity index 61% rename from akka-persistence/src/main/scala/CassandraStorage.scala rename to akka-persistence/src/main/scala/CassandraStorageBackend.scala index 74f06d3a9d..f5719625c7 100644 --- a/akka-persistence/src/main/scala/CassandraStorage.scala +++ b/akka-persistence/src/main/scala/CassandraStorageBackend.scala @@ -6,7 +6,6 @@ package se.scalablesolutions.akka.state import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Helpers._ -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.Config.config import org.apache.cassandra.service._ @@ -14,8 +13,14 @@ import org.apache.cassandra.service._ /** * @author Jonas Bonér */ -object CassandraStorage extends MapStorage - with VectorStorage with RefStorage with Logging { +private[akka] object CassandraStorageBackend extends + MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + Logging { + + type ElementType = Array[Byte] + val KEYSPACE = "akka" val MAP_COLUMN_PARENT = new ColumnParent("map", null) val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null) @@ -31,35 +36,14 @@ object CassandraStorage extends MapStorage case "ONE" => 1 case "QUORUM" => 2 case "ALL" => 3 - case unknown => throw new IllegalArgumentException("Consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]") + case unknown => throw new IllegalArgumentException( + "Cassandra consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]") } } val IS_ASCENDING = true @volatile private[this] var isRunning = false private[this] val protocol: Protocol = Protocol.Binary -/* { - config.getString("akka.storage.cassandra.procotol", "binary") match { - case "binary" => Protocol.Binary - case "json" => Protocol.JSON - case "simple-json" => Protocol.SimpleJSON - case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") - } - } -*/ - - private[this] val serializer: Serializer = { - config.getString("akka.storage.cassandra.storage-format", "manual") match { - case "scala-json" => Serializer.ScalaJSON - case "java-json" => Serializer.JavaJSON - case "protobuf" => Serializer.Protobuf - case "java" => Serializer.Java - case "manual" => Serializer.NOOP - case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage") - case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage") - case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") - } - } private[this] val sessions = new CassandraSessionPool( KEYSPACE, @@ -71,22 +55,22 @@ object CassandraStorage extends MapStorage // For Ref // =============================================================== - def insertRefStorageFor(name: String, element: AnyRef) = { + def insertRefStorageFor(name: String, element: Array[Byte]) = { sessions.withSession { _ ++| (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY), - serializer.out(element), + element, System.currentTimeMillis, CONSISTENCY_LEVEL) } } - def getRefStorageFor(name: String): Option[AnyRef] = { + def getRefStorageFor(name: String): Option[Array[Byte]] = { try { val column: Option[ColumnOrSuperColumn] = sessions.withSession { _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY)) } - if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None)) + if (column.isDefined) Some(column.get.getColumn.value) else None } catch { case e => @@ -99,40 +83,40 @@ object CassandraStorage extends MapStorage // For Vector // =============================================================== - def insertVectorStorageEntryFor(name: String, element: AnyRef) = { + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { sessions.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))), - serializer.out(element), + element, System.currentTimeMillis, CONSISTENCY_LEVEL) } } // FIXME implement insertVectorStorageEntriesFor - def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { - throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet") + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorageBackend is not implemented yet") } - def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = { + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { sessions.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)), - serializer.out(elem), + elem, System.currentTimeMillis, CONSISTENCY_LEVEL) } } - def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { val column: Option[ColumnOrSuperColumn] = sessions.withSession { _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index))) } - if (column.isDefined) serializer.in(column.get.column.value, None) + if (column.isDefined) column.get.column.value else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") } - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val columns: List[ColumnOrSuperColumn] = sessions.withSession { @@ -143,7 +127,7 @@ object CassandraStorage extends MapStorage count, CONSISTENCY_LEVEL) } - columns.map(column => serializer.in(column.getColumn.value, None)) + columns.map(column => column.getColumn.value) } def getVectorStorageSizeFor(name: String): Int = { @@ -156,21 +140,21 @@ object CassandraStorage extends MapStorage // For Map // =============================================================== - def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = { + def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = { sessions.withSession { _ ++| (name, - new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)), - serializer.out(element), + new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key), + element, System.currentTimeMillis, CONSISTENCY_LEVEL) } } - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = { val batch = new scala.collection.mutable.HashMap[String, List[ColumnOrSuperColumn]] for (entry <- entries) { val columnOrSuperColumn = new ColumnOrSuperColumn - columnOrSuperColumn.setColumn(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis)) + columnOrSuperColumn.setColumn(new Column(entry._1, entry._2, System.currentTimeMillis)) batch + (MAP_COLUMN_PARENT.getColumn_family -> List(columnOrSuperColumn)) } sessions.withSession { @@ -178,12 +162,12 @@ object CassandraStorage extends MapStorage } } - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { try { val column: Option[ColumnOrSuperColumn] = sessions.withSession { - _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key))) + _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key)) } - if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None)) + if (column.isDefined) Some(column.get.getColumn.value) else None } catch { case e => @@ -192,13 +176,16 @@ object CassandraStorage extends MapStorage } } - def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { + def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = { val size = getMapStorageSizeFor(name) sessions.withSession { session => - val columns = session / (name, MAP_COLUMN_PARENT, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, true, size, CONSISTENCY_LEVEL) + val columns = session / + (name, MAP_COLUMN_PARENT, + EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, + true, size, CONSISTENCY_LEVEL) for { columnOrSuperColumn <- columns - entry = (serializer.in(columnOrSuperColumn.column.name, None), serializer.in(columnOrSuperColumn.column.value, None)) + entry = (columnOrSuperColumn.column.name, columnOrSuperColumn.column.value) } yield entry } } @@ -209,8 +196,8 @@ object CassandraStorage extends MapStorage def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) - def removeMapStorageFor(name: String, key: AnyRef): Unit = { - val keyBytes = if (key == null) null else serializer.out(key) + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + val keyBytes = if (key == null) null else key sessions.withSession { _ -- (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes), @@ -219,13 +206,13 @@ object CassandraStorage extends MapStorage } } - def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): - List[Tuple2[AnyRef, AnyRef]] = { - val startBytes = if (start.isDefined) serializer.out(start.get) else null - val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): + List[Tuple2[Array[Byte], Array[Byte]]] = { + val startBytes = if (start.isDefined) start.get else null + val finishBytes = if (finish.isDefined) finish.get else null val columns: List[ColumnOrSuperColumn] = sessions.withSession { _ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL) } - columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None))) + columns.map(column => (column.getColumn.name, column.getColumn.value)) } } diff --git a/akka-persistence/src/main/scala/MongoStorage.scala b/akka-persistence/src/main/scala/MongoStorageBackend.scala similarity index 88% rename from akka-persistence/src/main/scala/MongoStorage.scala rename to akka-persistence/src/main/scala/MongoStorageBackend.scala index 8fd7a0c4b5..ae8d1ecb7a 100644 --- a/akka-persistence/src/main/scala/MongoStorage.scala +++ b/akka-persistence/src/main/scala/MongoStorageBackend.scala @@ -4,8 +4,8 @@ package se.scalablesolutions.akka.state -import util.Logging -import Config.config +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.Config.config import sjson.json.Serializer._ @@ -23,8 +23,12 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} *

* @author Debasish Ghosh */ -object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging { - +private[akka] object MongoStorageBackend extends + MapStorageBackend[AnyRef, AnyRef] with + VectorStorageBackend[AnyRef] with + RefStorageBackend[AnyRef] with + Logging { + // enrich with null safe findOne class RichDBCollection(value: DBCollection) { def findOneNS(o: DBObject): Option[DBObject] = { @@ -34,43 +38,43 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } } } - + implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c) - + val KEY = "key" val VALUE = "value" val COLLECTION = "akka_coll" - + val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1") val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb") val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017) - val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME) - val coll = db.getCollection(COLLECTION) + val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT) + val coll = db.getDB(MONGODB_SERVER_DBNAME).getCollection(COLLECTION) // FIXME: make this pluggable private[this] val serializer = SJSON - + def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) { insertMapStorageEntriesFor(name, List((key, value))) } def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) { import java.util.{Map, HashMap} - + val m: Map[AnyRef, AnyRef] = new HashMap for ((k, v) <- entries) { m.put(k, serializer.out(v)) } - + nullSafeFindOne(name) match { - case None => + case None => coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m)) case Some(dbo) => { // collate the maps val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]] o.putAll(m) - + // remove existing reference removeMapStorageFor(name) // and insert @@ -78,16 +82,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } } } - - def removeMapStorageFor(name: String) = { + + def removeMapStorageFor(name: String): Unit = { val q = new BasicDBObject q.put(KEY, name) coll.remove(q) } - def removeMapStorageFor(name: String, key: AnyRef) = { + def removeMapStorageFor(name: String, key: AnyRef): Unit = { nullSafeFindOne(name) match { - case None => + case None => case Some(dbo) => { val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap if (key.isInstanceOf[List[_]]) { @@ -104,10 +108,10 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } } } - - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = + + def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = getValueForKey(name, key.asInstanceOf[String]) - + def getMapStorageSizeFor(name: String): Int = { nullSafeFindOne(name) match { case None => 0 @@ -115,55 +119,55 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size } } - + def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { - val m = + val m = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] } - val n = + val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]] - val vals = - for(s <- n) + val vals = + for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) vals.asInstanceOf[List[Tuple2[String, AnyRef]]] } - - def getMapStorageRangeFor(name: String, start: Option[AnyRef], - finish: Option[AnyRef], + + def getMapStorageRangeFor(name: String, start: Option[AnyRef], + finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = { - val m = + val m = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] } /** - * count is the max number of results to return. Start with + * count is the max number of results to return. Start with * start or 0 (if start is not defined) and go until * you hit finish or count. */ val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0 - val cnt = + val cnt = if (finish.isDefined) { val f = finish.get.asInstanceOf[Int] if (f >= s) Math.min(count, (f - s)) else count } else count - val n = + val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt) - val vals = - for(s <- n) + val vals = + for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) vals.asInstanceOf[List[Tuple2[String, AnyRef]]] } - + private def getValueForKey(name: String, key: String): Option[AnyRef] = { try { nullSafeFindOne(name) match { @@ -179,16 +183,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L throw new Predef.NoSuchElementException(e.getMessage) } } - + def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { val q = new BasicDBObject q.put(KEY, name) - + val currentList = coll.findOneNS(q) match { - case None => + case None => new JArrayList[AnyRef] - case Some(dbo) => + case Some(dbo) => dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]] } if (!currentList.isEmpty) { @@ -196,26 +200,26 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L // remove before adding coll.remove(q) } - + // add to the current list elements.map(serializer.out(_)).foreach(currentList.add(_)) - + coll.insert( new BasicDBObject() .append(KEY, name) .append(VALUE, currentList) ) } - + def insertVectorStorageEntryFor(name: String, element: AnyRef) = { insertVectorStorageEntriesFor(name, List(element)) } - + def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { try { val o = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => @@ -224,17 +228,17 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L serializer.in[AnyRef]( o.get(index).asInstanceOf[Array[Byte]]) } catch { - case e => + case e => throw new Predef.NoSuchElementException(e.getMessage) } } - - def getVectorStorageRangeFor(name: String, + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { try { val o = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => @@ -242,24 +246,24 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } // pick the subrange and make a Scala list - val l = + val l = List(o.subList(start.get, start.get + count).toArray: _*) - for(e <- l) + for(e <- l) yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]) } catch { - case e => + case e => throw new Predef.NoSuchElementException(e.getMessage) } } - + // FIXME implement updateVectorStorageEntryFor def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException - + def getVectorStorageSizeFor(name: String): Int = { nullSafeFindOne(name) match { case None => 0 - case Some(dbo) => + case Some(dbo) => dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size } } diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala deleted file mode 100644 index f08d2cd925..0000000000 --- a/akka-persistence/src/main/scala/PersistentState.scala +++ /dev/null @@ -1,352 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.state - -import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction -import se.scalablesolutions.akka.collection._ -import se.scalablesolutions.akka.util.Logging - -import org.codehaus.aspectwerkz.proxy.Uuid - -class NoTransactionInScopeException extends RuntimeException - -sealed abstract class PersistentStateConfig -abstract class PersistentStorageConfig extends PersistentStateConfig -case class CassandraStorageConfig() extends PersistentStorageConfig -case class TerracottaStorageConfig() extends PersistentStorageConfig -case class TokyoCabinetStorageConfig() extends PersistentStorageConfig -case class MongoStorageConfig() extends PersistentStorageConfig - -/** - * Example Scala usage. - *

- * New map with generated id. - *

- * val myMap = PersistentState.newMap(CassandraStorageConfig)
- * 
- * - * New map with user-defined id. - *
- * val myMap = PersistentState.newMap(CassandraStorageConfig, id)
- * 
- * - * Get map by user-defined id. - *
- * val myMap = PersistentState.getMap(CassandraStorageConfig, id)
- * 
- * - * Example Java usage: - *
- * TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
- * 
- * - * @author Jonas Bonér - */ -object PersistentState { - def newMap(config: PersistentStorageConfig): PersistentMap = - // FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/] - newMap(config, Uuid.newUuid.toString) - - def newVector(config: PersistentStorageConfig): PersistentVector = - newVector(config, Uuid.newUuid.toString) - - def newRef(config: PersistentStorageConfig): PersistentRef = - newRef(config, Uuid.newUuid.toString) - - def getMap(config: PersistentStorageConfig, id: String): PersistentMap = - newMap(config, id) - - def getVector(config: PersistentStorageConfig, id: String): PersistentVector = - newVector(config, id) - - def getRef(config: PersistentStorageConfig, id: String): PersistentRef = - newRef(config, id) - - def newMap(config: PersistentStorageConfig, id: String): PersistentMap = config match { - case CassandraStorageConfig() => new CassandraPersistentMap(id) - case MongoStorageConfig() => new MongoPersistentMap(id) - case TerracottaStorageConfig() => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException - } - - def newVector(config: PersistentStorageConfig, id: String): PersistentVector = config match { - case CassandraStorageConfig() => new CassandraPersistentVector(id) - case MongoStorageConfig() => new MongoPersistentVector(id) - case TerracottaStorageConfig() => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException - } - - def newRef(config: PersistentStorageConfig, id: String): PersistentRef = config match { - case CassandraStorageConfig() => new CassandraPersistentRef(id) - case MongoStorageConfig() => new MongoPersistentRef(id) - case TerracottaStorageConfig() => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException - } -} - -/** - * Implementation of PersistentMap for every concrete - * storage will have the same workflow. This abstracts the workflow. - * - * Subclasses just need to provide the actual concrete instance for the - * abstract val storage. - * - * @author Jonas Bonér - */ -trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] - with Transactional with Committable with Logging { - protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef] - protected val removedEntries = TransactionalState.newVector[AnyRef] - protected val shouldClearOnCommit = TransactionalRef[Boolean]() - - // to be concretized in subclasses - val storage: MapStorage - - def commit = { - storage.removeMapStorageFor(uuid, removedEntries.toList) - storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) - if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) - storage.removeMapStorageFor(uuid) - newAndUpdatedEntries.clear - removedEntries.clear - } - - def -=(key: AnyRef) = remove(key) - - def +=(key: AnyRef, value: AnyRef) = put(key, value) - - override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = { - register - newAndUpdatedEntries.put(key, value) - } - - override def update(key: AnyRef, value: AnyRef) = { - register - newAndUpdatedEntries.update(key, value) - } - - def remove(key: AnyRef) = { - register - removedEntries.add(key) - } - - def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = - slice(start, None, count) - - def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): - List[Tuple2[AnyRef, AnyRef]] = try { - storage.getMapStorageRangeFor(uuid, start, finish, count) - } catch { case e: Exception => Nil } - - override def clear = { - register - shouldClearOnCommit.swap(true) - } - - override def contains(key: AnyRef): Boolean = try { - newAndUpdatedEntries.contains(key) || - storage.getMapStorageEntryFor(uuid, key).isDefined - } catch { case e: Exception => false } - - override def size: Int = try { - storage.getMapStorageSizeFor(uuid) - } catch { case e: Exception => 0 } - - override def get(key: AnyRef): Option[AnyRef] = { - if (newAndUpdatedEntries.contains(key)) { - newAndUpdatedEntries.get(key) - } - else try { - storage.getMapStorageEntryFor(uuid, key) - } catch { case e: Exception => None } - } - - override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = { - new Iterator[Tuple2[AnyRef, AnyRef]] { - private val originalList: List[Tuple2[AnyRef, AnyRef]] = try { - storage.getMapStorageFor(uuid) - } catch { - case e: Throwable => Nil - } - // FIXME how to deal with updated entries, these should be replaced in the originalList not just added - private var elements = newAndUpdatedEntries.toList ::: originalList.reverse - override def next: Tuple2[AnyRef, AnyRef]= synchronized { - val element = elements.head - elements = elements.tail - element - } - override def hasNext: Boolean = synchronized { !elements.isEmpty } - } - } - - private def register = { - if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException - currentTransaction.get.get.register(uuid, this) - } -} - -/** - * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. - * - * @author Jonas Bonér - */ -class CassandraPersistentMap(id: String) extends PersistentMap { - val uuid = id - val storage = CassandraStorage -} - -/** - * Implements a persistent transactional map based on the MongoDB document storage. - * - * @author Debasish Ghosh - */ -class MongoPersistentMap(id: String) extends PersistentMap { - val uuid = id - val storage = MongoStorage -} - -/** - * Implements a template for a concrete persistent transactional vector based storage. - * - * @author Jonas Bonér - */ -trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with Committable { - protected val newElems = TransactionalState.newVector[AnyRef] - protected val updatedElems = TransactionalState.newMap[Int, AnyRef] - protected val removedElems = TransactionalState.newVector[AnyRef] - protected val shouldClearOnCommit = TransactionalRef[Boolean]() - - val storage: VectorStorage - - def commit = { - // FIXME: should use batch function once the bug is resolved - for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) - for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) - newElems.clear - updatedElems.clear - } - - def +(elem: AnyRef) = add(elem) - - def add(elem: AnyRef) = { - register - newElems + elem - } - - def apply(index: Int): AnyRef = get(index) - - def get(index: Int): AnyRef = { - if (newElems.size > index) newElems(index) - else storage.getVectorStorageEntryFor(uuid, index) - } - - override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count) - - def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = { - val buffer = new scala.collection.mutable.ArrayBuffer[AnyRef] - storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_)) - buffer - } - - /** - * Removes the tail element of this vector. - */ - // FIXME: implement persistent vector pop - def pop: AnyRef = { - register - throw new UnsupportedOperationException("need to implement persistent vector pop") - } - - def update(index: Int, newElem: AnyRef) = { - register - storage.updateVectorStorageEntryFor(uuid, index, newElem) - } - - override def first: AnyRef = get(0) - - override def last: AnyRef = { - if (newElems.length != 0) newElems.last - else { - val len = length - if (len == 0) throw new NoSuchElementException("Vector is empty") - get(len - 1) - } - } - - def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length - - private def register = { - if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException - currentTransaction.get.get.register(uuid, this) - } -} - -/** - * Implements a persistent transactional vector based on the Cassandra - * distributed P2P key-value storage. - * - * @author Jonas Bonér - */ -class CassandraPersistentVector(id: String) extends PersistentVector { - val uuid = id - val storage = CassandraStorage -} - -/** - * Implements a persistent transactional vector based on the MongoDB - * document storage. - * - * @author Debaissh Ghosh - */ -class MongoPersistentVector(id: String) extends PersistentVector { - val uuid = id - val storage = MongoStorage -} - -/** - * Implements a persistent reference with abstract storage. - * - * @author Jonas Bonér - */ -trait PersistentRef extends Transactional with Committable { - protected val ref = new TransactionalRef[AnyRef] - - val storage: RefStorage - - def commit = if (ref.isDefined) { - storage.insertRefStorageFor(uuid, ref.get.get) - ref.swap(null) - } - - def swap(elem: AnyRef) = { - register - ref.swap(elem) - } - - def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) - - def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined - - def getOrElse(default: => AnyRef): AnyRef = { - val current = get - if (current.isDefined) current.get - else default - } - - private def register = { - if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException - currentTransaction.get.get.register(uuid, this) - } -} - -class CassandraPersistentRef(id: String) extends PersistentRef { - val uuid = id - val storage = CassandraStorage -} - -class MongoPersistentRef(id: String) extends PersistentRef { - val uuid = id - val storage = MongoStorage -} diff --git a/akka-persistence/src/main/scala/Storage.scala b/akka-persistence/src/main/scala/Storage.scala index 52dc45afa7..5f30cc3319 100644 --- a/akka-persistence/src/main/scala/Storage.scala +++ b/akka-persistence/src/main/scala/Storage.scala @@ -4,33 +4,352 @@ package se.scalablesolutions.akka.state -// abstracts persistence storage -trait Storage +import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction +import se.scalablesolutions.akka.collection._ +import se.scalablesolutions.akka.util.Logging -// for Maps -trait MapStorage extends Storage { - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) - def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) - def removeMapStorageFor(name: String) - def removeMapStorageFor(name: String, key: AnyRef) - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] - def getMapStorageSizeFor(name: String): Int - def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] - def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] +import org.codehaus.aspectwerkz.proxy.Uuid + +class NoTransactionInScopeException extends RuntimeException + +/** + * Example Scala usage. + *

+ * New map with generated id. + *

+ * val myMap = CassandraStorage.newMap
+ * 
+ * + * New map with user-defined id. + *
+ * val myMap = MongoStorage.newMap(id)
+ * 
+ * + * Get map by user-defined id. + *
+ * val myMap = CassandraStorage.getMap(id)
+ * 
+ * + * Example Java usage: + *
+ * PersistentMap myMap = MongoStorage.newMap();
+ * 
+ * Or: + *
+ * MongoPersistentMap myMap = MongoStorage.getMap(id);
+ * 
+ * + * @author Jonas Bonér + */ +trait Storage { + // FIXME: The UUID won't work across the remote machines, use [http://johannburkard.de/software/uuid/] + type ElementType + + def newMap: PersistentMap[ElementType, ElementType] + def newVector: PersistentVector[ElementType] + def newRef: PersistentRef[ElementType] + + def getMap(id: String): PersistentMap[ElementType, ElementType] + def getVector(id: String): PersistentVector[ElementType] + def getRef(id: String): PersistentRef[ElementType] + + def newMap(id: String): PersistentMap[ElementType, ElementType] + def newVector(id: String): PersistentVector[ElementType] + def newRef(id: String): PersistentRef[ElementType] } -// for Vectors -trait VectorStorage extends Storage { - def insertVectorStorageEntryFor(name: String, element: AnyRef) - def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) - def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) - def getVectorStorageEntryFor(name: String, index: Int): AnyRef - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] - def getVectorStorageSizeFor(name: String): Int +object CassandraStorage extends Storage { + type ElementType = Array[Byte] + + def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new CassandraPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new CassandraPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new CassandraPersistentRef(id) } -// for Ref -trait RefStorage extends Storage { - def insertRefStorageFor(name: String, element: AnyRef) - def getRefStorageFor(name: String): Option[AnyRef] +object MongoStorage extends Storage { + type ElementType = AnyRef + + def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new MongoPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new MongoPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new MongoPersistentRef(id) +} + +/** + * Implementation of PersistentMap for every concrete + * storage will have the same workflow. This abstracts the workflow. + * + * Subclasses just need to provide the actual concrete instance for the + * abstract val storage. + * + * @author Jonas Bonér + */ +trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] + with Transactional with Committable with Logging { + protected val newAndUpdatedEntries = TransactionalState.newMap[K, V] + protected val removedEntries = TransactionalState.newVector[K] + protected val shouldClearOnCommit = TransactionalRef[Boolean]() + + // to be concretized in subclasses + val storage: MapStorageBackend[K, V] + + def commit = { + removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key)) + storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) + if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) + storage.removeMapStorageFor(uuid) + newAndUpdatedEntries.clear + removedEntries.clear + } + + def -=(key: K) = remove(key) + + def +=(key: K, value: V) = put(key, value) + + override def put(key: K, value: V): Option[V] = { + register + newAndUpdatedEntries.put(key, value) + } + + override def update(key: K, value: V) = { + register + newAndUpdatedEntries.update(key, value) + } + + def remove(key: K) = { + register + removedEntries.add(key) + } + + def slice(start: Option[K], count: Int): List[Tuple2[K, V]] = + slice(start, None, count) + + def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try { + storage.getMapStorageRangeFor(uuid, start, finish, count) + } catch { case e: Exception => Nil } + + override def clear = { + register + shouldClearOnCommit.swap(true) + } + + override def contains(key: K): Boolean = try { + newAndUpdatedEntries.contains(key) || + storage.getMapStorageEntryFor(uuid, key).isDefined + } catch { case e: Exception => false } + + override def size: Int = try { + storage.getMapStorageSizeFor(uuid) + } catch { case e: Exception => 0 } + + override def get(key: K): Option[V] = { + if (newAndUpdatedEntries.contains(key)) { + newAndUpdatedEntries.get(key) + } + else try { + storage.getMapStorageEntryFor(uuid, key) + } catch { case e: Exception => None } + } + + override def elements: Iterator[Tuple2[K, V]] = { + new Iterator[Tuple2[K, V]] { + private val originalList: List[Tuple2[K, V]] = try { + storage.getMapStorageFor(uuid) + } catch { + case e: Throwable => Nil + } + // FIXME how to deal with updated entries, these should be replaced in the originalList not just added + private var elements = newAndUpdatedEntries.toList ::: originalList.reverse + override def next: Tuple2[K, V]= synchronized { + val element = elements.head + elements = elements.tail + element + } + override def hasNext: Boolean = synchronized { !elements.isEmpty } + } + } + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} + +/** + * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. + * + * @author Jonas Bonér + */ +class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] { + val uuid = id + val storage = CassandraStorageBackend +} + +/** + * Implements a persistent transactional map based on the MongoDB document storage. + * + * @author Debasish Ghosh + */ +class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] { + val uuid = id + val storage = MongoStorageBackend +} + +/** + * Implements a template for a concrete persistent transactional vector based storage. + * + * @author Jonas Bonér + */ +trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Committable { + protected val newElems = TransactionalState.newVector[T] + protected val updatedElems = TransactionalState.newMap[Int, T] + protected val removedElems = TransactionalState.newVector[T] + protected val shouldClearOnCommit = TransactionalRef[Boolean]() + + val storage: VectorStorageBackend[T] + + def commit = { + // FIXME: should use batch function once the bug is resolved + for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) + for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) + newElems.clear + updatedElems.clear + } + + def +(elem: T) = add(elem) + + def add(elem: T) = { + register + newElems + elem + } + + def apply(index: Int): T = get(index) + + def get(index: Int): T = { + if (newElems.size > index) newElems(index) + else storage.getVectorStorageEntryFor(uuid, index) + } + + override def slice(start: Int, count: Int): RandomAccessSeq[T] = slice(Some(start), None, count) + + def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[T] = { + val buffer = new scala.collection.mutable.ArrayBuffer[T] + storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_)) + buffer + } + + /** + * Removes the tail element of this vector. + */ + // FIXME: implement persistent vector pop + def pop: T = { + register + throw new UnsupportedOperationException("need to implement persistent vector pop") + } + + def update(index: Int, newElem: T) = { + register + storage.updateVectorStorageEntryFor(uuid, index, newElem) + } + + override def first: T = get(0) + + override def last: T = { + if (newElems.length != 0) newElems.last + else { + val len = length + if (len == 0) throw new NoSuchElementException("Vector is empty") + get(len - 1) + } + } + + def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} + +/** + * Implements a persistent transactional vector based on the Cassandra + * distributed P2P key-value storage. + * + * @author Jonas Bonér + */ +class CassandraPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = CassandraStorageBackend +} + +/** + * Implements a persistent transactional vector based on the MongoDB + * document storage. + * + * @author Debaissh Ghosh + */ +class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] { + val uuid = id + val storage = MongoStorageBackend +} + +/** + * Implements a persistent reference with abstract storage. + * + * @author Jonas Bonér + */ +trait PersistentRef[T] extends Transactional with Committable { + protected val ref = new TransactionalRef[T] + + val storage: RefStorageBackend[T] + + def commit = if (ref.isDefined) { + storage.insertRefStorageFor(uuid, ref.get.get) + ref.swap(null.asInstanceOf[T]) + } + + def swap(elem: T) = { + register + ref.swap(elem) + } + + def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) + + def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined + + def getOrElse(default: => T): T = { + val current = get + if (current.isDefined) current.get + else default + } + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} + +class CassandraPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = CassandraStorageBackend +} + +class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] { + val uuid = id + val storage = MongoStorageBackend } diff --git a/akka-persistence/src/main/scala/StorageBackend.scala b/akka-persistence/src/main/scala/StorageBackend.scala new file mode 100644 index 0000000000..76a7ccdfdf --- /dev/null +++ b/akka-persistence/src/main/scala/StorageBackend.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.state + +// abstracts persistence storage +trait StorageBackend + +// for Maps +trait MapStorageBackend[K, V] extends StorageBackend { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[K, V]]) + def insertMapStorageEntryFor(name: String, key: K, value: V) + def removeMapStorageFor(name: String) + def removeMapStorageFor(name: String, key: K) + def getMapStorageEntryFor(name: String, key: K): Option[V] + def getMapStorageSizeFor(name: String): Int + def getMapStorageFor(name: String): List[Tuple2[K, V]] + def getMapStorageRangeFor(name: String, start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] +} + +// for Vectors +trait VectorStorageBackend[T] extends StorageBackend { + def insertVectorStorageEntryFor(name: String, element: T) + def insertVectorStorageEntriesFor(name: String, elements: List[T]) + def updateVectorStorageEntryFor(name: String, index: Int, elem: T) + def getVectorStorageEntryFor(name: String, index: Int): T + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T] + def getVectorStorageSizeFor(name: String): Int +} + +// for Ref +trait RefStorageBackend[T] extends StorageBackend { + def insertRefStorageFor(name: String, element: T) + def getRefStorageFor(name: String): Option[T] +} diff --git a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala index 305763eba3..2142311f76 100644 --- a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala @@ -1,13 +1,10 @@ package se.scalablesolutions.akka.state -import akka.actor.Actor -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.Actor import junit.framework.TestCase -import dispatch._ -import org.junit.{Test, Before} +import org.junit.Test import org.junit.Assert._ case class GetMapState(key: String) @@ -31,35 +28,35 @@ class CassandraPersistentActor extends Actor { timeout = 100000 makeTransactionRequired - private lazy val mapState: PersistentMap = PersistentState.newMap(CassandraStorageConfig()) - private lazy val vectorState: PersistentVector = PersistentState.newVector(CassandraStorageConfig()) - private lazy val refState: PersistentRef = PersistentState.newRef(CassandraStorageConfig()) + private lazy val mapState = CassandraStorage.newMap + private lazy val vectorState = CassandraStorage.newVector + private lazy val refState = CassandraStorage.newRef def receive = { case GetMapState(key) => - reply(mapState.get(key).get) + reply(mapState.get(key.getBytes("UTF-8")).get) case GetVectorSize => reply(vectorState.length.asInstanceOf[AnyRef]) case GetRefState => reply(refState.get.get) case SetMapState(key, msg) => - mapState.put(key, msg) + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) reply(msg) case SetVectorState(msg) => - vectorState.add(msg) + vectorState.add(msg.getBytes("UTF-8")) reply(msg) case SetRefState(msg) => - refState.swap(msg) + refState.swap(msg.getBytes("UTF-8")) reply(msg) case Success(key, msg) => - mapState.put(key, msg) - vectorState.add(msg) - refState.swap(msg) + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) reply(msg) case Failure(key, msg, failer) => - mapState.put(key, msg) - vectorState.add(msg) - refState.swap(msg) + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) failer !! "Failure" reply(msg) } @@ -74,14 +71,15 @@ class CassandraPersistentActor extends Actor { } class CassandraPersistentActorSpec extends TestCase { - + @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = new CassandraPersistentActor stateful.start stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get) + val result: Array[Byte] = (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) } @Test @@ -95,7 +93,8 @@ class CassandraPersistentActorSpec extends TestCase { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => {}} - assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state + val result: Array[Byte] = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state } @Test @@ -127,7 +126,8 @@ class CassandraPersistentActorSpec extends TestCase { stateful.start stateful !! SetRefState("init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - assertEquals("new state", (stateful !! GetRefState).get) + val result: Array[Byte] = (stateful !! GetRefState).get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) } @Test @@ -141,6 +141,7 @@ class CassandraPersistentActorSpec extends TestCase { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => {}} - assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state + val result: Array[Byte] = (stateful !! GetRefState).get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state } } diff --git a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala index 10bf943dbb..051cfbfa12 100644 --- a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala @@ -31,10 +31,8 @@ case object LogSize class BankAccountActor extends Actor { makeTransactionRequired - private val accountState = - PersistentState.newMap(MongoStorageConfig()) - private val txnLog = - PersistentState.newVector(MongoStorageConfig()) + private val accountState = MongoStorage.newMap + private val txnLog = MongoStorage.newVector def receive: PartialFunction[Any, Unit] = { // check balance diff --git a/akka-persistence/src/test/scala/MongoStorageSpec.scala b/akka-persistence/src/test/scala/MongoStorageSpec.scala index 4adf61ced2..fae6d7f00d 100644 --- a/akka-persistence/src/test/scala/MongoStorageSpec.scala +++ b/akka-persistence/src/test/scala/MongoStorageSpec.scala @@ -14,7 +14,7 @@ class MongoStorageSpec extends TestCase { val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef] override def setUp = { - MongoStorage.coll.drop + MongoStorageBackend.coll.drop } @Test @@ -22,40 +22,40 @@ class MongoStorageSpec extends TestCase { changeSetV += "debasish" // string changeSetV += List(1, 2, 3) // Scala List changeSetV += List(100, 200) - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 3, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) changeSetV.clear // changeSetV should be reinitialized changeSetV += List(12, 23, 45) changeSetV += "maulindu" - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 5, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) // add more to the same changeSetV changeSetV += "ramanendu" changeSetV += Map(1 -> "dg", 2 -> "mc") // add for a diff transaction - MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList) assertEquals( 4, - MongoStorage.getVectorStorageSizeFor("U-A2")) + MongoStorageBackend.getVectorStorageSizeFor("U-A2")) // previous transaction change set should remain same assertEquals( 5, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) // test single element entry - MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9)) + MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9)) assertEquals( 6, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) } @Test @@ -64,25 +64,25 @@ class MongoStorageSpec extends TestCase { // initially everything 0 assertEquals( 0, - MongoStorage.getVectorStorageSizeFor("U-A2")) + MongoStorageBackend.getVectorStorageSizeFor("U-A2")) assertEquals( 0, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) // get some stuff changeSetV += "debasish" changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14)) - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 2, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) - val JsString(str) = MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString] + val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString] assertEquals("debasish", str) - val l = MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue] + val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue] val num_list = list ! num val num_list(l0) = l assertEquals(List(12, 13, 14), l0) @@ -91,14 +91,14 @@ class MongoStorageSpec extends TestCase { changeSetV += Map(1->1, 2->4, 3->9) changeSetV += BigInt(2310) changeSetV += List(100, 200, 300) - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 5, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) val r = - MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3) + MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3) assertEquals(3, r.size) val lr = r(0).asInstanceOf[JsValue] @@ -109,12 +109,12 @@ class MongoStorageSpec extends TestCase { @Test def testVectorFetchForNonExistentKeys = { try { - MongoStorage.getVectorStorageEntryFor("U-A1", 1) + MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1) fail("should throw an exception") } catch {case e: Predef.NoSuchElementException => {}} try { - MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12) + MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12) fail("should throw an exception") } catch {case e: Predef.NoSuchElementException => {}} } @@ -128,43 +128,43 @@ class MongoStorageSpec extends TestCase { changeSetM += "6" -> java.util.Calendar.getInstance.getTime // insert all into Mongo - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) assertEquals( 6, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // individual insert api - MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka") - MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25)) + MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka") + MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25)) assertEquals( 8, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // add the same changeSet for another transaction - MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList) assertEquals( 6, - MongoStorage.getMapStorageSizeFor("U-M2")) + MongoStorageBackend.getMapStorageSizeFor("U-M2")) // the first transaction should remain the same assertEquals( 8, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) changeSetM.clear } @Test def testMapContents = { fillMap - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) - MongoStorage.getMapStorageEntryFor("U-M1", "2") match { + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match { case Some(x) => { val JsString(str) = x.asInstanceOf[JsValue] assertEquals("peter", str) } case None => fail("should fetch peter") } - MongoStorage.getMapStorageEntryFor("U-M1", "4") match { + MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match { case Some(x) => { val num_list = list ! num val num_list(l0) = x.asInstanceOf[JsValue] @@ -172,7 +172,7 @@ class MongoStorageSpec extends TestCase { } case None => fail("should fetch list") } - MongoStorage.getMapStorageEntryFor("U-M1", "3") match { + MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match { case Some(x) => { val num_list = list ! num val num_list(l0) = x.asInstanceOf[JsValue] @@ -183,7 +183,7 @@ class MongoStorageSpec extends TestCase { // get the entire map val l: List[Tuple2[AnyRef, AnyRef]] = - MongoStorage.getMapStorageFor("U-M1") + MongoStorageBackend.getMapStorageFor("U-M1") assertEquals(4, l.size) assertTrue(l.map(_._1).contains("1")) @@ -196,7 +196,7 @@ class MongoStorageSpec extends TestCase { // trying to fetch for a non-existent transaction will throw try { - MongoStorage.getMapStorageFor("U-M2") + MongoStorageBackend.getMapStorageFor("U-M2") fail("should throw an exception") } catch {case e: Predef.NoSuchElementException => {}} @@ -207,11 +207,11 @@ class MongoStorageSpec extends TestCase { def testMapContentsByRange = { fillMap changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc") - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) // specify start and count val l: List[Tuple2[AnyRef, AnyRef]] = - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), None, 3) assertEquals(3, l.size) @@ -227,27 +227,27 @@ class MongoStorageSpec extends TestCase { // specify start, finish and count where finish - start == count assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size) // specify start, finish and count where finish - start > count assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size) // do not specify start or finish assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", None, None, 3).size) // specify finish and count assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", None, Some(Integer.valueOf(3)), 3).size) // specify start, finish and count where finish < start assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size) changeSetM.clear @@ -258,35 +258,35 @@ class MongoStorageSpec extends TestCase { fillMap changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc") - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) assertEquals(5, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // remove key "3" - MongoStorage.removeMapStorageFor("U-M1", "3") + MongoStorageBackend.removeMapStorageFor("U-M1", "3") assertEquals(4, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) try { - MongoStorage.getMapStorageEntryFor("U-M1", "3") + MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") fail("should throw exception") } catch { case e => {}} // remove key "4" - MongoStorage.removeMapStorageFor("U-M1", "4") + MongoStorageBackend.removeMapStorageFor("U-M1", "4") assertEquals(3, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // remove key "2" - MongoStorage.removeMapStorageFor("U-M1", "2") + MongoStorageBackend.removeMapStorageFor("U-M1", "2") assertEquals(2, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // remove the whole stuff - MongoStorage.removeMapStorageFor("U-M1") + MongoStorageBackend.removeMapStorageFor("U-M1") try { - MongoStorage.getMapStorageFor("U-M1") + MongoStorageBackend.getMapStorageFor("U-M1") fail("should throw exception") } catch { case e: NoSuchElementException => {}} @@ -303,14 +303,14 @@ class MongoStorageSpec extends TestCase { @Test def testRefStorage = { - MongoStorage.getRefStorageFor("U-R1") match { + MongoStorageBackend.getRefStorageFor("U-R1") match { case None => case Some(o) => fail("should be None") } val m = Map("1"->1, "2"->4, "3"->9) - MongoStorage.insertRefStorageFor("U-R1", m) - MongoStorage.getRefStorageFor("U-R1") match { + MongoStorageBackend.insertRefStorageFor("U-R1", m) + MongoStorageBackend.getRefStorageFor("U-R1") match { case None => fail("should not be empty") case Some(r) => { val a = r.asInstanceOf[JsValue] @@ -331,8 +331,8 @@ class MongoStorageSpec extends TestCase { // insert another one // the previous one should be replaced val b = List("100", "jonas") - MongoStorage.insertRefStorageFor("U-R1", b) - MongoStorage.getRefStorageFor("U-R1") match { + MongoStorageBackend.insertRefStorageFor("U-R1", b) + MongoStorageBackend.getRefStorageFor("U-R1") match { case None => fail("should not be empty") case Some(r) => { val a = r.asInstanceOf[JsValue] diff --git a/akka-rest/pom.xml b/akka-rest/pom.xml index 9dc47f6db8..85f42b4aba 100644 --- a/akka-rest/pom.xml +++ b/akka-rest/pom.xml @@ -32,7 +32,7 @@ com.sun.grizzly grizzly-comet-webserver - 1.8.6.3 + ${grizzly.version} com.sun.jersey diff --git a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java index 09676cb26e..7e0e43b6bd 100644 --- a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java @@ -12,9 +12,9 @@ import se.scalablesolutions.akka.annotation.transactionrequired; import se.scalablesolutions.akka.annotation.prerestart; import se.scalablesolutions.akka.annotation.postrestart; import se.scalablesolutions.akka.state.PersistentMap; -import se.scalablesolutions.akka.state.PersistentState; -import se.scalablesolutions.akka.state.PersistentMap; -import se.scalablesolutions.akka.state.CassandraStorageConfig; +import se.scalablesolutions.akka.state.CassandraStorage; + +import java.nio.ByteBuffer; /** * Try service out by invoking (multiple times): @@ -26,21 +26,22 @@ import se.scalablesolutions.akka.state.CassandraStorageConfig; @Path("/persistentjavacount") @transactionrequired public class PersistentSimpleService { - private Object KEY = "COUNTER"; + private String KEY = "COUNTER"; private boolean hasStartedTicking = false; - private PersistentMap storage = PersistentState.newMap(new CassandraStorageConfig()); + private PersistentMap storage = CassandraStorage.newMap(); @GET @Produces({"application/html"}) public String count() { if (!hasStartedTicking) { - storage.put(KEY, 0); + storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array()); hasStartedTicking = true; return "Tick: 0\n"; } else { - int counter = (Integer)storage.get(KEY).get() + 1; - storage.put(KEY, counter); + byte[] bytes = (byte[])storage.get(KEY.getBytes()).get(); + int counter = ByteBuffer.wrap(bytes).getInt(); + storage.put(KEY.getBytes(), ByteBuffer.allocate(4).putInt(counter + 1).array()); return "Tick: " + counter + "\n"; } } diff --git a/akka-samples-java/src/main/java/sample/java/SimpleService.java b/akka-samples-java/src/main/java/sample/java/SimpleService.java index 7702396375..7126621e60 100644 --- a/akka-samples-java/src/main/java/sample/java/SimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/SimpleService.java @@ -13,7 +13,6 @@ import se.scalablesolutions.akka.annotation.prerestart; import se.scalablesolutions.akka.annotation.postrestart; import se.scalablesolutions.akka.state.TransactionalState; import se.scalablesolutions.akka.state.TransactionalMap; -import se.scalablesolutions.akka.state.CassandraStorageConfig; /** * Try service out by invoking (multiple times): diff --git a/akka-samples-lift/src/main/scala/akka/SimpleService.scala b/akka-samples-lift/src/main/scala/akka/SimpleService.scala index b4bbd52157..8bec513bb9 100644 --- a/akka-samples-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples-lift/src/main/scala/akka/SimpleService.scala @@ -1,13 +1,13 @@ package sample.lift -import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig} -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} +import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} import java.lang.Integer -import javax.ws.rs.core.MultivaluedMap -import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes} +import javax.ws.rs.{GET, Path, Produces} +import java.nio.ByteBuffer + /** * Try service out by invoking (multiple times): @@ -56,7 +56,7 @@ class PersistentSimpleService extends Actor { case object Tick private val KEY = "COUNTER" private var hasStartedTicking = false - private val storage = PersistentState.newMap(CassandraStorageConfig()) + private val storage = CassandraStorage.newMap @GET @Produces(Array("text/html")) @@ -64,13 +64,14 @@ class PersistentSimpleService extends Actor { def receive = { case Tick => if (hasStartedTicking) { - val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue - storage.put(KEY, new Integer(counter + 1)) - reply(

Tick: {counter + 1}

) + val bytes = storage.get(KEY.getBytes).get + val counter = ByteBuffer.wrap(bytes).getInt + storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) + reply(Tick:{counter + 1}) } else { - storage.put(KEY, new Integer(0)) + storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array) hasStartedTicking = true - reply(

Tick: 0

) + reply(Tick: 0) } } } diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 7e4fffb00b..98441378b8 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -4,18 +4,19 @@ package sample.scala -import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig} import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} +import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import java.lang.Integer +import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} import org.atmosphere.annotation.{Broadcast, Suspend} import org.atmosphere.util.XSSHtmlFilter -import org.atmosphere.cpr.{BroadcastFilter,Broadcaster} +import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} import org.atmosphere.jersey.Broadcastable class Boot { @@ -104,7 +105,7 @@ class PersistentSimpleService extends Actor { case object Tick private val KEY = "COUNTER" private var hasStartedTicking = false - private val storage = PersistentState.newMap(CassandraStorageConfig()) + private val storage = CassandraStorage.newMap @GET @Produces(Array("text/html")) @@ -112,11 +113,12 @@ class PersistentSimpleService extends Actor { def receive = { case Tick => if (hasStartedTicking) { - val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue - storage.put(KEY, new Integer(counter + 1)) + val bytes = storage.get(KEY.getBytes).get + val counter = ByteBuffer.wrap(bytes).getInt + storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) reply(Tick:{counter + 1}) } else { - storage.put(KEY, new Integer(0)) + storage.put(KEY.getBytes, Array(0.toByte)) hasStartedTicking = true reply(Tick: 0) } @@ -155,7 +157,10 @@ class Chat extends Actor with Logging { @Consumes(Array("application/x-www-form-urlencoded")) @POST @Produces(Array("text/html")) - def publishMessage(form: MultivaluedMap[String, String]) = (this !! Chat(form.getFirst("name"), form.getFirst("action"), form.getFirst("message"))).getOrElse("System__error") + def publishMessage(form: MultivaluedMap[String, String]) = + (this !! Chat(form.getFirst("name"), + form.getFirst("action"), + form.getFirst("message"))).getOrElse("System__error") } @@ -171,8 +176,6 @@ class JsonpFilter extends BroadcastFilter[String] with Logging { } ("\n") + name + "\", message: \"" + message + "\" }); \n\n") } } diff --git a/akka-samples-security/src/main/scala/SimpleService.scala b/akka-samples-security/src/main/scala/SimpleService.scala index fc8b18367a..e9468ec75c 100644 --- a/akka-samples-security/src/main/scala/SimpleService.scala +++ b/akka-samples-security/src/main/scala/SimpleService.scala @@ -44,7 +44,7 @@ class DigestAuthenticationService extends DigestAuthenticationActor { //don't forget to configure your standalone Cassandra instance // //makeTransactionRequired - //override def mkNonceMap = PersistentState.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]] + //override def mkNonceMap = Storage.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]] //Use an in-memory nonce-map as default override def mkNonceMap = new scala.collection.mutable.HashMap[String, Long] diff --git a/akka-util/src/main/scala/HashCode.scala b/akka-util/src/main/scala/HashCode.scala index 0ce21f6a92..fd9e46a682 100755 --- a/akka-util/src/main/scala/HashCode.scala +++ b/akka-util/src/main/scala/HashCode.scala @@ -27,19 +27,28 @@ import java.lang.{Float => JFloat, Double => JDouble} object HashCode { val SEED = 23 + def hash(seed: Int, any: Any): Int = any match { + case value: Boolean => hash(seed, value) + case value: Char => hash(seed, value) + case value: Short => hash(seed, value) + case value: Int => hash(seed, value) + case value: Long => hash(seed, value) + case value: Float => hash(seed, value) + case value: Double => hash(seed, value) + case value: Byte => hash(seed, value) + case value: AnyRef => + var result = seed + if (value == null) result = hash(result, 0) + else if (!isArray(value)) result = hash(result, value.hashCode()) + else for (id <- 0 until JArray.getLength(value)) result = hash(result, JArray.get(value, id)) // is an array + result + } def hash(seed: Int, value: Boolean): Int = firstTerm(seed) + (if (value) 1 else 0) def hash(seed: Int, value: Char): Int = firstTerm(seed) + value.asInstanceOf[Int] def hash(seed: Int, value: Int): Int = firstTerm(seed) + value def hash(seed: Int, value: Long): Int = firstTerm(seed) + (value ^ (value >>> 32) ).asInstanceOf[Int] def hash(seed: Int, value: Float): Int = hash(seed, JFloat.floatToIntBits(value)) def hash(seed: Int, value: Double): Int = hash(seed, JDouble.doubleToLongBits(value)) - def hash(seed: Int, anyRef: AnyRef): Int = { - var result = seed - if (anyRef == null) result = hash(result, 0) - else if (!isArray(anyRef)) result = hash(result, anyRef.hashCode()) - else for (id <- 0 until JArray.getLength(anyRef)) result = hash(result, JArray.get(anyRef, id)) // is an array - result - } private def firstTerm(seed: Int): Int = PRIME * seed private def isArray(anyRef: AnyRef): Boolean = anyRef.getClass.isArray diff --git a/embedded-repo/com/mongodb/mongo/1.0/mongo-1.0.jar b/embedded-repo/com/mongodb/mongo/1.0/mongo-1.0.jar new file mode 100644 index 0000000000..81402ed075 Binary files /dev/null and b/embedded-repo/com/mongodb/mongo/1.0/mongo-1.0.jar differ diff --git a/pom.xml b/pom.xml index dae9d62afb..2ff2ad0ac5 100755 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ ${project.build.sourceEncoding} 0.5-SNAPSHOT 1.1.4 + 1.9.18-i