Merging of RemoteRequest and RemoteReply protocols completed

This commit is contained in:
Jonas Bonér 2010-11-02 18:11:58 +01:00
parent 19d86c8ada
commit ade112348e
7 changed files with 127 additions and 114 deletions

View file

@ -19,6 +19,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.api.exceptions.DeadTransactionException import org.multiverse.api.exceptions.DeadTransactionException
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map => JMap } import java.util.{ Map => JMap }
@ -26,12 +27,12 @@ import java.lang.reflect.Field
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import scala.collection.immutable.Stack import scala.collection.immutable.Stack
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.annotation.tailrec
import annotation.tailrec
private[akka] object ActorRefInternals { private[akka] object ActorRefInternals {
/** LifeCycles for ActorRefs /**
* LifeCycles for ActorRefs.
*/ */
private[akka] sealed trait StatusType private[akka] sealed trait StatusType
object UNSTARTED extends StatusType object UNSTARTED extends StatusType
@ -77,7 +78,10 @@ private[akka] object ActorRefInternals {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait ActorRef extends ActorRefShared with TransactionManagement with Logging with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => trait ActorRef extends ActorRefShared
with TransactionManagement
with Logging
with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
// Only mutable for RemoteServer in order to maintain identity across nodes // Only mutable for RemoteServer in order to maintain identity across nodes
@volatile @volatile

View file

@ -106,35 +106,6 @@ message RemoteMessageProtocol {
optional string cookie = 9; optional string cookie = 9;
} }
/**
* Defines a remote message request.
*
message RemoteRequestProtocol {
required UuidProtocol uuid = 1;
required MessageProtocol message = 2;
required ActorInfoProtocol actorInfo = 3;
required bool isOneWay = 4;
optional UuidProtocol supervisorUuid = 5;
optional RemoteActorRefProtocol sender = 6;
repeated MetadataEntryProtocol metadata = 7;
optional string cookie = 8;
}
*/
/**
* Defines a remote message reply.
*
message RemoteReplyProtocol {
required UuidProtocol uuid = 1;
optional MessageProtocol message = 2;
optional ExceptionProtocol exception = 3;
optional UuidProtocol supervisorUuid = 4;
required bool isActor = 5;
required bool isSuccessful = 6;
repeated MetadataEntryProtocol metadata = 7;
optional string cookie = 8;
}
*/
/** /**
* Defines a UUID. * Defines a UUID.
*/ */

View file

@ -284,7 +284,18 @@ class RemoteClient private[akka] (
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
else None else None
send(createRemoteMessageProtocolBuilder( send(createRemoteMessageProtocolBuilder(
actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, cookie).build, senderFuture) Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
cookie
).build, senderFuture)
} }
def send[T]( def send[T](
@ -407,6 +418,7 @@ class RemoteClientHandler(
log.debug("Remote client received RemoteMessageProtocol[\n%s]", reply.toString) log.debug("Remote client received RemoteMessageProtocol[\n%s]", reply.toString)
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) { if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage) val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message) future.completeWithResult(message)
} else { } else {
@ -419,7 +431,8 @@ class RemoteClientHandler(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
} }
future.completeWithException(parseException(reply, client.loader)) val exception = parseException(reply, client.loader)
future.completeWithException(exception)
} }
futures remove replyUuid futures remove replyUuid
} else { } else {

View file

@ -10,7 +10,8 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap} import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor.{ import se.scalablesolutions.akka.actor.{
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry} Actor, TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, ActorType => AkkaActorType}
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -495,18 +496,23 @@ class RemoteServerHandler(
override def onComplete(result: AnyRef) { override def onComplete(result: AnyRef) {
log.debug("Returning result from actor invocation [%s]", result) log.debug("Returning result from actor invocation [%s]", result)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
actorRef, Left(reply), isOneWay, senderOption, typedActorInfo, actorType, cookie) Some(actorRef),
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Left(result),
true,
Some(actorRef),
None,
AkkaActorType.ScalaActor,
None)
/* // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
val replyBuilder = RemoteMessageProtocol.newBuilder if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
.setUuid(request.getUuid)
.setMessage(MessageSerializer.serialize(result))
.setIsActor(true)
*/
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
try { try {
channel.write(replyBuilder.build) channel.write(messageBuilder.build)
} catch { } catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
} }
@ -514,7 +520,7 @@ class RemoteServerHandler(
override def onCompleteException(exception: Throwable) { override def onCompleteException(exception: Throwable) {
try { try {
channel.write(createErrorReplyMessage(exception, request, true)) channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor))
} catch { } catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
} }
@ -539,19 +545,28 @@ class RemoteServerHandler(
else { else {
val result = messageReceiver.invoke(typedActor, args: _*) val result = messageReceiver.invoke(typedActor, args: _*)
log.debug("Returning result from remote typed actor invocation [%s]", result) log.debug("Returning result from remote typed actor invocation [%s]", result)
val replyBuilder = RemoteMessageProtocol.newBuilder
.setUuid(request.getUuid) val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
.setMessage(MessageSerializer.serialize(result)) None,
.setIsActor(false) Right(request.getUuid),
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) actorInfo.getId,
channel.write(replyBuilder.build) actorInfo.getTarget,
actorInfo.getTimeout,
Left(result),
true,
None,
None,
AkkaActorType.TypedActor,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(messageBuilder.build)
} }
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
channel.write(createErrorReplyMessage(e.getCause, request, false)) channel.write(createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, false)) channel.write(createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
} }
@ -655,15 +670,23 @@ class RemoteServerHandler(
} else typedActorOrNull } else typedActorOrNull
} }
private def createErrorReplyMessage(e: Throwable, request: RemoteMessageProtocol, isActor: Boolean): RemoteMessageProtocol = { private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
val actorInfo = request.getActorInfo val actorInfo = request.getActorInfo
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget) log.error(exception, "Could not invoke remote actor [%s]", actorInfo.getTarget)
val replyBuilder = RemoteMessageProtocol.newBuilder val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
.setUuid(request.getUuid) None,
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build) Right(request.getUuid),
.setIsActor(isActor) actorInfo.getId,
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) actorInfo.getTarget,
replyBuilder.build actorInfo.getTimeout,
Right(exception),
true,
None,
None,
actorType,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
messageBuilder.build
} }
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = { private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {

View file

@ -127,8 +127,12 @@ object ActorSerialization {
val requestProtocols = val requestProtocols =
messages.map(m => messages.map(m =>
RemoteActorSerialization.createRemoteMessageProtocolBuilder( RemoteActorSerialization.createRemoteMessageProtocolBuilder(
actorRef, Some(actorRef),
m.message, Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(m.message),
false, false,
actorRef.getSender, actorRef.getSender,
None, None,
@ -256,29 +260,27 @@ object RemoteActorSerialization {
.build .build
} }
required UuidProtocol uuid = 1;
required ActorInfoProtocol actorInfo = 2;
required bool oneWay = 3;
optional MessageProtocol message = 4;
optional ExceptionProtocol exception = 5;
optional UuidProtocol supervisorUuid = 6;
optional RemoteActorRefProtocol sender = 7;
repeated MetadataEntryProtocol metadata = 8;
optional string cookie = 9;
def createRemoteMessageProtocolBuilder( def createRemoteMessageProtocolBuilder(
ctorRef: ActorRef, actorRef: Option[ActorRef],
message: Either[Any, Exception], uuid: Either[Uuid, UuidProtocol],
actorId: String,
actorClassName: String,
timeout: Long,
message: Either[Any, Throwable],
isOneWay: Boolean, isOneWay: Boolean,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]], typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType, actorType: ActorType,
secureCookie: Option[String]): RemoteMessageProtocol.Builder = { secureCookie: Option[String]): RemoteMessageProtocol.Builder = {
import actorRef._
val uuidProtocol = uuid match {
case Left(uid) => UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build
case Right(protocol) => protocol
}
val actorInfoBuilder = ActorInfoProtocol.newBuilder val actorInfoBuilder = ActorInfoProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(uuid.getTime).setLow(uuid.getClockSeqAndNode).build) .setUuid(uuidProtocol)
.setId(actorRef.id) .setId(actorId)
.setTarget(actorClassName) .setTarget(actorClassName)
.setTimeout(timeout) .setTimeout(timeout)
@ -295,9 +297,8 @@ object RemoteActorSerialization {
case ActorType.TypedActor => actorInfoBuilder.setActorType(TYPED_ACTOR) case ActorType.TypedActor => actorInfoBuilder.setActorType(TYPED_ACTOR)
} }
val actorInfo = actorInfoBuilder.build val actorInfo = actorInfoBuilder.build
val requestUuid = newUuid
val messageBuilder = RemoteMessageProtocol.newBuilder val messageBuilder = RemoteMessageProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(requestUuid.getTime).setLow(requestUuid.getClockSeqAndNode).build) .setUuid(uuidProtocol)
.setActorInfo(actorInfo) .setActorInfo(actorInfo)
.setOneWay(isOneWay) .setOneWay(isOneWay)
@ -305,21 +306,23 @@ object RemoteActorSerialization {
case Left(message) => case Left(message) =>
messageBuilder.setMessage(MessageSerializer.serialize(message)) messageBuilder.setMessage(MessageSerializer.serialize(message))
case Right(exception) => case Right(exception) =>
val exceptionProtocol = ExceptionProtocol.newBuilder messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass) .setClassname(exception.getClass.getName)
.setMessage(exception.getMessage) .setMessage(exception.getMessage)
.build .build)
messageBuilder.setException(exceptionProtocol)
} }
secureCookie.foreach(messageBuilder.setCookie(_)) secureCookie.foreach(messageBuilder.setCookie(_))
val id = registerSupervisorAsRemoteActor actorRef.foreach { ref =>
if (id.isDefined) messageBuilder.setSupervisorUuid( ref.registerSupervisorAsRemoteActor.foreach { id =>
messageBuilder.setSupervisorUuid(
UuidProtocol.newBuilder UuidProtocol.newBuilder
.setHigh(id.get.getTime) .setHigh(id.getTime)
.setLow(id.get.getClockSeqAndNode) .setLow(id.getClockSeqAndNode)
.build) .build)
}
}
senderOption.foreach { sender => senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender) RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender)

View file

@ -140,27 +140,10 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
a.makeRemote(HOSTNAME, PORT1) a.makeRemote(HOSTNAME, PORT1)
a.start a.start
}).toList }).toList
actors.map(_ !!! "Hello"). actors.map(_ !!! "Hello").foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get))
foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get))
actors.foreach(_.stop) actors.foreach(_.stop)
} }
@Test
def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L
val actor = actorOf[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
@Test @Test
def shouldRegisterActorByUuid { def shouldRegisterActorByUuid {
val actor1 = actorOf[MyActorCustomConstructor] val actor1 = actorOf[MyActorCustomConstructor]
@ -180,5 +163,21 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor1.stop actor1.stop
actor2.stop actor2.stop
} }
@Test
def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L
val actor = actorOf[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
} }

View file

@ -83,7 +83,7 @@ class RemoteTypedActorSpec extends
} }
describe("Remote Typed Actor ") { describe("Remote Typed Actor ") {
/*
it("should receive one-way message") { it("should receive one-way message") {
clearMessageLogs clearMessageLogs
val ta = conf.getInstance(classOf[RemoteTypedActorOne]) val ta = conf.getInstance(classOf[RemoteTypedActorOne])
@ -102,7 +102,7 @@ class RemoteTypedActorSpec extends
ta.requestReply("ping") ta.requestReply("ping")
} }
} }
*/
it("should be restarted on failure") { it("should be restarted on failure") {
clearMessageLogs clearMessageLogs
val ta = conf.getInstance(classOf[RemoteTypedActorOne]) val ta = conf.getInstance(classOf[RemoteTypedActorOne])
@ -112,7 +112,7 @@ class RemoteTypedActorSpec extends
} }
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
} }
/*
it("should restart linked friends on failure") { it("should restart linked friends on failure") {
clearMessageLogs clearMessageLogs
val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) val ta1 = conf.getInstance(classOf[RemoteTypedActorOne])
@ -124,5 +124,5 @@ class RemoteTypedActorSpec extends
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
} }
} */ }
} }