removed trailing whitespace
This commit is contained in:
parent
fc7068282d
commit
ee47eaee5b
13 changed files with 62 additions and 84 deletions
|
|
@ -506,7 +506,7 @@ private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
|
||||||
* Marker interface for proxyable actors (such as typed actor).
|
* Marker interface for proxyable actors (such as typed actor).
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
trait Proxyable {
|
trait Proxyable {
|
||||||
private[actor] def swapProxiedActor(newInstance: Actor)
|
private[actor] def swapProxiedActor(newInstance: Actor)
|
||||||
}
|
}
|
||||||
|
|
@ -515,9 +515,9 @@ trait Proxyable {
|
||||||
* Represents the different Actor types.
|
* Represents the different Actor types.
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
sealed trait ActorType
|
sealed trait ActorType
|
||||||
object ActorType {
|
object ActorType {
|
||||||
case object ScalaActor extends ActorType
|
case object ScalaActor extends ActorType
|
||||||
case object TypedActor extends ActorType
|
case object TypedActor extends ActorType
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,9 +62,9 @@ import scala.reflect.BeanProperty
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
trait ActorRef extends
|
trait ActorRef extends
|
||||||
ActorRefShared with
|
ActorRefShared with
|
||||||
TransactionManagement with
|
TransactionManagement with
|
||||||
Logging with
|
Logging with
|
||||||
java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
|
java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
|
||||||
|
|
||||||
|
|
@ -78,7 +78,7 @@ trait ActorRef extends
|
||||||
@volatile protected[akka] var startOnCreation = false
|
@volatile protected[akka] var startOnCreation = false
|
||||||
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
|
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
|
||||||
protected[akka] val guard = new ReentrantGuard
|
protected[akka] val guard = new ReentrantGuard
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* User overridable callback/setting.
|
* User overridable callback/setting.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
@ -746,7 +746,7 @@ class LocalActorRef private[akka](
|
||||||
ensureRemotingEnabled
|
ensureRemotingEnabled
|
||||||
if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port))
|
if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port))
|
||||||
else throw new ActorInitializationException(
|
else throw new ActorInitializationException(
|
||||||
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -759,7 +759,7 @@ class LocalActorRef private[akka](
|
||||||
RemoteClientModule.register(address, uuid)
|
RemoteClientModule.register(address, uuid)
|
||||||
homeAddress = (RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
|
homeAddress = (RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
|
||||||
} else throw new ActorInitializationException(
|
} else throw new ActorInitializationException(
|
||||||
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -830,10 +830,10 @@ class LocalActorRef private[akka](
|
||||||
actor.shutdown
|
actor.shutdown
|
||||||
ActorRegistry.unregister(this)
|
ActorRegistry.unregister(this)
|
||||||
if (isRemotingEnabled) {
|
if (isRemotingEnabled) {
|
||||||
remoteAddress.foreach { address =>
|
remoteAddress.foreach { address =>
|
||||||
RemoteClientModule.unregister(address, uuid)
|
RemoteClientModule.unregister(address, uuid)
|
||||||
}
|
}
|
||||||
RemoteServerModule.unregister(this)
|
RemoteServerModule.unregister(this)
|
||||||
}
|
}
|
||||||
nullOutActorRefReferencesFor(actorInstance.get)
|
nullOutActorRefReferencesFor(actorInstance.get)
|
||||||
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
|
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
|
||||||
|
|
@ -1137,7 +1137,7 @@ class LocalActorRef private[akka](
|
||||||
freshActor.init
|
freshActor.init
|
||||||
freshActor.initTransactionalState
|
freshActor.initTransactionalState
|
||||||
actorInstance.set(freshActor)
|
actorInstance.set(freshActor)
|
||||||
if (failedActor.isInstanceOf[Proxyable])
|
if (failedActor.isInstanceOf[Proxyable])
|
||||||
failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor)
|
failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor)
|
||||||
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
|
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
|
||||||
freshActor.postRestart(reason)
|
freshActor.postRestart(reason)
|
||||||
|
|
@ -1357,16 +1357,16 @@ object RemoteActorSystemMessage {
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
private[akka] case class RemoteActorRef private[akka] (
|
private[akka] case class RemoteActorRef private[akka] (
|
||||||
uuuid: String,
|
uuuid: String,
|
||||||
val className: String,
|
val className: String,
|
||||||
val hostname: String,
|
val hostname: String,
|
||||||
val port: Int,
|
val port: Int,
|
||||||
_timeout: Long,
|
_timeout: Long,
|
||||||
loader: Option[ClassLoader])
|
loader: Option[ClassLoader])
|
||||||
extends ActorRef with ScalaActorRef {
|
extends ActorRef with ScalaActorRef {
|
||||||
|
|
||||||
ensureRemotingEnabled
|
ensureRemotingEnabled
|
||||||
|
|
||||||
_uuid = uuuid
|
_uuid = uuuid
|
||||||
timeout = _timeout
|
timeout = _timeout
|
||||||
|
|
||||||
|
|
@ -1480,7 +1480,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
||||||
* upon restart, remote restart etc.
|
* upon restart, remote restart etc.
|
||||||
*/
|
*/
|
||||||
def id: String
|
def id: String
|
||||||
|
|
||||||
def id_=(id: String): Unit
|
def id_=(id: String): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ import se.scalablesolutions.akka.util.ListenerManagement
|
||||||
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
|
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
sealed trait ActorRegistryEvent
|
sealed trait ActorRegistryEvent
|
||||||
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
|
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
|
||||||
case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,7 @@ sealed class Supervisor private[akka] (
|
||||||
_childActors.put(className, actorRef :: currentActors)
|
_childActors.put(className, actorRef :: currentActors)
|
||||||
actorRef.lifeCycle = Some(lifeCycle)
|
actorRef.lifeCycle = Some(lifeCycle)
|
||||||
supervisor.link(actorRef)
|
supervisor.link(actorRef)
|
||||||
remoteAddress.foreach { address =>
|
remoteAddress.foreach { address =>
|
||||||
RemoteServerModule.registerActor(
|
RemoteServerModule.registerActor(
|
||||||
new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)
|
new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ object ConfigLogger extends Logging
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object Config {
|
object Config {
|
||||||
val VERSION = "1.0-SNAPSHOT"
|
val VERSION = "1.0-SNAPSHOT"
|
||||||
|
|
||||||
// Set Multiverse options for max speed
|
// Set Multiverse options for max speed
|
||||||
|
|
|
||||||
|
|
@ -137,4 +137,4 @@ object DataFlow {
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class DataFlowVariableException(msg: String) extends AkkaException(msg)
|
class DataFlowVariableException(msg: String) extends AkkaException(msg)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ class GlobalStm extends TransactionManagement {
|
||||||
val DefaultGlobalTransactionFactory = TransactionFactory(
|
val DefaultGlobalTransactionFactory = TransactionFactory(
|
||||||
DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
|
DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
|
||||||
|
|
||||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T =
|
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T =
|
||||||
atomic(factory)(body)
|
atomic(factory)(body)
|
||||||
|
|
||||||
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ class LocalStm extends TransactionManagement with Logging {
|
||||||
val DefaultLocalTransactionFactory = TransactionFactory(
|
val DefaultLocalTransactionFactory = TransactionFactory(
|
||||||
DefaultLocalTransactionConfig, "DefaultLocalTransaction")
|
DefaultLocalTransactionConfig, "DefaultLocalTransaction")
|
||||||
|
|
||||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T =
|
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T =
|
||||||
atomic(factory)(body)
|
atomic(factory)(body)
|
||||||
|
|
||||||
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import java.net.InetSocketAddress
|
||||||
* Helper class for reflective access to different modules in order to allow optional loading of modules.
|
* Helper class for reflective access to different modules in order to allow optional loading of modules.
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object ReflectiveAccess {
|
object ReflectiveAccess {
|
||||||
|
|
||||||
val loader = getClass.getClassLoader
|
val loader = getClass.getClassLoader
|
||||||
|
|
@ -26,15 +26,15 @@ object ReflectiveAccess {
|
||||||
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
|
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reflective access to the RemoteClient module.
|
* Reflective access to the RemoteClient module.
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object RemoteClientModule {
|
object RemoteClientModule {
|
||||||
|
|
||||||
type RemoteClient = {
|
type RemoteClient = {
|
||||||
def send[T](
|
def send[T](
|
||||||
message: Any,
|
message: Any,
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
senderFuture: Option[CompletableFuture[_]],
|
senderFuture: Option[CompletableFuture[_]],
|
||||||
remoteAddress: InetSocketAddress,
|
remoteAddress: InetSocketAddress,
|
||||||
|
|
@ -43,18 +43,18 @@ object ReflectiveAccess {
|
||||||
actorRef: ActorRef,
|
actorRef: ActorRef,
|
||||||
typedActorInfo: Option[Tuple2[String, String]],
|
typedActorInfo: Option[Tuple2[String, String]],
|
||||||
actorType: ActorType): Option[CompletableFuture[T]]
|
actorType: ActorType): Option[CompletableFuture[T]]
|
||||||
def registerSupervisorForActor(actorRef: ActorRef)
|
def registerSupervisorForActor(actorRef: ActorRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteClientObject = {
|
type RemoteClientObject = {
|
||||||
def register(hostname: String, port: Int, uuid: String): Unit
|
def register(hostname: String, port: Int, uuid: String): Unit
|
||||||
def unregister(hostname: String, port: Int, uuid: String): Unit
|
def unregister(hostname: String, port: Int, uuid: String): Unit
|
||||||
def clientFor(address: InetSocketAddress): RemoteClient
|
def clientFor(address: InetSocketAddress): RemoteClient
|
||||||
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
|
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined
|
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined
|
||||||
|
|
||||||
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException(
|
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException(
|
||||||
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
|
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
|
||||||
|
|
||||||
|
|
@ -87,9 +87,9 @@ object ReflectiveAccess {
|
||||||
ensureRemotingEnabled
|
ensureRemotingEnabled
|
||||||
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
|
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
|
||||||
}
|
}
|
||||||
|
|
||||||
def send[T](
|
def send[T](
|
||||||
message: Any,
|
message: Any,
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
senderFuture: Option[CompletableFuture[_]],
|
senderFuture: Option[CompletableFuture[_]],
|
||||||
remoteAddress: InetSocketAddress,
|
remoteAddress: InetSocketAddress,
|
||||||
|
|
@ -101,11 +101,11 @@ object ReflectiveAccess {
|
||||||
ensureRemotingEnabled
|
ensureRemotingEnabled
|
||||||
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
|
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
|
||||||
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
|
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reflective access to the RemoteServer module.
|
* Reflective access to the RemoteServer module.
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
|
|
@ -113,15 +113,15 @@ object ReflectiveAccess {
|
||||||
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
|
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
|
||||||
val PORT = Config.config.getInt("akka.remote.server.port", 9999)
|
val PORT = Config.config.getInt("akka.remote.server.port", 9999)
|
||||||
|
|
||||||
type RemoteServerObject = {
|
type RemoteServerObject = {
|
||||||
def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef): Unit
|
def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef): Unit
|
||||||
def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
|
def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteNodeObject = {
|
type RemoteNodeObject = {
|
||||||
def unregister(actorRef: ActorRef): Unit
|
def unregister(actorRef: ActorRef): Unit
|
||||||
}
|
}
|
||||||
|
|
||||||
val remoteServerObjectInstance: Option[RemoteServerObject] = {
|
val remoteServerObjectInstance: Option[RemoteServerObject] = {
|
||||||
try {
|
try {
|
||||||
val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteServer$")
|
val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteServer$")
|
||||||
|
|
@ -157,7 +157,7 @@ object ReflectiveAccess {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reflective access to the TypedActors module.
|
* Reflective access to the TypedActors module.
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
|
|
@ -167,7 +167,7 @@ object ReflectiveAccess {
|
||||||
def isJoinPoint(message: Any): Boolean
|
def isJoinPoint(message: Any): Boolean
|
||||||
def isJoinPointAndOneWay(message: Any): Boolean
|
def isJoinPointAndOneWay(message: Any): Boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined
|
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined
|
||||||
|
|
||||||
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
|
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
|
||||||
|
|
@ -185,7 +185,7 @@ object ReflectiveAccess {
|
||||||
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
|
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
|
||||||
ensureTypedActorEnabled
|
ensureTypedActorEnabled
|
||||||
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
|
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
|
||||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||||
}
|
}
|
||||||
typedActorObjectInstance.get.isJoinPoint(message)
|
typedActorObjectInstance.get.isJoinPoint(message)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||||
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
|
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
|
|
||||||
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
||||||
result.get should equal (332833500)
|
result.get should equal (332833500)
|
||||||
ActorRegistry.shutdownAll
|
ActorRegistry.shutdownAll
|
||||||
|
|
@ -136,7 +136,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||||
thread { ints(0, 1000, producer) }
|
thread { ints(0, 1000, producer) }
|
||||||
thread { sum(0, producer, consumer) }
|
thread { sum(0, producer, consumer) }
|
||||||
thread { recurseSum(consumer) }
|
thread { recurseSum(consumer) }
|
||||||
|
|
||||||
latch.await(15,TimeUnit.SECONDS) should equal (true)
|
latch.await(15,TimeUnit.SECONDS) should equal (true)
|
||||||
ActorRegistry.shutdownAll
|
ActorRegistry.shutdownAll
|
||||||
}*/
|
}*/
|
||||||
|
|
|
||||||
|
|
@ -248,7 +248,7 @@ class RemoteClient private[akka] (
|
||||||
protected override def manageLifeCycleOfListeners = false
|
protected override def manageLifeCycleOfListeners = false
|
||||||
|
|
||||||
def send[T](
|
def send[T](
|
||||||
message: Any,
|
message: Any,
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
senderFuture: Option[CompletableFuture[T]],
|
senderFuture: Option[CompletableFuture[T]],
|
||||||
remoteAddress: InetSocketAddress,
|
remoteAddress: InetSocketAddress,
|
||||||
|
|
@ -262,8 +262,8 @@ class RemoteClient private[akka] (
|
||||||
}
|
}
|
||||||
|
|
||||||
def send[T](
|
def send[T](
|
||||||
request: RemoteRequestProtocol,
|
request: RemoteRequestProtocol,
|
||||||
senderFuture: Option[CompletableFuture[T]]):
|
senderFuture: Option[CompletableFuture[T]]):
|
||||||
Option[CompletableFuture[T]] = if (isRunning) {
|
Option[CompletableFuture[T]] = if (isRunning) {
|
||||||
if (request.getIsOneWay) {
|
if (request.getIsOneWay) {
|
||||||
connection.getChannel.write(request)
|
connection.getChannel.write(request)
|
||||||
|
|
@ -464,25 +464,3 @@ class RemoteClientHandler(
|
||||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object RemoteDisconnectTest {
|
|
||||||
import se.scalablesolutions.akka.actor.{Actor,ActorRef}
|
|
||||||
|
|
||||||
class TestClientActor extends Actor {
|
|
||||||
def receive = {
|
|
||||||
case ("send ping",akt:ActorRef) => akt ! "ping"
|
|
||||||
case "pong" => {
|
|
||||||
log.debug("got pong")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class TestServerActor extends Actor {
|
|
||||||
def receive = {
|
|
||||||
case "ping" => {
|
|
||||||
log.debug("got ping")
|
|
||||||
self reply "pong"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,7 @@ object ActorSerialization {
|
||||||
else None
|
else None
|
||||||
|
|
||||||
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
||||||
|
|
||||||
val factory = () => {
|
val factory = () => {
|
||||||
val actorClass = classLoader.loadClass(protocol.getActorClassname)
|
val actorClass = classLoader.loadClass(protocol.getActorClassname)
|
||||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||||
|
|
@ -180,7 +180,7 @@ object ActorSerialization {
|
||||||
|
|
||||||
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]
|
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]
|
||||||
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
|
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
|
||||||
|
|
||||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
||||||
format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
||||||
ar
|
ar
|
||||||
|
|
@ -252,7 +252,7 @@ object RemoteActorSerialization {
|
||||||
.setTarget(actorClassName)
|
.setTarget(actorClassName)
|
||||||
.setTimeout(timeout)
|
.setTimeout(timeout)
|
||||||
|
|
||||||
typedActorInfo.foreach { typedActor =>
|
typedActorInfo.foreach { typedActor =>
|
||||||
actorInfoBuilder.setTypedActorInfo(
|
actorInfoBuilder.setTypedActorInfo(
|
||||||
TypedActorInfoProtocol.newBuilder
|
TypedActorInfoProtocol.newBuilder
|
||||||
.setInterface(typedActor._1)
|
.setInterface(typedActor._1)
|
||||||
|
|
|
||||||
|
|
@ -224,7 +224,7 @@ abstract class TypedActor extends Actor with Proxyable {
|
||||||
if (arg.getClass.getName.contains(TypedActor.AW_PROXY_PREFIX)) unserializable = true
|
if (arg.getClass.getName.contains(TypedActor.AW_PROXY_PREFIX)) unserializable = true
|
||||||
}
|
}
|
||||||
if (!unserializable && hasMutableArgument) {
|
if (!unserializable && hasMutableArgument) {
|
||||||
|
|
||||||
//FIXME serializeArguments
|
//FIXME serializeArguments
|
||||||
// val copyOfArgs = Serializer.Java.deepClone(args)
|
// val copyOfArgs = Serializer.Java.deepClone(args)
|
||||||
// joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
|
// joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
|
||||||
|
|
@ -539,11 +539,11 @@ object TypedActor extends Logging {
|
||||||
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
|
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
|
||||||
Supervisor(SupervisorConfig(restartStrategy, components))
|
Supervisor(SupervisorConfig(restartStrategy, components))
|
||||||
|
|
||||||
def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message))
|
def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message))
|
||||||
isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti])
|
isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti])
|
||||||
else false
|
else false
|
||||||
|
|
||||||
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -607,11 +607,11 @@ private[akka] sealed class TypedActorAspect {
|
||||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||||
|
|
||||||
val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues)
|
val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues)
|
||||||
|
|
||||||
val future = RemoteClientModule.send[AnyRef](
|
val future = RemoteClientModule.send[AnyRef](
|
||||||
message, None, None, remoteAddress.get,
|
message, None, None, remoteAddress.get,
|
||||||
timeout, isOneWay, actorRef,
|
timeout, isOneWay, actorRef,
|
||||||
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
||||||
ActorType.TypedActor)
|
ActorType.TypedActor)
|
||||||
|
|
||||||
if (isOneWay) null // for void methods
|
if (isOneWay) null // for void methods
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue