Merge branch 'master' into 281-hseeberger

Conflicts:
	akka-core/src/main/scala/remote/RemoteServer.scala
This commit is contained in:
Heiko Seeberger 2010-06-19 15:54:51 +02:00
commit 516cad81ba
28 changed files with 5015 additions and 2190 deletions

View file

@ -1,46 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.remote.protobuf;
/*
Compile with:
cd ./akka-core/src/main/java
protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out .
*/
message ActorRefProtocol {
required string uuid = 1;
required string actorClassName = 2;
required string sourceHostname = 3;
required uint32 sourcePort = 4;
required uint64 timeout = 5;
}
message RemoteRequestProtocol {
required uint64 id = 1;
required uint32 protocol = 2;
required bytes message = 3;
optional bytes messageManifest = 4;
optional string method = 5;
required string target = 6;
required string uuid = 7;
required uint64 timeout = 8;
optional string supervisorUuid = 9;
required bool isActor = 10;
required bool isOneWay = 11;
required bool isEscaped = 12;
optional ActorRefProtocol sender = 13;
}
message RemoteReplyProtocol {
required uint64 id = 1;
optional uint32 protocol = 2;
optional bytes message = 3;
optional bytes messageManifest = 4;
optional string exception = 5;
optional string supervisorUuid = 6;
required bool isActor = 7;
required bool isSuccessful = 8;
}

View file

@ -0,0 +1,129 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
option java_package = "se.scalablesolutions.akka.remote.protocol";
option optimize_for = SPEED;
/******************************************
Compile with:
cd ./akka-core/src/main/protocol
protoc RemoteProtocol.proto --java_out ../java
*******************************************/
/**
* Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node.
*/
message RemoteActorRefProtocol {
required string uuid = 1;
required string actorClassname = 2;
required AddressProtocol homeAddress = 3;
optional uint64 timeout = 4;
}
/**
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedActorRefProtocol {
required string uuid = 1;
required string id = 2;
required string actorClassname = 3;
required bytes actorInstance = 4;
required string serializerClassname = 5;
required AddressProtocol originalAddress = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
optional LifeCycleProtocol lifeCycle = 9;
optional RemoteActorRefProtocol supervisor = 10;
optional bytes hotswapStack = 11;
}
/**
* Defines a remote message request.
*/
message RemoteRequestProtocol {
required uint64 id = 1;
required SerializationSchemeType serializationScheme = 2;
required bytes message = 3;
optional bytes messageManifest = 4;
optional string method = 5;
required string target = 6;
required string uuid = 7;
required uint64 timeout = 8;
optional string supervisorUuid = 9;
required bool isActor = 10;
required bool isOneWay = 11;
required bool isEscaped = 12;
optional RemoteActorRefProtocol sender = 13;
}
/**
* Defines a remote message reply.
*/
message RemoteReplyProtocol {
required uint64 id = 1;
optional SerializationSchemeType serializationScheme = 2;
optional bytes message = 3;
optional bytes messageManifest = 4;
optional ExceptionProtocol exception = 5;
optional string supervisorUuid = 6;
required bool isActor = 7;
required bool isSuccessful = 8;
}
/**
* Defines the serialization scheme used to serialize the message and/or Actor instance.
*/
enum SerializationSchemeType {
JAVA = 1;
SBINARY = 2;
SCALA_JSON = 3;
JAVA_JSON = 4;
PROTOBUF = 5;
}
/**
* Defines the type of the life-cycle of a supervised Actor.
*/
enum LifeCycleType {
PERMANENT = 1;
TEMPORARY = 2;
}
/*
enum DispatcherType {
GLOBAL_EVENT_EXECUTOR_BASED = 1;
GLOBAL_REACTOR_SINGLE_THREAD_BASED = 2;
GLOBAL_REACTOR_THREAD_POOL_BASED = 3;
EVENT_EXECUTOR_BASED = 4;
THREAD_BASED = 5;
}
*/
/**
* Defines the life-cycle of a supervised Actor.
*/
message LifeCycleProtocol {
required LifeCycleType lifeCycle = 1;
optional string preRestart = 2;
optional string postRestart = 3;
}
/**
* Defines a remote address.
*/
message AddressProtocol {
required string hostname = 1;
required uint32 port = 2;
}
/**
* Defines an exception.
*/
message ExceptionProtocol {
required string classname = 1;
required string message = 2;
}

View file

@ -5,7 +5,7 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.FaultHandlingStrategy
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture}
import se.scalablesolutions.akka.config.ScalaConfig._

View file

@ -8,24 +8,9 @@ import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.serialization.Serializer
/*
// FIXME add support for ActorWithNestedReceive
trait ActorWithNestedReceive extends Actor {
import Actor.actor
private var nestedReactsProcessors: List[ActorRef] = Nil
private val processNestedReacts: Receive = {
case message if !nestedReactsProcessors.isEmpty =>
val processors = nestedReactsProcessors.reverse
processors.head forward message
nestedReactsProcessors = processors.tail.reverse
}
protected def react: Receive
protected def reactAgain(pf: Receive) = nestedReactsProcessors ::= actor(pf)
protected def receive = processNestedReacts orElse react
}
*/
import com.google.protobuf.Message
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@ -49,7 +34,69 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
self.makeRemote(hostname, port)
}
// Life-cycle messages for the Actors
/**
* Mix in this trait to create a serializable actor, serializable through
* a custom serialization protocol.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait SerializableActor extends Actor {
val serializer: Serializer
def toBinary: Array[Byte]
}
/**
* Mix in this trait to create a serializable actor, serializable through
* Protobuf.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ProtobufSerializableActor[T <: Message] extends SerializableActor {
val serializer = Serializer.Protobuf
def toBinary: Array[Byte] = toProtobuf.toByteArray
def fromBinary(bytes: Array[Byte]) = fromProtobuf(serializer.fromBinary(bytes, Some(clazz)).asInstanceOf[T])
val clazz: Class[T]
def toProtobuf: T
def fromProtobuf(message: T): Unit
}
/**
* Mix in this trait to create a serializable actor, serializable through
* Java serialization.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaSerializableActor extends SerializableActor {
@transient val serializer = Serializer.Java
def toBinary: Array[Byte] = serializer.toBinary(this)
}
/**
* Mix in this trait to create a serializable actor, serializable through
* a Java JSON parser (Jackson).
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaJSONSerializableActor extends SerializableActor {
val serializer = Serializer.JavaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}
/**
* Mix in this trait to create a serializable actor, serializable through
* a Scala JSON parser (SJSON).
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ScalaJSONSerializableActor extends SerializableActor {
val serializer = Serializer.ScalaJSON
def toBinary: Array[Byte] = serializer.toBinary(this)
}
/**
* Life-cycle messages for the Actors
*/
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: Option[Actor.Receive]) extends LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage
@ -299,7 +346,7 @@ trait Actor extends Logging {
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit val optionSelf: Option[ActorRef] = {
@transient implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
@ -308,7 +355,7 @@ trait Actor extends Logging {
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
else ref
}
@ -319,7 +366,7 @@ trait Actor extends Logging {
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
@transient implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
/**
* The 'self' field holds the ActorRef for this actor.
@ -348,7 +395,7 @@ trait Actor extends Logging {
* self.stop(..)
* </pre>
*/
val self: ActorRef = {
@transient val self: ActorRef = {
val zelf = optionSelf.get
zelf.id = getClass.getName
zelf
@ -448,10 +495,4 @@ trait Actor extends Logging {
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
override def hashCode: Int = self.hashCode
override def equals(that: Any): Boolean = self.equals(that)
override def toString = self.toString
}

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
@ -28,45 +28,119 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
import java.lang.reflect.Field
import com.google.protobuf.ByteString
/**
* The ActorRef object can be used to deserialize ActorRef instances from of its binary representation
* or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance.
*
* <p/>
* Binary -> ActorRef:
* <pre>
* val actorRef = ActorRef.fromBinary(bytes)
* actorRef ! message // send message to remote actor through its reference
* </pre>
*
* <p/>
* Protobuf Message -> ActorRef:
* Protobuf Message -> RemoteActorRef:
* <pre>
* val actorRef = ActorRef.fromProtobuf(protobufMessage)
* val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage)
* actorRef ! message // send message to remote actor through its reference
* </pre>
*
* <p/>
* Protobuf Message -> LocalActorRef:
* <pre>
* val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage)
* actorRef ! message // send message to local actor through its reference
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRef {
/**
* Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance.
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinary(bytes: Array[Byte]): ActorRef =
fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance.
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
private[akka] def fromProtobuf(protocol: ActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(
protocol.getUuid,
protocol.getActorClassName,
protocol.getSourceHostname,
protocol.getSourcePort,
protocol.getActorClassname,
protocol.getHomeAddress.getHostname,
protocol.getHomeAddress.getPort,
protocol.getTimeout,
loader)
/**
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
*/
def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
*/
def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
*/
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
val serializerClass =
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
else Class.forName(protocol.getSerializerClassname)
val serializer = serializerClass.newInstance.asInstanceOf[Serializer]
val lifeCycle =
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
val restartCallbacks =
if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart)
Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart))
else None
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks)
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks)
else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
} else None
val supervisor =
if (protocol.hasSupervisor)
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
val hotswap =
if (protocol.hasHotswapStack) Some(serializer
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
.asInstanceOf[PartialFunction[Any, Unit]])
else None
new LocalActorRef(
protocol.getUuid,
protocol.getId,
protocol.getActorClassname,
protocol.getActorInstance.toByteArray,
protocol.getOriginalAddress.getHostname,
protocol.getOriginalAddress.getPort,
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
lifeCycle,
supervisor,
hotswap,
loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
serializer)
}
}
/**
@ -136,19 +210,25 @@ trait ActorRef extends TransactionManagement {
/**
* User overridable callback/setting.
*
* <p/>
* Set trapExit to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
*
* Trap no exceptions:
* <pre>
* // trap no exceptions
* trapExit = Nil
* </pre>
*
* // trap all exceptions
* Trap all exceptions:
* <pre>
* trapExit = List(classOf[Throwable])
* </pre>
*
* // trap specific exceptions only
* Trap specific exceptions only:
* <pre>
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
@ -158,10 +238,13 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
* <p/>
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
* <p/>
* Can be one of:
* <pre/>
* <pre>
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
*
* </pre>
* Or:
* <pre>
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* </pre>
*/
@ -211,6 +294,11 @@ trait ActorRef extends TransactionManagement {
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
/**
* Returns the uuid for the actor.
*/
def uuid = _uuid
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
@ -239,15 +327,24 @@ trait ActorRef extends TransactionManagement {
def isShutdown: Boolean = _isShutDown
/**
* Returns the uuid for the actor.
*/
def uuid = _uuid
/**
* Tests if the actor is able to handle the message passed in as arguments.
* Is the actor able to handle the message passed in as arguments?
*/
def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message)
/**
* Is the actor is serializable?
*/
def isSerializable: Boolean = actor.isInstanceOf[SerializableActor]
/**
* Returns the 'Serializer' instance for the Actor as an Option.
* <p/>
* It returns 'Some(serializer)' if the Actor is serializable and 'None' if not.
*/
def serializer: Option[Serializer] =
if (isSerializable) Some(actor.asInstanceOf[SerializableActor].serializer)
else None
/**
* Only for internal use. UUID is effectively final.
*/
@ -517,7 +614,9 @@ trait ActorRef extends TransactionManagement {
*/
def shutdownLinkedActors: Unit
protected[akka] def toProtobuf: ActorRefProtocol
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
@ -562,13 +661,13 @@ trait ActorRef extends TransactionManagement {
protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
requestBuilder.setSender(sender.toProtobuf)
requestBuilder.setSender(sender.toRemoteActorRefProtocol)
}
}
}
/**
* Local ActorRef that is used when referencing the Actor on its "home" node.
* Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -576,9 +675,51 @@ sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
private var isDeserialized = false
private var loader: Option[ClassLoader] = None
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
// used only for deserialization
private[akka] def this(__uuid: String,
__id: String,
__actorClassName: String,
__actorBytes: Array[Byte],
__hostname: String,
__port: Int,
__isTransactor: Boolean,
__timeout: Long,
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__loader: ClassLoader,
__serializer: Serializer) = {
this(() => {
val actorClass = __loader.loadClass(__actorClassName)
val actorInstance = actorClass.newInstance
if (actorInstance.isInstanceOf[ProtobufSerializableActor[_]]) {
val instance = actorInstance.asInstanceOf[ProtobufSerializableActor[_]]
instance.fromBinary(__actorBytes)
instance
} else __serializer.fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor]
})
loader = Some(__loader)
isDeserialized = true
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
timeout = __timeout
lifeCycle = __lifeCycle
_supervisor = __supervisor
hotswap = __hotswap
actorSelfFields._1.set(actor, this)
actorSelfFields._2.set(actor, Some(this))
actorSelfFields._3.set(actor, Some(this))
ActorRegistry.register(this)
}
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@ -594,12 +735,12 @@ sealed class LocalActorRef private[akka](
// instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass)
if (runActorInitialization) initializeActorInstance
if (runActorInitialization && !isDeserialized) initializeActorInstance
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
protected[akka] def toProtobuf: ActorRefProtocol = guard.withGuard {
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = guard.withGuard {
val host = homeAddress.getHostName
val port = homeAddress.getPort
@ -610,15 +751,59 @@ sealed class LocalActorRef private[akka](
registeredInRemoteNodeDuringSerialization = true
}
ActorRefProtocol.newBuilder
RemoteActorRefProtocol.newBuilder
.setUuid(uuid)
.setActorClassName(actorClass.getName)
.setSourceHostname(host)
.setSourcePort(port)
.setActorClassname(actorClass.getName)
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
.setTimeout(timeout)
.build
}
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
if (!isSerializable) throw new IllegalStateException(
"Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
}
val builder = LifeCycleProtocol.newBuilder
lifeCycle match {
case Some(LifeCycle(scope, None)) =>
setScope(builder, scope)
Some(builder.build)
case Some(LifeCycle(scope, Some(callbacks))) =>
setScope(builder, scope)
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)
Some(builder.build)
case None => None
}
}
val serializerClassname = serializer
.getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined"))
.getClass.getName
val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(uuid)
.setId(id)
.setActorClassname(actorClass.getName)
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor].toBinary))
.setSerializerClassname(serializerClassname)
.setOriginalAddress(originalAddress)
.setIsTransactor(isTransactor)
.setTimeout(timeout)
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol))
// FIXME: how to serialize the hotswap PartialFunction ??
// hotswap.foreach(builder.setHotswapStack(_))
builder.build
}
/**
* Returns the mailbox.
*/
@ -627,7 +812,10 @@ sealed class LocalActorRef private[akka](
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
def toBinary: Array[Byte] = toProtobuf.toByteArray
def toBinary: Array[Byte] = {
if (isSerializable) toSerializedActorRefProtocol.toByteArray
else toRemoteActorRefProtocol.toByteArray
}
/**
* Returns the class for the Actor instance that is managed by the ActorRef.
@ -940,7 +1128,7 @@ sealed class LocalActorRef private[akka](
.setIsOneWay(false)
.setIsEscaped(false)
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtobuf))
//senderOption.foreach(sender => requestBuilder.setSender(sender.toRemoteActorRefProtocol))
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
@ -1279,7 +1467,8 @@ private[akka] case class RemoteActorRef private[akka] (
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def toProtobuf: ActorRefProtocol = unsupported
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = unsupported
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -30,21 +30,22 @@ object Scheduler {
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ActorRef = {
try {
val future = service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
scheduler
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
def unschedule(actorRef: ActorRef) = {
actorRef ! UnSchedule
schedulers.remove(actorRef)
def unschedule(scheduleActor: ActorRef) = {
scheduleActor ! UnSchedule
schedulers.remove(scheduleActor)
}
def shutdown = {
@ -79,5 +80,3 @@ private object SchedulerThreadFactory extends ThreadFactory {
thread
}
}

View file

@ -86,8 +86,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
finishedBeforeMailboxEmpty = processMailbox(receiver)
} finally {
lock.unlock
if (finishedBeforeMailboxEmpty)
dispatch(receiver)
if (finishedBeforeMailboxEmpty) dispatch(receiver)
}
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))

View file

@ -119,7 +119,7 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
case m: Message[ADDR_T] => {
val (src, msg) = (m.sender, m.msg)
(serializer in (msg, None)) match {
(serializer fromBinary (msg, None)) match {
case PapersPlease => {
log debug ("Asked for papers by %s", src)
@ -169,7 +169,7 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* that's been set in the akka-conf
*/
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
lazy val m = serializer out msg
lazy val m = serializer toBinary msg
for (r <- recipients) toOneNode(r, m)
}
@ -178,7 +178,7 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* that's been set in the akka-conf
*/
protected def broadcast[T <: AnyRef](msg: T): Unit =
if (!remotes.isEmpty) toAllNodes(serializer out msg)
if (!remotes.isEmpty) toAllNodes(serializer toBinary msg)
/**
* Applies the given PartialFunction to all known RemoteAddresses

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
@ -359,12 +359,11 @@ class RemoteClientHandler(val name: String,
event.getChannel.close
}
private def parseException(reply: RemoteReplyProtocol) = {
private def parseException(reply: RemoteReplyProtocol): Throwable = {
val exception = reply.getException
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
exceptionType
val exceptionClass = Class.forName(exception.getClassname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exceptionMessage).asInstanceOf[Throwable]
.newInstance(exception.getMessage).asInstanceOf[Throwable]
}
}

View file

@ -4,9 +4,8 @@
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.serialization.Serializable.SBinary
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
import se.scalablesolutions.akka.serialization.{Serializer, Serializable}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import com.google.protobuf.{Message, ByteString}
@ -25,102 +24,102 @@ object RemoteProtocolBuilder {
}
def getMessage(request: RemoteRequestProtocol): Any = {
request.getProtocol match {
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY =>
request.getSerializationScheme match {
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(request.getMessage.toByteArray, None))
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(request.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass))
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(request.getMessage.toByteArray, Some(messageClass))
}
}
def getMessage(reply: RemoteReplyProtocol): Any = {
reply.getProtocol match {
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY =>
reply.getSerializationScheme match {
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(reply.getMessage.toByteArray, None))
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(reply.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass))
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(reply.getMessage.toByteArray, Some(messageClass))
}
}
def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setProtocol(SerializationProtocol.JAVA_JSON)
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.out(box(message))))
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
}
def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setProtocol(SerializationProtocol.JAVA_JSON)
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.out(box(message))))
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
}

View file

@ -12,7 +12,7 @@ import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.util.Helpers.narrow
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.config.Config.config
import org.jboss.netty.bootstrap.ServerBootstrap
@ -324,9 +324,8 @@ class RemoteServerHandler(
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**
* ChannelOpen overridden to store open channels for a clean shutdown
* of a RemoteServer. If a channel is closed before, it is
* automatically removed from the open channels group.
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
* If a channel is closed before, it is automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
openChannels.add(ctx.getChannel)
@ -365,7 +364,8 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
val sender =
if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
@ -386,7 +386,7 @@ class RemoteServerHandler(
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
@ -404,7 +404,6 @@ class RemoteServerHandler(
val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
//continueTransaction(request)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(
request.getMethod, unescapedArgClasses: _*)
@ -426,7 +425,7 @@ class RemoteServerHandler(
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
@ -436,7 +435,7 @@ class RemoteServerHandler(
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)

View file

@ -15,14 +15,6 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream}
import sjson.json.{Serializer=>SJSONSerializer}
object SerializationProtocol {
val JAVA = 0
val SBINARY = 1
val SCALA_JSON = 2
val JAVA_JSON = 3
val PROTOBUF = 4
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -19,12 +19,10 @@ import sjson.json.{Serializer => SJSONSerializer}
*/
trait Serializer {
var classLoader: Option[ClassLoader] = None
def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))
def deepClone(obj: AnyRef): AnyRef
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
def toBinary(obj: AnyRef): Array[Byte]
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
// For Java API
@ -46,9 +44,8 @@ object Serializer {
object NOOP extends NOOP
class NOOP extends Serializer {
def deepClone(obj: AnyRef): AnyRef = obj
def out(obj: AnyRef): Array[Byte] = obj.asInstanceOf[Array[Byte]]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes
def toBinary(obj: AnyRef): Array[Byte] = Array[Byte]()
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null.asInstanceOf[AnyRef]
}
/**
@ -56,9 +53,7 @@ object Serializer {
*/
object Java extends Java
trait Java extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = {
def toBinary(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(obj)
@ -66,7 +61,7 @@ object Serializer {
bos.toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in =
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
@ -81,15 +76,13 @@ object Serializer {
*/
object Protobuf extends Protobuf
trait Protobuf extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = {
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
obj.asInstanceOf[Message].toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Need a protobuf message class to be able to serialize bytes using protobuf")
// TODO: should we cache this method lookup?
@ -98,9 +91,9 @@ object Serializer {
message.toBuilder().mergeFrom(bytes).build
}
def in(bytes: Array[Byte], clazz: Class[_]): AnyRef = {
def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = {
if (clazz eq null) throw new IllegalArgumentException("Protobuf message can't be null")
in(bytes, Some(clazz))
fromBinary(bytes, Some(clazz))
}
}
@ -111,9 +104,7 @@ object Serializer {
trait JavaJSON extends Serializer {
private val mapper = new ObjectMapper
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = {
def toBinary(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
mapper.writeValue(out, obj)
@ -121,7 +112,7 @@ object Serializer {
bos.toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Can't deserialize JSON to instance if no class is provided")
val in =
@ -132,7 +123,7 @@ object Serializer {
obj
}
def in(json: String, clazz: Class[_]): AnyRef = {
def fromJSON(json: String, clazz: Class[_]): AnyRef = {
if (clazz eq null) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided")
mapper.readValue(json, clazz).asInstanceOf[AnyRef]
}
@ -143,19 +134,17 @@ object Serializer {
*/
object ScalaJSON extends ScalaJSON
trait ScalaJSON extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
def toBinary(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
// FIXME set ClassLoader on SJSONSerializer.SJSON
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = SJSONSerializer.SJSON.in(bytes)
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = SJSONSerializer.SJSON.in(bytes)
import scala.reflect.Manifest
def in[T](json: String)(implicit m: Manifest[T]): AnyRef = {
def fromJSON[T](json: String)(implicit m: Manifest[T]): AnyRef = {
SJSONSerializer.SJSON.in(json)(m)
}
def in[T](bytes: Array[Byte])(implicit m: Manifest[T]): AnyRef = {
def fromBinary[T](bytes: Array[Byte])(implicit m: Manifest[T]): AnyRef = {
SJSONSerializer.SJSON.in(bytes)(m)
}
}
@ -171,11 +160,11 @@ object Serializer {
var classLoader: Option[ClassLoader] = None
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None)
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = fromBinary[T](toBinary[T](obj), None)
def out[T](t : T)(implicit bin : Writes[T]): Array[Byte] = toByteArray[T](t)
def toBinary[T](t : T)(implicit bin : Writes[T]): Array[Byte] = toByteArray[T](t)
def in[T](array : Array[Byte], clazz: Option[Class[T]])(implicit bin : Reads[T]): T = fromByteArray[T](array)
def fromBinary[T](array : Array[Byte], clazz: Option[Class[T]])(implicit bin : Reads[T]): T = fromByteArray[T](array)
}
}

View file

@ -17,7 +17,7 @@ import java.net.UnknownHostException
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Logging {
@transient lazy val log = Logger.get(this.getClass.getName)
@sjson.json.JSONProperty(ignore = true) @transient lazy val log = Logger.get(this.getClass.getName)
}
/**

View file

@ -376,11 +376,301 @@ public final class ProtobufProtocol {
// @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.ProtobufPOJO)
}
public static final class Counter extends
com.google.protobuf.GeneratedMessage {
// Use Counter.newBuilder() to construct.
private Counter() {
initFields();
}
private Counter(boolean noInit) {}
private static final Counter defaultInstance;
public static Counter getDefaultInstance() {
return defaultInstance;
}
public Counter getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_Counter_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_Counter_fieldAccessorTable;
}
// required uint32 count = 1;
public static final int COUNT_FIELD_NUMBER = 1;
private boolean hasCount;
private int count_ = 0;
public boolean hasCount() { return hasCount; }
public int getCount() { return count_; }
private void initFields() {
}
public final boolean isInitialized() {
if (!hasCount) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (hasCount()) {
output.writeUInt32(1, getCount());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasCount()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(1, getCount());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.Counter parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.Counter prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.actor.ProtobufProtocol.Counter result;
// Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.Counter();
return builder;
}
protected se.scalablesolutions.akka.actor.ProtobufProtocol.Counter internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.actor.ProtobufProtocol.Counter();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.getDescriptor();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.Counter getDefaultInstanceForType() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.Counter build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.actor.ProtobufProtocol.Counter buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.Counter buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.Counter) {
return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.Counter)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.Counter other) {
if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.getDefaultInstance()) return this;
if (other.hasCount()) {
setCount(other.getCount());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 8: {
setCount(input.readUInt32());
break;
}
}
}
}
// required uint32 count = 1;
public boolean hasCount() {
return result.hasCount();
}
public int getCount() {
return result.getCount();
}
public Builder setCount(int value) {
result.hasCount = true;
result.count_ = value;
return this;
}
public Builder clearCount() {
result.hasCount = false;
result.count_ = 0;
return this;
}
// @@protoc_insertion_point(builder_scope:se.scalablesolutions.akka.actor.Counter)
}
static {
defaultInstance = new Counter(true);
se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit();
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:se.scalablesolutions.akka.actor.Counter)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_se_scalablesolutions_akka_actor_Counter_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_se_scalablesolutions_akka_actor_Counter_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -392,7 +682,8 @@ public final class ProtobufProtocol {
java.lang.String[] descriptorData = {
"\n\026ProtobufProtocol.proto\022\037se.scalablesol" +
"utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" +
"\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010"
"\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010\"\030\n\007C" +
"ounter\022\r\n\005count\030\001 \002(\r"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -407,6 +698,14 @@ public final class ProtobufProtocol {
new java.lang.String[] { "Id", "Name", "Status", },
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.class,
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.Builder.class);
internal_static_se_scalablesolutions_akka_actor_Counter_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_se_scalablesolutions_akka_actor_Counter_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_actor_Counter_descriptor,
new java.lang.String[] { "Count", },
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.class,
se.scalablesolutions.akka.actor.ProtobufProtocol.Counter.Builder.class);
return null;
}
};

View file

@ -6,8 +6,8 @@ package se.scalablesolutions.akka.actor;
/*
Compile with:
cd ./akka-core/src/test/java
protoc ProtobufProtocol.proto --java_out .
cd ./akka-core/src/test/protocol
protoc ProtobufProtocol.proto --java_out ../java
*/
message ProtobufPOJO {
@ -15,3 +15,7 @@ message ProtobufPOJO {
required string name = 2;
required bool status = 3;
}
message Counter {
required uint32 count = 1;
}

View file

@ -0,0 +1,115 @@
package se.scalablesolutions.akka.actor
import Actor._
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import com.google.protobuf.Message
@RunWith(classOf[JUnitRunner])
class SerializableActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
describe("SerializableActor") {
it("should be able to serialize and deserialize a JavaSerializableActor") {
val actor1 = actorOf[JavaSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
val bytes = actor1.toBinary
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 2")
}
it("should be able to serialize and deserialize a ProtobufSerializableActor") {
val actor1 = actorOf[ProtobufSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
(actor1 !! "hello").getOrElse("_") should equal("world 2")
val bytes = actor1.toBinary
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 3")
}
/*
it("should be able to serialize and deserialize a JavaJSONSerializableActor") {
val actor1 = actorOf[JavaJSONSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
(actor1 !! "hello").getOrElse("_") should equal("world 2")
val bytes = actor1.toBinary
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 3")
}
it("should be able to serialize and deserialize a ScalaJSONSerializableActor") {
val actor1 = actorOf[ScalaJSONSerializableTestActor].start
val serializer = actor1.serializer.getOrElse(fail("Serializer not defined"))
(actor1 !! "hello").getOrElse("_") should equal("world 1")
val bytes = actor1.toBinary
val actor2 = ActorRef.fromBinaryToLocalActorRef(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 2")
}
*/
}
}
@serializable class JavaSerializableTestActor extends JavaSerializableActor {
private var count = 0
def receive = {
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}
class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufProtocol.Counter] {
val clazz = classOf[ProtobufProtocol.Counter]
private var count = 0
def toProtobuf = ProtobufProtocol.Counter.newBuilder.setCount(count).build
def fromProtobuf(message: ProtobufProtocol.Counter) = count = message.getCount
def receive = {
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}
class JavaJSONSerializableTestActor extends JavaJSONSerializableActor {
private var count = 0
def receive = {
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}
@scala.reflect.BeanInfo class ScalaJSONSerializableTestActor extends ScalaJSONSerializableActor {
private var count = 0
def receive = {
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}

View file

@ -22,20 +22,20 @@ class SerializerSpec extends JUnitSuite {
@Test
def shouldSerializeString = {
val f = Foo("debasish")
val json = Serializer.ScalaJSON.out(f)
val json = Serializer.ScalaJSON.toBinary(f)
assert(new String(json) == """{"foo":"debasish"}""")
val fo = Serializer.ScalaJSON.in[Foo](new String(json)).asInstanceOf[Foo]
val fo = Serializer.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo]
assert(fo == f)
}
@Test
def shouldSerializeTuple2 = {
val message = MyMessage("id", ("hello", 34))
val json = Serializer.ScalaJSON.out(message)
val json = Serializer.ScalaJSON.toBinary(message)
assert(new String(json) == """{"id":"id","value":{"hello":34}}""")
val f = Serializer.ScalaJSON.in[MyMessage](new String(json)).asInstanceOf[MyMessage]
val f = Serializer.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage]
assert(f == message)
val g = Serializer.ScalaJSON.in[MyMessage](json).asInstanceOf[MyMessage]
val g = Serializer.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage]
assert(f == message)
}
}

View file

@ -104,7 +104,7 @@ class StmSpec extends
}
describe("Transactor") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multipse times in a row") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
import GlobalTransactionVectorTestActor._
try {
val actor = actorOf[NestedTransactorLevelOneActor].start

View file

@ -10,17 +10,24 @@ import javax.ws.rs.ext.{MessageBodyWriter, Provider}
import javax.ws.rs.Produces
/**
* writes Lists of JSON serializable objects
* Writes Lists of JSON serializable objects.
*/
@Provider
@Produces(Array("application/json"))
class ListWriter extends MessageBodyWriter[List[_]] {
def isWriteable(aClass: Class[_], aType: java.lang.reflect.Type, annotations: Array[java.lang.annotation.Annotation], mediaType: MediaType) = {
def isWriteable(aClass: Class[_],
aType: java.lang.reflect.Type,
annotations: Array[java.lang.annotation.Annotation],
mediaType: MediaType) =
classOf[List[_]].isAssignableFrom(aClass) || aClass == ::.getClass
}
def getSize(list: List[_], aClass: Class[_], aType: java.lang.reflect.Type, annotations: Array[java.lang.annotation.Annotation], mediaType: MediaType) = -1L
def getSize(list: List[_],
aClass: Class[_],
aType: java.lang.reflect.Type,
annotations: Array[java.lang.annotation.Annotation],
mediaType: MediaType) =
-1L
def writeTo(list: List[_],
aClass: Class[_],
@ -28,11 +35,7 @@ class ListWriter extends MessageBodyWriter[List[_]] {
annotations: Array[java.lang.annotation.Annotation],
mediaType: MediaType,
stringObjectMultivaluedMap: MultivaluedMap[String, Object],
outputStream: OutputStream) : Unit = {
if (list.isEmpty)
outputStream.write(" ".getBytes)
else
outputStream.write(Serializer.ScalaJSON.out(list))
}
outputStream: OutputStream): Unit =
if (list.isEmpty) outputStream.write(" ".getBytes)
else outputStream.write(Serializer.ScalaJSON.toBinary(list))
}

View file

@ -150,7 +150,7 @@ class CassandraPersistentActorSpec extends JUnitSuite {
}
/*
import org.apache.cassandra.service.CassandraDaemon
object EmbeddedCassandraService {
object // EmbeddedCassandraService {
System.setProperty("storage-config", "src/test/resources");

View file

@ -67,12 +67,11 @@ class SimpleServiceActor extends Transactor {
def receive = {
case "Tick" => if (hasStartedTicking) {
val bytes = storage.get(KEY.getBytes).get
val counter = Integer.parseInt(new String(bytes, "UTF8"))
storage.put(KEY.getBytes, (counter + 1).toString.getBytes )
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY.getBytes, "0".getBytes)
storage.put(KEY, new Integer(0))
hasStartedTicking = true
self.reply(<success>Tick: 0</success>)
}
@ -124,11 +123,15 @@ class PersistentSimpleServiceActor extends Transactor {
def receive = {
case "Tick" => if (hasStartedTicking) {
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
val counter = Integer.parseInt(new String(bytes, "UTF8"))
storage.put(KEY.getBytes, (counter + 1).toString.getBytes )
// val bytes = storage.get(KEY.getBytes).get
// val counter = ByteBuffer.wrap(bytes).getInt
// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY.getBytes, Array(0.toByte))
storage.put(KEY.getBytes, "0".getBytes)
// storage.put(KEY.getBytes, Array(0.toByte))
hasStartedTicking = true
self.reply(<success>Tick: 0</success>)
}

View file

@ -0,0 +1,6 @@
project.name=Akka Plugin
project.organization=se.scalablesolutions.akka
# mirrors akka version
project.version=0.9.1
sbt.version=0.7.4
build.scala.versions=2.7.7

View file

@ -0,0 +1,3 @@
import sbt._
class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info)

View file

@ -0,0 +1,48 @@
import sbt._
object AkkaRepositories {
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
}
trait AkkaBaseProject extends BasicScalaProject {
import AkkaRepositories._
// Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases)
// is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", AkkaRepo)
val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo)
val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo)
val sjsonModuleConfig = ModuleConfiguration("sjson.json", AkkaRepo)
val voldemortModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo)
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
}
trait AkkaProject extends AkkaBaseProject {
val akkaVersion = "0.9.1"
// convenience method
def akkaModule(module: String) = "se.scalablesolutions.akka" %% ("akka-" + module) % akkaVersion
// akka core dependency by default
val akkaCore = akkaModule("core")
}

View file

@ -337,7 +337,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaSampleSecurityProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {
val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1.1" % "compile"
val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.3" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
}
class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) {