diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index 24a04c3eb3..6a5365f3fb 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -506,7 +506,7 @@ private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) { * Marker interface for proxyable actors (such as typed actor). * * @author Jonas Bonér - */ + */ trait Proxyable { private[actor] def swapProxiedActor(newInstance: Actor) } @@ -515,9 +515,9 @@ trait Proxyable { * Represents the different Actor types. * * @author Jonas Bonér - */ + */ sealed trait ActorType object ActorType { case object ScalaActor extends ActorType - case object TypedActor extends ActorType + case object TypedActor extends ActorType } diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index cec8d18d84..7f2c5c4c01 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -62,9 +62,9 @@ import scala.reflect.BeanProperty * * @author Jonas Bonér */ -trait ActorRef extends - ActorRefShared with - TransactionManagement with +trait ActorRef extends + ActorRefShared with + TransactionManagement with Logging with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => @@ -78,7 +78,7 @@ trait ActorRef extends @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[akka] val guard = new ReentrantGuard - + /** * User overridable callback/setting. *

@@ -746,7 +746,7 @@ class LocalActorRef private[akka]( ensureRemotingEnabled if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port)) else throw new ActorInitializationException( - "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") } /** @@ -759,7 +759,7 @@ class LocalActorRef private[akka]( RemoteClientModule.register(address, uuid) homeAddress = (RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) } else throw new ActorInitializationException( - "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") } /** @@ -830,10 +830,10 @@ class LocalActorRef private[akka]( actor.shutdown ActorRegistry.unregister(this) if (isRemotingEnabled) { - remoteAddress.foreach { address => + remoteAddress.foreach { address => RemoteClientModule.unregister(address, uuid) } - RemoteServerModule.unregister(this) + RemoteServerModule.unregister(this) } nullOutActorRefReferencesFor(actorInstance.get) } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") @@ -1137,7 +1137,7 @@ class LocalActorRef private[akka]( freshActor.init freshActor.initTransactionalState actorInstance.set(freshActor) - if (failedActor.isInstanceOf[Proxyable]) + if (failedActor.isInstanceOf[Proxyable]) failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor) Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) freshActor.postRestart(reason) @@ -1357,16 +1357,16 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - uuuid: String, - val className: String, - val hostname: String, - val port: Int, - _timeout: Long, + uuuid: String, + val className: String, + val hostname: String, + val port: Int, + _timeout: Long, loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { ensureRemotingEnabled - + _uuid = uuuid timeout = _timeout @@ -1480,7 +1480,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * upon restart, remote restart etc. */ def id: String - + def id_=(id: String): Unit /** diff --git a/akka-actor/src/main/scala/actor/ActorRegistry.scala b/akka-actor/src/main/scala/actor/ActorRegistry.scala index b14ff45f48..e8c38f2b76 100644 --- a/akka-actor/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/actor/ActorRegistry.scala @@ -16,7 +16,7 @@ import se.scalablesolutions.akka.util.ListenerManagement * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. * * @author Jonas Bonér - */ + */ sealed trait ActorRegistryEvent case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent diff --git a/akka-actor/src/main/scala/actor/Supervisor.scala b/akka-actor/src/main/scala/actor/Supervisor.scala index 5493f35c56..1af351a33d 100644 --- a/akka-actor/src/main/scala/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/actor/Supervisor.scala @@ -162,7 +162,7 @@ sealed class Supervisor private[akka] ( _childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = Some(lifeCycle) supervisor.link(actorRef) - remoteAddress.foreach { address => + remoteAddress.foreach { address => RemoteServerModule.registerActor( new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef) } diff --git a/akka-actor/src/main/scala/config/Config.scala b/akka-actor/src/main/scala/config/Config.scala index bc16ce59c8..3b50d613c1 100644 --- a/akka-actor/src/main/scala/config/Config.scala +++ b/akka-actor/src/main/scala/config/Config.scala @@ -24,7 +24,7 @@ object ConfigLogger extends Logging * * @author Jonas Bonér */ -object Config { +object Config { val VERSION = "1.0-SNAPSHOT" // Set Multiverse options for max speed diff --git a/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala b/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala index f35d0f898e..bddfd8597f 100644 --- a/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala @@ -137,4 +137,4 @@ object DataFlow { * @author Jonas Bonér */ class DataFlowVariableException(msg: String) extends AkkaException(msg) -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/stm/global/GlobalStm.scala b/akka-actor/src/main/scala/stm/global/GlobalStm.scala index 76de9d5f57..f2dfce8a96 100644 --- a/akka-actor/src/main/scala/stm/global/GlobalStm.scala +++ b/akka-actor/src/main/scala/stm/global/GlobalStm.scala @@ -31,7 +31,7 @@ class GlobalStm extends TransactionManagement { val DefaultGlobalTransactionFactory = TransactionFactory( DefaultGlobalTransactionConfig, "DefaultGlobalTransaction") - def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body) def atomic[T](factory: TransactionFactory)(body: => T): T = { diff --git a/akka-actor/src/main/scala/stm/local/LocalStm.scala b/akka-actor/src/main/scala/stm/local/LocalStm.scala index c24097f9e5..f0e60206f6 100644 --- a/akka-actor/src/main/scala/stm/local/LocalStm.scala +++ b/akka-actor/src/main/scala/stm/local/LocalStm.scala @@ -32,7 +32,7 @@ class LocalStm extends TransactionManagement with Logging { val DefaultLocalTransactionFactory = TransactionFactory( DefaultLocalTransactionConfig, "DefaultLocalTransaction") - def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body) def atomic[T](factory: TransactionFactory)(body: => T): T = { diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 72909457b0..da9fb2f3c6 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -14,7 +14,7 @@ import java.net.InetSocketAddress * Helper class for reflective access to different modules in order to allow optional loading of modules. * * @author Jonas Bonér - */ + */ object ReflectiveAccess { val loader = getClass.getClassLoader @@ -26,15 +26,15 @@ object ReflectiveAccess { def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled /** - * Reflective access to the RemoteClient module. + * Reflective access to the RemoteClient module. * * @author Jonas Bonér */ object RemoteClientModule { - type RemoteClient = { + type RemoteClient = { def send[T]( - message: Any, + message: Any, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[_]], remoteAddress: InetSocketAddress, @@ -43,18 +43,18 @@ object ReflectiveAccess { actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType): Option[CompletableFuture[T]] - def registerSupervisorForActor(actorRef: ActorRef) + def registerSupervisorForActor(actorRef: ActorRef) } - - type RemoteClientObject = { - def register(hostname: String, port: Int, uuid: String): Unit + + type RemoteClientObject = { + def register(hostname: String, port: Int, uuid: String): Unit def unregister(hostname: String, port: Int, uuid: String): Unit def clientFor(address: InetSocketAddress): RemoteClient def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient } - + lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined - + def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException( "Can't load the remoting module, make sure that akka-remote.jar is on the classpath") @@ -87,9 +87,9 @@ object ReflectiveAccess { ensureRemotingEnabled remoteClientObjectInstance.get.clientFor(hostname, port, loader) } - + def send[T]( - message: Any, + message: Any, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[_]], remoteAddress: InetSocketAddress, @@ -101,11 +101,11 @@ object ReflectiveAccess { ensureRemotingEnabled clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T]( message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) - } + } } /** - * Reflective access to the RemoteServer module. + * Reflective access to the RemoteServer module. * * @author Jonas Bonér */ @@ -113,15 +113,15 @@ object ReflectiveAccess { val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost") val PORT = Config.config.getInt("akka.remote.server.port", 9999) - type RemoteServerObject = { + type RemoteServerObject = { def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef): Unit def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit } - - type RemoteNodeObject = { - def unregister(actorRef: ActorRef): Unit + + type RemoteNodeObject = { + def unregister(actorRef: ActorRef): Unit } - + val remoteServerObjectInstance: Option[RemoteServerObject] = { try { val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteServer$") @@ -157,7 +157,7 @@ object ReflectiveAccess { } /** - * Reflective access to the TypedActors module. + * Reflective access to the TypedActors module. * * @author Jonas Bonér */ @@ -167,7 +167,7 @@ object ReflectiveAccess { def isJoinPoint(message: Any): Boolean def isJoinPointAndOneWay(message: Any): Boolean } - + lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( @@ -185,7 +185,7 @@ object ReflectiveAccess { def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = { ensureTypedActorEnabled if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) { - future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) + future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) } typedActorObjectInstance.get.isJoinPoint(message) } diff --git a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala index c5d3c32e63..518d50fc36 100644 --- a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala +++ b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala @@ -99,7 +99,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) latch.countDown } - + latch.await(3,TimeUnit.SECONDS) should equal (true) result.get should equal (332833500) ActorRegistry.shutdownAll @@ -136,7 +136,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { thread { ints(0, 1000, producer) } thread { sum(0, producer, consumer) } thread { recurseSum(consumer) } - + latch.await(15,TimeUnit.SECONDS) should equal (true) ActorRegistry.shutdownAll }*/ diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index 8a555bf5a1..2b3a3a41ce 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -248,7 +248,7 @@ class RemoteClient private[akka] ( protected override def manageLifeCycleOfListeners = false def send[T]( - message: Any, + message: Any, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]], remoteAddress: InetSocketAddress, @@ -262,8 +262,8 @@ class RemoteClient private[akka] ( } def send[T]( - request: RemoteRequestProtocol, - senderFuture: Option[CompletableFuture[T]]): + request: RemoteRequestProtocol, + senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { if (request.getIsOneWay) { connection.getChannel.write(request) @@ -464,25 +464,3 @@ class RemoteClientHandler( .newInstance(exception.getMessage).asInstanceOf[Throwable] } } - -object RemoteDisconnectTest { -import se.scalablesolutions.akka.actor.{Actor,ActorRef} - - class TestClientActor extends Actor { - def receive = { - case ("send ping",akt:ActorRef) => akt ! "ping" - case "pong" => { - log.debug("got pong") - } - } - } - - class TestServerActor extends Actor { - def receive = { - case "ping" => { - log.debug("got ping") - self reply "pong" - } - } - } -} diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index e7bee8e9b9..3f54f8e921 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -155,7 +155,7 @@ object ActorSerialization { else None val classLoader = loader.getOrElse(getClass.getClassLoader) - + val factory = () => { val actorClass = classLoader.loadClass(protocol.getActorClassname) if (format.isInstanceOf[SerializerBasedActorFormat[_]]) @@ -180,7 +180,7 @@ object ActorSerialization { val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) - + if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false) format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T]) ar @@ -252,7 +252,7 @@ object RemoteActorSerialization { .setTarget(actorClassName) .setTimeout(timeout) - typedActorInfo.foreach { typedActor => + typedActorInfo.foreach { typedActor => actorInfoBuilder.setTypedActorInfo( TypedActorInfoProtocol.newBuilder .setInterface(typedActor._1) diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 96790b590b..b27f5b4b4d 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -224,7 +224,7 @@ abstract class TypedActor extends Actor with Proxyable { if (arg.getClass.getName.contains(TypedActor.AW_PROXY_PREFIX)) unserializable = true } if (!unserializable && hasMutableArgument) { - + //FIXME serializeArguments // val copyOfArgs = Serializer.Java.deepClone(args) // joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]]) @@ -539,11 +539,11 @@ object TypedActor extends Logging { private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = Supervisor(SupervisorConfig(restartStrategy, components)) - def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message)) + def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message)) isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti]) else false - private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] + private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint] } /** @@ -607,11 +607,11 @@ private[akka] sealed class TypedActorAspect { val isOneWay = TypedActor.isOneWay(methodRtti) val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) - + val future = RemoteClientModule.send[AnyRef]( - message, None, None, remoteAddress.get, - timeout, isOneWay, actorRef, - Some((interfaceClass.getName, methodRtti.getMethod.getName)), + message, None, None, remoteAddress.get, + timeout, isOneWay, actorRef, + Some((interfaceClass.getName, methodRtti.getMethod.getName)), ActorType.TypedActor) if (isOneWay) null // for void methods