diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 0f9cb05979..ca5b9e225a 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -471,7 +471,7 @@ trait Actor extends TransactionManagement { * actor.send(message) * */ - def !(message: AnyRef)(implicit sender: AnyRef) = { + def !(message: Any)(implicit sender: AnyRef) = { val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) else None if (_isRunning) postMessageToMailbox(message, from) @@ -482,7 +482,7 @@ trait Actor extends TransactionManagement { /** * Same as the '!' method but does not take an implicit sender as second parameter. */ - def send(message: AnyRef) = { + def send(message: Any) = { if (_isRunning) postMessageToMailbox(message, None) else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") @@ -500,7 +500,7 @@ trait Actor extends TransactionManagement { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: AnyRef, timeout: Long): Option[T] = if (_isRunning) { + def !![T](message: Any, timeout: Long): Option[T] = if (_isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) val isActiveObject = message.isInstanceOf[Invocation] if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None) @@ -527,19 +527,19 @@ trait Actor extends TransactionManagement { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: AnyRef): Option[T] = !![T](message, timeout) + def !![T](message: Any): Option[T] = !![T](message, timeout) /** * This method is evil and has been removed. Use '!!' with a timeout instead. */ - def !?[T](message: AnyRef): T = throw new UnsupportedOperationException( + def !?[T](message: Any): T = throw new UnsupportedOperationException( "'!?' is evil and has been removed. Use '!!' with a timeout instead") /** * Use reply(..) to reply with a message to the original sender of the message currently * being processed. */ - protected[this] def reply(message: AnyRef) = { + protected[this] def reply(message: Any) = { sender match { case Some(senderActor) => senderActor ! message @@ -736,7 +736,7 @@ trait Actor extends TransactionManagement { actor } - private def postMessageToMailbox(message: AnyRef, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime + private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -756,7 +756,7 @@ trait Actor extends TransactionManagement { } } - private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): + private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder diff --git a/akka-actors/src/main/scala/dispatch/Future.scala b/akka-actors/src/main/scala/dispatch/Future.scala index 1f9a1a4726..5b9e90c604 100644 --- a/akka-actors/src/main/scala/dispatch/Future.scala +++ b/akka-actors/src/main/scala/dispatch/Future.scala @@ -18,12 +18,12 @@ sealed trait FutureResult { def isCompleted: Boolean def isExpired: Boolean def timeoutInNanos: Long - def result: Option[AnyRef] + def result: Option[Any] def exception: Option[Tuple2[AnyRef, Throwable]] } trait CompletableFutureResult extends FutureResult { - def completeWithResult(result: AnyRef) + def completeWithResult(result: Any) def completeWithException(toBlame: AnyRef, exception: Throwable) } @@ -36,7 +36,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes private val _lock = new ReentrantLock private val _signal = _lock.newCondition private var _completed: Boolean = _ - private var _result: Option[AnyRef] = None + private var _result: Option[Any] = None private var _exception: Option[Tuple2[AnyRef, Throwable]] = None def await = try { @@ -79,7 +79,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes _lock.unlock } - def result: Option[AnyRef] = try { + def result: Option[Any] = try { _lock.lock _result } finally { @@ -93,7 +93,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes _lock.unlock } - def completeWithResult(result: AnyRef) = try { + def completeWithResult(result: Any) = try { _lock.lock if (!_completed) { _completed = true diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala index 36a1dfb989..c790928ae5 100644 --- a/akka-actors/src/main/scala/dispatch/Reactor.scala +++ b/akka-actors/src/main/scala/dispatch/Reactor.scala @@ -38,7 +38,7 @@ trait MessageDemultiplexer { } class MessageInvocation(val receiver: Actor, - val message: AnyRef, + val message: Any, val future: Option[CompletableFutureResult], val sender: Option[Actor], val tx: Option[Transaction]) { diff --git a/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala b/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala index 1c846a42cd..18e2e62d9b 100644 --- a/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala +++ b/akka-actors/src/main/scala/nio/RemoteProtocolBuilder.scala @@ -4,14 +4,14 @@ package se.scalablesolutions.akka.nio -import akka.serialization.Serializable.SBinary +import se.scalablesolutions.akka.serialization.Serializable.SBinary +import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol} +import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} + import com.google.protobuf.{Message, ByteString} -import serialization.{Serializer, Serializable, SerializationProtocol} -import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} - object RemoteProtocolBuilder { - def getMessage(request: RemoteRequest): AnyRef = { + def getMessage(request: RemoteRequest): Any = { request.getProtocol match { case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] @@ -26,13 +26,13 @@ object RemoteProtocolBuilder { val messageClass = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] Serializer.Protobuf.in(request.getMessage.toByteArray, Some(messageClass)) case SerializationProtocol.JAVA => - Serializer.Java.in(request.getMessage.toByteArray, None) + unbox(Serializer.Java.in(request.getMessage.toByteArray, None)) case SerializationProtocol.AVRO => throw new UnsupportedOperationException("Avro protocol is not yet supported") } } - def getMessage(reply: RemoteReply): AnyRef = { + def getMessage(reply: RemoteReply): Any = { reply.getProtocol match { case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] @@ -47,15 +47,15 @@ object RemoteProtocolBuilder { val messageClass = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] Serializer.Protobuf.in(reply.getMessage.toByteArray, Some(messageClass)) case SerializationProtocol.JAVA => - Serializer.Java.in(reply.getMessage.toByteArray, None) + unbox(Serializer.Java.in(reply.getMessage.toByteArray, None)) case SerializationProtocol.AVRO => throw new UnsupportedOperationException("Avro protocol is not yet supported") } } - def setMessage(message: AnyRef, builder: RemoteRequest.Builder) = { + def setMessage(message: Any, builder: RemoteRequest.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { - val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]] + val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) @@ -68,22 +68,22 @@ object RemoteProtocolBuilder { val serializable = message.asInstanceOf[Serializable.ScalaJSON] builder.setProtocol(SerializationProtocol.SCALA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else if (message.isInstanceOf[Serializable.JavaJSON]) { val serializable = message.asInstanceOf[Serializable.JavaJSON] builder.setProtocol(SerializationProtocol.JAVA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else { // default, e.g. if no protocol used explicitly then use Java serialization builder.setProtocol(SerializationProtocol.JAVA) - builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message))) + builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message)))) } } - def setMessage(message: AnyRef, builder: RemoteReply.Builder) = { + def setMessage(message: Any, builder: RemoteReply.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { - val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]] + val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) @@ -96,16 +96,41 @@ object RemoteProtocolBuilder { val serializable = message.asInstanceOf[Serializable.ScalaJSON] builder.setProtocol(SerializationProtocol.SCALA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else if (message.isInstanceOf[Serializable.JavaJSON]) { val serializable = message.asInstanceOf[Serializable.JavaJSON] builder.setProtocol(SerializationProtocol.JAVA_JSON) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes)) + builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) } else { // default, e.g. if no protocol used explicitly then use Java serialization builder.setProtocol(SerializationProtocol.JAVA) - builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message))) + builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message)))) } } + + private def box(value: Any): AnyRef = value match { + case value: Boolean => new java.lang.Boolean(value) + case value: Char => new java.lang.Character(value) + case value: Short => new java.lang.Short(value) + case value: Int => new java.lang.Integer(value) + case value: Long => new java.lang.Long(value) + case value: Float => new java.lang.Float(value) + case value: Double => new java.lang.Double(value) + case value: Byte => new java.lang.Byte(value) + case value => value.asInstanceOf[AnyRef] + } + + private def unbox(value: AnyRef): Any = value match { + case value: java.lang.Boolean => value.booleanValue + case value: java.lang.Character => value.charValue + case value: java.lang.Short => value.shortValue + case value: java.lang.Integer => value.intValue + case value: java.lang.Long => value.longValue + case value: java.lang.Float => value.floatValue + case value: java.lang.Double => value.doubleValue + case value: java.lang.Byte => value.byteValue + case value => value + } + } diff --git a/akka-actors/src/main/scala/serialization/Serializer.scala b/akka-actors/src/main/scala/serialization/Serializer.scala index 4e596a4bcb..643855a141 100644 --- a/akka-actors/src/main/scala/serialization/Serializer.scala +++ b/akka-actors/src/main/scala/serialization/Serializer.scala @@ -4,11 +4,12 @@ package se.scalablesolutions.akka.serialization -import com.google.protobuf.Message import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream} -import reflect.{BeanProperty, Manifest} -import sbinary.DefaultProtocol + +import com.google.protobuf.Message + import org.codehaus.jackson.map.ObjectMapper + import sjson.json.{Serializer =>SJSONSerializer} /** diff --git a/akka-util/src/main/scala/HashCode.scala b/akka-util/src/main/scala/HashCode.scala index 0ce21f6a92..fd9e46a682 100755 --- a/akka-util/src/main/scala/HashCode.scala +++ b/akka-util/src/main/scala/HashCode.scala @@ -27,19 +27,28 @@ import java.lang.{Float => JFloat, Double => JDouble} object HashCode { val SEED = 23 + def hash(seed: Int, any: Any): Int = any match { + case value: Boolean => hash(seed, value) + case value: Char => hash(seed, value) + case value: Short => hash(seed, value) + case value: Int => hash(seed, value) + case value: Long => hash(seed, value) + case value: Float => hash(seed, value) + case value: Double => hash(seed, value) + case value: Byte => hash(seed, value) + case value: AnyRef => + var result = seed + if (value == null) result = hash(result, 0) + else if (!isArray(value)) result = hash(result, value.hashCode()) + else for (id <- 0 until JArray.getLength(value)) result = hash(result, JArray.get(value, id)) // is an array + result + } def hash(seed: Int, value: Boolean): Int = firstTerm(seed) + (if (value) 1 else 0) def hash(seed: Int, value: Char): Int = firstTerm(seed) + value.asInstanceOf[Int] def hash(seed: Int, value: Int): Int = firstTerm(seed) + value def hash(seed: Int, value: Long): Int = firstTerm(seed) + (value ^ (value >>> 32) ).asInstanceOf[Int] def hash(seed: Int, value: Float): Int = hash(seed, JFloat.floatToIntBits(value)) def hash(seed: Int, value: Double): Int = hash(seed, JDouble.doubleToLongBits(value)) - def hash(seed: Int, anyRef: AnyRef): Int = { - var result = seed - if (anyRef == null) result = hash(result, 0) - else if (!isArray(anyRef)) result = hash(result, anyRef.hashCode()) - else for (id <- 0 until JArray.getLength(anyRef)) result = hash(result, JArray.get(anyRef, id)) // is an array - result - } private def firstTerm(seed: Int): Int = PRIME * seed private def isArray(anyRef: AnyRef): Boolean = anyRef.getClass.isArray