Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2010-11-04 12:51:27 +01:00
commit b15fe0ff5d
12 changed files with 963 additions and 1588 deletions

View file

@ -19,6 +19,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.api.exceptions.DeadTransactionException
import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map => JMap }
@ -26,12 +27,12 @@ import java.lang.reflect.Field
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import annotation.tailrec
import scala.annotation.tailrec
private[akka] object ActorRefInternals extends Logging {
/** LifeCycles for ActorRefs
/**
* LifeCycles for ActorRefs.
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType

View file

@ -5,11 +5,12 @@
package akka.actor
import akka.stm.Ref
import akka.AkkaException
import akka.config.RemoteAddress
import akka.japi.{Function => JFunc, Procedure => JProc}
import akka.AkkaException
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
import akka.config.RemoteAddress
class AgentException private[akka](message: String) extends AkkaException(message)
@ -105,6 +106,7 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =
import Agent._
import Actor._
val dispatcher = remote match {
case Some(address) =>
val d = actorOf(new AgentDispatcher[T]())
@ -244,7 +246,8 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =
*/
object Agent {
import Actor._
/*
/**
* The internal messages for passing around requests.
*/
private[akka] case class Value[T](value: T)
@ -291,13 +294,11 @@ final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor {
* Periodically handles incoming messages.
*/
def receive = {
case Value(v: T) =>
swap(v)
case Value(v: T) => swap(v)
case Read => self.reply_?(value.get())
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
proc(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))
case Function(fun: (T => T)) => swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) => proc(value.getOrElse(
throw new AgentException("Could not read Agent's value; value is null")))
}
/**

View file

@ -7,11 +7,11 @@ package akka.actor
import akka.dispatch._
import akka.stm.global._
import akka.config.Supervision._
import akka.japi.Procedure
import java.net.InetSocketAddress
import scala.reflect.BeanProperty
import akka.japi.Procedure
/**
* Subclass this abstract class to create a MDB-style untyped actor.
@ -62,6 +62,7 @@ import akka.japi.Procedure
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class UntypedActor extends Actor {
def getContext(): ActorRef = self
final protected def receive = {
@ -123,6 +124,7 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object UntypedActor {
/**
* Creates an ActorRef out of the Actor type represented by the class provided.
* Example in Java:

View file

@ -6,9 +6,10 @@ package akka.dispatch
import akka.AkkaException
import akka.actor.Actor.spawn
import akka.routing.Dispatcher
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import akka.routing.Dispatcher
class FutureTimeoutException(message: String) extends AkkaException(message)
@ -26,12 +27,10 @@ object Futures {
dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
(body: => T): Future[T] = {
val f = new DefaultCompletableFuture[T](timeout)
spawn({
try { f completeWithResult body }
catch { case e => f completeWithException e}
})(dispatcher)
f
}
@ -45,8 +44,7 @@ object Futures {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
if (sleepMs > 0 && future.isEmpty)
Thread.sleep(sleepMs)
if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs)
} while (future.isEmpty)
future.get
}
@ -89,12 +87,19 @@ object Futures {
sealed trait Future[T] {
def await : Future[T]
def awaitBlocking : Future[T]
def isCompleted: Boolean
def isExpired: Boolean
def timeoutInNanos: Long
def result: Option[T]
def exception: Option[Throwable]
def map[O](f: (T) => O): Future[O] = {
val wrapped = this
new Future[O] {
@ -111,12 +116,14 @@ sealed trait Future[T] {
trait CompletableFuture[T] extends Future[T] {
def completeWithResult(result: T)
def completeWithException(exception: Throwable)
}
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
import TimeUnit.{MILLISECONDS => TIME_UNIT}
def this() = this(0)
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
@ -207,7 +214,9 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
_lock.unlock
}
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
protected def onComplete(result: T) {}
protected def onCompleteException(exception: Throwable) {}
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
}

View file

@ -49,7 +49,7 @@ message SerializedActorRefProtocol {
optional LifeCycleProtocol lifeCycle = 10;
optional RemoteActorRefProtocol supervisor = 11;
optional bytes hotswapStack = 12;
repeated RemoteRequestProtocol messages = 13;
repeated RemoteMessageProtocol messages = 13;
}
/**
@ -92,38 +92,18 @@ message TypedActorInfoProtocol {
}
/**
* Defines a remote connection handshake.
* Defines a remote message.
*/
//message HandshakeProtocol {
// required string cookie = 1;
//}
/**
* Defines a remote message request.
*/
message RemoteRequestProtocol {
message RemoteMessageProtocol {
required UuidProtocol uuid = 1;
required MessageProtocol message = 2;
required ActorInfoProtocol actorInfo = 3;
required bool isOneWay = 4;
optional UuidProtocol supervisorUuid = 5;
optional RemoteActorRefProtocol sender = 6;
repeated MetadataEntryProtocol metadata = 7;
optional string cookie = 8;
}
/**
* Defines a remote message reply.
*/
message RemoteReplyProtocol {
required UuidProtocol uuid = 1;
optional MessageProtocol message = 2;
optional ExceptionProtocol exception = 3;
optional UuidProtocol supervisorUuid = 4;
required bool isActor = 5;
required bool isSuccessful = 6;
repeated MetadataEntryProtocol metadata = 7;
optional string cookie = 8;
required ActorInfoProtocol actorInfo = 2;
required bool oneWay = 3;
optional MessageProtocol message = 4;
optional ExceptionProtocol exception = 5;
optional UuidProtocol supervisorUuid = 6;
optional RemoteActorRefProtocol sender = 7;
repeated MetadataEntryProtocol metadata = 8;
optional string cookie = 9;
}
/**

View file

@ -71,6 +71,7 @@ object RemoteClient extends Logging {
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
@ -286,15 +287,26 @@ class RemoteClient private[akka] (
actorType: ActorType): Option[CompletableFuture[T]] = {
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
else None
send(createRemoteRequestProtocolBuilder(
actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, cookie).build, senderFuture)
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
cookie
).build, senderFuture)
}
def send[T](
request: RemoteRequestProtocol,
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getIsOneWay) {
if (request.getOneWay) {
connection.getChannel.write(request)
None
} else {
@ -364,9 +376,9 @@ class RemoteClientPipelineFactory(
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
@ -404,12 +416,13 @@ class RemoteClientHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
val result = event.getMessage
if (result.isInstanceOf[RemoteReplyProtocol]) {
val reply = result.asInstanceOf[RemoteReplyProtocol]
if (result.isInstanceOf[RemoteMessageProtocol]) {
val reply = result.asInstanceOf[RemoteMessageProtocol]
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow)
log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString)
log.debug("Remote client received RemoteMessageProtocol[\n%s]", reply.toString)
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) {
if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
@ -422,7 +435,8 @@ class RemoteClientHandler(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
}
future.completeWithException(parseException(reply, client.loader))
val exception = parseException(reply, client.loader)
future.completeWithException(exception)
}
futures remove replyUuid
} else {
@ -485,7 +499,7 @@ class RemoteClientHandler(
event.getChannel.close
}
private def parseException(reply: RemoteReplyProtocol, loader: Option[ClassLoader]): Throwable = {
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
val exception = reply.getException
val classname = exception.getClassname
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)

View file

@ -9,13 +9,13 @@ import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import akka.actor.{
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage}
import akka.actor.Actor._
import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType}
import akka.util._
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.Config._
import akka.config.ConfigurationException
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
@ -31,7 +31,6 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.config.ConfigurationException
/**
* Use this object if you need a single remote server on a specific node.
@ -69,11 +68,7 @@ object RemoteNode extends RemoteServer
object RemoteServer {
val UUID_PREFIX = "uuid:"
val SECURE_COOKIE: Option[String] = {
val cookie = config.getString("akka.remote.secure-cookie", "")
if (cookie == "") None
else Some(cookie)
}
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie")
val REQUIRE_COOKIE = {
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
@ -407,7 +402,7 @@ class RemoteServerPipelineFactory(
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
@ -482,10 +477,10 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequestProtocol]) {
val requestProtocol = message.asInstanceOf[RemoteRequestProtocol]
if (message.isInstanceOf[RemoteMessageProtocol]) {
val requestProtocol = message.asInstanceOf[RemoteMessageProtocol]
if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
handleRemoteRequestProtocol(requestProtocol, event.getChannel)
handleRemoteMessageProtocol(requestProtocol, event.getChannel)
}
}
@ -501,8 +496,8 @@ class RemoteServerHandler(
else None
}
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
log.debug("Received RemoteMessageProtocol[\n%s]", request.toString)
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
@ -511,7 +506,7 @@ class RemoteServerHandler(
}
}
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel): Unit = {
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid)
@ -520,7 +515,7 @@ class RemoteServerHandler(
createActor(actorInfo).start
} catch {
case e: SecurityException =>
channel.write(createErrorReplyMessage(e, request, true))
channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
server.notifyListeners(RemoteServerError(e, server))
return
}
@ -538,7 +533,7 @@ class RemoteServerHandler(
throw new SecurityException("Remote server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor")
case _ => // then match on user defined messages
if (request.getIsOneWay) actorRef.!(message)(sender)
if (request.getOneWay) actorRef.!(message)(sender)
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(
message,
request.getActorInfo.getTimeout,
@ -546,16 +541,24 @@ class RemoteServerHandler(
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
override def onComplete(result: AnyRef) {
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setUuid(request.getUuid)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(true)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result),
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
try {
channel.write(replyBuilder.build)
channel.write(messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
@ -563,7 +566,7 @@ class RemoteServerHandler(
override def onCompleteException(exception: Throwable) {
try {
channel.write(createErrorReplyMessage(exception, request, true))
channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor))
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
@ -573,7 +576,7 @@ class RemoteServerHandler(
}
}
private def dispatchToTypedActor(request: RemoteRequestProtocol, channel: Channel) = {
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
val actorInfo = request.getActorInfo
val typedActorInfo = actorInfo.getTypedActorInfo
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
@ -584,24 +587,32 @@ class RemoteServerHandler(
try {
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
if (request.getIsOneWay) messageReceiver.invoke(typedActor, args: _*)
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
else {
val result = messageReceiver.invoke(typedActor, args: _*)
log.debug("Returning result from remote typed actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setUuid(request.getUuid)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result),
true,
None,
None,
AkkaActorType.TypedActor,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(messageBuilder.build)
}
} catch {
case e: InvocationTargetException =>
channel.write(createErrorReplyMessage(e.getCause, request, false))
channel.write(createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, false))
channel.write(createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server))
}
}
@ -711,19 +722,26 @@ class RemoteServerHandler(
} else typedActorOrNull
}
private def createErrorReplyMessage(e: Throwable, request: RemoteRequestProtocol, isActor: Boolean): RemoteReplyProtocol = {
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
val actorInfo = request.getActorInfo
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setUuid(request.getUuid)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(isActor)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
replyBuilder.build
log.error(exception, "Could not invoke remote actor [%s]", actorInfo.getTarget)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Right(exception),
true,
None,
None,
actorType,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
messageBuilder.build
}
private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = {
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {
val attachment = ctx.getAttachment
if ((attachment ne null) &&
attachment.isInstanceOf[String] &&

View file

@ -127,9 +127,13 @@ object ActorSerialization {
val requestProtocols =
messages.map(m =>
RemoteActorSerialization.createRemoteRequestProtocolBuilder(
actorRef,
m.message,
RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(m.message),
false,
actorRef.getSender,
None,
@ -201,7 +205,7 @@ object ActorSerialization {
hotswap,
factory)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
@ -257,19 +261,27 @@ object RemoteActorSerialization {
.build
}
def createRemoteRequestProtocolBuilder(
actorRef: ActorRef,
message: Any,
def createRemoteMessageProtocolBuilder(
actorRef: Option[ActorRef],
uuid: Either[Uuid, UuidProtocol],
actorId: String,
actorClassName: String,
timeout: Long,
message: Either[Any, Throwable],
isOneWay: Boolean,
senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType,
secureCookie: Option[String]): RemoteRequestProtocol.Builder = {
import actorRef._
secureCookie: Option[String]): RemoteMessageProtocol.Builder = {
val uuidProtocol = uuid match {
case Left(uid) => UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build
case Right(protocol) => protocol
}
val actorInfoBuilder = ActorInfoProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(uuid.getTime).setLow(uuid.getClockSeqAndNode).build)
.setId(actorRef.id)
.setUuid(uuidProtocol)
.setId(actorId)
.setTarget(actorClassName)
.setTimeout(timeout)
@ -286,28 +298,39 @@ object RemoteActorSerialization {
case ActorType.TypedActor => actorInfoBuilder.setActorType(TYPED_ACTOR)
}
val actorInfo = actorInfoBuilder.build
val requestUuid = newUuid
val requestBuilder = RemoteRequestProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(requestUuid.getTime).setLow(requestUuid.getClockSeqAndNode).build)
.setMessage(MessageSerializer.serialize(message))
val messageBuilder = RemoteMessageProtocol.newBuilder
.setUuid(uuidProtocol)
.setActorInfo(actorInfo)
.setIsOneWay(isOneWay)
.setOneWay(isOneWay)
secureCookie.foreach(requestBuilder.setCookie(_))
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(
UuidProtocol.newBuilder
.setHigh(id.get.getTime)
.setLow(id.get.getClockSeqAndNode)
message match {
case Left(message) =>
messageBuilder.setMessage(MessageSerializer.serialize(message))
case Right(exception) =>
messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName)
.setMessage(exception.getMessage)
.build)
}
secureCookie.foreach(messageBuilder.setCookie(_))
actorRef.foreach { ref =>
ref.registerSupervisorAsRemoteActor.foreach { id =>
messageBuilder.setSupervisorUuid(
UuidProtocol.newBuilder
.setHigh(id.getTime)
.setLow(id.getClockSeqAndNode)
.build)
}
}
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender)
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
messageBuilder.setSender(toRemoteActorRefProtocol(sender))
}
requestBuilder
messageBuilder
}
}

View file

@ -140,27 +140,10 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
a.makeRemote(HOSTNAME, PORT1)
a.start
}).toList
actors.map(_ !!! "Hello").
foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get))
actors.map(_ !!! "Hello").foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get))
actors.foreach(_.stop)
}
@Test
def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L
val actor = actorOf[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
@Test
def shouldRegisterActorByUuid {
val actor1 = actorOf[MyActorCustomConstructor]
@ -180,5 +163,21 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor1.stop
actor2.stop
}
@Test
def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L
val actor = actorOf[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
}

View file

@ -83,7 +83,7 @@ class RemoteTypedActorSpec extends
}
describe("Remote Typed Actor ") {
/*
it("should receive one-way message") {
clearMessageLogs
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
@ -102,7 +102,7 @@ class RemoteTypedActorSpec extends
ta.requestReply("ping")
}
}
*/
it("should be restarted on failure") {
clearMessageLogs
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
@ -112,7 +112,7 @@ class RemoteTypedActorSpec extends
}
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
}
/*
it("should restart linked friends on failure") {
clearMessageLogs
val ta1 = conf.getInstance(classOf[RemoteTypedActorOne])
@ -124,5 +124,5 @@ class RemoteTypedActorSpec extends
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
}
}
*/ }
}

View file

@ -143,6 +143,7 @@ akka {
client {
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
}