Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Eugene Vigdorchik 2011-12-31 11:34:31 +04:00
commit 5ec128f83a
25 changed files with 1280 additions and 640 deletions

View file

@ -1,75 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testing
import akka.serialization.Serializer
import com.google.protobuf.Message
import org.codehaus.jackson.map.ObjectMapper
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
import akka.util.ClassLoaderObjectInputStream
import sjson.json._
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def identifier = 2: Byte
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
obj.asInstanceOf[Message].toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Need a protobuf message class to be able to serialize bytes using protobuf")
clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
}
}
object ProtobufSerializer extends ProtobufSerializer
class JavaJSONSerializer extends Serializer {
private val mapper = new ObjectMapper
def identifier = 3: Byte
def toBinary(obj: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
mapper.writeValue(out, obj)
out.close
bos.toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Can't deserialize JSON to instance if no class is provided")
val in =
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes))
else new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef]
in.close
obj
}
}
object JavaJSONSerializer extends JavaJSONSerializer
class SJSONSerializer extends Serializer {
def identifier = 4: Byte
def toBinary(obj: AnyRef): Array[Byte] =
sjson.json.Serializer.SJSON.out(obj)
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], cl: Option[ClassLoader] = None): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Can't deserialize JSON to instance if no class is provided")
import sjson.json.Serializer._
val sj = new SJSON with DefaultConstructor { val classLoader = cl }
sj.in(bytes, clazz.get.getName)
}
}
object SJSONSerializer extends SJSONSerializer

View file

@ -4,8 +4,6 @@
package akka.serialization
import akka.serialization.Serialization._
import scala.reflect._
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import akka.actor._
@ -13,6 +11,26 @@ import java.io._
import akka.dispatch.Await
import akka.util.Timeout
import akka.util.duration._
import scala.reflect.BeanInfo
import com.google.protobuf.Message
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def includeManifest: Boolean = true
def identifier = 2
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
obj.asInstanceOf[Message].toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Need a protobuf message class to be able to serialize bytes using protobuf")
clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
}
}
object SerializeSpec {
@ -21,14 +39,10 @@ object SerializeSpec {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.testing.ProtobufSerializer"
sjson = "akka.testing.SJSONSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
sjson = ["akka.serialization.SerializeSpec$Person"]
java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
}
}
@ -57,7 +71,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
"have correct bindings" in {
ser.bindings(addr.getClass.getName) must be("java")
ser.bindings(person.getClass.getName) must be("sjson")
ser.bindings("akka.actor.ProtobufProtocol$MyMessage") must be("proto")
}
"serialize Address" in {
@ -143,14 +157,12 @@ object VerifySerializabilitySpec {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.testing.ProtobufSerializer"
sjson = "akka.testing.SJSONSerializer"
proto = "akka.serialization.ProtobufSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
sjson = ["akka.serialization.SerializeSpec$Person"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
}
}

View file

@ -197,8 +197,7 @@ akka {
# class is not found, then the default serializer (Java serialization) is used.
serializers {
# java = "akka.serialization.JavaSerializer"
# proto = "akka.testing.ProtobufSerializer"
# sjson = "akka.testing.SJSONSerializer"
# proto = "akka.serialization.ProtobufSerializer"
default = "akka.serialization.JavaSerializer"
}
@ -208,7 +207,6 @@ akka {
# "akka.serialization.MyJavaSerializableActor",
# "akka.serialization.MyStatelessActorWithMessagesInMailbox",
# "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
# sjson = ["akka.serialization.SerializeSpec$Person"]
# proto = ["com.google.protobuf.Message",
# "akka.actor.ProtobufProtocol$MyMessage"]
# }

View file

@ -560,9 +560,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() runnable.run())
}
def run(timeout: org.jboss.netty.akka.util.Timeout) { dispatcher.execute(runnable) }
}
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
@ -575,7 +573,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createSingleTask(f: Unit): TimerTask =
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() f)
dispatcher.execute(new Runnable { def run = f })
}
}
@ -598,7 +596,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createContinuousTask(delay: Duration, f: Unit): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() f)
dispatcher.execute(new Runnable { def run = f })
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
@ -609,7 +607,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() runnable.run())
dispatcher.execute(runnable)
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}

View file

@ -184,7 +184,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
* @throws the underlying exception if there's an InvocationTargetException thrown on the invocation
*/
def apply(instance: AnyRef): AnyRef = try {
parameters match { //TODO: We do not yet obey Actor.SERIALIZE_MESSAGES
parameters match {
case null method.invoke(instance)
case args if args.length == 0 method.invoke(instance)
case args method.invoke(instance, args: _*)
@ -193,7 +193,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
private def writeReplace(): AnyRef = parameters match {
case null SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null)
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]())
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Int](), Array[Array[Byte]]())
case ps
val serializers: Array[Serializer] = ps map SerializationExtension(Serialization.currentSystem.value).findSerializerFor
val serializedParameters: Array[Array[Byte]] = Array.ofDim[Array[Byte]](serializers.length)
@ -207,7 +207,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializerIdentifiers: Array[Serializer.Identifier], serializedParameters: Array[Array[Byte]]) {
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializerIdentifiers: Array[Int], serializedParameters: Array[Array[Byte]]) {
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
@ -390,6 +390,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
private[akka] class TypedActorInvocationHandler(val extension: TypedActorExtension, val actorVar: AtomVar[ActorRef], val timeout: Timeout) extends InvocationHandler {
def actor = actorVar.get
@throws(classOf[Throwable])
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" actor.toString
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean

View file

@ -74,10 +74,10 @@ case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to sup
case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch
case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch
final case class TaskInvocation(eventStream: EventStream, function: () Unit, cleanup: () Unit) extends Runnable {
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
def run() {
try {
function()
runnable.run()
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
@ -87,15 +87,48 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit,
}
}
object ExecutionContext {
implicit def defaultExecutionContext(implicit system: ActorSystem): ExecutionContext = system.dispatcher
/**
* Creates an ExecutionContext from the given ExecutorService
*/
def fromExecutorService(e: ExecutorService): ExecutionContext = new WrappedExecutorService(e)
/**
* Creates an ExecutionContext from the given Executor
*/
def fromExecutor(e: Executor): ExecutionContext = new WrappedExecutor(e)
private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext
private class WrappedExecutor(val executor: Executor) extends Executor with ExecutionContext {
override final def execute(runnable: Runnable): Unit = executor.execute(runnable)
}
}
/**
* An ExecutionContext is essentially the same thing as a java.util.concurrent.Executor
* This interface/trait exists to decouple the concept of execution from Actors & MessageDispatchers
* It is also needed to provide a fallback implicit default instance (in the companion object).
*/
trait ExecutionContext {
/**
* Submits the runnable for execution
*/
def execute(runnable: Runnable): Unit
}
object MessageDispatcher {
val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
val SCHEDULED = 1
val RESCHEDULED = 2
implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher
implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher
}
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable {
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable with Executor with ExecutionContext {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater }
@ -131,8 +164,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
ifSensibleToDoSoThenScheduleShutdown()
}
protected[akka] final def dispatchTask(block: () Unit) {
val invocation = TaskInvocation(eventStream, block, taskCleanup)
final def execute(runnable: Runnable) {
val invocation = TaskInvocation(eventStream, runnable, taskCleanup)
inhabitantsUpdater.incrementAndGet(this)
try {
executeTask(invocation)

View file

@ -21,8 +21,9 @@ import scala.annotation.tailrec
import scala.collection.mutable.Stack
import akka.util.{ Switch, Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable }
import akka.dispatch.Await.CanAwait
import java.util.concurrent._
import akka.actor.ActorSystem
object Await {
sealed trait CanAwait
@ -55,37 +56,37 @@ object Futures {
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher)
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)
/**
* Java API, equivalent to Promise.apply
*/
def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher)
def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()(executor)
/**
* Java API, creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable, dispatcher: MessageDispatcher): Promise[T] = Promise.failed(exception)(dispatcher)
def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)(executor)
/**
* Java API, Creates an already completed Promise with the specified result
*/
def successful[T](result: T, dispatcher: MessageDispatcher): Promise[T] = Promise.successful(result)(dispatcher)
def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)(executor)
/**
* Java API.
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = {
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_))
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = {
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(executor).map(JOption.fromScalaOption(_))
}
/**
* Java API.
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher)
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(executor)
/**
* Java API
@ -94,23 +95,23 @@ object Futures {
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(executor)
/**
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher)
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], executor: ExecutionContext): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(executor)
/**
* Java API.
* Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = {
implicit val d = dispatcher
def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = {
implicit val d = executor
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
@ -124,8 +125,8 @@ object Futures {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = {
implicit val d = dispatcher
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], executor: ExecutionContext): Future[JIterable[B]] = {
implicit val d = executor
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a)
val fb = fn(a)
for (r fr; b fb) yield { r add b; r }
@ -139,18 +140,19 @@ object Future {
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T)(implicit dispatcher: MessageDispatcher): Future[T] = {
def apply[T](body: T)(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()
dispatcher dispatchTask { ()
promise complete {
try {
Right(body)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e Left(e)
executor.execute(new Runnable {
def run =
promise complete {
try {
Right(body)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e Left(e)
}
}
}
}
})
promise
}
@ -161,13 +163,13 @@ object Future {
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] =
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = {
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val futureResult = Promise[T]()
val completeFirst: Either[Throwable, T] Unit = futureResult complete _
@ -179,7 +181,7 @@ object Future {
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](futures: Traversable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
def find[T](futures: Traversable[Future[T]])(predicate: T Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
if (futures.isEmpty) Promise.successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
@ -210,7 +212,7 @@ object Future {
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
* </pre>
*/
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise.successful(zero)
else sequence(futures).map(_.foldLeft(zero)(foldFun))
}
@ -222,7 +224,7 @@ object Future {
* val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
* </pre>
*/
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] = {
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) T)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).map(_ reduce op)
}
@ -234,7 +236,7 @@ object Future {
* val myFutureList = Future.traverse(myList)(x Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
@ -256,7 +258,7 @@ object Future {
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = {
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val future = Promise[A]
dispatchTask({ ()
(reify(body) foreachFull (future success, future failure): Future[Any]) onFailure {
@ -290,7 +292,7 @@ object Future {
* }
* </pre>
*/
def blocking(implicit dispatcher: MessageDispatcher): Unit =
def blocking(implicit executor: ExecutionContext): Unit =
_taskStack.get match {
case Some(taskStack) if taskStack.nonEmpty
val tasks = taskStack.elems
@ -308,32 +310,38 @@ object Future {
/**
* Internal API, do not call
*/
private[akka] def dispatchTask(task: () Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit =
private[akka] def dispatchTask(task: () Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit =
_taskStack.get match {
case Some(taskStack) if !force taskStack push task
case _
dispatcher dispatchTask { ()
try {
val taskStack = Stack[() Unit](task)
_taskStack set Some(taskStack)
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case e
// FIXME catching all and continue isn't good for OOME, ticket #1418
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage))
case _ executor.execute(
new Runnable {
def run =
try {
val taskStack = Stack[() Unit](task)
_taskStack set Some(taskStack)
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case e
// FIXME catching all and continue isn't good for OOME, ticket #1418
executor match {
case m: MessageDispatcher
m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage))
case other
e.printStackTrace()
}
}
}
}
} finally { _taskStack set None }
}
} finally { _taskStack set None }
})
}
}
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
implicit def dispatcher: MessageDispatcher
implicit def executor: ExecutionContext
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X])
@ -462,8 +470,8 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
future complete (try {
Right(f(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
case e
logError("Future.map", e)
Left(e)
})
}
@ -511,19 +519,25 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
try {
p completeWith f(r)
} catch {
case e: Exception
case e
p complete Left(e)
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
logError("Future.flatMap", e)
}
}
p
}
/**
* Same as onSuccess { case r => f(r) } but is also used in for-comprehensions
*/
final def foreach(f: T Unit): Unit = onComplete {
case Right(r) f(r)
case _
}
/**
* Used by for-comprehensions
*/
final def withFilter(p: T Boolean) = new FutureWithFilter[T](this, p)
final class FutureWithFilter[+A](self: Future[A], p: A Boolean) {
@ -533,6 +547,11 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
}
/**
* Returns a new Future that will hold the successful result of this Future if it matches
* the given predicate, if it doesn't match, the resulting Future will be a failed Future
* with a MatchError, of if this Future fails, that failure will be propagated to the returned Future
*/
final def filter(pred: T Boolean): Future[T] = {
val p = Promise[T]()
onComplete {
@ -540,13 +559,20 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case r @ Right(res) p complete (try {
if (pred(res)) r else Left(new MatchError(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
case e
logError("Future.filter", e)
Left(e)
})
}
p
}
protected def logError(msg: String, problem: Throwable): Unit = {
executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage))
case other problem.printStackTrace()
}
}
}
object Promise {
@ -555,17 +581,17 @@ object Promise {
*
* Scala API
*/
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
def apply[A]()(implicit executor: ExecutionContext): Promise[A] = new DefaultPromise[A]()
/**
* Creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception))
def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Left(exception))
/**
* Creates an already completed Promise with the specified result
*/
def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result))
def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result))
}
/**
@ -621,8 +647,8 @@ trait Promise[T] extends Future[T] {
try {
fr completeWith cont(thisPromise)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
case e
logError("Promise.completeWith", e)
fr failure e
}
}
@ -636,8 +662,8 @@ trait Promise[T] extends Future[T] {
try {
fr completeWith cont(f)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
case e
logError("Promise.completeWith", e)
fr failure e
}
}
@ -669,7 +695,7 @@ private[dispatch] object DefaultPromise {
/**
* The default concrete Future implementation.
*/
class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
self
import DefaultPromise.{ FState, Success, Failure, Pending }
@ -759,7 +785,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
}
private final def notifyCompleted(func: Either[Throwable, T] Unit, result: Either[Throwable, T]) {
try { func(result) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
try { func(result) } catch { case e logError("Future onComplete-callback raised an exception", e) }
}
}
@ -767,7 +793,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
*/
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
val value = Some(resolve(suppliedValue))
def tryComplete(value: Either[Throwable, T]): Boolean = true

View file

@ -14,18 +14,26 @@ import akka.actor.{ Extension, ActorSystem, ActorSystemImpl }
case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization {
// TODO ensure that these are always set (i.e. withValue()) when doing deserialization
val currentSystem = new DynamicVariable[ActorSystemImpl](null)
/**
* This holds a reference to the current ActorSystem (the surrounding context)
* during serialization and deserialization.
*
* If you are using Serializers yourself, outside of SerializationExtension,
* you'll need to surround the serialization/deserialization with:
*
* currentSystem.withValue(system) {
* ...code...
* }
*/
val currentSystem = new DynamicVariable[ActorSystem](null)
class Settings(val config: Config) {
import scala.collection.JavaConverters._
import config._
val Serializers: Map[String, String] = {
toStringMap(getConfig("akka.actor.serializers"))
}
val Serializers: Map[String, String] =
getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
@ -40,9 +48,6 @@ object Serialization {
}
}
private def toStringMap(mapConfig: Config): Map[String, String] =
mapConfig.root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
}
}
@ -55,27 +60,53 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
val settings = new Settings(system.settings.config)
//TODO document me
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
* to either an Array of Bytes or an Exception if one was thrown.
*/
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception Left(e) }
//TODO document me
/**
* Deserializes the given array of bytes using the specified serializer id,
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte],
serializerId: Int,
clazz: Option[Class[_]],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) {
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, classLoader))
}
} catch { case e: Exception Left(e) }
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) {
Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader))
}
currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) }
} catch { case e: Exception Left(e) }
/**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null,
* falls back to the Serializer named "default"
*/
def findSerializerFor(o: AnyRef): Serializer = o match {
case null NullSerializer
case other serializerFor(other.getClass)
}
//TODO document me
/**
* Returns the configured Serializer for the given Class, falls back to the Serializer named "default"
*/
def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups
serializerMap.get(clazz.getName).getOrElse(serializers("default"))
@ -85,6 +116,9 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs)
/**
* FIXME implement support for this
*/
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
if (bindings.isEmpty)
Left(NoSerializerFoundException("No mapping serializer found for " + cl))
@ -116,14 +150,11 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/
lazy val bindings: Map[String, String] = {
val configBindings = settings.SerializationBindings
configBindings.foldLeft(Map[String, String]()) {
case (result, (k: String, vs: Seq[_]))
//All keys which are lists, take the Strings from them and Map them
result ++ (vs collect { case v: String (v, k) })
case (result, x)
//For any other values, just skip them
result
settings.SerializationBindings.foldLeft(Map[String, String]()) {
//All keys which are lists, take the Strings from them and Map them
case (result, (k: String, vs: Seq[_])) result ++ (vs collect { case v: String (v, k) })
//For any other values, just skip them
case (result, _) result
}
}
@ -133,9 +164,9 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
lazy val serializerMap: Map[String, Serializer] = bindings mapValues serializers
/**
* Maps from a Serializer.Identifier (Byte) to a Serializer instance (optimization)
* Maps from a Serializer Identity (Int) to a Serializer instance (optimization)
*/
lazy val serializerByIdentity: Map[Serializer.Identifier, Serializer] =
lazy val serializerByIdentity: Map[Int, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
}

View file

@ -5,6 +5,10 @@ package akka.serialization
import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ActorSystemImpl }
/**
* SerializationExtension is an Akka Extension to interact with the Serialization
* that is built into Akka
*/
object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider {
override def get(system: ActorSystem): Serialization = super.get(system)
override def lookup = SerializationExtension

View file

@ -7,38 +7,67 @@ package akka.serialization
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
import akka.util.ClassLoaderObjectInputStream
object Serializer {
val defaultSerializerName = classOf[JavaSerializer].getName
type Identifier = Byte
}
/**
* A Serializer represents a bimap between an object and an array of bytes representing that object
*/
trait Serializer extends scala.Serializable {
/**
* Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic
* Completely unique value to identify this implementation of Serializer, used to optimize network traffic
* Values from 0 to 16 is reserved for Akka internal usage
*/
def identifier: Serializer.Identifier
def identifier: Int
/**
* Serializes the given object into an Array of Byte
*/
def toBinary(o: AnyRef): Array[Byte]
/**
* Returns whether this serializer needs a manifest in the fromBinary method
*/
def includeManifest: Boolean
/**
* Deserializes the given Array of Bytes into an AnyRef
*/
def fromBinary(bytes: Array[Byte]): AnyRef = fromBinary(bytes, None, None)
/**
* Deserializes the given Array of Bytes into an AnyRef with an optional type hint
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = fromBinary(bytes, manifest, None)
/**
* Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]], classLoader: Option[ClassLoader]): AnyRef
}
/**
* Java API for creating a Serializer
*/
abstract class JSerializer extends Serializer {
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef =
fromBinary(bytes, manifest.orNull, classLoader.orNull)
/**
* This method should be overridden,
* manifest and classLoader may be null.
*/
def fromBinary(bytes: Array[Byte], manifest: Class[_], classLoader: ClassLoader): AnyRef
}
object JavaSerializer extends JavaSerializer
object NullSerializer extends NullSerializer
/**
* This Serializer uses standard Java Serialization
*/
class JavaSerializer extends Serializer {
def identifier = 1: Byte
def includeManifest: Boolean = false
def identifier = 1
def toBinary(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
@ -59,11 +88,13 @@ class JavaSerializer extends Serializer {
}
}
/**
* This is a special Serializer that Serializes and deserializes nulls only
*/
class NullSerializer extends Serializer {
val nullAsBytes = Array[Byte]()
def identifier = 0: Byte
def includeManifest: Boolean = false
def identifier = 0
def toBinary(o: AnyRef) = nullAsBytes
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null
}

View file

@ -175,7 +175,7 @@ Looking up Actors by Concrete Path
In addition, actor references may be looked up using the
:meth:`ActorSystem.actorFor` method, which returns an (unverified) local,
remote or clustered actor reference. Sending messages to such a reference or
attempting to observe its livelyhood will traverse the actor hierarchy of the
attempting to observe its liveness will traverse the actor hierarchy of the
actor system from top to bottom by passing messages from parent to child until
either the target is reached or failure is certain, i.e. a name in the path
does not exist (in practice this process will be optimized using caches, but it

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.serialization
import org.scalatest.junit.JUnitSuite
class SerializationDocTest extends SerializationDocTestBase with JUnitSuite

View file

@ -0,0 +1,136 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.serialization;
import akka.serialization.JSerializer;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.serialization.Serializer;
import org.junit.Test;
import static org.junit.Assert.*;
//#imports
import akka.serialization.*;
import akka.actor.ActorSystem;
import com.typesafe.config.*;
//#imports
public class SerializationDocTestBase {
//#my-own-serializer
public static class MyOwnSerializer extends JSerializer {
// This is whether "fromBinary" requires a "clazz" or not
@Override public boolean includeManifest() {
return false;
}
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 16 is reserved by Akka itself
@Override public int identifier() {
return 1234567;
}
// "toBinary" serializes the given object to an Array of Bytes
@Override public byte[] toBinary(Object obj) {
// Put the code that serializes the object here
//#...
return new byte[0];
//#...
}
// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
// into the optionally provided classLoader.
@Override public Object fromBinary(byte[] bytes,
Class clazz,
ClassLoader classLoader) {
// Put your code that deserializes here
//#...
return null;
//#...
}
}
//#my-own-serializer
@Test public void haveExamples() {
/*
//#serialize-messages-config
akka {
actor {
serialize-messages = on
}
}
//#serialize-messages-config
//#serialize-creators-config
akka {
actor {
serialize-creators = on
}
}
//#serialize-creators-config
//#serialize-serializers-config
akka {
actor {
serializers {
default = "akka.serialization.JavaSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
}
}
//#serialize-serializers-config
//#serialization-bindings-config
akka {
actor {
serializers {
default = "akka.serialization.JavaSerializer"
java = "akka.serialization.JavaSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
serialization-bindings {
java = ["java.lang.String",
"app.my.Customer"]
myown = ["my.own.BusinessObject",
"something.equally.Awesome",
"java.lang.Boolean"]
}
}
}
//#serialization-bindings-config
*/
}
@Test public void demonstrateTheProgrammaticAPI() {
//#programmatic
ActorSystem system = ActorSystem.create("example");
// Get the Serialization Extension
Serialization serialization = SerializationExtension.get(system);
// Have something to serialize
String original = "woohoo";
// Find the Serializer for it
Serializer serializer = serialization.findSerializerFor(original);
// Turn it into bytes
byte[] bytes = serializer.toBinary(original);
// Turn it back into an object,
// the nulls are for the class manifest and for the classloader
String back = (String)serializer.fromBinary(bytes);
// Voilá!
assertEquals(original, back);
//#programmatic
system.shutdown();
}
}

View file

@ -5,8 +5,90 @@
Serialization (Java)
#####################
Serialization will soon be documented.
.. sidebar:: Contents
Until then we refer to the following section in the configuration file:
.. contents:: :local:
* `Serializers <https://github.com/jboner/akka/blob/master/akka-actor/src/main/resources/reference.conf#L180>`_
Akka has a built-in Extension for serialization,
and it is both possible to use the built-in serializers and to write your own.
The serialization mechanism is both used by Akka internally to serialize messages,
and available for ad-hoc serialization of whatever you might need it for.
Usage
=====
Configuration
-------------
For Akka to know which ``Serializer`` to use for what, you need edit your Akka Configuration,
in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer``
you wish to use, like this:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-serializers-config
.. note::
The name ``default`` is special in the sense that the ``Serializer``
mapped to it will be used as default.
After you've bound names to different implementations of ``Serializer`` you need to wire which classes
should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialization-bindings-config
.. note::
Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``,
this means that you'll need to list the specific classes.
Verification
------------
If you want to verify that your messages are serializable you can enable the following config option:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-messages-config
.. warning::
We only recommend using the config option turned on when you're running tests.
It is completely pointless to have it turned on in other scenarios.
If you want to verify that your ``Props`` are serializable you can enable the following config option:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-creators-config
.. warning::
We only recommend using the config option turned on when you're running tests.
It is completely pointless to have it turned on in other scenarios.
Programmatic
------------
If you want to programmatically serialize/deserialize using Akka Serialization,
here's some examples:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java
:include: imports,programmatic
For more information, have a look at the ``ScalaDoc`` for ``akka.serialization._``
Customization
=============
So, lets say that you want to create your own ``Serializer``,
you saw the ``akka.docs.serialization.MyOwnSerializer`` in the config example above?
Creating new Serializers
------------------------
First you need to create a class definition of your ``Serializer``,
which is done by extending ``akka.serialization.JSerializer``, like this:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java
:include: imports,my-own-serializer
:exclude: ...
Then you only need to fill in the blanks, bind it to a name in your Akka Configuration and then
list which classes that should be serialized using it.

View file

@ -70,8 +70,6 @@ class FutureDocSpec extends AkkaSpec {
import akka.dispatch.Future
import akka.util.duration._
implicit def dispatcher = system.dispatcher
val future = Future {
"Hello" + "World"
}

View file

@ -0,0 +1,157 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.serialization
import org.scalatest.matchers.MustMatchers
import akka.testkit._
//#imports
import akka.actor.ActorSystem
import akka.serialization._
import com.typesafe.config.ConfigFactory
//#imports
//#my-own-serializer
class MyOwnSerializer extends Serializer {
// This is whether "fromBinary" requires a "clazz" or not
def includeManifest: Boolean = false
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 16 is reserved by Akka itself
def identifier = 1234567
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the code that serializes the object here
//#...
Array[Byte]()
//#...
}
// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
// into the optionally provided classLoader.
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]],
classLoader: Option[ClassLoader] = None): AnyRef = {
// Put your code that deserializes here
//#...
null
//#...
}
}
//#my-own-serializer
class SerializationDocSpec extends AkkaSpec {
"demonstrate configuration of serialize messages" in {
//#serialize-messages-config
val config = ConfigFactory.parseString("""
akka {
actor {
serialize-messages = on
}
}
""")
//#serialize-messages-config
val a = ActorSystem("system", config)
a.settings.SerializeAllMessages must be(true)
a.shutdown()
}
"demonstrate configuration of serialize creators" in {
//#serialize-creators-config
val config = ConfigFactory.parseString("""
akka {
actor {
serialize-creators = on
}
}
""")
//#serialize-creators-config
val a = ActorSystem("system", config)
a.settings.SerializeAllCreators must be(true)
a.shutdown()
}
"demonstrate configuration of serializers" in {
//#serialize-serializers-config
val config = ConfigFactory.parseString("""
akka {
actor {
serializers {
default = "akka.serialization.JavaSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
}
}
""")
//#serialize-serializers-config
val a = ActorSystem("system", config)
SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer")
SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer")
a.shutdown()
}
"demonstrate configuration of serialization-bindings" in {
//#serialization-bindings-config
val config = ConfigFactory.parseString("""
akka {
actor {
serializers {
default = "akka.serialization.JavaSerializer"
java = "akka.serialization.JavaSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
serialization-bindings {
java = ["java.lang.String",
"app.my.Customer"]
myown = ["my.own.BusinessObject",
"something.equally.Awesome",
"java.lang.Boolean"]
}
}
}
""")
//#serialization-bindings-config
val a = ActorSystem("system", config)
SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer")
SerializationExtension(a).serializers("java").getClass.getName must equal("akka.serialization.JavaSerializer")
SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer")
SerializationExtension(a).serializerFor(classOf[String]).getClass.getName must equal("akka.serialization.JavaSerializer")
SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass.getName must equal("akka.docs.serialization.MyOwnSerializer")
a.shutdown()
}
"demonstrate the programmatic API" in {
//#programmatic
val system = ActorSystem("example")
// Get the Serialization Extension
val serialization = SerializationExtension(system)
// Have something to serialize
val original = "woohoo"
// Find the Serializer for it
val serializer = serialization.findSerializerFor(original)
// Turn it into bytes
val bytes = serializer.toBinary(original)
// Turn it back into an object
val back = serializer.fromBinary(bytes,
manifest = None,
classLoader = None)
// Voilá!
back must equal(original)
//#programmatic
system.shutdown()
}
}

View file

@ -5,8 +5,89 @@
Serialization (Scala)
######################
Serialization will soon be documented.
.. sidebar:: Contents
Until then we refer to the following section in the configuration file:
.. contents:: :local:
* `Serializers <https://github.com/jboner/akka/blob/master/akka-actor/src/main/resources/reference.conf#L180>`_
Akka has a built-in Extension for serialization,
and it is both possible to use the built-in serializers and to write your own.
The serialization mechanism is both used by Akka internally to serialize messages,
and available for ad-hoc serialization of whatever you might need it for.
Usage
=====
Configuration
-------------
For Akka to know which ``Serializer`` to use for what, you need edit your Akka Configuration,
in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer``
you wish to use, like this:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config
.. note::
The name ``default`` is special in the sense that the ``Serializer``
mapped to it will be used as default.
After you've bound names to different implementations of ``Serializer`` you need to wire which classes
should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config
.. note::
Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``,
this means that you'll need to list the specific classes.
Verification
------------
If you want to verify that your messages are serializable you can enable the following config option:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-messages-config
.. warning::
We only recommend using the config option turned on when you're running tests.
It is completely pointless to have it turned on in other scenarios.
If you want to verify that your ``Props`` are serializable you can enable the following config option:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-creators-config
.. warning::
We only recommend using the config option turned on when you're running tests.
It is completely pointless to have it turned on in other scenarios.
Programmatic
------------
If you want to programmatically serialize/deserialize using Akka Serialization,
here's some examples:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala
:include: imports,programmatic
For more information, have a look at the ``ScalaDoc`` for ``akka.serialization._``
Customization
=============
So, lets say that you want to create your own ``Serializer``,
you saw the ``akka.docs.serialization.MyOwnSerializer`` in the config example above?
Creating new Serializers
------------------------
First you need to create a class definition of your ``Serializer`` like so:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala
:include: imports,my-own-serializer
:exclude: ...
Then you only need to fill in the blanks, bind it to a name in your Akka Configuration and then
list which classes that should be serialized using it.

File diff suppressed because it is too large Load diff

View file

@ -73,7 +73,8 @@ message ActorRefProtocol {
*/
message MessageProtocol {
required bytes message = 1;
optional bytes messageManifest = 2;
required int32 serializerId = 2;
optional bytes messageManifest = 3;
}
/**

View file

@ -37,8 +37,16 @@ akka {
}
remote {
# Which implementation of akka.remote.RemoteSupport to use
# default is a TCP-based remote transport based on Netty
transport = "akka.remote.netty.NettyRemoteSupport"
# In case of increased latency / overflow how long
# should we wait (blocking the sender) until we deem the send to be cancelled?
# 0 means "never backoff", any positive number will indicate time to block at most.
backoff-timeout = 0ms
use-compression = off
# Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'

View file

@ -5,29 +5,40 @@
package akka.remote
import akka.remote.RemoteProtocol._
import akka.serialization.Serialization
import com.google.protobuf.ByteString
import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import akka.util.ReflectiveAccess
object MessageSerializer {
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
val clazz = loadManifest(classLoader, messageProtocol)
SerializationExtension(system).deserialize(messageProtocol.getMessage.toByteArray,
clazz, classLoader).fold(x throw x, identity)
val clazz = if (messageProtocol.hasMessageManifest) {
Option(ReflectiveAccess.getClassFor[AnyRef](
messageProtocol.getMessageManifest.toStringUtf8,
classLoader.getOrElse(ReflectiveAccess.loader)) match {
case Left(e) throw e
case Right(r) r
})
} else None
SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId,
clazz,
classLoader) match {
case Left(e) throw e
case Right(r) r
}
}
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
val s = SerializationExtension(system)
val serializer = s.findSerializerFor(message)
val builder = MessageProtocol.newBuilder
val bytes = SerializationExtension(system).serialize(message).fold(x throw x, identity)
builder.setMessage(ByteString.copyFrom(bytes))
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier)
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.build
}
private def loadManifest(classLoader: Option[ClassLoader], messageProtocol: MessageProtocol): Class[_] = {
val manifest = messageProtocol.getMessageManifest.toStringUtf8
classLoader map (_.loadClass(manifest)) getOrElse (Class.forName(manifest))
}
}

View file

@ -23,6 +23,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")

View file

@ -18,14 +18,14 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import scala.collection.mutable.HashMap
import java.net.InetSocketAddress
import java.util.concurrent._
import java.util.concurrent.atomic._
import akka.AkkaException
import akka.event.Logging
import locks.ReentrantReadWriteLock
import org.jboss.netty.channel._
import akka.actor.ActorSystemImpl
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
import java.util.concurrent._
import locks.ReentrantReadWriteLock
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
@ -73,16 +73,21 @@ abstract class RemoteClient private[akka] (
*/
private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = {
try {
currentChannel.write(request).addListener(
val channel = currentChannel
val f = channel.write(request)
f.addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
if (future.isCancelled || !future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(request, future.getCause, remoteSupport, remoteAddress))
}
}
})
// Check if we should back off
if (!channel.isWritable) {
val backoff = remoteSupport.remote.remoteSettings.BackoffTimeout
if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off
}
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import com.google.protobuf.Message
/**
* This Serializer serializes `com.google.protobuf.Message`s
*/
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def includeManifest: Boolean = true
def identifier = 2
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Message m.toByteArray
case _ throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef =
clazz match {
case None throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
case Some(c) c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
}
}

View file

@ -15,6 +15,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport")
getString("akka.remote.secure-cookie") must equal("")
getBoolean("akka.remote.use-passive-connections") must equal(true)
getMilliseconds("akka.remote.backoff-timeout") must equal(0)
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
//akka.remote.server