From 39a4f72ad1f10429a993521aac6843cda9256579 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 2 May 2010 21:25:34 +0200 Subject: [PATCH 1/7] Added start utility method --- akka-core/src/main/scala/actor/Actor.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 7968bd2ce1..f726721384 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -206,6 +206,19 @@ object Actor extends Logging { } } } + + /** Starts the specified actor and returns it, useful for: + *
val actor = new FooActor
+   *  actor.start
+   *  //Gets replaced by
+   *  val actor = start(new FooActor)
+   *  
+ */ + def start[T <: Actor](actor : T) : T = { + actor.start + actor + } + } /** From b93a893722804b2df28ec13bd85f57f33b6dd51c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 3 May 2010 19:32:40 +0200 Subject: [PATCH 2/7] Split up Patterns.scala in different files --- .../src/main/scala/routing/Iterators.scala | 27 ++++++++ .../src/main/scala/routing/Listeners.scala | 25 +++++++ .../src/main/scala/routing/Patterns.scala | 69 ++----------------- .../src/main/scala/routing/Routers.scala | 28 ++++++++ 4 files changed, 84 insertions(+), 65 deletions(-) create mode 100644 akka-core/src/main/scala/routing/Iterators.scala create mode 100644 akka-core/src/main/scala/routing/Listeners.scala create mode 100644 akka-core/src/main/scala/routing/Routers.scala diff --git a/akka-core/src/main/scala/routing/Iterators.scala b/akka-core/src/main/scala/routing/Iterators.scala new file mode 100644 index 0000000000..7d06bd74ee --- /dev/null +++ b/akka-core/src/main/scala/routing/Iterators.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.Actor + +trait InfiniteIterator[T] extends Iterator[T] + +class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { + @volatile private[this] var current: List[T] = items + + def hasNext = items != Nil + + def next = { + val nc = if (current == Nil) items else current + current = nc.tail + nc.head + } +} + +class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] { + def hasNext = items != Nil + + def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) +} \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala new file mode 100644 index 0000000000..495aab9ee3 --- /dev/null +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.Actor + +sealed trait ListenerMessage +case class Listen(listener : Actor) extends ListenerMessage +case class Deafen(listener : Actor) extends ListenerMessage +case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage + +trait Listeners { self : Actor => + import se.scalablesolutions.akka.actor.Agent + private lazy val listeners = Agent(Set[Actor]()) + + protected def listenerManagement : PartialFunction[Any,Unit] = { + case Listen(l) => listeners( _ + l) + case Deafen(l) => listeners( _ - l ) + case WithListeners(f) => listeners foreach f + } + + protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) ) +} \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala index d8a49c74e3..a27c69b846 100644 --- a/akka-core/src/main/scala/routing/Patterns.scala +++ b/akka-core/src/main/scala/routing/Patterns.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.Actor @@ -21,7 +25,6 @@ object Patterns { def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) - //FIXME 2.8, use default params with CyclicIterator def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer { val seq = actors } @@ -38,68 +41,4 @@ object Patterns { def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor({case _ => actorToLog}, logger) -} - -trait Dispatcher { self: Actor => - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, Actor] - - protected def dispatch: PartialFunction[Any, Unit] = { - case a if routes.isDefinedAt(a) => - if (self.replyTo.isDefined) routes(a) forward transform(a) - else routes(a) ! transform(a) - } - - def receive = dispatch -} - -trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[Actor] - - protected def routes = { case x if seq.hasNext => seq.next } -} - -trait InfiniteIterator[T] extends Iterator[T] - -class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { - @volatile private[this] var current: List[T] = items - - def hasNext = items != Nil - - def next = { - val nc = if (current == Nil) items else current - current = nc.tail - nc.head - } -} - -class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] { - def hasNext = items != Nil - - def next = { - def actorWithSmallestMailbox(a1: Actor, a2: Actor) = { - if (a1.mailboxSize < a2.mailboxSize) a1 else a2 - } - items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2)) - } -} - -sealed trait ListenerMessage -case class Listen(listener : Actor) extends ListenerMessage -case class Deafen(listener : Actor) extends ListenerMessage -case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage - -trait Listeners { self : Actor => - import se.scalablesolutions.akka.actor.Agent - private lazy val listeners = Agent(Set[Actor]()) - - protected def listenerManagement : PartialFunction[Any,Unit] = { - case Listen(l) => listeners( _ + l) - case Deafen(l) => listeners( _ - l ) - case WithListeners(f) => listeners foreach f - } - - protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) ) } \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala new file mode 100644 index 0000000000..3749f94437 --- /dev/null +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.Actor + +trait Dispatcher { self: Actor => + + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, Actor] + + protected def dispatch: PartialFunction[Any, Unit] = { + case a if routes.isDefinedAt(a) => + if (self.replyTo.isDefined) routes(a) forward transform(a) + else routes(a) ! transform(a) + } + + def receive = dispatch +} + +trait LoadBalancer extends Dispatcher { self: Actor => + protected def seq: InfiniteIterator[Actor] + + protected def routes = { case x if seq.hasNext => seq.next } +} \ No newline at end of file From 29c20bf62e695b63b61e741710251656be8dc309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 4 May 2010 11:00:09 +0200 Subject: [PATCH 3/7] minor edits --- .../src/main/scala/CassandraStorageBackend.scala | 9 ++++++++- .../src/main/scala/RedisStorageBackend.scala | 13 ++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala index fb57761037..f9273f05dd 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala @@ -117,7 +117,14 @@ private[akka] object CassandraStorageBackend extends else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") } - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + /** + * if start and finish both are defined, ignore count and + * report the range [start, finish) + * if start is not defined, assume start = 0 + * if start == 0 and finish == 0, return an empty collection + */ + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): + List[Array[Byte]] = { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val columns: List[ColumnOrSuperColumn] = sessions.withSession { diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 8e2adaa5c3..b1973c3c7b 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -226,18 +226,17 @@ private [akka] object RedisStorageBackend extends } } + /** + * if start and finish both are defined, ignore count and + * report the range [start, finish) + * if start is not defined, assume start = 0 + * if start == 0 and finish == 0, return an empty collection + */ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling { - /** - * if start and finish both are defined, ignore count and - * report the range [start, finish) - * if start is not defined, assume start = 0 - * if start == 0 and finish == 0, return an empty collection - */ val s = if (start.isDefined) start.get else 0 val cnt = if (finish.isDefined) { val f = finish.get - // if (f >= s) Math.min(count, (f - s)) else count if (f >= s) (f - s) else count } else count From 3f38822afba35d23d60b05cb2192c58c99a2e095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 4 May 2010 22:15:33 +0200 Subject: [PATCH 4/7] Changed suffix on source JAR from -src to -sources --- project/build/AkkaProject.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5623f110bb..388af31334 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -24,7 +24,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val distPath = info.projectPath / "dist" override def compileOptions = super.compileOptions ++ - Seq("-deprecation", "-Xmigration", "-Xcheckinit", "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8").map(x => CompileOption(x)) + Seq("-deprecation", "-Xmigration", "-Xcheckinit", + "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8") + .map(x => CompileOption(x)) override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList @@ -110,14 +112,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { //override def defaultPublishRepository = Some(Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile)) val publishTo = Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile) - val sourceArtifact = Artifact(artifactID, "src", "jar", Some("src"), Nil, None) - val docsArtifact = Artifact(artifactID, "docs", "jar", Some("doc"), Nil, None) + val sourceArtifact = Artifact(artifactID, "source", "jar", Some("source"), Nil, None) + val docsArtifact = Artifact(artifactID, "docs", "jar", Some("docs"), Nil, None) // Credentials(Path.userHome / ".akka_publish_credentials", log) //override def documentOptions = encodingUtf8.map(SimpleDocOption(_)) - override def packageDocsJar = defaultJarPath("-doc.jar") - override def packageSrcJar= defaultJarPath("-src.jar") + override def packageDocsJar = defaultJarPath("-docs.jar") + override def packageSrcJar= defaultJarPath("-source.jar") override def packageToPublishActions = super.packageToPublishActions ++ Seq(packageDocs, packageSrc) override def pomExtra = @@ -361,8 +363,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { def deployPath: Path lazy val dist = distAction - def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn(`package`, packageDocs, packageSrc) describedAs("Deploying") - def deployTask(jar: Path, docs: Path, src: Path, toDir: Path, genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task { + def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn( + `package`, packageDocs, packageSrc) describedAs("Deploying") + def deployTask(jar: Path, docs: Path, src: Path, toDir: Path, + genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task { gen(jar, toDir, genJar, "Deploying bits") orElse gen(docs, toDir, genDocs, "Deploying docs") orElse gen(src, toDir, genSource, "Deploying sources") From f168a360907078602a6154d64e957d8861e2f798 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 5 May 2010 12:13:25 +0200 Subject: [PATCH 5/7] Fixed remote actor protobuf message serialization problem + added tests --- .../scala/remote/RemoteProtocolBuilder.scala | 35 +- .../scala/serialization/Serializable.scala | 4 +- .../src/test/java/ProtobufProtocol.proto | 17 + .../akka/actor/ProtobufProtocol.java | 402 ++++++++++++++++++ ...rotobufActorMessageSerializationSpec.scala | 72 ++++ 5 files changed, 517 insertions(+), 13 deletions(-) create mode 100644 akka-core/src/test/java/ProtobufProtocol.proto create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java create mode 100644 akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index 65558dd997..cae7f151b4 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -17,7 +17,6 @@ object RemoteProtocolBuilder { private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf - def setClassLoader(cl: ClassLoader) = { SERIALIZER_JAVA.classLoader = Some(cl) SERIALIZER_JAVA_JSON.classLoader = Some(cl) @@ -26,6 +25,8 @@ object RemoteProtocolBuilder { def getMessage(request: RemoteRequest): Any = { request.getProtocol match { + case SerializationProtocol.JAVA => + unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(request.getMessage.toByteArray) @@ -36,17 +37,19 @@ object RemoteProtocolBuilder { val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest))) case SerializationProtocol.PROTOBUF => + val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] + val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]] + protobufMessage.fromBytes(request.getMessage.toByteArray) + case SerializationProtocol.PROTOBUF_RAW => val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass)) - case SerializationProtocol.JAVA => - unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) - case SerializationProtocol.AVRO => - throw new UnsupportedOperationException("Avro protocol is not yet supported") } } def getMessage(reply: RemoteReply): Any = { reply.getProtocol match { + case SerializationProtocol.JAVA => + unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(reply.getMessage.toByteArray) @@ -57,12 +60,12 @@ object RemoteProtocolBuilder { val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String] SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest))) case SerializationProtocol.PROTOBUF => + val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] + val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]] + protobufMessage.fromBytes(reply.getMessage.toByteArray) + case SerializationProtocol.PROTOBUF_RAW => val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass)) - case SerializationProtocol.JAVA => - unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) - case SerializationProtocol.AVRO => - throw new UnsupportedOperationException("Avro protocol is not yet supported") } } @@ -72,9 +75,14 @@ object RemoteProtocolBuilder { builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) + } else if (message.isInstanceOf[Serializable.Protobuf[_]]) { + val serializable = message.asInstanceOf[Serializable.Protobuf[_]] + builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray)) + builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Message]) { val serializable = message.asInstanceOf[Message] - builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setProtocol(SerializationProtocol.PROTOBUF_RAW) builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Serializable.ScalaJSON]) { @@ -100,9 +108,14 @@ object RemoteProtocolBuilder { builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) + } else if (message.isInstanceOf[Serializable.Protobuf[_]]) { + val serializable = message.asInstanceOf[Serializable.Protobuf[_]] + builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray)) + builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Message]) { val serializable = message.asInstanceOf[Message] - builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setProtocol(SerializationProtocol.PROTOBUF_RAW) builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Serializable.ScalaJSON]) { diff --git a/akka-core/src/main/scala/serialization/Serializable.scala b/akka-core/src/main/scala/serialization/Serializable.scala index d0a199f67b..0f9bcc4f75 100644 --- a/akka-core/src/main/scala/serialization/Serializable.scala +++ b/akka-core/src/main/scala/serialization/Serializable.scala @@ -16,12 +16,12 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream} import sjson.json.{Serializer=>SJSONSerializer} object SerializationProtocol { + val JAVA = 0 val SBINARY = 1 val SCALA_JSON = 2 val JAVA_JSON = 3 val PROTOBUF = 4 - val JAVA = 5 - val AVRO = 6 + val PROTOBUF_RAW = 5 } /** diff --git a/akka-core/src/test/java/ProtobufProtocol.proto b/akka-core/src/test/java/ProtobufProtocol.proto new file mode 100644 index 0000000000..f4b146506c --- /dev/null +++ b/akka-core/src/test/java/ProtobufProtocol.proto @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor; + +/* + Compile with: + cd ./akka-core/src/test/java + protoc ProtobufProtocol.proto --java_out . +*/ + +message ProtobufPOJO { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java new file mode 100644 index 0000000000..9995225cf5 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java @@ -0,0 +1,402 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! + +package se.scalablesolutions.akka.actor; + +public final class ProtobufProtocol { + private ProtobufProtocol() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public static final class ProtobufPOJO extends + com.google.protobuf.GeneratedMessage { + // Use ProtobufPOJO.newBuilder() to construct. + private ProtobufPOJO() {} + + private static final ProtobufPOJO defaultInstance = new ProtobufPOJO(); + public static ProtobufPOJO getDefaultInstance() { + return defaultInstance; + } + + public ProtobufPOJO getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable; + } + + // required uint64 id = 1; + public static final int ID_FIELD_NUMBER = 1; + private boolean hasId; + private long id_ = 0L; + public boolean hasId() { return hasId; } + public long getId() { return id_; } + + // required string name = 2; + public static final int NAME_FIELD_NUMBER = 2; + private boolean hasName; + private java.lang.String name_ = ""; + public boolean hasName() { return hasName; } + public java.lang.String getName() { return name_; } + + // required bool status = 3; + public static final int STATUS_FIELD_NUMBER = 3; + private boolean hasStatus; + private boolean status_ = false; + public boolean hasStatus() { return hasStatus; } + public boolean getStatus() { return status_; } + + public final boolean isInitialized() { + if (!hasId) return false; + if (!hasName) return false; + if (!hasStatus) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (hasId()) { + output.writeUInt64(1, getId()); + } + if (hasName()) { + output.writeString(2, getName()); + } + if (hasStatus()) { + output.writeBool(3, getStatus()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasId()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, getId()); + } + if (hasName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getName()); + } + if (hasStatus()) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, getStatus()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO result; + + // Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO(); + return builder; + } + + protected se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDescriptor(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO) { + return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO other) { + if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasName()) { + setName(other.getName()); + } + if (other.hasStatus()) { + setStatus(other.getStatus()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 8: { + setId(input.readUInt64()); + break; + } + case 18: { + setName(input.readString()); + break; + } + case 24: { + setStatus(input.readBool()); + break; + } + } + } + } + + + // required uint64 id = 1; + public boolean hasId() { + return result.hasId(); + } + public long getId() { + return result.getId(); + } + public Builder setId(long value) { + result.hasId = true; + result.id_ = value; + return this; + } + public Builder clearId() { + result.hasId = false; + result.id_ = 0L; + return this; + } + + // required string name = 2; + public boolean hasName() { + return result.hasName(); + } + public java.lang.String getName() { + return result.getName(); + } + public Builder setName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasName = true; + result.name_ = value; + return this; + } + public Builder clearName() { + result.hasName = false; + result.name_ = getDefaultInstance().getName(); + return this; + } + + // required bool status = 3; + public boolean hasStatus() { + return result.hasStatus(); + } + public boolean getStatus() { + return result.getStatus(); + } + public Builder setStatus(boolean value) { + result.hasStatus = true; + result.status_ = value; + return this; + } + public Builder clearStatus() { + result.hasStatus = false; + result.status_ = false; + return this; + } + } + + static { + se.scalablesolutions.akka.actor.ProtobufProtocol.getDescriptor(); + } + + static { + se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit(); + } + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\026ProtobufProtocol.proto\022\037se.scalablesol" + + "utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" + + "\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor, + new java.lang.String[] { "Id", "Name", "Status", }, + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.class, + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + public static void internalForceInit() {} +} diff --git a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala new file mode 100644 index 0000000000..cde44b3016 --- /dev/null +++ b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala @@ -0,0 +1,72 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.dispatch.Dispatchers +import se.scalablesolutions.akka.serialization.Serializable.Protobuf + +import ProtobufProtocol.ProtobufPOJO + +/* --------------------------- +Uses this Protobuf message: + +message ProtobufPOJO { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} +--------------------------- */ + +object ProtobufActorMessageSerializationSpec { + val unit = TimeUnit.MILLISECONDS + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + class RemoteActorSpecActorBidirectional extends Actor { + start + def receive = { + case pojo: ProtobufPOJO => + val id = pojo.getId + reply(id + 1) + case msg => + throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg) + } + } +} + +class ProtobufActorMessageSerializationSpec extends JUnitSuite { + import ProtobufActorMessageSerializationSpec._ + + @Before + def init() { + server = new RemoteServer + server.start(HOSTNAME, PORT) + server.register("RemoteActorSpecActorBidirectional", new RemoteActorSpecActorBidirectional) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + @After + def finished() { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } + + @Test + def shouldSendReplyAsync = { + val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT) + val result = actor !! ProtobufPOJO.newBuilder + .setId(11) + .setStatus(true) + .setName("Coltrane") + .build + assert(12L === result.get.asInstanceOf[Long]) + actor.stop + } +} + \ No newline at end of file From b0dd4b5d6243047fa21b035c5f65641f7c8b49d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 5 May 2010 12:24:18 +0200 Subject: [PATCH 6/7] Renamed Reactor.scala to MessageHandling.scala --- .../main/scala/dispatch/{Reactor.scala => MessageHandling.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename akka-core/src/main/scala/dispatch/{Reactor.scala => MessageHandling.scala} (100%) diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala similarity index 100% rename from akka-core/src/main/scala/dispatch/Reactor.scala rename to akka-core/src/main/scala/dispatch/MessageHandling.scala From c1d81d3953e0a3be0dacd792dd8a3fa48943ee0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 5 May 2010 12:45:11 +0200 Subject: [PATCH 7/7] Removed Serializable.Protobuf since it did not work, use direct Protobuf messages for remote messages instead --- .../scala/remote/RemoteProtocolBuilder.scala | 22 ++----------------- .../scala/serialization/Serializable.scala | 10 --------- ...rotobufActorMessageSerializationSpec.scala | 1 - 3 files changed, 2 insertions(+), 31 deletions(-) diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index cae7f151b4..b95ac210f5 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -37,10 +37,6 @@ object RemoteProtocolBuilder { val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest))) case SerializationProtocol.PROTOBUF => - val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] - val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]] - protobufMessage.fromBytes(request.getMessage.toByteArray) - case SerializationProtocol.PROTOBUF_RAW => val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass)) } @@ -60,10 +56,6 @@ object RemoteProtocolBuilder { val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String] SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest))) case SerializationProtocol.PROTOBUF => - val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] - val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]] - protobufMessage.fromBytes(reply.getMessage.toByteArray) - case SerializationProtocol.PROTOBUF_RAW => val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass)) } @@ -75,14 +67,9 @@ object RemoteProtocolBuilder { builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) - } else if (message.isInstanceOf[Serializable.Protobuf[_]]) { - val serializable = message.asInstanceOf[Serializable.Protobuf[_]] - builder.setProtocol(SerializationProtocol.PROTOBUF) - builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray)) - builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Message]) { val serializable = message.asInstanceOf[Message] - builder.setProtocol(SerializationProtocol.PROTOBUF_RAW) + builder.setProtocol(SerializationProtocol.PROTOBUF) builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Serializable.ScalaJSON]) { @@ -108,14 +95,9 @@ object RemoteProtocolBuilder { builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) - } else if (message.isInstanceOf[Serializable.Protobuf[_]]) { - val serializable = message.asInstanceOf[Serializable.Protobuf[_]] - builder.setProtocol(SerializationProtocol.PROTOBUF) - builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray)) - builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Message]) { val serializable = message.asInstanceOf[Message] - builder.setProtocol(SerializationProtocol.PROTOBUF_RAW) + builder.setProtocol(SerializationProtocol.PROTOBUF) builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Serializable.ScalaJSON]) { diff --git a/akka-core/src/main/scala/serialization/Serializable.scala b/akka-core/src/main/scala/serialization/Serializable.scala index 0f9bcc4f75..e302ff7fb8 100644 --- a/akka-core/src/main/scala/serialization/Serializable.scala +++ b/akka-core/src/main/scala/serialization/Serializable.scala @@ -21,7 +21,6 @@ object SerializationProtocol { val SCALA_JSON = 2 val JAVA_JSON = 3 val PROTOBUF = 4 - val PROTOBUF_RAW = 5 } /** @@ -106,13 +105,4 @@ object Serializable { def toJSON: String = new String(toBytes, "UTF-8") def toBytes: Array[Byte] = SJSONSerializer.SJSON.out(this) } - - /** - * @author Jonas Bonér - */ - trait Protobuf[T] extends Serializable { - def fromBytes(bytes: Array[Byte]): T = getMessage.toBuilder.mergeFrom(bytes).asInstanceOf[T] - def toBytes: Array[Byte] = getMessage.toByteArray - def getMessage: Message - } } diff --git a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala index cde44b3016..275bdb45e2 100644 --- a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala +++ b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala @@ -6,7 +6,6 @@ import org.junit.{Test, Before, After} import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.dispatch.Dispatchers -import se.scalablesolutions.akka.serialization.Serializable.Protobuf import ProtobufProtocol.ProtobufPOJO