make everything except tutorial-second compile

someone should look at remoting vs. timeout (i.e. which is sent around),
because I removed that in some places. It might simply be irrelevant
once we remove the Future special-casing.
This commit is contained in:
Roland 2011-10-12 09:10:05 +02:00
parent 93b1ef3703
commit 14751f7d29
22 changed files with 300 additions and 254 deletions

View file

@ -6,16 +6,10 @@ package akka.remote.netty
import akka.actor.{ ActorRef, Uuid, newUuid, uuidFrom, IllegalActorStateException, RemoteActorRef, PoisonPill, RemoteActorSystemMessage, AutoReceivedMessage }
import akka.dispatch.{ ActorPromise, DefaultPromise, Promise }
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.remote._
import RemoteProtocol._
import akka.actor.Actor._
import akka.config.Config
import akka.config.Config._
import akka.util._
import akka.event.EventHandler
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@ -27,14 +21,14 @@ import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.util.concurrent._
import java.util.concurrent.atomic._
import akka.AkkaException
import akka.AkkaApplication
import akka.serialization.RemoteActorSerialization
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
@ -55,21 +49,23 @@ object RemoteEncoder {
}
trait NettyRemoteClientModule extends RemoteClientModule {
self: ListenerManagement
self: RemoteSupport
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val remoteActors = new Index[RemoteAddress, Uuid]
private val lock = new ReadWriteGuard
def app: AkkaApplication
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
loader: Option[ClassLoader]): Option[Promise[T]] =
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress,
isOneWay: Boolean,
actorRef: ActorRef,
loader: Option[ClassLoader]): Option[Promise[T]] =
withClientFor(remoteAddress, loader) { client
client.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef)
client.send[T](message, senderOption, senderFuture, remoteAddress, isOneWay, actorRef)
}
private[akka] def withClientFor[T](
@ -89,7 +85,7 @@ trait NettyRemoteClientModule extends RemoteClientModule {
//Recheck for addition, race between upgrades
case Some(client) client //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
val client = new ActiveRemoteClient(app, this, address, loader, self.notifyListeners _)
client.connect()
remoteClients += key -> client
client
@ -144,9 +140,14 @@ trait NettyRemoteClientModule extends RemoteClientModule {
* reuses an already established connection.
*/
abstract class RemoteClient private[akka] (
val app: AkkaApplication,
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
import app.config
implicit def _app = app
val serialization = new RemoteActorSerialization(app)
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", false)
val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1)
@ -157,7 +158,7 @@ abstract class RemoteClient private[akka] (
protected val futures = new ConcurrentHashMap[Uuid, Promise[_]]
protected val pendingRequests = {
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
}
private[remote] val runSwitch = new Switch()
@ -180,7 +181,7 @@ abstract class RemoteClient private[akka] (
val iter = pendingRequests.iterator
while (iter.hasNext) {
val (_, _, message) = iter.next
messages = messages :+ MessageSerializer.deserialize(message.getMessage)
messages = messages :+ MessageSerializer.deserialize(app, message.getMessage)
}
messages.toArray
}
@ -193,11 +194,10 @@ abstract class RemoteClient private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef): Option[Promise[T]] = {
val messageProtocol = createRemoteMessageProtocolBuilder(
Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build
val messageProtocol = serialization.createRemoteMessageProtocolBuilder(
Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.TimeoutMillis, Right(message), isOneWay, senderOption).build
send(messageProtocol, senderFuture)
}
@ -318,13 +318,15 @@ abstract class RemoteClient private[akka] (
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveRemoteClient private[akka] (
_app: AkkaApplication,
module: NettyRemoteClientModule,
remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None,
notifyListenersFun: ( Any) Unit)
extends RemoteClient(module, remoteAddress) {
extends RemoteClient(_app, module, remoteAddress) {
import RemoteClientSettings._
val settings = new RemoteClientSettings(app)
import settings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile
@ -381,7 +383,7 @@ class ActiveRemoteClient private[akka] (
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, futures, bootstrap, remoteAddress, timer, this))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(app, settings, name, futures, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -412,7 +414,7 @@ class ActiveRemoteClient private[akka] (
}
}
}
}, RemoteClientSettings.REAP_FUTURES_DELAY.length, RemoteClientSettings.REAP_FUTURES_DELAY.unit)
}, REAP_FUTURES_DELAY.length, REAP_FUTURES_DELAY.unit)
notifyListeners(RemoteClientStarted(module, remoteAddress))
true
}
@ -465,20 +467,24 @@ class ActiveRemoteClient private[akka] (
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveRemoteClientPipelineFactory(
app: AkkaApplication,
val settings: RemoteClientSettings,
name: String,
futures: ConcurrentMap[Uuid, Promise[_]],
bootstrap: ClientBootstrap,
remoteAddress: InetSocketAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
import settings._
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val timeout = new ReadTimeoutHandler(timer, READ_TIMEOUT.length, READ_TIMEOUT.unit)
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client)
val remoteClient = new ActiveRemoteClientHandler(app, settings, name, futures, bootstrap, remoteAddress, timer, client)
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)
}
@ -489,6 +495,8 @@ class ActiveRemoteClientPipelineFactory(
*/
@ChannelHandler.Sharable
class ActiveRemoteClientHandler(
val app: AkkaApplication,
val settings: RemoteClientSettings,
val name: String,
val futures: ConcurrentMap[Uuid, Promise[_]],
val bootstrap: ClientBootstrap,
@ -497,13 +505,15 @@ class ActiveRemoteClientHandler(
val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler {
implicit def _app = app
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
event.getMessage match {
case arp: AkkaRemoteProtocol if arp.hasInstruction
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN spawn {
case CommandType.SHUTDOWN akka.dispatch.Future {
client.module.shutdownClientConnection(remoteAddress)
}
}
@ -521,7 +531,7 @@ class ActiveRemoteClientHandler(
case future
if (reply.hasMessage) {
val message = MessageSerializer.deserialize(reply.getMessage)
val message = MessageSerializer.deserialize(app, reply.getMessage)
future.completeWithResult(message)
} else {
future.completeWithException(parseException(reply, client.loader))
@ -547,8 +557,8 @@ class ActiveRemoteClientHandler(
client.connect(reconnectIfAlreadyConnected = true)
}
}
}, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn {
}, settings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else akka.dispatch.Future {
client.module.shutdownClientConnection(remoteAddress) // spawn in another thread
}
}
@ -578,7 +588,7 @@ class ActiveRemoteClientHandler(
cause match {
case e: ReadTimeoutException
spawn {
akka.dispatch.Future {
client.module.shutdownClientConnection(remoteAddress) // spawn in another thread
}
case e: Exception
@ -610,7 +620,7 @@ class ActiveRemoteClientHandler(
/**
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with NettyRemoteServerModule with NettyRemoteClientModule {
// Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
@ -639,13 +649,16 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
EventHandler.debug(this,
"Creating RemoteActorRef with address [%s] connected to [%s]"
.format(actorAddress, remoteInetSocketAddress))
RemoteActorRef(remoteInetSocketAddress, actorAddress, timeout, loader)
RemoteActorRef(app, app.remote, remoteInetSocketAddress, actorAddress, loader)
}
}
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
import RemoteServerSettings._
class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
val settings = new RemoteServerSettings(app)
import settings._
val serialization = new RemoteActorSerialization(app)
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host, port)
@ -664,13 +677,13 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, serverModule)
val pipelineFactory = new RemoteServerPipelineFactory(settings, serialization, name, openChannels, executor, loader, serverModule)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("backlog", RemoteServerSettings.BACKLOG)
bootstrap.setOption("backlog", BACKLOG)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT.toMillis)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT.toMillis)
openChannels.add(bootstrap.bind(address))
serverModule.notifyListeners(RemoteServerStarted(serverModule))
@ -680,8 +693,8 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
if (RemoteClientSettings.SECURE_COOKIE.nonEmpty)
b.setCookie(RemoteClientSettings.SECURE_COOKIE.get)
if (SECURE_COOKIE.nonEmpty)
b.setCookie(SECURE_COOKIE.get)
b.build
}
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
@ -698,19 +711,21 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
}
trait NettyRemoteServerModule extends RemoteServerModule {
self: RemoteModule
self: RemoteSupport
def app: AkkaApplication
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match {
case Some(server) server.address
case None ReflectiveAccess.RemoteModule.configDefaultAddress
case None app.reflective.RemoteModule.configDefaultAddress
}
def name = currentServer.get match {
case Some(server) server.name
case None
val a = ReflectiveAccess.RemoteModule.configDefaultAddress
val a = app.reflective.RemoteModule.configDefaultAddress
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort
}
@ -723,7 +738,7 @@ trait NettyRemoteServerModule extends RemoteServerModule {
_isRunning switchOn {
EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port))
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader)))
}
} catch {
case e: Exception
@ -826,13 +841,15 @@ trait NettyRemoteServerModule extends RemoteServerModule {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(
val settings: RemoteServerSettings,
val serialization: RemoteActorSerialization,
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
import settings._
def getPipeline: ChannelPipeline = {
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
@ -841,7 +858,7 @@ class RemoteServerPipelineFactory(
val protobufEnc = new ProtobufEncoder
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val remoteServer = new RemoteServerHandler(settings, serialization, name, openChannels, loader, server)
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
@ -878,12 +895,16 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si
*/
@ChannelHandler.Sharable
class RemoteServerHandler(
val settings: RemoteServerSettings,
val serialization: RemoteActorSerialization,
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
import settings._
implicit def app = server.app
// applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
@ -1000,9 +1021,9 @@ class RemoteServerHandler(
return
}
val message = MessageSerializer.deserialize(request.getMessage)
val message = MessageSerializer.deserialize(app, request.getMessage)
val sender =
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
if (request.hasSender) Some(serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
message match {
@ -1023,7 +1044,7 @@ class RemoteServerHandler(
onComplete(_.value.get match {
case Left(exception) write(channel, createErrorReplyMessage(exception, request))
case r: Right[_, _]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
val messageBuilder = serialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getAddress,
@ -1059,7 +1080,7 @@ class RemoteServerHandler(
EventHandler.debug(this,
"Looking up a remotely available actor for address [%s] on node [%s]"
.format(address, Config.nodename))
.format(address, app.nodename))
val byAddress = server.actors.get(address) // try actor-by-address
if (byAddress eq null) {
@ -1102,7 +1123,7 @@ class RemoteServerHandler(
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
val actorInfo = request.getActorInfo
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
val messageBuilder = serialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getAddress,