1. Fixed issues with remote message tx log.

2. Added trait for network failure testing that supports 'TCP RST', 'TCP DENY' and message throttling/delay.
3. Added test for the remote transaction log. Both for TCP RST and TCP DENY.
This commit is contained in:
Jonas Bonér 2011-03-25 16:06:55 +01:00
parent 331b5c7b74
commit 5a9bfe9053
4 changed files with 265 additions and 21 deletions

View file

@ -181,9 +181,15 @@ abstract class RemoteClient private[akka] (
/**
* Returns an array with the current pending messages not yet delivered.
*/
def pendingMessages: Array[Any] = pendingRequests
.toArray.asInstanceOf[Array[(Boolean, Uuid, RemoteMessageProtocol)]]
.map(req => MessageSerializer.deserialize(req._3.getMessage))
def pendingMessages: Array[Any] = {
var messages = Vector[Any]()
val iter = pendingRequests.iterator
while (iter.hasNext) {
val (_, _, message) = iter.next
messages = messages :+ MessageSerializer.deserialize(message.getMessage)
}
messages.toArray
}
/**
* Converts the message to the wireprotocol and sends the message across the wire
@ -222,16 +228,43 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
pendingRequests.add((true, null, request))
sendPendingMessages()
try {
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
}
} catch {
case e: Throwable =>
pendingRequests.add((true, null, request)) // add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress))
throw e
}
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
pendingRequests.add((false, futureUuid, request))
sendPendingMessages()
def handleRequestReplyError(future: ChannelFuture) = {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
var future: ChannelFuture = null
try {
// try to send the original one
future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) handleRequestReplyError(future)
} catch {
case e: Exception => handleRequestReplyError(future)
}
Some(futureResult)
}
} else {
@ -241,10 +274,12 @@ abstract class RemoteClient private[akka] (
}
}
private[remote] def sendPendingMessages() = {
var pendingMessage = pendingRequests.peek // try to grab first message
while (pendingMessage ne null) {
val (isOneWay, futureUuid, message) = pendingMessage
private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log
val nrOfMessages = pendingRequests.size
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
var pendingRequest = pendingRequests.peek
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) { // sendOneWay
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
@ -255,16 +290,15 @@ abstract class RemoteClient private[akka] (
} else { // sendRequestReply
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled) {
futures.remove(futureUuid) // Clean up future
} else if (!future.isSuccess) {
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
}
pendingRequests.remove(pendingMessage) // message delivered; remove from tx log
pendingMessage = pendingRequests.peek // try to grab next message
pendingRequests.remove(pendingRequest)
pendingRequest = pendingRequests.peek // try to grab next message
}
}
@ -474,9 +508,16 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
try {
client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
} catch {
case e: Throwable =>
EventHandler.error(e, this, e.getMessage)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
}
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {