Merged with master

This commit is contained in:
Jonas Bonér 2010-05-05 13:26:31 +02:00
commit 573ba890b5
14 changed files with 629 additions and 108 deletions

View file

@ -227,6 +227,19 @@ object Actor extends Logging {
} }
} }
} }
/** Starts the specified actor and returns it, useful for:
* <pre>val actor = new FooActor
* actor.start
* //Gets replaced by
* val actor = start(new FooActor)
* </pre>
*/
def start[T <: Actor](actor : T) : T = {
actor.start
actor
}
} }
/** /**

View file

@ -17,7 +17,6 @@ object RemoteProtocolBuilder {
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(cl: ClassLoader) = { def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl) SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl) SERIALIZER_JAVA_JSON.classLoader = Some(cl)
@ -26,6 +25,8 @@ object RemoteProtocolBuilder {
def getMessage(request: RemoteRequest): Any = { def getMessage(request: RemoteRequest): Any = {
request.getProtocol match { request.getProtocol match {
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY => case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray) renderer.fromBytes(request.getMessage.toByteArray)
@ -38,15 +39,13 @@ object RemoteProtocolBuilder {
case SerializationProtocol.PROTOBUF => case SerializationProtocol.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass)) SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
} }
} }
def getMessage(reply: RemoteReply): Any = { def getMessage(reply: RemoteReply): Any = {
reply.getProtocol match { reply.getProtocol match {
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY => case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray) renderer.fromBytes(reply.getMessage.toByteArray)
@ -59,10 +58,6 @@ object RemoteProtocolBuilder {
case SerializationProtocol.PROTOBUF => case SerializationProtocol.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass)) SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
} }
} }

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.ActorID
trait InfiniteIterator[T] extends Iterator[T]
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
@volatile private[this] var current: List[T] = items
def hasNext = items != Nil
def next = {
val nc = if (current == Nil) items else current
current = nc.tail
nc.head
}
}
class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] {
def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
}

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID}
sealed trait ListenerMessage
case class Listen(listener: ActorID) extends ListenerMessage
case class Deafen(listener: ActorID) extends ListenerMessage
case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage
trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[ActorID]())
protected def listenerManagement : PartialFunction[Any,Unit] = {
case Listen(l) => listeners( _ + l)
case Deafen(l) => listeners( _ - l )
case WithListeners(f) => listeners foreach f
}
protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
}

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.patterns package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorID}
@ -23,12 +27,14 @@ object Patterns {
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
//FIXME 2.8, use default params with CyclicIterator //FIXME 2.8, use default params with CyclicIterator
def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = newActor(() => new Actor with LoadBalancer { def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID =
newActor(() => new Actor with LoadBalancer {
start start
val seq = actors val seq = actors
}) })
def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = newActor(() => new Actor with Dispatcher { def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID =
newActor(() => new Actor with Dispatcher {
start start
override def transform(msg: Any) = msgTransformer(msg) override def transform(msg: Any) = msgTransformer(msg)
def routes = routing def routes = routing
@ -42,67 +48,3 @@ object Patterns {
def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID =
dispatcherActor({case _ => actorToLog}, logger) dispatcherActor({case _ => actorToLog}, logger)
} }
trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorID]
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) =>
if (self.replyTo.isDefined) routes(a) forward transform(a)
else routes(a) ! transform(a)
}
def receive = dispatch
}
trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorID]
protected def routes = { case x if seq.hasNext => seq.next }
}
trait InfiniteIterator[T] extends Iterator[T]
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
@volatile private[this] var current: List[T] = items
def hasNext = items != Nil
def next = {
val nc = if (current == Nil) items else current
current = nc.tail
nc.head
}
}
class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] {
def hasNext = items != Nil
def next = {
def actorWithSmallestMailbox(a1: ActorID, a2: ActorID) = {
if (a1.mailboxSize < a2.mailboxSize) a1 else a2
}
items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2))
}
}
sealed trait ListenerMessage
case class Listen(listener : ActorID) extends ListenerMessage
case class Deafen(listener : ActorID) extends ListenerMessage
case class WithListeners(f : Set[ActorID] => Unit) extends ListenerMessage
trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[ActorID]())
protected def listenerManagement : PartialFunction[Any,Unit] = {
case Listen(l) => listeners( _ + l)
case Deafen(l) => listeners( _ - l )
case WithListeners(f) => listeners foreach f
}
protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID}
trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorID]
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) =>
if (self.replyTo.isDefined) routes(a) forward transform(a)
else routes(a) ! transform(a)
}
def receive = dispatch
}
trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorID]
protected def routes = { case x if seq.hasNext => seq.next }
}

View file

@ -16,12 +16,11 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream}
import sjson.json.{Serializer=>SJSONSerializer} import sjson.json.{Serializer=>SJSONSerializer}
object SerializationProtocol { object SerializationProtocol {
val JAVA = 0
val SBINARY = 1 val SBINARY = 1
val SCALA_JSON = 2 val SCALA_JSON = 2
val JAVA_JSON = 3 val JAVA_JSON = 3
val PROTOBUF = 4 val PROTOBUF = 4
val JAVA = 5
val AVRO = 6
} }
/** /**
@ -106,13 +105,4 @@ object Serializable {
def toJSON: String = new String(toBytes, "UTF-8") def toJSON: String = new String(toBytes, "UTF-8")
def toBytes: Array[Byte] = SJSONSerializer.SJSON.out(this) def toBytes: Array[Byte] = SJSONSerializer.SJSON.out(this)
} }
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Protobuf[T] extends Serializable {
def fromBytes(bytes: Array[Byte]): T = getMessage.toBuilder.mergeFrom(bytes).asInstanceOf[T]
def toBytes: Array[Byte] = getMessage.toByteArray
def getMessage: Message
}
} }

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor;
/*
Compile with:
cd ./akka-core/src/test/java
protoc ProtobufProtocol.proto --java_out .
*/
message ProtobufPOJO {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
}

View file

@ -0,0 +1,402 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
package se.scalablesolutions.akka.actor;
public final class ProtobufProtocol {
private ProtobufProtocol() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public static final class ProtobufPOJO extends
com.google.protobuf.GeneratedMessage {
// Use ProtobufPOJO.newBuilder() to construct.
private ProtobufPOJO() {}
private static final ProtobufPOJO defaultInstance = new ProtobufPOJO();
public static ProtobufPOJO getDefaultInstance() {
return defaultInstance;
}
public ProtobufPOJO getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable;
}
// required uint64 id = 1;
public static final int ID_FIELD_NUMBER = 1;
private boolean hasId;
private long id_ = 0L;
public boolean hasId() { return hasId; }
public long getId() { return id_; }
// required string name = 2;
public static final int NAME_FIELD_NUMBER = 2;
private boolean hasName;
private java.lang.String name_ = "";
public boolean hasName() { return hasName; }
public java.lang.String getName() { return name_; }
// required bool status = 3;
public static final int STATUS_FIELD_NUMBER = 3;
private boolean hasStatus;
private boolean status_ = false;
public boolean hasStatus() { return hasStatus; }
public boolean getStatus() { return status_; }
public final boolean isInitialized() {
if (!hasId) return false;
if (!hasName) return false;
if (!hasStatus) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (hasId()) {
output.writeUInt64(1, getId());
}
if (hasName()) {
output.writeString(2, getName());
}
if (hasStatus()) {
output.writeBool(3, getStatus());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasId()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(1, getId());
}
if (hasName()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getName());
}
if (hasStatus()) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, getStatus());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO result;
// Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO();
return builder;
}
protected se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDescriptor();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO) {
return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO other) {
if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this;
if (other.hasId()) {
setId(other.getId());
}
if (other.hasName()) {
setName(other.getName());
}
if (other.hasStatus()) {
setStatus(other.getStatus());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 8: {
setId(input.readUInt64());
break;
}
case 18: {
setName(input.readString());
break;
}
case 24: {
setStatus(input.readBool());
break;
}
}
}
}
// required uint64 id = 1;
public boolean hasId() {
return result.hasId();
}
public long getId() {
return result.getId();
}
public Builder setId(long value) {
result.hasId = true;
result.id_ = value;
return this;
}
public Builder clearId() {
result.hasId = false;
result.id_ = 0L;
return this;
}
// required string name = 2;
public boolean hasName() {
return result.hasName();
}
public java.lang.String getName() {
return result.getName();
}
public Builder setName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasName = true;
result.name_ = value;
return this;
}
public Builder clearName() {
result.hasName = false;
result.name_ = getDefaultInstance().getName();
return this;
}
// required bool status = 3;
public boolean hasStatus() {
return result.hasStatus();
}
public boolean getStatus() {
return result.getStatus();
}
public Builder setStatus(boolean value) {
result.hasStatus = true;
result.status_ = value;
return this;
}
public Builder clearStatus() {
result.hasStatus = false;
result.status_ = false;
return this;
}
}
static {
se.scalablesolutions.akka.actor.ProtobufProtocol.getDescriptor();
}
static {
se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit();
}
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\026ProtobufProtocol.proto\022\037se.scalablesol" +
"utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" +
"\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor,
new java.lang.String[] { "Id", "Name", "Status", },
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.class,
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
public static void internalForceInit() {}
}

View file

@ -0,0 +1,72 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
import ProtobufProtocol.ProtobufPOJO
import Actor._
/* ---------------------------
Uses this Protobuf message:
message ProtobufPOJO {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
}
--------------------------- */
object ProtobufActorMessageSerializationSpec {
val unit = TimeUnit.MILLISECONDS
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
class RemoteActorSpecActorBidirectional extends Actor {
start
def receive = {
case pojo: ProtobufPOJO =>
val id = pojo.getId
reply(id + 1)
case msg =>
throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg)
}
}
}
class ProtobufActorMessageSerializationSpec extends JUnitSuite {
import ProtobufActorMessageSerializationSpec._
@Before
def init() {
server = new RemoteServer
server.start(HOSTNAME, PORT)
server.register("RemoteActorSpecActorBidirectional", newActor[RemoteActorSpecActorBidirectional])
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
@After
def finished() {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
}
@Test
def shouldSendReplyAsync = {
val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT)
val result = actor !! ProtobufPOJO.newBuilder
.setId(11)
.setStatus(true)
.setName("Coltrane")
.build
assert(12L === result.get.asInstanceOf[Long])
actor.stop
}
}

View file

@ -117,7 +117,14 @@ private[akka] object CassandraStorageBackend extends
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
} }
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { /**
* if <tt>start</tt> and <tt>finish</tt> both are defined, ignore <tt>count</tt> and
* report the range [start, finish)
* if <tt>start</tt> is not defined, assume <tt>start</tt> = 0
* if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection
*/
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int):
List[Array[Byte]] = {
val startBytes = if (start.isDefined) intToBytes(start.get) else null val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
val columns: List[ColumnOrSuperColumn] = sessions.withSession { val columns: List[ColumnOrSuperColumn] = sessions.withSession {

View file

@ -226,18 +226,17 @@ private [akka] object RedisStorageBackend extends
} }
} }
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling {
/** /**
* if <tt>start</tt> and <tt>finish</tt> both are defined, ignore <tt>count</tt> and * if <tt>start</tt> and <tt>finish</tt> both are defined, ignore <tt>count</tt> and
* report the range [start, finish) * report the range [start, finish)
* if <tt>start</tt> is not defined, assume <tt>start</tt> = 0 * if <tt>start</tt> is not defined, assume <tt>start</tt> = 0
* if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection * if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection
*/ */
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling {
val s = if (start.isDefined) start.get else 0 val s = if (start.isDefined) start.get else 0
val cnt = val cnt =
if (finish.isDefined) { if (finish.isDefined) {
val f = finish.get val f = finish.get
// if (f >= s) Math.min(count, (f - s)) else count
if (f >= s) (f - s) else count if (f >= s) (f - s) else count
} }
else count else count

View file

@ -24,7 +24,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val distPath = info.projectPath / "dist" lazy val distPath = info.projectPath / "dist"
override def compileOptions = super.compileOptions ++ override def compileOptions = super.compileOptions ++
Seq("-deprecation", "-Xmigration", "-Xcheckinit", "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8").map(x => CompileOption(x)) Seq("-deprecation", "-Xmigration", "-Xcheckinit",
"-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8")
.map(x => CompileOption(x))
override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList
@ -110,14 +112,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
//override def defaultPublishRepository = Some(Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile)) //override def defaultPublishRepository = Some(Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile))
val publishTo = Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile) val publishTo = Resolver.file("maven-local", Path.userHome / ".m2" / "repository" asFile)
val sourceArtifact = Artifact(artifactID, "src", "jar", Some("src"), Nil, None) val sourceArtifact = Artifact(artifactID, "source", "jar", Some("source"), Nil, None)
val docsArtifact = Artifact(artifactID, "docs", "jar", Some("doc"), Nil, None) val docsArtifact = Artifact(artifactID, "docs", "jar", Some("docs"), Nil, None)
// Credentials(Path.userHome / ".akka_publish_credentials", log) // Credentials(Path.userHome / ".akka_publish_credentials", log)
//override def documentOptions = encodingUtf8.map(SimpleDocOption(_)) //override def documentOptions = encodingUtf8.map(SimpleDocOption(_))
override def packageDocsJar = defaultJarPath("-doc.jar") override def packageDocsJar = defaultJarPath("-docs.jar")
override def packageSrcJar= defaultJarPath("-src.jar") override def packageSrcJar= defaultJarPath("-source.jar")
override def packageToPublishActions = super.packageToPublishActions ++ Seq(packageDocs, packageSrc) override def packageToPublishActions = super.packageToPublishActions ++ Seq(packageDocs, packageSrc)
override def pomExtra = override def pomExtra =
@ -361,8 +363,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
def deployPath: Path def deployPath: Path
lazy val dist = distAction lazy val dist = distAction
def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn(`package`, packageDocs, packageSrc) describedAs("Deploying") def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn(
def deployTask(jar: Path, docs: Path, src: Path, toDir: Path, genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task { `package`, packageDocs, packageSrc) describedAs("Deploying")
def deployTask(jar: Path, docs: Path, src: Path, toDir: Path,
genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task {
gen(jar, toDir, genJar, "Deploying bits") orElse gen(jar, toDir, genJar, "Deploying bits") orElse
gen(docs, toDir, genDocs, "Deploying docs") orElse gen(docs, toDir, genDocs, "Deploying docs") orElse
gen(src, toDir, genSource, "Deploying sources") gen(src, toDir, genSource, "Deploying sources")