removed trailing whitespace
This commit is contained in:
parent
0d5862a332
commit
24494a3766
13 changed files with 62 additions and 84 deletions
|
|
@ -506,7 +506,7 @@ private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
|
|||
* Marker interface for proxyable actors (such as typed actor).
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
*/
|
||||
trait Proxyable {
|
||||
private[actor] def swapProxiedActor(newInstance: Actor)
|
||||
}
|
||||
|
|
@ -515,9 +515,9 @@ trait Proxyable {
|
|||
* Represents the different Actor types.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
*/
|
||||
sealed trait ActorType
|
||||
object ActorType {
|
||||
case object ScalaActor extends ActorType
|
||||
case object TypedActor extends ActorType
|
||||
case object TypedActor extends ActorType
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,9 +62,9 @@ import scala.reflect.BeanProperty
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait ActorRef extends
|
||||
ActorRefShared with
|
||||
TransactionManagement with
|
||||
trait ActorRef extends
|
||||
ActorRefShared with
|
||||
TransactionManagement with
|
||||
Logging with
|
||||
java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
|
||||
|
||||
|
|
@ -78,7 +78,7 @@ trait ActorRef extends
|
|||
@volatile protected[akka] var startOnCreation = false
|
||||
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
|
||||
protected[akka] val guard = new ReentrantGuard
|
||||
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
|
|
@ -746,7 +746,7 @@ class LocalActorRef private[akka](
|
|||
ensureRemotingEnabled
|
||||
if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port))
|
||||
else throw new ActorInitializationException(
|
||||
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -759,7 +759,7 @@ class LocalActorRef private[akka](
|
|||
RemoteClientModule.register(address, uuid)
|
||||
homeAddress = (RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
|
||||
} else throw new ActorInitializationException(
|
||||
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -830,10 +830,10 @@ class LocalActorRef private[akka](
|
|||
actor.shutdown
|
||||
ActorRegistry.unregister(this)
|
||||
if (isRemotingEnabled) {
|
||||
remoteAddress.foreach { address =>
|
||||
remoteAddress.foreach { address =>
|
||||
RemoteClientModule.unregister(address, uuid)
|
||||
}
|
||||
RemoteServerModule.unregister(this)
|
||||
RemoteServerModule.unregister(this)
|
||||
}
|
||||
nullOutActorRefReferencesFor(actorInstance.get)
|
||||
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
|
||||
|
|
@ -1137,7 +1137,7 @@ class LocalActorRef private[akka](
|
|||
freshActor.init
|
||||
freshActor.initTransactionalState
|
||||
actorInstance.set(freshActor)
|
||||
if (failedActor.isInstanceOf[Proxyable])
|
||||
if (failedActor.isInstanceOf[Proxyable])
|
||||
failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor)
|
||||
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
|
||||
freshActor.postRestart(reason)
|
||||
|
|
@ -1357,16 +1357,16 @@ object RemoteActorSystemMessage {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] case class RemoteActorRef private[akka] (
|
||||
uuuid: String,
|
||||
val className: String,
|
||||
val hostname: String,
|
||||
val port: Int,
|
||||
_timeout: Long,
|
||||
uuuid: String,
|
||||
val className: String,
|
||||
val hostname: String,
|
||||
val port: Int,
|
||||
_timeout: Long,
|
||||
loader: Option[ClassLoader])
|
||||
extends ActorRef with ScalaActorRef {
|
||||
|
||||
ensureRemotingEnabled
|
||||
|
||||
|
||||
_uuid = uuuid
|
||||
timeout = _timeout
|
||||
|
||||
|
|
@ -1480,7 +1480,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
|||
* upon restart, remote restart etc.
|
||||
*/
|
||||
def id: String
|
||||
|
||||
|
||||
def id_=(id: String): Unit
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import se.scalablesolutions.akka.util.ListenerManagement
|
|||
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
*/
|
||||
sealed trait ActorRegistryEvent
|
||||
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
|
||||
case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ sealed class Supervisor private[akka] (
|
|||
_childActors.put(className, actorRef :: currentActors)
|
||||
actorRef.lifeCycle = Some(lifeCycle)
|
||||
supervisor.link(actorRef)
|
||||
remoteAddress.foreach { address =>
|
||||
remoteAddress.foreach { address =>
|
||||
RemoteServerModule.registerActor(
|
||||
new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ object ConfigLogger extends Logging
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Config {
|
||||
object Config {
|
||||
val VERSION = "1.0-SNAPSHOT"
|
||||
|
||||
// Set Multiverse options for max speed
|
||||
|
|
|
|||
|
|
@ -137,4 +137,4 @@ object DataFlow {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class DataFlowVariableException(msg: String) extends AkkaException(msg)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class GlobalStm extends TransactionManagement {
|
|||
val DefaultGlobalTransactionFactory = TransactionFactory(
|
||||
DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
|
||||
|
||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T =
|
||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T =
|
||||
atomic(factory)(body)
|
||||
|
||||
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class LocalStm extends TransactionManagement with Logging {
|
|||
val DefaultLocalTransactionFactory = TransactionFactory(
|
||||
DefaultLocalTransactionConfig, "DefaultLocalTransaction")
|
||||
|
||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T =
|
||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T =
|
||||
atomic(factory)(body)
|
||||
|
||||
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import java.net.InetSocketAddress
|
|||
* Helper class for reflective access to different modules in order to allow optional loading of modules.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
*/
|
||||
object ReflectiveAccess {
|
||||
|
||||
val loader = getClass.getClassLoader
|
||||
|
|
@ -26,15 +26,15 @@ object ReflectiveAccess {
|
|||
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
|
||||
|
||||
/**
|
||||
* Reflective access to the RemoteClient module.
|
||||
* Reflective access to the RemoteClient module.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteClientModule {
|
||||
|
||||
type RemoteClient = {
|
||||
type RemoteClient = {
|
||||
def send[T](
|
||||
message: Any,
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[_]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
|
|
@ -43,18 +43,18 @@ object ReflectiveAccess {
|
|||
actorRef: ActorRef,
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType): Option[CompletableFuture[T]]
|
||||
def registerSupervisorForActor(actorRef: ActorRef)
|
||||
def registerSupervisorForActor(actorRef: ActorRef)
|
||||
}
|
||||
|
||||
type RemoteClientObject = {
|
||||
def register(hostname: String, port: Int, uuid: String): Unit
|
||||
|
||||
type RemoteClientObject = {
|
||||
def register(hostname: String, port: Int, uuid: String): Unit
|
||||
def unregister(hostname: String, port: Int, uuid: String): Unit
|
||||
def clientFor(address: InetSocketAddress): RemoteClient
|
||||
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
|
||||
}
|
||||
|
||||
|
||||
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined
|
||||
|
||||
|
||||
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException(
|
||||
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
|
||||
|
||||
|
|
@ -87,9 +87,9 @@ object ReflectiveAccess {
|
|||
ensureRemotingEnabled
|
||||
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
|
||||
}
|
||||
|
||||
|
||||
def send[T](
|
||||
message: Any,
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[_]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
|
|
@ -101,11 +101,11 @@ object ReflectiveAccess {
|
|||
ensureRemotingEnabled
|
||||
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
|
||||
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reflective access to the RemoteServer module.
|
||||
* Reflective access to the RemoteServer module.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -113,15 +113,15 @@ object ReflectiveAccess {
|
|||
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
|
||||
val PORT = Config.config.getInt("akka.remote.server.port", 9999)
|
||||
|
||||
type RemoteServerObject = {
|
||||
type RemoteServerObject = {
|
||||
def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef): Unit
|
||||
def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
|
||||
}
|
||||
|
||||
type RemoteNodeObject = {
|
||||
def unregister(actorRef: ActorRef): Unit
|
||||
|
||||
type RemoteNodeObject = {
|
||||
def unregister(actorRef: ActorRef): Unit
|
||||
}
|
||||
|
||||
|
||||
val remoteServerObjectInstance: Option[RemoteServerObject] = {
|
||||
try {
|
||||
val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteServer$")
|
||||
|
|
@ -157,7 +157,7 @@ object ReflectiveAccess {
|
|||
}
|
||||
|
||||
/**
|
||||
* Reflective access to the TypedActors module.
|
||||
* Reflective access to the TypedActors module.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -167,7 +167,7 @@ object ReflectiveAccess {
|
|||
def isJoinPoint(message: Any): Boolean
|
||||
def isJoinPointAndOneWay(message: Any): Boolean
|
||||
}
|
||||
|
||||
|
||||
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined
|
||||
|
||||
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
|
||||
|
|
@ -185,7 +185,7 @@ object ReflectiveAccess {
|
|||
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
|
||||
ensureTypedActorEnabled
|
||||
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
|
||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||
}
|
||||
typedActorObjectInstance.get.isJoinPoint(message)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
|||
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
|
||||
latch.countDown
|
||||
}
|
||||
|
||||
|
||||
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
||||
result.get should equal (332833500)
|
||||
ActorRegistry.shutdownAll
|
||||
|
|
@ -136,7 +136,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
|||
thread { ints(0, 1000, producer) }
|
||||
thread { sum(0, producer, consumer) }
|
||||
thread { recurseSum(consumer) }
|
||||
|
||||
|
||||
latch.await(15,TimeUnit.SECONDS) should equal (true)
|
||||
ActorRegistry.shutdownAll
|
||||
}*/
|
||||
|
|
|
|||
|
|
@ -248,7 +248,7 @@ class RemoteClient private[akka] (
|
|||
protected override def manageLifeCycleOfListeners = false
|
||||
|
||||
def send[T](
|
||||
message: Any,
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
|
|
@ -262,8 +262,8 @@ class RemoteClient private[akka] (
|
|||
}
|
||||
|
||||
def send[T](
|
||||
request: RemoteRequestProtocol,
|
||||
senderFuture: Option[CompletableFuture[T]]):
|
||||
request: RemoteRequestProtocol,
|
||||
senderFuture: Option[CompletableFuture[T]]):
|
||||
Option[CompletableFuture[T]] = if (isRunning) {
|
||||
if (request.getIsOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
|
|
@ -464,25 +464,3 @@ class RemoteClientHandler(
|
|||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
}
|
||||
}
|
||||
|
||||
object RemoteDisconnectTest {
|
||||
import se.scalablesolutions.akka.actor.{Actor,ActorRef}
|
||||
|
||||
class TestClientActor extends Actor {
|
||||
def receive = {
|
||||
case ("send ping",akt:ActorRef) => akt ! "ping"
|
||||
case "pong" => {
|
||||
log.debug("got pong")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestServerActor extends Actor {
|
||||
def receive = {
|
||||
case "ping" => {
|
||||
log.debug("got ping")
|
||||
self reply "pong"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ object ActorSerialization {
|
|||
else None
|
||||
|
||||
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
||||
|
||||
|
||||
val factory = () => {
|
||||
val actorClass = classLoader.loadClass(protocol.getActorClassname)
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||
|
|
@ -180,7 +180,7 @@ object ActorSerialization {
|
|||
|
||||
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]
|
||||
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
|
||||
|
||||
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
||||
format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
||||
ar
|
||||
|
|
@ -252,7 +252,7 @@ object RemoteActorSerialization {
|
|||
.setTarget(actorClassName)
|
||||
.setTimeout(timeout)
|
||||
|
||||
typedActorInfo.foreach { typedActor =>
|
||||
typedActorInfo.foreach { typedActor =>
|
||||
actorInfoBuilder.setTypedActorInfo(
|
||||
TypedActorInfoProtocol.newBuilder
|
||||
.setInterface(typedActor._1)
|
||||
|
|
|
|||
|
|
@ -224,7 +224,7 @@ abstract class TypedActor extends Actor with Proxyable {
|
|||
if (arg.getClass.getName.contains(TypedActor.AW_PROXY_PREFIX)) unserializable = true
|
||||
}
|
||||
if (!unserializable && hasMutableArgument) {
|
||||
|
||||
|
||||
//FIXME serializeArguments
|
||||
// val copyOfArgs = Serializer.Java.deepClone(args)
|
||||
// joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
|
||||
|
|
@ -539,11 +539,11 @@ object TypedActor extends Logging {
|
|||
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
|
||||
Supervisor(SupervisorConfig(restartStrategy, components))
|
||||
|
||||
def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message))
|
||||
def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message))
|
||||
isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti])
|
||||
else false
|
||||
|
||||
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
||||
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -607,11 +607,11 @@ private[akka] sealed class TypedActorAspect {
|
|||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
|
||||
val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues)
|
||||
|
||||
|
||||
val future = RemoteClientModule.send[AnyRef](
|
||||
message, None, None, remoteAddress.get,
|
||||
timeout, isOneWay, actorRef,
|
||||
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
||||
message, None, None, remoteAddress.get,
|
||||
timeout, isOneWay, actorRef,
|
||||
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
||||
ActorType.TypedActor)
|
||||
|
||||
if (isOneWay) null // for void methods
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue