2012-01-20 14:29:50 +01:00
/* *
* Copyright ( C ) 2009 - 2010 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.remote
import scala.reflect.BeanProperty
2012-02-27 10:28:20 +01:00
import akka.actor. { Terminated , LocalRef , InternalActorRef , AutoReceivedMessage , AddressFromURIString , Address , ActorSystemImpl , ActorSystem , ActorRef }
2012-01-20 14:29:50 +01:00
import akka.dispatch.SystemMessage
import akka.event. { LoggingAdapter , Logging }
import akka.AkkaException
2012-01-27 12:14:28 +01:00
import akka.serialization.Serialization
2012-02-01 16:06:30 +01:00
import akka.remote.RemoteProtocol._
2012-02-28 15:48:02 +01:00
import akka.dispatch.ChildTerminated
2012-01-20 14:29:50 +01:00
/* *
* Remote life - cycle events .
*/
2012-03-16 13:45:32 +01:00
sealed trait RemoteLifeCycleEvent extends Serializable {
2012-01-20 14:29:50 +01:00
def logLevel : Logging.LogLevel
}
/* *
* Life - cycle events for RemoteClient .
*/
trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent {
def remoteAddress : Address
}
case class RemoteClientError (
@BeanProperty cause : Throwable ,
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty remoteAddress : Address ) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging . ErrorLevel
override def toString =
2012-02-18 17:40:58 +01:00
"RemoteClientError@" + remoteAddress + ": Error[" + cause + "]"
2012-01-20 14:29:50 +01:00
}
case class RemoteClientDisconnected (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty remoteAddress : Address ) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging . DebugLevel
override def toString =
"RemoteClientDisconnected@" + remoteAddress
}
case class RemoteClientConnected (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty remoteAddress : Address ) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging . DebugLevel
override def toString =
"RemoteClientConnected@" + remoteAddress
}
case class RemoteClientStarted (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty remoteAddress : Address ) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging . InfoLevel
override def toString =
"RemoteClientStarted@" + remoteAddress
}
case class RemoteClientShutdown (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty remoteAddress : Address ) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging . InfoLevel
override def toString =
"RemoteClientShutdown@" + remoteAddress
}
case class RemoteClientWriteFailed (
@BeanProperty request : AnyRef ,
@BeanProperty cause : Throwable ,
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty remoteAddress : Address ) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging . WarningLevel
override def toString =
2012-02-01 15:46:40 +01:00
"RemoteClientWriteFailed@" + remoteAddress +
": MessageClass[" + ( if ( request ne null ) request . getClass . getName else "no message" ) +
2012-02-18 17:40:58 +01:00
"] Error[" + cause + "]"
2012-01-20 14:29:50 +01:00
}
/* *
* Life - cycle events for RemoteServer .
*/
trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent
case class RemoteServerStarted (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ) extends RemoteServerLifeCycleEvent {
2012-01-20 14:29:50 +01:00
override def logLevel = Logging . InfoLevel
override def toString =
"RemoteServerStarted@" + remote
}
case class RemoteServerShutdown (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ) extends RemoteServerLifeCycleEvent {
2012-01-20 14:29:50 +01:00
override def logLevel = Logging . InfoLevel
override def toString =
"RemoteServerShutdown@" + remote
}
case class RemoteServerError (
@BeanProperty val cause : Throwable ,
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ) extends RemoteServerLifeCycleEvent {
2012-01-20 14:29:50 +01:00
override def logLevel = Logging . ErrorLevel
override def toString =
2012-02-18 17:40:58 +01:00
"RemoteServerError@" + remote + "] Error[" + cause + "]"
2012-01-20 14:29:50 +01:00
}
case class RemoteServerClientConnected (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty val clientAddress : Option [ Address ] ) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging . DebugLevel
override def toString =
2012-02-01 15:46:40 +01:00
"RemoteServerClientConnected@" + remote +
2012-02-05 09:19:57 +01:00
": Client[" + clientAddress . getOrElse ( "no address" ) + "]"
2012-01-20 14:29:50 +01:00
}
case class RemoteServerClientDisconnected (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty val clientAddress : Option [ Address ] ) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging . DebugLevel
override def toString =
2012-02-01 15:46:40 +01:00
"RemoteServerClientDisconnected@" + remote +
2012-02-05 09:19:57 +01:00
": Client[" + clientAddress . getOrElse ( "no address" ) + "]"
2012-01-20 14:29:50 +01:00
}
case class RemoteServerClientClosed (
2012-03-16 13:45:32 +01:00
@transient @BeanProperty remote : RemoteTransport ,
2012-01-20 14:29:50 +01:00
@BeanProperty val clientAddress : Option [ Address ] ) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging . DebugLevel
override def toString =
2012-02-01 15:46:40 +01:00
"RemoteServerClientClosed@" + remote +
2012-02-05 09:19:57 +01:00
": Client[" + clientAddress . getOrElse ( "no address" ) + "]"
2012-01-20 14:29:50 +01:00
}
/* *
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down .
*/
class RemoteClientException private [ akka ] (
message : String ,
2012-03-16 13:45:32 +01:00
@transient @BeanProperty val client : RemoteTransport ,
2012-01-20 14:29:50 +01:00
val remoteAddress : Address , cause : Throwable = null ) extends AkkaException ( message , cause )
class RemoteTransportException ( message : String , cause : Throwable ) extends AkkaException ( message , cause )
2012-01-27 15:21:05 +01:00
/* *
* The remote transport is responsible for sending and receiving messages .
* Each transport has an address , which it should provide in
* Serialization . currentTransportAddress ( thread - local ) while serializing
* actor references ( which might also be part of messages ) . This address must
* be available ( i . e . fully initialized ) by the time the first message is
* received or when the start ( ) method returns , whatever happens first .
*/
2012-01-20 14:29:50 +01:00
abstract class RemoteTransport {
/* *
* Shuts down the remoting
*/
def shutdown ( ) : Unit
/* *
* Address to be used in RootActorPath of refs generated for this transport .
*/
def address : Address
/* *
* The actor system , for which this transport is instantiated . Will publish to its eventStream .
*/
def system : ActorSystem
/* *
2012-01-27 12:14:28 +01:00
* Start up the transport , i . e . enable incoming connections .
2012-01-20 14:29:50 +01:00
*/
2012-01-27 12:14:28 +01:00
def start ( ) : Unit
2012-01-20 14:29:50 +01:00
/* *
* Shuts down a specific client connected to the supplied remote address returns true if successful
*/
def shutdownClientConnection ( address : Address ) : Boolean
/* *
* Restarts a specific client connected to the supplied remote address , but only if the client is not shut down
*/
def restartClientConnection ( address : Address ) : Boolean
/* * Methods that needs to be implemented by a transport * */
2012-04-08 00:25:53 +02:00
def send ( message : Any ,
senderOption : Option [ ActorRef ] ,
recipient : RemoteActorRef ) : Unit
2012-01-20 14:29:50 +01:00
2012-04-08 00:25:53 +02:00
def notifyListeners ( message : RemoteLifeCycleEvent ) : Unit = {
2012-01-20 14:29:50 +01:00
system . eventStream . publish ( message )
2012-02-18 17:40:58 +01:00
system . log . log ( message . logLevel , "{}" , message )
2012-01-20 14:29:50 +01:00
}
override def toString = address . toString
}
2012-01-27 13:30:43 +01:00
class RemoteMessage ( input : RemoteMessageProtocol , system : ActorSystemImpl ) {
2012-01-20 14:29:50 +01:00
def originalReceiver = input . getRecipient . getPath
lazy val sender : ActorRef =
if ( input . hasSender ) system . provider . actorFor ( system . provider . rootGuardian , input . getSender . getPath )
else system . deadLetters
lazy val recipient : InternalActorRef = system . provider . actorFor ( system . provider . rootGuardian , originalReceiver )
Bye-bye ReflectiveAccess, introducing PropertyMaster, see #1750
- PropertyMaster is the only place in Akka which calls
ClassLoader.getClass (apart from kernel, which might be special)
- all PropertyMaster methods (there are only three) take a ClassManifest
of what is to be constructed, and they verify that the obtained object
is actually compatible with the required type
Other stuff:
- noticed that I had forgotten to change to ExtendedActorSystem when
constructing Extensions by ExtensionKey (damn you, reflection!)
- moved Serializer.currentSystem into JavaSerializer, because that’s the
only one needing it (it’s only used in readResolve() methods)
- Serializers are constructed now with one-arg constructor taking
ExtendedActorSystem (if that exists, otherwise no-arg as before), to
allow JavaSerializer to do its magic; possibly necessary for others as
well
- Removed all Option[ClassLoader] signatures
- made it so that the ActorSystem will try context class loader, then
the class loader which loaded the class actually calling into
ActorSystem.apply, then the loader which loaded ActorSystemImpl
- for the second of the above I added a (reflectively accessed hopefully
safe) facility for getting caller Class[_] objects by using
sun.reflect.Reflection; this is optional an defaults to None, e.g. on
Android, which means that getting the caller’s classloader is done on
a best effort basis (there’s nothing we can do because a StackTrace
does not contain actual Class[_] objects).
- refactored DurableMailbox to contain the owner val and use that
instead of declaring that in all subclasses
2012-02-09 11:56:43 +01:00
lazy val payload : AnyRef = MessageSerializer . deserialize ( system , input . getMessage )
2012-01-20 14:29:50 +01:00
override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender
}
trait RemoteMarshallingOps {
def log : LoggingAdapter
def system : ActorSystemImpl
def provider : RemoteActorRefProvider
2012-01-27 12:14:28 +01:00
def address : Address
2012-01-20 14:29:50 +01:00
protected def useUntrustedMode : Boolean
def createMessageSendEnvelope ( rmp : RemoteMessageProtocol ) : AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol . newBuilder
arp . setMessage ( rmp )
arp . build
}
def createControlEnvelope ( rcp : RemoteControlProtocol ) : AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol . newBuilder
arp . setInstruction ( rcp )
arp . build
}
/* *
* Serializes the ActorRef instance into a Protocol Buffers ( protobuf ) Message .
*/
def toRemoteActorRefProtocol ( actor : ActorRef ) : ActorRefProtocol = {
2012-01-27 12:14:28 +01:00
ActorRefProtocol . newBuilder . setPath ( actor . path . toStringWithAddress ( address ) ) . build
2012-01-20 14:29:50 +01:00
}
def createRemoteMessageProtocolBuilder (
recipient : ActorRef ,
message : Any ,
senderOption : Option [ ActorRef ] ) : RemoteMessageProtocol . Builder = {
val messageBuilder = RemoteMessageProtocol . newBuilder . setRecipient ( toRemoteActorRefProtocol ( recipient ) )
if ( senderOption . isDefined ) messageBuilder . setSender ( toRemoteActorRefProtocol ( senderOption . get ) )
2012-01-27 12:14:28 +01:00
Serialization . currentTransportAddress . withValue ( address ) {
messageBuilder . setMessage ( MessageSerializer . serialize ( system , message . asInstanceOf [ AnyRef ] ) )
}
2012-01-20 14:29:50 +01:00
messageBuilder
}
def receiveMessage ( remoteMessage : RemoteMessage ) {
val remoteDaemon = provider . remoteDaemon
remoteMessage . recipient match {
case `remoteDaemon` ⇒
2012-01-27 12:14:28 +01:00
if ( provider . remoteSettings . LogReceive ) log . debug ( "received daemon message {}" , remoteMessage )
2012-01-20 14:29:50 +01:00
remoteMessage . payload match {
case m @ ( _ : DaemonMsg | _ : Terminated ) ⇒
try remoteDaemon ! m catch {
case e : Exception ⇒ log . error ( e , "exception while processing remote command {} from {}" , m , remoteMessage . sender )
}
case x ⇒ log . warning ( "remoteDaemon received illegal message {} from {}" , x , remoteMessage . sender )
}
case l : LocalRef ⇒
2012-01-27 12:14:28 +01:00
if ( provider . remoteSettings . LogReceive ) log . debug ( "received local message {}" , remoteMessage )
2012-01-20 14:29:50 +01:00
remoteMessage . payload match {
case msg : SystemMessage ⇒
if ( useUntrustedMode )
throw new SecurityException ( "RemoteModule server is operating is untrusted mode, can not send system message" )
else l . sendSystemMessage ( msg )
case _ : AutoReceivedMessage if ( useUntrustedMode ) ⇒
throw new SecurityException ( "RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor" )
case m ⇒ l . ! ( m ) ( remoteMessage . sender )
}
2012-01-27 12:14:28 +01:00
case r : RemoteRef ⇒
if ( provider . remoteSettings . LogReceive ) log . debug ( "received remote-destined message {}" , remoteMessage )
2012-01-20 14:29:50 +01:00
remoteMessage . originalReceiver match {
2012-02-27 10:28:20 +01:00
case AddressFromURIString ( address ) if address == provider . transport . address ⇒
2012-01-27 12:14:28 +01:00
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
2012-01-20 14:29:50 +01:00
r . ! ( remoteMessage . payload ) ( remoteMessage . sender )
2012-04-23 16:38:22 +02:00
case r ⇒ log . error ( "dropping message {} for non-local recipient {} arriving at {} inbound address is {}" , remoteMessage . payload , r , address , provider . transport . address )
2012-01-20 14:29:50 +01:00
}
2012-04-23 16:38:22 +02:00
case r ⇒ log . error ( "dropping message {} for non-local recipient {} arriving at {} inbound address is {}" , remoteMessage . payload , r , address , provider . transport . address )
2012-01-20 14:29:50 +01:00
}
}
}