changed actor message type from AnyRef to Any
This commit is contained in:
parent
7a8b70cc0f
commit
7f4d19b900
6 changed files with 77 additions and 42 deletions
|
|
@ -471,7 +471,7 @@ trait Actor extends TransactionManagement {
|
|||
* actor.send(message)
|
||||
* </pre>
|
||||
*/
|
||||
def !(message: AnyRef)(implicit sender: AnyRef) = {
|
||||
def !(message: Any)(implicit sender: AnyRef) = {
|
||||
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
|
||||
else None
|
||||
if (_isRunning) postMessageToMailbox(message, from)
|
||||
|
|
@ -482,7 +482,7 @@ trait Actor extends TransactionManagement {
|
|||
/**
|
||||
* Same as the '!' method but does not take an implicit sender as second parameter.
|
||||
*/
|
||||
def send(message: AnyRef) = {
|
||||
def send(message: Any) = {
|
||||
if (_isRunning) postMessageToMailbox(message, None)
|
||||
else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
|
@ -500,7 +500,7 @@ trait Actor extends TransactionManagement {
|
|||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !: Option[T] = if (_isRunning) {
|
||||
def !: Option[T] = if (_isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
|
||||
val isActiveObject = message.isInstanceOf[Invocation]
|
||||
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
|
||||
|
|
@ -527,19 +527,19 @@ trait Actor extends TransactionManagement {
|
|||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !: Option[T] = !
|
||||
def !: Option[T] = !
|
||||
|
||||
/**
|
||||
* This method is evil and has been removed. Use '!!' with a timeout instead.
|
||||
*/
|
||||
def !?[T](message: AnyRef): T = throw new UnsupportedOperationException(
|
||||
def !?[T](message: Any): T = throw new UnsupportedOperationException(
|
||||
"'!?' is evil and has been removed. Use '!!' with a timeout instead")
|
||||
|
||||
/**
|
||||
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
*/
|
||||
protected[this] def reply(message: AnyRef) = {
|
||||
protected[this] def reply(message: Any) = {
|
||||
sender match {
|
||||
case Some(senderActor) =>
|
||||
senderActor ! message
|
||||
|
|
@ -736,7 +736,7 @@ trait Actor extends TransactionManagement {
|
|||
actor
|
||||
}
|
||||
|
||||
private def postMessageToMailbox(message: AnyRef, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
|
|
@ -756,7 +756,7 @@ trait Actor extends TransactionManagement {
|
|||
}
|
||||
}
|
||||
|
||||
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long):
|
||||
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long):
|
||||
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
|
|
|
|||
|
|
@ -18,12 +18,12 @@ sealed trait FutureResult {
|
|||
def isCompleted: Boolean
|
||||
def isExpired: Boolean
|
||||
def timeoutInNanos: Long
|
||||
def result: Option[AnyRef]
|
||||
def result: Option[Any]
|
||||
def exception: Option[Tuple2[AnyRef, Throwable]]
|
||||
}
|
||||
|
||||
trait CompletableFutureResult extends FutureResult {
|
||||
def completeWithResult(result: AnyRef)
|
||||
def completeWithResult(result: Any)
|
||||
def completeWithException(toBlame: AnyRef, exception: Throwable)
|
||||
}
|
||||
|
||||
|
|
@ -36,7 +36,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
|
|||
private val _lock = new ReentrantLock
|
||||
private val _signal = _lock.newCondition
|
||||
private var _completed: Boolean = _
|
||||
private var _result: Option[AnyRef] = None
|
||||
private var _result: Option[Any] = None
|
||||
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
|
||||
|
||||
def await = try {
|
||||
|
|
@ -79,7 +79,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
|
|||
_lock.unlock
|
||||
}
|
||||
|
||||
def result: Option[AnyRef] = try {
|
||||
def result: Option[Any] = try {
|
||||
_lock.lock
|
||||
_result
|
||||
} finally {
|
||||
|
|
@ -93,7 +93,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
|
|||
_lock.unlock
|
||||
}
|
||||
|
||||
def completeWithResult(result: AnyRef) = try {
|
||||
def completeWithResult(result: Any) = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ trait MessageDemultiplexer {
|
|||
}
|
||||
|
||||
class MessageInvocation(val receiver: Actor,
|
||||
val message: AnyRef,
|
||||
val message: Any,
|
||||
val future: Option[CompletableFutureResult],
|
||||
val sender: Option[Actor],
|
||||
val tx: Option[Transaction]) {
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@
|
|||
|
||||
package se.scalablesolutions.akka.nio
|
||||
|
||||
import akka.serialization.Serializable.SBinary
|
||||
import se.scalablesolutions.akka.serialization.Serializable.SBinary
|
||||
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||
|
||||
import com.google.protobuf.{Message, ByteString}
|
||||
|
||||
import serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||
|
||||
object RemoteProtocolBuilder {
|
||||
def getMessage(request: RemoteRequest): AnyRef = {
|
||||
def getMessage(request: RemoteRequest): Any = {
|
||||
request.getProtocol match {
|
||||
case SerializationProtocol.SBINARY =>
|
||||
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
|
||||
|
|
@ -26,13 +26,13 @@ object RemoteProtocolBuilder {
|
|||
val messageClass = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
|
||||
Serializer.Protobuf.in(request.getMessage.toByteArray, Some(messageClass))
|
||||
case SerializationProtocol.JAVA =>
|
||||
Serializer.Java.in(request.getMessage.toByteArray, None)
|
||||
unbox(Serializer.Java.in(request.getMessage.toByteArray, None))
|
||||
case SerializationProtocol.AVRO =>
|
||||
throw new UnsupportedOperationException("Avro protocol is not yet supported")
|
||||
}
|
||||
}
|
||||
|
||||
def getMessage(reply: RemoteReply): AnyRef = {
|
||||
def getMessage(reply: RemoteReply): Any = {
|
||||
reply.getProtocol match {
|
||||
case SerializationProtocol.SBINARY =>
|
||||
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
|
||||
|
|
@ -47,15 +47,15 @@ object RemoteProtocolBuilder {
|
|||
val messageClass = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
|
||||
Serializer.Protobuf.in(reply.getMessage.toByteArray, Some(messageClass))
|
||||
case SerializationProtocol.JAVA =>
|
||||
Serializer.Java.in(reply.getMessage.toByteArray, None)
|
||||
unbox(Serializer.Java.in(reply.getMessage.toByteArray, None))
|
||||
case SerializationProtocol.AVRO =>
|
||||
throw new UnsupportedOperationException("Avro protocol is not yet supported")
|
||||
}
|
||||
}
|
||||
|
||||
def setMessage(message: AnyRef, builder: RemoteRequest.Builder) = {
|
||||
def setMessage(message: Any, builder: RemoteRequest.Builder) = {
|
||||
if (message.isInstanceOf[Serializable.SBinary[_]]) {
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
|
||||
builder.setProtocol(SerializationProtocol.SBINARY)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
|
|
@ -68,22 +68,22 @@ object RemoteProtocolBuilder {
|
|||
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
|
||||
builder.setProtocol(SerializationProtocol.SCALA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
|
||||
val serializable = message.asInstanceOf[Serializable.JavaJSON]
|
||||
builder.setProtocol(SerializationProtocol.JAVA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else {
|
||||
// default, e.g. if no protocol used explicitly then use Java serialization
|
||||
builder.setProtocol(SerializationProtocol.JAVA)
|
||||
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message)))
|
||||
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message))))
|
||||
}
|
||||
}
|
||||
|
||||
def setMessage(message: AnyRef, builder: RemoteReply.Builder) = {
|
||||
def setMessage(message: Any, builder: RemoteReply.Builder) = {
|
||||
if (message.isInstanceOf[Serializable.SBinary[_]]) {
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
|
||||
builder.setProtocol(SerializationProtocol.SBINARY)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
|
|
@ -96,16 +96,41 @@ object RemoteProtocolBuilder {
|
|||
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
|
||||
builder.setProtocol(SerializationProtocol.SCALA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
|
||||
val serializable = message.asInstanceOf[Serializable.JavaJSON]
|
||||
builder.setProtocol(SerializationProtocol.JAVA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else {
|
||||
// default, e.g. if no protocol used explicitly then use Java serialization
|
||||
builder.setProtocol(SerializationProtocol.JAVA)
|
||||
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message)))
|
||||
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message))))
|
||||
}
|
||||
}
|
||||
|
||||
private def box(value: Any): AnyRef = value match {
|
||||
case value: Boolean => new java.lang.Boolean(value)
|
||||
case value: Char => new java.lang.Character(value)
|
||||
case value: Short => new java.lang.Short(value)
|
||||
case value: Int => new java.lang.Integer(value)
|
||||
case value: Long => new java.lang.Long(value)
|
||||
case value: Float => new java.lang.Float(value)
|
||||
case value: Double => new java.lang.Double(value)
|
||||
case value: Byte => new java.lang.Byte(value)
|
||||
case value => value.asInstanceOf[AnyRef]
|
||||
}
|
||||
|
||||
private def unbox(value: AnyRef): Any = value match {
|
||||
case value: java.lang.Boolean => value.booleanValue
|
||||
case value: java.lang.Character => value.charValue
|
||||
case value: java.lang.Short => value.shortValue
|
||||
case value: java.lang.Integer => value.intValue
|
||||
case value: java.lang.Long => value.longValue
|
||||
case value: java.lang.Float => value.floatValue
|
||||
case value: java.lang.Double => value.doubleValue
|
||||
case value: java.lang.Byte => value.byteValue
|
||||
case value => value
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,12 @@
|
|||
|
||||
package se.scalablesolutions.akka.serialization
|
||||
|
||||
import com.google.protobuf.Message
|
||||
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
|
||||
import reflect.{BeanProperty, Manifest}
|
||||
import sbinary.DefaultProtocol
|
||||
|
||||
import com.google.protobuf.Message
|
||||
|
||||
import org.codehaus.jackson.map.ObjectMapper
|
||||
|
||||
import sjson.json.{Serializer =>SJSONSerializer}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -27,19 +27,28 @@ import java.lang.{Float => JFloat, Double => JDouble}
|
|||
object HashCode {
|
||||
val SEED = 23
|
||||
|
||||
def hash(seed: Int, any: Any): Int = any match {
|
||||
case value: Boolean => hash(seed, value)
|
||||
case value: Char => hash(seed, value)
|
||||
case value: Short => hash(seed, value)
|
||||
case value: Int => hash(seed, value)
|
||||
case value: Long => hash(seed, value)
|
||||
case value: Float => hash(seed, value)
|
||||
case value: Double => hash(seed, value)
|
||||
case value: Byte => hash(seed, value)
|
||||
case value: AnyRef =>
|
||||
var result = seed
|
||||
if (value == null) result = hash(result, 0)
|
||||
else if (!isArray(value)) result = hash(result, value.hashCode())
|
||||
else for (id <- 0 until JArray.getLength(value)) result = hash(result, JArray.get(value, id)) // is an array
|
||||
result
|
||||
}
|
||||
def hash(seed: Int, value: Boolean): Int = firstTerm(seed) + (if (value) 1 else 0)
|
||||
def hash(seed: Int, value: Char): Int = firstTerm(seed) + value.asInstanceOf[Int]
|
||||
def hash(seed: Int, value: Int): Int = firstTerm(seed) + value
|
||||
def hash(seed: Int, value: Long): Int = firstTerm(seed) + (value ^ (value >>> 32) ).asInstanceOf[Int]
|
||||
def hash(seed: Int, value: Float): Int = hash(seed, JFloat.floatToIntBits(value))
|
||||
def hash(seed: Int, value: Double): Int = hash(seed, JDouble.doubleToLongBits(value))
|
||||
def hash(seed: Int, anyRef: AnyRef): Int = {
|
||||
var result = seed
|
||||
if (anyRef == null) result = hash(result, 0)
|
||||
else if (!isArray(anyRef)) result = hash(result, anyRef.hashCode())
|
||||
else for (id <- 0 until JArray.getLength(anyRef)) result = hash(result, JArray.get(anyRef, id)) // is an array
|
||||
result
|
||||
}
|
||||
|
||||
private def firstTerm(seed: Int): Int = PRIME * seed
|
||||
private def isArray(anyRef: AnyRef): Boolean = anyRef.getClass.isArray
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue