based remoting protocol Protobuf + added SBinary, Scala-JSON, Java-JSON and Protobuf serialization traits and protocols for remote and storage usage

This commit is contained in:
jboner 2009-07-23 20:01:37 +02:00
parent a3fac4338f
commit 4878c9fa9b
27 changed files with 1012 additions and 1174 deletions

View file

@ -1434,15 +1434,26 @@
<root url="jar://$MAVEN_REPOSITORY$/com/google/protobuf/protobuf-java/2.1.0/protobuf-java-2.1.0-sources.jar!/" /> <root url="jar://$MAVEN_REPOSITORY$/com/google/protobuf/protobuf-java/2.1.0/protobuf-java-2.1.0-sources.jar!/" />
</SOURCES> </SOURCES>
</library> </library>
<library name="Maven: sbinary:sbinary:0.3-alpha"> <library name="Maven: sbinary:sbinary:0.3">
<CLASSES> <CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3-alpha/sbinary-0.3-alpha.jar!/" /> <root url="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3/sbinary-0.3.jar!/" />
</CLASSES> </CLASSES>
<JAVADOC> <JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3-alpha/sbinary-0.3-alpha-javadoc.jar!/" /> <root url="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3/sbinary-0.3-javadoc.jar!/" />
</JAVADOC> </JAVADOC>
<SOURCES> <SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3-alpha/sbinary-0.3-alpha-sources.jar!/" /> <root url="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3/sbinary-0.3-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: com.twitter:scala-json:1.0">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/com/twitter/scala-json/1.0/scala-json-1.0.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/com/twitter/scala-json/1.0/scala-json-1.0-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/com/twitter/scala-json/1.0/scala-json-1.0-sources.jar!/" />
</SOURCES> </SOURCES>
</library> </library>
</component> </component>

830
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -37,7 +37,6 @@
<remote> <remote>
service = on service = on
protocol = "serialization" # Options: serialization (coming: json, avro, thrift, protobuf)
hostname = "localhost" hostname = "localhost"
port = 9999 port = 9999
connection-timeout = 1000 # in millis connection-timeout = 1000 # in millis
@ -54,7 +53,7 @@
<cassandra> <cassandra>
service = on service = on
storage-format = "serialization" # Options: serialization (coming: json, avro, thrift, protobuf) storage-format = "serialization" # Options: serialization, scala-json, java-json
blocking = false # inserts and queries should be blocking or not blocking = false # inserts and queries should be blocking or not
<thrift-server> <thrift-server>

View file

@ -41,7 +41,8 @@
<orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3-alpha" level="project" /> <orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.twitter:scala-json:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />

View file

@ -1,6 +1,6 @@
package se.scalablesolutions.akka.api; package se.scalablesolutions.akka.api;
public class InMemFailer { public class InMemFailer implements Serializable {
public int fail() { public int fail() {
throw new RuntimeException("expected"); throw new RuntimeException("expected");
} }

View file

@ -1,6 +1,6 @@
package se.scalablesolutions.akka.api; package se.scalablesolutions.akka.api;
public class PersistentFailer { public class PersistentFailer implements Serializable {
public int fail() { public int fail() {
throw new RuntimeException("expected"); throw new RuntimeException("expected");
} }

View file

@ -57,7 +57,8 @@
<orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3-alpha" level="project" /> <orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.twitter:scala-json:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />

View file

@ -81,6 +81,11 @@
<artifactId>sbinary</artifactId> <artifactId>sbinary</artifactId>
<version>0.3</version> <version>0.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scala-json</artifactId>
<version>1.0</version>
</dependency>
<!-- For Cassandra --> <!-- For Cassandra -->
<dependency> <dependency>

View file

@ -53,13 +53,12 @@ object Kernel extends Logging {
if (RUN_REMOTE_SERVICE) startRemoteService if (RUN_REMOTE_SERVICE) startRemoteService
STORAGE_SYSTEM match { STORAGE_SYSTEM match {
case "cassandra" => startCassandra case "cassandra" => startCassandra
case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported") case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported") case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported") case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported") case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
case "tokyo-tyrant" => throw new UnsupportedOperationException("tokyo-tyrart storage backend is not yet supported") case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]")
case "hazelcast" => throw new UnsupportedOperationException("hazelcast storage backend is not yet supported")
} }
if (RUN_REST_SERVICE) startJersey if (RUN_REST_SERVICE) startJersey
@ -115,7 +114,7 @@ object Kernel extends Logging {
remoteServerThread.start remoteServerThread.start
} }
private[akka] def startCassandra = if (kernel.Kernel.config.getBool("akka.storage.cassandra.service", true)) { private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
System.setProperty("cassandra", "") System.setProperty("cassandra", "")
System.setProperty("storage-config", akka.Boot.CONFIG + "/") System.setProperty("storage-config", akka.Boot.CONFIG + "/")
CassandraStorage.start CassandraStorage.start

View file

@ -4,16 +4,15 @@
package se.scalablesolutions.akka.kernel.actor package se.scalablesolutions.akka.kernel.actor
import com.google.protobuf.ByteString
import java.io.File
import java.lang.reflect.{InvocationTargetException, Method} import java.lang.reflect.{InvocationTargetException, Method}
import java.net.InetSocketAddress import java.net.InetSocketAddress
import kernel.config.ScalaConfig._
import kernel.reactor.{MessageDispatcher, FutureResult} import kernel.reactor.{MessageDispatcher, FutureResult}
import kernel.util.{HashCode, Serializer, JavaJSONSerializer}
import kernel.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import kernel.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import kernel.nio.{RemoteClient, RemoteServer, RemoteRequestIdFactory} import kernel.nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
import kernel.config.ScalaConfig._
import kernel.util._
import serialization.Serializer
import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice}
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
@ -255,8 +254,6 @@ sealed class ActorAroundAdvice(val target: Class[_],
val actor: Dispatcher, val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress], val remoteAddress: Option[InetSocketAddress],
val timeout: Long) extends AroundAdvice { val timeout: Long) extends AroundAdvice {
private val serializer: Serializer = JavaJSONSerializer
val id = target.getName val id = target.getName
actor.timeout = timeout actor.timeout = timeout
actor.start actor.start
@ -281,16 +278,14 @@ sealed class ActorAroundAdvice(val target: Class[_],
private def remoteDispatch(joinpoint: JoinPoint): AnyRef = { private def remoteDispatch(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val oneWay = isOneWay(rtti) val oneWay = isOneWay(rtti)
val (message: AnyRef, isEscaped) = escapeArguments(rtti.getParameterValues) val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
val supervisorId = { val supervisorId = {
val id = actor.registerSupervisorAsRemoteActor val id = actor.registerSupervisorAsRemoteActor
if (id.isDefined) id.get if (id.isDefined) id.get
else null else null
} }
val request = RemoteRequest.newBuilder val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId) .setId(RemoteRequestIdFactory.nextId)
.setMessage(ByteString.copyFrom(serializer.out(message)))
.setMessageType(message.getClass.getName)
.setMethod(rtti.getMethod.getName) .setMethod(rtti.getMethod.getName)
.setTarget(target.getName) .setTarget(target.getName)
.setTimeout(timeout) .setTimeout(timeout)
@ -298,8 +293,9 @@ sealed class ActorAroundAdvice(val target: Class[_],
.setIsActor(false) .setIsActor(false)
.setIsOneWay(oneWay) .setIsOneWay(oneWay)
.setIsEscaped(false) .setIsEscaped(false)
.build RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = RemoteClient.clientFor(remoteAddress.get).send(request) val remoteMessage = requestBuilder.build
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage)
if (oneWay) null // for void methods if (oneWay) null // for void methods
else { else {
if (future.isDefined) { if (future.isDefined) {
@ -321,7 +317,7 @@ sealed class ActorAroundAdvice(val target: Class[_],
rtti.getMethod.getReturnType == java.lang.Void.TYPE || rtti.getMethod.getReturnType == java.lang.Void.TYPE ||
rtti.getMethod.isAnnotationPresent(Annotations.oneway) rtti.getMethod.isAnnotationPresent(Annotations.oneway)
private def escapeArguments(args: Array[Object]): Tuple2[Array[Object], Boolean] = { private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false var isEscaped = false
val escapedArgs = for (arg <- args) yield { val escapedArgs = for (arg <- args) yield {
val clazz = arg.getClass val clazz = arg.getClass
@ -451,7 +447,8 @@ private[kernel] class Dispatcher(val callbacks: Option[RestartCallbacks]) extend
if (arg.getClass.getName.contains("$$ProxiedByAWSubclassing$$")) unserializable = true if (arg.getClass.getName.contains("$$ProxiedByAWSubclassing$$")) unserializable = true
} }
if (!unserializable && hasMutableArgument) { if (!unserializable && hasMutableArgument) {
val copyOfArgs = serializer.deepClone(args) // FIXME: can we have another default deep cloner?
val copyOfArgs = Serializer.Java.deepClone(args)
joinpoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs) joinpoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs)
} }
} }

View file

@ -12,12 +12,11 @@ import kernel.reactor._
import kernel.config.ScalaConfig._ import kernel.config.ScalaConfig._
import kernel.stm.TransactionManagement import kernel.stm.TransactionManagement
import kernel.util.Helpers.ReadWriteLock import kernel.util.Helpers.ReadWriteLock
import kernel.util.{Serializer, ScalaJSONSerializer, Logging} import kernel.nio.protobuf.RemoteProtocol.RemoteRequest
import kernel.nio.protobuf._ import kernel.util.Logging
import kernel.nio.{RemoteServer, RemoteClient, RemoteRequestIdFactory} import serialization.{Serializer, Serializable, SerializationProtocol}
import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
import nio.protobuf.RemoteProtocol.RemoteRequest
sealed abstract class LifecycleMessage sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage
@ -32,20 +31,25 @@ object DispatcherType {
case object ThreadBasedDispatcher extends DispatcherType case object ThreadBasedDispatcher extends DispatcherType
} }
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
def invoke(handle: MessageInvocation) = actor.invoke(handle) def invoke(handle: MessageInvocation) = actor.invoke(handle)
} }
def deserialize(array : Array[Byte]) : MediaContent = fromByteArray[MediaContent](array) /**
def serialize(content : MediaContent) : Array[Byte] = toByteArray(content) * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor { object Actor {
val TIMEOUT = kernel.Kernel.config.getInt("akka.actor.timeout", 5000) val TIMEOUT = kernel.Kernel.config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = kernel.Kernel.config.getBool("akka.actor.serialize-messages", false) val SERIALIZE_MESSAGES = kernel.Kernel.config.getBool("akka.actor.serialize-messages", false)
} }
trait Actor extends Logging with TransactionManagement { /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable trait Actor extends Logging with TransactionManagement {
@volatile private[this] var isRunning: Boolean = false @volatile private[this] var isRunning: Boolean = false
private[this] val remoteFlagLock = new ReadWriteLock private[this] val remoteFlagLock = new ReadWriteLock
private[this] val transactionalFlagLock = new ReadWriteLock private[this] val transactionalFlagLock = new ReadWriteLock
@ -61,8 +65,6 @@ trait Actor extends Logging with TransactionManagement {
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor] protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
protected[this] val serializer: Serializer = ScalaJSONSerializer
// ==================================== // ====================================
// ==== USER CALLBACKS TO OVERRIDE ==== // ==== USER CALLBACKS TO OVERRIDE ====
// ==================================== // ====================================
@ -403,8 +405,6 @@ trait Actor extends Logging with TransactionManagement {
if (remoteAddress.isDefined) { if (remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId) .setId(RemoteRequestIdFactory.nextId)
.setMessage(ByteString.copyFrom(serializer.out(message)))
.setMessageType(message.getClass.getName)
.setTarget(this.getClass.getName) .setTarget(this.getClass.getName)
.setTimeout(timeout) .setTimeout(timeout)
.setIsActor(true) .setIsActor(true)
@ -412,6 +412,7 @@ trait Actor extends Logging with TransactionManagement {
.setIsEscaped(false) .setIsEscaped(false)
val id = registerSupervisorAsRemoteActor val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build) RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
} else { } else {
val handle = new MessageInvocation(this, message, None, TransactionManagement.threadBoundTx.get) val handle = new MessageInvocation(this, message, None, TransactionManagement.threadBoundTx.get)
@ -424,13 +425,12 @@ trait Actor extends Logging with TransactionManagement {
if (remoteAddress.isDefined) { if (remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId) .setId(RemoteRequestIdFactory.nextId)
.setMessage(ByteString.copyFrom(serializer.out(message)))
.setMessageType(message.getClass.getName)
.setTarget(this.getClass.getName) .setTarget(this.getClass.getName)
.setTimeout(timeout) .setTimeout(timeout)
.setIsActor(true) .setIsActor(true)
.setIsOneWay(false) .setIsOneWay(false)
.setIsEscaped(false) .setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build) val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
@ -455,7 +455,7 @@ trait Actor extends Logging with TransactionManagement {
private def dispatch[T](messageHandle: MessageInvocation) = { private def dispatch[T](messageHandle: MessageInvocation) = {
if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx) if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
val message = messageHandle.message//serializeMessage(messageHandle.message) val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future val future = messageHandle.future
try { try {
senderFuture = future senderFuture = future
@ -474,7 +474,7 @@ trait Actor extends Logging with TransactionManagement {
private def transactionalDispatch[T](messageHandle: MessageInvocation) = { private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx) if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
val message = messageHandle.message//serializeMessage(messageHandle.message) val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future val future = messageHandle.future
try { try {
if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision
@ -607,7 +607,7 @@ trait Actor extends Logging with TransactionManagement {
!message.isInstanceOf[scala.collection.immutable.Set[_]] && !message.isInstanceOf[scala.collection.immutable.Set[_]] &&
!message.isInstanceOf[scala.collection.immutable.Tree[_,_]] && !message.isInstanceOf[scala.collection.immutable.Tree[_,_]] &&
!message.getClass.isAnnotationPresent(Annotations.immutable)) { !message.getClass.isAnnotationPresent(Annotations.immutable)) {
serializer.deepClone(message) Serializer.Java.deepClone(message)
} else message } else message
} else message } else message

View file

@ -7,10 +7,11 @@ package se.scalablesolutions.akka.kernel.nio
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap}
import kernel.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import kernel.actor.{Exit, Actor} import kernel.actor.{Exit, Actor}
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
import kernel.util.{Serializer, ScalaJSONSerializer, JavaJSONSerializer, Logging} import serialization.{Serializer, Serializable, SerializationProtocol}
import kernel.util.Logging
import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._ import org.jboss.netty.channel._
@ -18,9 +19,11 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import protobuf.RemoteProtocol
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClient extends Logging { object RemoteClient extends Logging {
private val clients = new HashMap[String, RemoteClient] private val clients = new HashMap[String, RemoteClient]
def clientFor(address: InetSocketAddress): RemoteClient = synchronized { def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
@ -37,6 +40,9 @@ object RemoteClient extends Logging {
} }
} }
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClient(hostname: String, port: Int) extends Logging { class RemoteClient(hostname: String, port: Int) extends Logging {
@volatile private var isRunning = false @volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
@ -107,7 +113,7 @@ class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFuture
def getPipeline: ChannelPipeline = { def getPipeline: ChannelPipeline = {
val p = Channels.pipeline() val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteReply.getDefaultInstance)); p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance));
p.addLast("frameEncoder", new LengthFieldPrepender(4)); p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("protobufEncoder", new ProtobufEncoder()); p.addLast("protobufEncoder", new ProtobufEncoder());
p.addLast("handler", new RemoteClientHandler(futures, supervisors)) p.addLast("handler", new RemoteClientHandler(futures, supervisors))
@ -135,12 +141,7 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
log.debug("Received RemoteReply[\n%s]", reply.toString) log.debug("Received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId) val future = futures.get(reply.getId)
if (reply.getIsSuccessful) { if (reply.getIsSuccessful) {
val messageBytes = reply.getMessage.toByteArray val message = RemoteProtocolBuilder.getMessage(reply)
val messageType = reply.getMessageType
val messageClass = Class.forName(messageType)
val message =
if (reply.isActor) ScalaJSONSerializer.in(messageBytes, Some(messageClass))
else JavaJSONSerializer.in(messageBytes, Some(messageClass))
future.completeWithResult(message) future.completeWithResult(message)
} else { } else {
if (reply.hasSupervisorUuid) { if (reply.hasSupervisorUuid) {
@ -148,15 +149,9 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid) val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, new RuntimeException(reply.getException)) else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply))
} }
val exception = reply.getException future.completeWithException(null, parseException(reply))
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
val exceptionInstance = exceptionType
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exceptionMessage).asInstanceOf[Throwable]
future.completeWithException(null, exceptionInstance)
} }
futures.remove(reply.getId) futures.remove(reply.getId)
} else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result) } else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result)
@ -172,4 +167,13 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
event.getCause.printStackTrace event.getCause.printStackTrace
event.getChannel.close event.getChannel.close
} }
private def parseException(reply: RemoteReply) = {
val exception = reply.getException
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
exceptionType
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exceptionMessage).asInstanceOf[Throwable]
}
} }

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.nio
import akka.serialization.Serializable.SBinary
import com.google.protobuf.{Message, ByteString}
import serialization.{Serializer, Serializable, SerializationProtocol}
import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
object RemoteProtocolBuilder {
def getMessage(request: RemoteRequest): AnyRef = {
request.getProtocol match {
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.ScalaJSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.JAVA_JSON =>
val manifest = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.JavaJSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val manifest = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Message]
Serializer.Protobuf.in(request.getMessage.toByteArray, manifest)
case SerializationProtocol.JAVA =>
Serializer.Java.in(request.getMessage.toByteArray, None)
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
}
def getMessage(reply: RemoteReply): AnyRef = {
reply.getProtocol match {
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary]
renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.ScalaJSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.JAVA_JSON =>
val manifest = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
Serializer.JavaJSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val manifest = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Message]
Serializer.Protobuf.in(reply.getMessage.toByteArray, manifest)
case SerializationProtocol.JAVA =>
Serializer.Java.in(reply.getMessage.toByteArray, None)
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
}
def setMessage(message: AnyRef, builder: RemoteRequest.Builder) = {
if (message.isInstanceOf[Serializable.SBinary]) {
val serializable = message.asInstanceOf[Serializable.SBinary]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.Protobuf]) {
val serializable = message.asInstanceOf[Serializable.Protobuf]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(Serializer.Java.out(serializable.getSchema)))
} else if (message.isInstanceOf[Serializable.ScalaJSON[_]]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON[_]]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.body.asInstanceOf[AnyRef].getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON[_]]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON[_]]
builder.setProtocol(SerializationProtocol.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.body.asInstanceOf[AnyRef].getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message)))
}
}
def setMessage(message: AnyRef, builder: RemoteReply.Builder) = {
if (message.isInstanceOf[Serializable.SBinary]) {
val serializable = message.asInstanceOf[Serializable.SBinary]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.Protobuf]) {
val serializable = message.asInstanceOf[Serializable.Protobuf]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(Serializer.Java.out(serializable.getSchema)))
} else if (message.isInstanceOf[Serializable.ScalaJSON[_]]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON[_]]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.body.asInstanceOf[AnyRef].getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON[_]]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON[_]]
builder.setProtocol(SerializationProtocol.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.body.asInstanceOf[AnyRef].getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message)))
}
}
}

View file

@ -9,9 +9,10 @@ import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.concurrent.{ConcurrentHashMap, Executors}
import kernel.actor._ import kernel.actor._
import kernel.util.{Serializer, ScalaJSONSerializer, JavaJSONSerializer, Logging} import kernel.util._
import protobuf.RemoteProtocol import protobuf.RemoteProtocol
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import serialization.{Serializer, Serializable, SerializationProtocol}
import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._ import org.jboss.netty.channel._
@ -19,12 +20,16 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import com.google.protobuf.ByteString /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServer extends Logging { class RemoteServer extends Logging {
def start = RemoteServer.start def start = RemoteServer.start
} }
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer extends Logging { object RemoteServer extends Logging {
val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost") val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost")
val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999) val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999)
@ -104,21 +109,18 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
log.debug("Dispatching to remote actor [%s]", request.getTarget) log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getTimeout) val actor = createActor(request.getTarget, request.getTimeout)
actor.start actor.start
val messageClass = Class.forName(request.getMessageType) val message = RemoteProtocolBuilder.getMessage(request)
val message = ScalaJSONSerializer.in(request.getMessage.toByteArray, Some(messageClass))
if (request.getIsOneWay) actor ! message if (request.getIsOneWay) actor ! message
else { else {
try { try {
val resultOrNone = actor !! message val resultOrNone = actor !! message
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result) log.debug("Returning result from actor invocation [%s]", result)
val replyMessage = ScalaJSONSerializer.out(result)
val replyBuilder = RemoteReply.newBuilder val replyBuilder = RemoteReply.newBuilder
.setId(request.getId) .setId(request.getId)
.setMessage(ByteString.copyFrom(replyMessage))
.setMessageType(result.getClass.getName)
.setIsSuccessful(true) .setIsSuccessful(true)
.setIsActor(true) .setIsActor(true)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build) channel.write(replyBuilder.build)
} catch { } catch {
@ -140,7 +142,7 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget) log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
val activeObject = createActiveObject(request.getTarget, request.getTimeout) val activeObject = createActiveObject(request.getTarget, request.getTimeout)
val args: scala.List[AnyRef] = JavaJSONSerializer.in(request.getMessage.toByteArray, Some(classOf[scala.List[AnyRef]])) val args = RemoteProtocolBuilder.getMessage(request).asInstanceOf[scala.List[AnyRef]]
val argClasses = args.map(_.getClass) val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout) val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
@ -151,13 +153,11 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
else { else {
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*) val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result) log.debug("Returning result from remote active object invocation [%s]", result)
val replyMessage = JavaJSONSerializer.out(result)
val replyBuilder = RemoteReply.newBuilder val replyBuilder = RemoteReply.newBuilder
.setId(request.getId) .setId(request.getId)
.setMessage(ByteString.copyFrom(replyMessage))
.setMessageType(result.getClass.getName)
.setIsSuccessful(true) .setIsSuccessful(true)
.setIsActor(false) .setIsActor(false)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build) channel.write(replyBuilder.build)
} }

View file

@ -4,28 +4,51 @@
package se.scalablesolutions.akka.serialization package se.scalablesolutions.akka.serialization
import org.codehaus.jackson.map.ObjectMapper
import com.google.protobuf.Message
import com.twitter.commons.Json import com.twitter.commons.Json
import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream}
import reflect.Manifest import reflect.Manifest
import sbinary.DefaultProtocol import sbinary.DefaultProtocol
import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream}
object SerializationProtocol {
val SBINARY = 1
val SCALA_JSON = 2
val JAVA_JSON = 3
val PROTOBUF = 4
val JAVA = 5
val AVRO = 6
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Serializable {
def toBytes: Array[Byte]
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Serializable { object Serializable {
trait Protobuf {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JSON[T] extends Serializable {
def body: T
def toJSON: String
} }
trait SBinary[T] extends DefaultProtocol { /**
def toBytes: Array[Byte] = toByteArray(this) * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
def getManifest: Manifest[T] = Manifest.singleType(this.asInstanceOf[T]) */
} trait JavaJSON[T] extends JSON[T]{
private val mapper = new ObjectMapper
trait JavaJSON {
private val mapper = new org.codehaus.jackson.map.ObjectMapper
def toJSON: String = { def toJSON: String = {
val out = new StringWriter val out = new StringWriter
mapper.writeValue(out, obj) mapper.writeValue(out, body)
out.close out.close
out.toString out.toString
} }
@ -33,19 +56,38 @@ object Serializable {
def toBytes: Array[Byte] = { def toBytes: Array[Byte] = {
val bos = new ByteArrayOutputStream val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos) val out = new ObjectOutputStream(bos)
mapper.writeValue(out, obj) mapper.writeValue(out, body)
out.close out.close
bos.toByteArray bos.toByteArray
} }
} }
trait ScalaJSON { /**
def toJSON: String = { * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
Json.build(obj).toString.getBytes("UTF-8") */
} trait ScalaJSON[T] extends JSON[T] {
def toJSON: String = Json.build(body).toString
def toBytes: Array[Byte] = Json.build(body).toString.getBytes
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Protobuf extends Serializable {
def toBytes: Array[Byte]
def getSchema: Message
}
def toBytes: Array[Byte] = { /**
Json.build(obj).toString.getBytes("UTF-8") * <pre>
} * import sbinary.DefaultProtocol._
* def fromBytes(bytes: Array[Byte]) = fromByteArray[String](bytes)
* def toBytes: Array[Byte] = toByteArray(body)
* </pre>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait SBinary extends Serializable {
def fromBytes(bytes: Array[Byte])
def toBytes: Array[Byte]
} }
} }

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.serialization
import com.google.protobuf.Message
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
import reflect.Manifest
import sbinary.DefaultProtocol
import org.codehaus.jackson.map.ObjectMapper
import com.twitter.commons.Json
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Serializer {
def deepClone[T <: AnyRef](obj: T): T
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Serializer {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Java extends Serializer {
def deepClone[T <: AnyRef](obj: T): T = in(out(obj), None).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(obj)
out.close
bos.toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
in.close
obj
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Protobuf {
//def deepClone[T <: AnyRef](obj: T): T = in(out(obj), None).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = {
throw new UnsupportedOperationException
}
def in(bytes: Array[Byte], schema: Message): AnyRef = {
throw new UnsupportedOperationException
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object JavaJSON extends Serializer {
private val mapper = new ObjectMapper
def deepClone[T <: AnyRef](obj: T): T = in(out(obj), Some(obj.getClass)).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
mapper.writeValue(out, obj)
out.close
bos.toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided")
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef]
in.close
obj
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ScalaJSON extends Serializer {
def deepClone[T <: AnyRef](obj: T): T = in(out(obj), None).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = Json.build(obj).toString.getBytes("UTF-8")
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = Json.parse(new String(bytes, "UTF-8")).asInstanceOf[AnyRef]
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object SBinary {
import sbinary.DefaultProtocol._
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None)
def out[T](t : T)(implicit bin : Writes[T]): Array[Byte] = toByteArray[T](t)
def in[T](array : Array[Byte], clazz: Option[Class[T]])(implicit bin : Reads[T]): T = fromByteArray[T](array)
}
}

View file

@ -5,7 +5,9 @@
package se.scalablesolutions.akka.kernel.state package se.scalablesolutions.akka.kernel.state
import java.io.File import java.io.File
import kernel.util.{Serializer, JavaSerializationSerializer, Logging}
import kernel.util.Logging
import serialization.{Serializer, Serializable, SerializationProtocol}
import org.apache.cassandra.config.DatabaseDescriptor import org.apache.cassandra.config.DatabaseDescriptor
import org.apache.cassandra.service._ import org.apache.cassandra.service._
@ -23,7 +25,7 @@ import org.apache.thrift.TProcessorFactory
* <p/> * <p/>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
final object CassandraStorage extends Logging { object CassandraStorage extends Logging {
val TABLE_NAME = "akka" val TABLE_NAME = "akka"
val MAP_COLUMN_FAMILY = "map" val MAP_COLUMN_FAMILY = "map"
val VECTOR_COLUMN_FAMILY = "vector" val VECTOR_COLUMN_FAMILY = "vector"
@ -38,12 +40,13 @@ final object CassandraStorage extends Logging {
@volatile private[this] var isRunning = false @volatile private[this] var isRunning = false
private[this] val serializer: Serializer = { private[this] val serializer: Serializer = {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "serialization") match { kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "serialization" => JavaSerializationSerializer case "scala-json" => Serializer.ScalaJSON
case "json" => throw new UnsupportedOperationException("json storage protocol is not yet supported") case "java-json" => Serializer.JavaJSON
case "avro" => throw new UnsupportedOperationException("avro storage protocol is not yet supported") //case "sbinary" => Serializer.SBinary
case "thrift" => throw new UnsupportedOperationException("thrift storage protocol is not yet supported") case "java" => Serializer.Java
case "protobuf" => throw new UnsupportedOperationException("protobuf storage protocol is not yet supported") case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported")
case unknown => throw new UnsupportedOperationException("unknwon storage protocol [" + unknown + "]")
} }
} }

View file

@ -4,6 +4,8 @@
package se.scalablesolutions.akka.kernel.util package se.scalablesolutions.akka.kernel.util
import java.io.UnsupportedEncodingException
import java.security.{NoSuchAlgorithmException, MessageDigest}
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.actors._ import scala.actors._
@ -18,6 +20,20 @@ class SystemFailure(cause: Throwable) extends RuntimeException(cause)
*/ */
object Helpers extends Logging { object Helpers extends Logging {
def getDigestFor(s: String) = {
val digest = MessageDigest.getInstance("MD5")
digest.update(s.getBytes("ASCII"))
val bytes = digest.digest
val sb = new StringBuilder
val hex = "0123456789ABCDEF"
bytes.foreach(b => {
val n = b.asInstanceOf[Int]
sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF))
})
sb.toString
}
// ================================================ // ================================================
@serializable @serializable
class ReadWriteLock { class ReadWriteLock {

View file

@ -1,95 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.util
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
import reflect.Manifest
import sbinary.DefaultProtocol
import org.codehaus.jackson.map.ObjectMapper
import com.twitter.commons.Json
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Serializer {
def deepClone[T <: AnyRef](obj: T): T
def out(obj: AnyRef): Array[Byte]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object JavaSerializationSerializer extends Serializer {
def deepClone[T <: AnyRef](obj: T): T = in(out(obj), None).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(obj)
out.close
bos.toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
in.close
obj
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object JavaJSONSerializer extends Serializer {
private val json = new org.codehaus.jackson.map.ObjectMapper
def deepClone[T <: AnyRef](obj: T): T = in(out(obj), Some(obj.getClass)).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
json.writeValue(out, obj)
out.close
bos.toByteArray
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided")
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = json.readValue(in, clazz.get).asInstanceOf[AnyRef]
in.close
obj
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ScalaJSONSerializer extends Serializer {
def deepClone[T <: AnyRef](obj: T): T = in(out(obj), None).asInstanceOf[T]
def out(obj: AnyRef): Array[Byte] = {
Json.build(obj).toString.getBytes("UTF-8")
}
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
Json.parse(new String(bytes, "UTF-8")).asInstanceOf[AnyRef]
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object SBinarySerializer extends SBinarySerializer
trait SBinarySerializer extends DefaultProtocol {
def in[T](t : T)(implicit bin : Writes[T], m: Manifest[T]): Pair[Array[Byte], Manifest[T]] =
Pair(toByteArray(t), m)
def out[T](array : Array[Byte], m: Manifest[T])(implicit bin : Reads[T]) =
read[T](new ByteArrayInputStream(array))
}

View file

@ -76,6 +76,7 @@ class InMemStatefulActor extends Actor {
} }
} }
@serializable
class InMemFailerActor extends Actor { class InMemFailerActor extends Actor {
makeTransactionRequired makeTransactionRequired
def receive: PartialFunction[Any, Unit] = { def receive: PartialFunction[Any, Unit] = {

View file

@ -5,7 +5,6 @@ import java.util.concurrent.TimeUnit
import junit.framework.TestCase import junit.framework.TestCase
import kernel.Kernel import kernel.Kernel
import kernel.reactor._ import kernel.reactor._
import kernel.state.{CassandraStorageConfig, TransactionalState} import kernel.state.{CassandraStorageConfig, TransactionalState}
@ -49,7 +48,7 @@ class PersistentActor extends Actor {
} }
} }
class PersistentFailerActor extends Actor { @serializable class PersistentFailerActor extends Actor {
makeTransactionRequired makeTransactionRequired
def receive: PartialFunction[Any, Unit] = { def receive: PartialFunction[Any, Unit] = {
case "Failure" => case "Failure" =>
@ -61,7 +60,7 @@ object PersistenceManager {
@volatile var isRunning = false @volatile var isRunning = false
def init = if (!isRunning) { def init = if (!isRunning) {
System.setProperty("storage-config", "config") System.setProperty("storage-config", "config")
Kernel.startCassandra Kernel.boot
isRunning = true isRunning = true
} }
} }

View file

@ -591,7 +591,8 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
} }
} }
class RemotePingPong1Actor extends Actor {
@serializable class RemotePingPong1Actor extends Actor {
override def receive: PartialFunction[Any, Unit] = { override def receive: PartialFunction[Any, Unit] = {
case Ping => case Ping =>
Log.messageLog += "ping" Log.messageLog += "ping"
@ -608,7 +609,7 @@ class RemotePingPong1Actor extends Actor {
} }
} }
class RemotePingPong2Actor extends Actor { @serializable class RemotePingPong2Actor extends Actor {
override def receive: PartialFunction[Any, Unit] = { override def receive: PartialFunction[Any, Unit] = {
case Ping => case Ping =>
Log.messageLog += "ping" Log.messageLog += "ping"
@ -621,7 +622,7 @@ class RemotePingPong2Actor extends Actor {
} }
} }
class RemotePingPong3Actor extends Actor { @serializable class RemotePingPong3Actor extends Actor {
override def receive: PartialFunction[Any, Unit] = { override def receive: PartialFunction[Any, Unit] = {
case Ping => case Ping =>
Log.messageLog += "ping" Log.messageLog += "ping"

Binary file not shown.

View file

@ -41,7 +41,8 @@
<orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3-alpha" level="project" /> <orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.twitter:scala-json:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />

View file

@ -1,5 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> <module relativePaths="true" MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="FacetManager">
<facet type="Scala" name="Scala">
<configuration />
</facet>
</component>
<component name="NewModuleRootManager" inherit-compiler-output="false"> <component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" /> <output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" /> <output-test url="file://$MODULE_DIR$/target/test-classes" />
@ -41,7 +46,8 @@
<orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache:zookeeper:3.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-core-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3-alpha" level="project" /> <orderEntry type="library" exported="" name="Maven: sbinary:sbinary:0.3" level="project" />
<orderEntry type="library" exported="" name="Maven: com.twitter:scala-json:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" /> <orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" /> <orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />

View file

@ -12,45 +12,27 @@ package se.scalablesolutions.akka.kernel.nio.protobuf;
option optimize_for = SPEED; option optimize_for = SPEED;
/*
val message: Array[Byte],
val method: String,
val target: String,
val timeout: Long,
val supervisorUuid: String
val isOneWay: Boolean,
val isActor: Boolean,
val isEscaped: Boolean,
val id: Long,
val messageType: String,
*/
message RemoteRequest { message RemoteRequest {
required bytes message = 1; required uint64 id = 1;
optional string method = 2; required uint32 protocol = 2;
required string target = 3; required bytes message = 3;
required uint64 timeout = 4; optional bytes messageManifest = 4;
optional string supervisorUuid = 5; optional string method = 5;
required bool isActor = 6; required string target = 6;
required bool isOneWay = 7; required uint64 timeout = 7;
required bool isEscaped = 8; optional string supervisorUuid = 8;
required uint64 id = 9; required bool isActor = 9;
required string messageType = 10; required bool isOneWay = 10;
required bool isEscaped = 11;
} }
/*
val isSuccessful: Boolean,
val id: Long,
val message: Array[Byte],
val exception: Throwable,
val supervisorUuid: String
val isActor: Boolean,
*/
message RemoteReply { message RemoteReply {
required bool isSuccessful = 1; required uint64 id = 1;
required uint64 id = 2; optional uint32 protocol = 2;
optional bytes message = 3; optional bytes message = 3;
optional string exception = 4; optional bytes messageManifest = 4;
optional string supervisorUuid = 5; optional string exception = 5;
optional string messageType = 6; optional string supervisorUuid = 6;
required bool isActor = 7; required bool isActor = 7;
required bool isSuccessful = 8;
} }