diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 2f21432eb8..c645afef42 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -227,6 +227,19 @@ object Actor extends Logging { } } } + + /** Starts the specified actor and returns it, useful for: + *
val actor = new FooActor
+   *  actor.start
+   *  //Gets replaced by
+   *  val actor = start(new FooActor)
+   *  
+ */ + def start[T <: Actor](actor : T) : T = { + actor.start + actor + } + } /** diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala similarity index 100% rename from akka-core/src/main/scala/dispatch/Reactor.scala rename to akka-core/src/main/scala/dispatch/MessageHandling.scala diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index 65558dd997..b95ac210f5 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -17,7 +17,6 @@ object RemoteProtocolBuilder { private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf - def setClassLoader(cl: ClassLoader) = { SERIALIZER_JAVA.classLoader = Some(cl) SERIALIZER_JAVA_JSON.classLoader = Some(cl) @@ -26,6 +25,8 @@ object RemoteProtocolBuilder { def getMessage(request: RemoteRequest): Any = { request.getProtocol match { + case SerializationProtocol.JAVA => + unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(request.getMessage.toByteArray) @@ -38,15 +39,13 @@ object RemoteProtocolBuilder { case SerializationProtocol.PROTOBUF => val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass)) - case SerializationProtocol.JAVA => - unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) - case SerializationProtocol.AVRO => - throw new UnsupportedOperationException("Avro protocol is not yet supported") } } def getMessage(reply: RemoteReply): Any = { reply.getProtocol match { + case SerializationProtocol.JAVA => + unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(reply.getMessage.toByteArray) @@ -59,10 +58,6 @@ object RemoteProtocolBuilder { case SerializationProtocol.PROTOBUF => val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass)) - case SerializationProtocol.JAVA => - unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) - case SerializationProtocol.AVRO => - throw new UnsupportedOperationException("Avro protocol is not yet supported") } } diff --git a/akka-core/src/main/scala/routing/Iterators.scala b/akka-core/src/main/scala/routing/Iterators.scala new file mode 100644 index 0000000000..77159767af --- /dev/null +++ b/akka-core/src/main/scala/routing/Iterators.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.ActorID + +trait InfiniteIterator[T] extends Iterator[T] + +class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { + @volatile private[this] var current: List[T] = items + + def hasNext = items != Nil + + def next = { + val nc = if (current == Nil) items else current + current = nc.tail + nc.head + } +} + +class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { + def hasNext = items != Nil + + def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) +} \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala new file mode 100644 index 0000000000..d2fcc1cc73 --- /dev/null +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.{Actor, ActorID} + +sealed trait ListenerMessage +case class Listen(listener: ActorID) extends ListenerMessage +case class Deafen(listener: ActorID) extends ListenerMessage +case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage + +trait Listeners { self : Actor => + import se.scalablesolutions.akka.actor.Agent + private lazy val listeners = Agent(Set[ActorID]()) + + protected def listenerManagement : PartialFunction[Any,Unit] = { + case Listen(l) => listeners( _ + l) + case Deafen(l) => listeners( _ - l ) + case WithListeners(f) => listeners foreach f + } + + protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) ) +} \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala index 1d3e307d27..c8ac39fc72 100644 --- a/akka-core/src/main/scala/routing/Patterns.scala +++ b/akka-core/src/main/scala/routing/Patterns.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.{Actor, ActorID} @@ -23,16 +27,18 @@ object Patterns { filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) //FIXME 2.8, use default params with CyclicIterator - def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = newActor(() => new Actor with LoadBalancer { - start - val seq = actors - }) + def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = + newActor(() => new Actor with LoadBalancer { + start + val seq = actors + }) - def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = newActor(() => new Actor with Dispatcher { - start - override def transform(msg: Any) = msgTransformer(msg) - def routes = routing - }) + def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = + newActor(() => new Actor with Dispatcher { + start + override def transform(msg: Any) = msgTransformer(msg) + def routes = routing + }) def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher { start @@ -41,68 +47,4 @@ object Patterns { def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = dispatcherActor({case _ => actorToLog}, logger) -} - -trait Dispatcher { self: Actor => - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, ActorID] - - protected def dispatch: PartialFunction[Any, Unit] = { - case a if routes.isDefinedAt(a) => - if (self.replyTo.isDefined) routes(a) forward transform(a) - else routes(a) ! transform(a) - } - - def receive = dispatch -} - -trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[ActorID] - - protected def routes = { case x if seq.hasNext => seq.next } -} - -trait InfiniteIterator[T] extends Iterator[T] - -class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { - @volatile private[this] var current: List[T] = items - - def hasNext = items != Nil - - def next = { - val nc = if (current == Nil) items else current - current = nc.tail - nc.head - } -} - -class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { - def hasNext = items != Nil - - def next = { - def actorWithSmallestMailbox(a1: ActorID, a2: ActorID) = { - if (a1.mailboxSize < a2.mailboxSize) a1 else a2 - } - items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2)) - } -} - -sealed trait ListenerMessage -case class Listen(listener : ActorID) extends ListenerMessage -case class Deafen(listener : ActorID) extends ListenerMessage -case class WithListeners(f : Set[ActorID] => Unit) extends ListenerMessage - -trait Listeners { self : Actor => - import se.scalablesolutions.akka.actor.Agent - private lazy val listeners = Agent(Set[ActorID]()) - - protected def listenerManagement : PartialFunction[Any,Unit] = { - case Listen(l) => listeners( _ + l) - case Deafen(l) => listeners( _ - l ) - case WithListeners(f) => listeners foreach f - } - - protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) ) } \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala new file mode 100644 index 0000000000..ce3c7c311c --- /dev/null +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.{Actor, ActorID} + +trait Dispatcher { self: Actor => + + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, ActorID] + + protected def dispatch: PartialFunction[Any, Unit] = { + case a if routes.isDefinedAt(a) => + if (self.replyTo.isDefined) routes(a) forward transform(a) + else routes(a) ! transform(a) + } + + def receive = dispatch +} + +trait LoadBalancer extends Dispatcher { self: Actor => + protected def seq: InfiniteIterator[ActorID] + + protected def routes = { case x if seq.hasNext => seq.next } +} \ No newline at end of file diff --git a/akka-core/src/main/scala/serialization/Serializable.scala b/akka-core/src/main/scala/serialization/Serializable.scala index d0a199f67b..e302ff7fb8 100644 --- a/akka-core/src/main/scala/serialization/Serializable.scala +++ b/akka-core/src/main/scala/serialization/Serializable.scala @@ -16,12 +16,11 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream} import sjson.json.{Serializer=>SJSONSerializer} object SerializationProtocol { + val JAVA = 0 val SBINARY = 1 val SCALA_JSON = 2 val JAVA_JSON = 3 val PROTOBUF = 4 - val JAVA = 5 - val AVRO = 6 } /** @@ -106,13 +105,4 @@ object Serializable { def toJSON: String = new String(toBytes, "UTF-8") def toBytes: Array[Byte] = SJSONSerializer.SJSON.out(this) } - - /** - * @author Jonas Bonér - */ - trait Protobuf[T] extends Serializable { - def fromBytes(bytes: Array[Byte]): T = getMessage.toBuilder.mergeFrom(bytes).asInstanceOf[T] - def toBytes: Array[Byte] = getMessage.toByteArray - def getMessage: Message - } } diff --git a/akka-core/src/test/java/ProtobufProtocol.proto b/akka-core/src/test/java/ProtobufProtocol.proto new file mode 100644 index 0000000000..f4b146506c --- /dev/null +++ b/akka-core/src/test/java/ProtobufProtocol.proto @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor; + +/* + Compile with: + cd ./akka-core/src/test/java + protoc ProtobufProtocol.proto --java_out . +*/ + +message ProtobufPOJO { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java new file mode 100644 index 0000000000..9995225cf5 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java @@ -0,0 +1,402 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! + +package se.scalablesolutions.akka.actor; + +public final class ProtobufProtocol { + private ProtobufProtocol() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public static final class ProtobufPOJO extends + com.google.protobuf.GeneratedMessage { + // Use ProtobufPOJO.newBuilder() to construct. + private ProtobufPOJO() {} + + private static final ProtobufPOJO defaultInstance = new ProtobufPOJO(); + public static ProtobufPOJO getDefaultInstance() { + return defaultInstance; + } + + public ProtobufPOJO getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable; + } + + // required uint64 id = 1; + public static final int ID_FIELD_NUMBER = 1; + private boolean hasId; + private long id_ = 0L; + public boolean hasId() { return hasId; } + public long getId() { return id_; } + + // required string name = 2; + public static final int NAME_FIELD_NUMBER = 2; + private boolean hasName; + private java.lang.String name_ = ""; + public boolean hasName() { return hasName; } + public java.lang.String getName() { return name_; } + + // required bool status = 3; + public static final int STATUS_FIELD_NUMBER = 3; + private boolean hasStatus; + private boolean status_ = false; + public boolean hasStatus() { return hasStatus; } + public boolean getStatus() { return status_; } + + public final boolean isInitialized() { + if (!hasId) return false; + if (!hasName) return false; + if (!hasStatus) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (hasId()) { + output.writeUInt64(1, getId()); + } + if (hasName()) { + output.writeString(2, getName()); + } + if (hasStatus()) { + output.writeBool(3, getStatus()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasId()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, getId()); + } + if (hasName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getName()); + } + if (hasStatus()) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, getStatus()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO result; + + // Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO(); + return builder; + } + + protected se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDescriptor(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO) { + return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO other) { + if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasName()) { + setName(other.getName()); + } + if (other.hasStatus()) { + setStatus(other.getStatus()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 8: { + setId(input.readUInt64()); + break; + } + case 18: { + setName(input.readString()); + break; + } + case 24: { + setStatus(input.readBool()); + break; + } + } + } + } + + + // required uint64 id = 1; + public boolean hasId() { + return result.hasId(); + } + public long getId() { + return result.getId(); + } + public Builder setId(long value) { + result.hasId = true; + result.id_ = value; + return this; + } + public Builder clearId() { + result.hasId = false; + result.id_ = 0L; + return this; + } + + // required string name = 2; + public boolean hasName() { + return result.hasName(); + } + public java.lang.String getName() { + return result.getName(); + } + public Builder setName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasName = true; + result.name_ = value; + return this; + } + public Builder clearName() { + result.hasName = false; + result.name_ = getDefaultInstance().getName(); + return this; + } + + // required bool status = 3; + public boolean hasStatus() { + return result.hasStatus(); + } + public boolean getStatus() { + return result.getStatus(); + } + public Builder setStatus(boolean value) { + result.hasStatus = true; + result.status_ = value; + return this; + } + public Builder clearStatus() { + result.hasStatus = false; + result.status_ = false; + return this; + } + } + + static { + se.scalablesolutions.akka.actor.ProtobufProtocol.getDescriptor(); + } + + static { + se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit(); + } + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\026ProtobufProtocol.proto\022\037se.scalablesol" + + "utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" + + "\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor, + new java.lang.String[] { "Id", "Name", "Status", }, + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.class, + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + public static void internalForceInit() {} +} diff --git a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala new file mode 100644 index 0000000000..1af37c7a26 --- /dev/null +++ b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala @@ -0,0 +1,72 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.dispatch.Dispatchers + +import ProtobufProtocol.ProtobufPOJO +import Actor._ + +/* --------------------------- +Uses this Protobuf message: + +message ProtobufPOJO { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} +--------------------------- */ + +object ProtobufActorMessageSerializationSpec { + val unit = TimeUnit.MILLISECONDS + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + class RemoteActorSpecActorBidirectional extends Actor { + start + def receive = { + case pojo: ProtobufPOJO => + val id = pojo.getId + reply(id + 1) + case msg => + throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg) + } + } +} + +class ProtobufActorMessageSerializationSpec extends JUnitSuite { + import ProtobufActorMessageSerializationSpec._ + + @Before + def init() { + server = new RemoteServer + server.start(HOSTNAME, PORT) + server.register("RemoteActorSpecActorBidirectional", newActor[RemoteActorSpecActorBidirectional]) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + @After + def finished() { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } + + @Test + def shouldSendReplyAsync = { + val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT) + val result = actor !! ProtobufPOJO.newBuilder + .setId(11) + .setStatus(true) + .setName("Coltrane") + .build + assert(12L === result.get.asInstanceOf[Long]) + actor.stop + } +} + \ No newline at end of file diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala index fb57761037..f9273f05dd 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala @@ -117,7 +117,14 @@ private[akka] object CassandraStorageBackend extends else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") } - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + /** + * if start and finish both are defined, ignore count and + * report the range [start, finish) + * if start is not defined, assume start = 0 + * if start == 0 and finish == 0, return an empty collection + */ + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): + List[Array[Byte]] = { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val columns: List[ColumnOrSuperColumn] = sessions.withSession { diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 8e2adaa5c3..b1973c3c7b 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -226,18 +226,17 @@ private [akka] object RedisStorageBackend extends } } + /** + * if start and finish both are defined, ignore count and + * report the range [start, finish) + * if start is not defined, assume start = 0 + * if start == 0 and finish == 0, return an empty collection + */ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling { - /** - * if start and finish both are defined, ignore count and - * report the range [start, finish) - * if start is not defined, assume start = 0 - * if start == 0 and finish == 0, return an empty collection - */ val s = if (start.isDefined) start.get else 0 val cnt = if (finish.isDefined) { val f = finish.get - // if (f >= s) Math.min(count, (f - s)) else count if (f >= s) (f - s) else count } else count diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5623f110bb..388af31334 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -24,7 +24,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val distPath = info.projectPath / "dist" override def compileOptions = super.compileOptions ++ - Seq("-deprecation", "-Xmigration", "-Xcheckinit", "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8").map(x => CompileOption(x)) + Seq("-deprecation", "-Xmigration", "-Xcheckinit", + "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8") + .map(x => CompileOption(x)) override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList @@ -110,14 +112,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { //override def defaultPublishRepository = Some(Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile)) val publishTo = Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile) - val sourceArtifact = Artifact(artifactID, "src", "jar", Some("src"), Nil, None) - val docsArtifact = Artifact(artifactID, "docs", "jar", Some("doc"), Nil, None) + val sourceArtifact = Artifact(artifactID, "source", "jar", Some("source"), Nil, None) + val docsArtifact = Artifact(artifactID, "docs", "jar", Some("docs"), Nil, None) // Credentials(Path.userHome / ".akka_publish_credentials", log) //override def documentOptions = encodingUtf8.map(SimpleDocOption(_)) - override def packageDocsJar = defaultJarPath("-doc.jar") - override def packageSrcJar= defaultJarPath("-src.jar") + override def packageDocsJar = defaultJarPath("-docs.jar") + override def packageSrcJar= defaultJarPath("-source.jar") override def packageToPublishActions = super.packageToPublishActions ++ Seq(packageDocs, packageSrc) override def pomExtra = @@ -361,8 +363,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { def deployPath: Path lazy val dist = distAction - def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn(`package`, packageDocs, packageSrc) describedAs("Deploying") - def deployTask(jar: Path, docs: Path, src: Path, toDir: Path, genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task { + def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn( + `package`, packageDocs, packageSrc) describedAs("Deploying") + def deployTask(jar: Path, docs: Path, src: Path, toDir: Path, + genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task { gen(jar, toDir, genJar, "Deploying bits") orElse gen(docs, toDir, genDocs, "Deploying docs") orElse gen(src, toDir, genSource, "Deploying sources")