Merge pull request #1338 from drewhk/wip-1478-sysmsg-guaranteed-delivery-drewhk

Guaranteed delivery of system messages (and now praying...)
This commit is contained in:
drewhk 2013-04-18 12:38:28 -07:00
commit 98d4bcc9dd
30 changed files with 4749 additions and 2770 deletions

View file

@ -201,7 +201,7 @@ object FSMTimingSpec {
case Initial -> TestSingleTimerResubmit setTimer("blah", Tick, 500.millis.dilated, false) case Initial -> TestSingleTimerResubmit setTimer("blah", Tick, 500.millis.dilated, false)
} }
when(TestSingleTimerResubmit) { when(TestSingleTimerResubmit) {
case Event(Tick, _) tester ! Tick; setTimer("blah", Tock, 500.millis.dilated, false) case Event(Tick, _) tester ! Tick; setTimer("blah", Tock, 500.millis.dilated, false); stay()
case Event(Tock, _) tester ! Tock; goto(Initial) case Event(Tock, _) tester ! Tock; goto(Initial)
} }
when(TestCancelTimer) { when(TestCancelTimer) {

View file

@ -342,7 +342,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
* @param repeat send once if false, scheduleAtFixedRate if true * @param repeat send once if false, scheduleAtFixedRate if true
* @return current state descriptor * @return current state descriptor
*/ */
final def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): State = { final def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): Unit = {
if (debugEvent) if (debugEvent)
log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg)
if (timers contains name) { if (timers contains name) {
@ -351,7 +351,6 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
val timer = Timer(name, msg, repeat, timerGen.next)(context) val timer = Timer(name, msg, repeat, timerGen.next)(context)
timer.schedule(self, timeout) timer.schedule(self, timeout)
timers(name) = timer timers(name) = timer
stay
} }
/** /**

View file

@ -29,7 +29,7 @@ object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig {
""" """
# this setting is here to limit the number of retries and failures while the # this setting is here to limit the number of retries and failures while the
# node is being blackholed # node is being blackholed
akka.remote.failure-detector.retry-gate-closed-for = 500 ms akka.remote.retry-gate-closed-for = 500 ms
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.cluster.publish-stats-interval = 0s akka.cluster.publish-stats-interval = 0s
@ -169,6 +169,7 @@ abstract class UnreachableNodeJoinsAgainSpec
val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
system.shutdown() system.shutdown()
system.awaitTermination(10 seconds) system.awaitTermination(10 seconds)
Thread.sleep(5000)
// create new ActorSystem with same host:port // create new ActorSystem with same host:port
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp { akka.remote.netty.tcp {

View file

@ -498,6 +498,7 @@ class ClusterSingletonManager(
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption) logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption)
previousLeaderOption foreach { peer(_) ! HandOverToMe } previousLeaderOption foreach { peer(_) ! HandOverToMe }
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false) setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false)
stay()
} else if (previousLeaderOption forall removed.contains) { } else if (previousLeaderOption forall removed.contains) {
// can't send HandOverToMe, previousLeader unknown for new node (or restart) // can't send HandOverToMe, previousLeader unknown for new node (or restart)
// previous leader might be down or removed, so no TakeOverFromMe message is received // previous leader might be down or removed, so no TakeOverFromMe message is received

View file

@ -5,7 +5,7 @@ package akka.actor.mailbox
import akka.dispatch.{ Envelope, MessageQueue } import akka.dispatch.{ Envelope, MessageQueue }
import akka.remote.MessageSerializer import akka.remote.MessageSerializer
import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } import akka.remote.WireFormats.{ ActorRefData, RemoteEnvelope }
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor._ import akka.actor._
@ -43,10 +43,10 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
// It's alright to use ref.path.toString here // It's alright to use ref.path.toString here
// When the sender is a LocalActorRef it should be local when deserialized also. // When the sender is a LocalActorRef it should be local when deserialized also.
// When the sender is a RemoteActorRef the path.toString already contains remote address information. // When the sender is a RemoteActorRef the path.toString already contains remote address information.
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build def serializeActorRef(ref: ActorRef): ActorRefData = ActorRefData.newBuilder.setPath(ref.path.toString).build
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef]) val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])
val builder = RemoteMessageProtocol.newBuilder val builder = RemoteEnvelope.newBuilder
.setMessage(message) .setMessage(message)
.setRecipient(serializeActorRef(owner)) .setRecipient(serializeActorRef(owner))
.setSender(serializeActorRef(durableMessage.sender)) .setSender(serializeActorRef(durableMessage.sender))
@ -60,10 +60,10 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
*/ */
def deserialize(bytes: Array[Byte]): Envelope = { def deserialize(bytes: Array[Byte]): Envelope = {
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = def deserializeActorRef(refProtocol: ActorRefData): ActorRef =
system.provider.resolveActorRef(refProtocol.getPath) system.provider.resolveActorRef(refProtocol.getPath)
val durableMessage = RemoteMessageProtocol.parseFrom(bytes) val durableMessage = RemoteEnvelope.parseFrom(bytes)
val message = MessageSerializer.deserialize(system, durableMessage.getMessage) val message = MessageSerializer.deserialize(system, durableMessage.getMessage)
val sender = deserializeActorRef(durableMessage.getSender) val sender = deserializeActorRef(durableMessage.getSender)

View file

@ -8,94 +8,68 @@ option optimize_for = SPEED;
/****************************************** /******************************************
Compile with: Compile with:
cd ./akka-remote/src/main/protocol cd ./akka-remote/src/main/protocol
protoc RemoteProtocol.proto --java_out ../java protoc WireFormats.proto --java_out ../java
cd ../../../.. cd ../../../..
./scripts/fix-protobuf.sh ./scripts/fix-protobuf.sh
*******************************************/ *******************************************/
message AkkaRemoteProtocol { /******************************************
optional bytes payload = 1; * Remoting message formats
optional RemoteControlProtocol instruction = 2; ******************************************/
message AckAndEnvelopeContainer {
optional AcknowledgementInfo ack = 1;
optional RemoteEnvelope envelope = 2;
} }
/** /**
* Defines a remote message. * Defines a remote message.
*/ */
message RemoteMessageProtocol { message RemoteEnvelope {
required ActorRefProtocol recipient = 1; required ActorRefData recipient = 1;
required MessageProtocol message = 2; required SerializedMessage message = 2;
optional ActorRefProtocol sender = 4; optional ActorRefData sender = 4;
repeated MetadataEntryProtocol metadata = 5; optional fixed64 seq = 5;
} }
/** message AcknowledgementInfo {
* Defines some control messages for the remoting required fixed64 cumulativeAck = 1;
*/ repeated fixed64 nacks = 2;
message RemoteControlProtocol {
required CommandType commandType = 1;
optional string cookie = 2;
optional AddressProtocol origin = 3;
}
/**
* Defines the type of the RemoteControlProtocol command type
*/
enum CommandType {
CONNECT = 1;
SHUTDOWN = 2;
HEARTBEAT = 3;
} }
/** /**
* Defines a remote ActorRef that "remembers" and uses its original Actor instance * Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node. * on the original node.
*/ */
message ActorRefProtocol { message ActorRefData {
required string path = 1; required string path = 1;
} }
/** /**
* Defines a message. * Defines a message.
*/ */
message MessageProtocol { message SerializedMessage {
required bytes message = 1; required bytes message = 1;
required int32 serializerId = 2; required int32 serializerId = 2;
optional bytes messageManifest = 3; optional bytes messageManifest = 3;
} }
/**
* Defines a meta data entry.
*/
message MetadataEntryProtocol {
required string key = 1;
required bytes value = 2;
}
/**
* Defines a remote address.
*/
message AddressProtocol {
required string system = 1;
required string hostname = 2;
required uint32 port = 3;
optional string protocol = 4;
}
/** /**
* Defines akka.remote.DaemonMsgCreate * Defines akka.remote.DaemonMsgCreate
*/ */
message DaemonMsgCreateProtocol { message DaemonMsgCreateData {
required PropsProtocol props = 1; required PropsData props = 1;
required DeployProtocol deploy = 2; required DeployData deploy = 2;
required string path = 3; required string path = 3;
required ActorRefProtocol supervisor = 4; required ActorRefData supervisor = 4;
} }
/** /**
* Serialization of akka.actor.Props * Serialization of akka.actor.Props
*/ */
message PropsProtocol { message PropsData {
required DeployProtocol deploy = 2; required DeployData deploy = 2;
required string clazz = 3; required string clazz = 3;
repeated bytes args = 4; repeated bytes args = 4;
repeated string classes = 5; repeated string classes = 5;
@ -104,10 +78,59 @@ message PropsProtocol {
/** /**
* Serialization of akka.actor.Deploy * Serialization of akka.actor.Deploy
*/ */
message DeployProtocol { message DeployData {
required string path = 1; required string path = 1;
optional bytes config = 2; optional bytes config = 2;
optional bytes routerConfig = 3; optional bytes routerConfig = 3;
optional bytes scope = 4; optional bytes scope = 4;
optional string dispatcher = 5; optional string dispatcher = 5;
} }
/******************************************
* Akka Protocol message formats
******************************************/
/**
* Message format of Akka Protocol.
* Message contains either a payload or an instruction.
*/
message AkkaProtocolMessage {
optional bytes payload = 1;
optional AkkaControlMessage instruction = 2;
}
/**
* Defines some control messages for the remoting
*/
message AkkaControlMessage {
required CommandType commandType = 1;
optional AkkaHandshakeInfo handshakeInfo = 2;
}
message AkkaHandshakeInfo {
required AddressData origin = 1;
required fixed64 uid = 2;
optional string cookie = 3;
}
/**
* Defines the type of the AkkaControlMessage command type
*/
enum CommandType {
CONNECT = 1;
SHUTDOWN = 2;
HEARTBEAT = 3;
}
/**
* Defines a remote address.
*/
message AddressData {
required string system = 1;
required string hostname = 2;
required uint32 port = 3;
optional string protocol = 4;
}

View file

@ -82,12 +82,6 @@ akka {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
} }
# If enabled, an inbound connection is only considered to be live after the remote
# system sent an explicit acknowledgement.
# It is recommended to leave this setting on when connectionless transports (e.g. UDP)
# are used.
wait-activity-enabled = on
# Controls the backoff interval after a refused write is reattempted. (Transports may # Controls the backoff interval after a refused write is reattempted. (Transports may
# refuse writes if their internal buffer is full) # refuse writes if their internal buffer is full)
backoff-interval = 0.01 s backoff-interval = 0.01 s
@ -245,6 +239,25 @@ akka {
# order of seconds or minutes. # order of seconds or minutes.
gate-unknown-addresses-for = 60 s gate-unknown-addresses-for = 60 s
# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.
system-message-buffer-size = 1000
# This setting defines the maximum idle time after an individual
# acknowledgement for system messages is sent. System message delivery
# is guaranteed by explicit acknowledgement messages. These acks are
# piggybacked on ordinary traffic messages. If no traffic is detected
# during the time period configured here, the remoting will send out
# an individual ack.
system-message-ack-piggyback-timeout = 0.3 s
# This setting defines the time after messages that have not been
# explicitly acknowledged or negatively acknowledged are resent.
# Messages that were negatively acknowledged are always immediately
# resent.
resend-interval = 1 s
### Transports and adapters ### Transports and adapters
# List of the transport drivers that will be loaded by the remoting. # List of the transport drivers that will be loaded by the remoting.

View file

@ -0,0 +1,191 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import scala.collection.immutable._
import akka.AkkaException
object SeqNo {
implicit val ord: Ordering[SeqNo] = new Ordering[SeqNo] {
override def compare(x: SeqNo, y: SeqNo): Int = {
val sgn = if (x.rawValue < y.rawValue) -1 else if (x.rawValue > y.rawValue) 1 else 0
if (((x.rawValue - y.rawValue) * sgn) < 0L) -sgn else sgn
}
}
}
/**
* Implements a 64 bit sequence number with proper wrap-around ordering.
*/
case class SeqNo(rawValue: Long) extends Ordered[SeqNo] {
/**
* Checks if this sequence number is an immediate successor of the provided one.
*
* @param that The second sequence number that has to be exactly one less
* @return true if this sequence number is the successor of the provided one
*/
def isSuccessor(that: SeqNo): Boolean = (this.rawValue - that.rawValue) == 1
/**
* Increments the sequence number. Wraps-around if 64 bit limit is reached.
* @return the incremented sequence number
*/
def inc: SeqNo = new SeqNo(this.rawValue + 1L)
override def compare(that: SeqNo) = SeqNo.ord.compare(this, that)
override def toString = String.valueOf(rawValue)
}
object HasSequenceNumber {
implicit def seqOrdering[T <: HasSequenceNumber]: Ordering[T] = new Ordering[T] {
def compare(x: T, y: T) = x.seq.compare(y.seq)
}
}
/**
* Messages that are to be buffered in [[akka.remote.AckedSendBuffer]] or [[akka.remote.AckedReceiveBuffer]] has
* to implement this interface to provide the sequence needed by the buffers.
*/
trait HasSequenceNumber {
/**
* Sequence number of the message
*/
def seq: SeqNo
}
/**
* Class representing an acknowledgement with selective negative acknowledgements.
*
* @param cumulativeAck Represents the highest sequence number received.
* @param nacks Set of sequence numbers between the last delivered one and cumulativeAck that has been not yet received.
*/
case class Ack(cumulativeAck: SeqNo, nacks: Set[SeqNo] = Set.empty) {
override def toString = s"ACK[$cumulativeAck, ${nacks.mkString("{", ", ", "}")}]"
}
class ResendBufferCapacityReachedException(c: Int)
extends AkkaException(s"Resend buffer capacity of [$c] has been reached.")
/**
* Implements an immutable resend buffer that buffers messages until they have been acknowledged. Properly removes messages
* when an ack is received. This buffer works together with [[akka.remote.AckedReceiveBuffer]] on the receiving end.
*
* @param capacity Maximum number of messages the buffer is willing to accept. If reached [[akka.remote.ResendBufferCapacityReachedException]]
* is thrown.
* @param nonAcked Sequence of messages that has not yet been acknowledged.
* @param nacked Sequence of messages that has been explicitly negative acknowledged.
* @param maxSeq The maximum sequence number that has been stored in this buffer. Messages having lower sequence number
* will be not stored but rejected with [[java.lang.IllegalArgumentException]]
*/
case class AckedSendBuffer[T <: HasSequenceNumber](
capacity: Int,
nonAcked: IndexedSeq[T] = Vector.empty[T],
nacked: IndexedSeq[T] = Vector.empty[T],
maxSeq: SeqNo = SeqNo(-1)) {
/**
* Processes an incoming acknowledgement and returns a new buffer with only unacknowledged elements remaining.
* @param ack The received acknowledgement
* @return An updated buffer containing the remaining unacknowledged messages
*/
def acknowledge(ack: Ack): AckedSendBuffer[T] = this.copy(
nonAcked = nonAcked.filter { m m.seq > ack.cumulativeAck },
nacked = (nacked ++ nonAcked) filter { m ack.nacks(m.seq) })
/**
* Puts a new message in the buffer. Throws [[java.lang.IllegalArgumentException]] if an out-of-sequence message
* is attempted to be stored.
* @param msg The message to be stored for possible future retransmission.
* @return The updated buffer
*/
def buffer(msg: T): AckedSendBuffer[T] = {
if (msg.seq <= maxSeq) throw new IllegalArgumentException(s"Sequence number must be monotonic. Received [${msg.seq}] " +
s"which is smaller than [$maxSeq]")
if (nonAcked.size == capacity) throw new ResendBufferCapacityReachedException(capacity)
this.copy(nonAcked = this.nonAcked :+ msg, maxSeq = msg.seq)
}
override def toString = nonAcked.map(_.seq).mkString("[", ", ", "]")
}
/**
* Implements an immutable receive buffer that buffers incoming messages until they can be safely delivered. This
* buffer works together with a [[akka.remote.AckedSendBuffer]] on the sender side.
*
* @param lastDelivered Sequence number of the last message that has been delivered.
* @param cumulativeAck The highest sequence number received so far.
* @param buf Buffer of messages that are waiting for delivery
*/
case class AckedReceiveBuffer[T <: HasSequenceNumber](
lastDelivered: SeqNo = SeqNo(-1),
cumulativeAck: SeqNo = SeqNo(-1),
buf: SortedSet[T] = TreeSet.empty[T])(implicit val seqOrdering: Ordering[T]) {
import SeqNo.ord.max
/**
* Puts a sequenced message in the receive buffer returning a new buffer.
* @param arrivedMsg message to be put into the buffer.
* @return The updated buffer containing the message.
*/
def receive(arrivedMsg: T): AckedReceiveBuffer[T] = {
this.copy(
cumulativeAck = max(arrivedMsg.seq, cumulativeAck),
buf = if (arrivedMsg.seq > lastDelivered && !buf.contains(arrivedMsg)) buf + arrivedMsg else buf)
}
/**
* Extract all messages that could be safely delivered, an updated ack to be sent to the sender, and an updated
* buffer that has the messages removed that can be delivered.
* @return Triplet of the updated buffer, messages that can be delivered and the updated acknowledgement.
*/
def extractDeliverable: (AckedReceiveBuffer[T], Seq[T], Ack) = {
var deliver = Vector.empty[T]
var ack = Ack(cumulativeAck = cumulativeAck)
var updatedLastDelivered = lastDelivered
var prev = lastDelivered
for (bufferedMsg buf) {
if (bufferedMsg.seq.isSuccessor(updatedLastDelivered)) {
deliver :+= bufferedMsg
updatedLastDelivered = updatedLastDelivered.inc
} else if (!bufferedMsg.seq.isSuccessor(prev)) {
var diff = bufferedMsg.seq.rawValue - prev.rawValue - 1
var nacks = Set.empty[SeqNo]
// Collect all missing sequence numbers (gaps)
while (diff > 0) {
nacks += SeqNo(prev.rawValue + diff)
diff -= 1
}
ack = ack.copy(nacks = ack.nacks ++ nacks)
}
prev = bufferedMsg.seq
}
(this.copy(buf = buf filterNot deliver.contains, lastDelivered = updatedLastDelivered), deliver, ack)
}
/**
* Merges two receive buffers. Merging preserves sequencing of messages, and drops all messages that has been
* safely acknowledged by any of the participating buffers. Also updates the expected sequence numbers.
* @param that The receive buffer to merge with
* @return The merged receive buffer.
*/
def mergeFrom(that: AckedReceiveBuffer[T]): AckedReceiveBuffer[T] = {
this.copy(
lastDelivered = max(this.lastDelivered, that.lastDelivered),
cumulativeAck = max(this.cumulativeAck, that.cumulativeAck),
buf = (this.buf union that.buf).filter { _.seq > lastDelivered })
}
override def toString = buf.map { _.seq }.mkString("[", ", ", "]")
}

View file

@ -3,21 +3,27 @@
*/ */
package akka.remote package akka.remote
import akka.{ OnlyCauseStackTrace, AkkaException }
import akka.actor._ import akka.actor._
import akka.dispatch.sysmsg.SystemMessage import akka.dispatch.sysmsg.SystemMessage
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.pattern.pipe import akka.pattern.pipe
import akka.remote.EndpointManager.Send import akka.remote.EndpointManager.Send
import akka.remote.RemoteProtocol.MessageProtocol import akka.remote.WireFormats.SerializedMessage
import akka.remote.transport.AkkaPduCodec._ import akka.remote.transport.AkkaPduCodec._
import akka.remote.transport.AssociationHandle._ import akka.remote.transport.AssociationHandle._
import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.remote.transport.Transport.InvalidAssociationException
import akka.remote.transport.{ AkkaPduProtobufCodec, AkkaPduCodec, Transport, AkkaProtocolHandle }
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.util.ByteString import akka.util.ByteString
import scala.util.control.NonFatal import akka.{ OnlyCauseStackTrace, AkkaException }
import akka.remote.transport.Transport.InvalidAssociationException
import java.io.NotSerializableException import java.io.NotSerializableException
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.duration.{ Duration, Deadline }
import scala.util.control.NonFatal
import scala.annotation.tailrec
import akka.remote.EndpointWriter.FlushAndStop
import akka.actor.SupervisorStrategy._
import akka.remote.EndpointManager.Link
/** /**
* INTERNAL API * INTERNAL API
@ -25,7 +31,7 @@ import java.io.NotSerializableException
private[remote] trait InboundMessageDispatcher { private[remote] trait InboundMessageDispatcher {
def dispatch(recipient: InternalActorRef, def dispatch(recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
serializedMessage: MessageProtocol, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]): Unit senderOption: Option[ActorRef]): Unit
} }
@ -40,7 +46,7 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor
override def dispatch(recipient: InternalActorRef, override def dispatch(recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
serializedMessage: MessageProtocol, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]): Unit = { senderOption: Option[ActorRef]): Unit = {
import provider.remoteSettings._ import provider.remoteSettings._
@ -87,29 +93,6 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor
} }
/**
* INTERNAL API
*/
private[remote] object EndpointWriter {
/**
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
* to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the
* same remote endpoint: when a parallel inbound association is detected, the old one is removed and the new one is
* used instead.
* @param handle Handle of the new inbound association.
*/
case class TakeOver(handle: AssociationHandle)
case object BackoffTimer
case object FlushAndStop
sealed trait State
case object Initializing extends State
case object Buffering extends State
case object Writing extends State
case object Handoff extends State
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -125,6 +108,13 @@ private[remote] class EndpointException(msg: String, cause: Throwable) extends A
private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
extends EndpointException("Invalid address: " + remoteAddress, cause) extends EndpointException("Invalid address: " + remoteAddress, cause)
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[remote] case class HopelessAssociation(localAddress: Address, remoteAddress: Address, uid: Option[Int], cause: Throwable)
extends EndpointException("Catastrophic association error.")
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -143,6 +133,182 @@ private[remote] class EndpointAssociationException(msg: String, cause: Throwable
@SerialVersionUID(1L) @SerialVersionUID(1L)
private[remote] class OversizedPayloadException(msg: String) extends EndpointException(msg) private[remote] class OversizedPayloadException(msg: String) extends EndpointException(msg)
/**
* INTERNAL API
*/
private[remote] object ReliableDeliverySupervisor {
case object Ungate
case class GotUid(uid: Int)
def apply(
handleOrActive: Option[AkkaProtocolHandle],
localAddress: Address,
remoteAddress: Address,
transport: Transport,
settings: RemoteSettings,
codec: AkkaPduCodec,
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props =
Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, transport, settings,
codec, receiveBuffers)
}
/**
* INTERNAL API
*/
private[remote] class ReliableDeliverySupervisor(
handleOrActive: Option[AkkaProtocolHandle],
val localAddress: Address,
val remoteAddress: Address,
val transport: Transport,
val settings: RemoteSettings,
val codec: AkkaPduCodec,
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor {
import ReliableDeliverySupervisor._
def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) {
case e @ (_: InvalidAssociation | _: HopelessAssociation) Escalate
case NonFatal(e)
if (retryGateEnabled) {
import context.dispatcher
context.become(gated)
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
context.unwatch(writer)
currentHandle = None
Stop
} else {
Restart
}
}
var currentHandle: Option[AkkaProtocolHandle] = handleOrActive
var resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
var resendDeadline = Deadline.now + settings.SysResendTimeout
var lastCumulativeAck = SeqNo(-1)
val nextSeq = {
var seqCounter: Long = 0L
() {
val tmp = seqCounter
seqCounter += 1
SeqNo(tmp)
}
}
var writer: ActorRef = createWriter()
var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
override def postStop(): Unit = {
// All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
// number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are
// still possibly retransmitted.
// Such a situation may arise when the EndpointWriter is shut down, and all of its mailbox contents are delivered
// to dead letters. These messages should be ignored, as they still live in resendBuffer and might be delivered to
// the remote system later.
(resendBuffer.nacked ++ resendBuffer.nonAcked) foreach { s context.system.deadLetters ! s.copy(seqOpt = None) }
receiveBuffers.remove(Link(localAddress, remoteAddress))
}
override def postRestart(reason: Throwable): Unit = {
throw new IllegalStateException(
"BUG: ReliableDeliverySupervisor has been attempted to be restarted. This must not happen.")
}
override def receive: Receive = {
case FlushAndStop
// Trying to serve until our last breath
resendAll()
writer ! FlushAndStop
context.become(flushWait)
case s: Send
handleSend(s)
case ack: Ack
resendBuffer = resendBuffer.acknowledge(ack)
if (lastCumulativeAck < ack.cumulativeAck) {
resendDeadline = Deadline.now + settings.SysResendTimeout
lastCumulativeAck = ack.cumulativeAck
} else if (resendDeadline.isOverdue()) {
resendAll()
resendDeadline = Deadline.now + settings.SysResendTimeout
}
resendNacked()
case Terminated(_)
currentHandle = None
context.become(idle)
case GotUid(u) uid = Some(u)
}
def gated: Receive = {
case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
writer = createWriter()
resendAll()
context.become(receive)
} else context.become(idle)
case s @ Send(msg: SystemMessage, _, _, _) tryBuffer(s.copy(seqOpt = Some(nextSeq())))
case s: Send context.system.deadLetters ! s
case FlushAndStop context.stop(self)
case _ // Ignore
}
def idle: Receive = {
case s: Send
writer = createWriter()
resendAll()
handleSend(s)
context.become(receive)
case FlushAndStop context.stop(self)
}
def flushWait: Receive = {
case Terminated(_)
// Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down
// and don't really know if they were properly delivered or not.
resendBuffer = new AckedSendBuffer[Send](0)
context.stop(self)
case _ // Ignore
}
private def handleSend(send: Send): Unit =
if (send.message.isInstanceOf[SystemMessage]) {
val sequencedSend = send.copy(seqOpt = Some(nextSeq()))
tryBuffer(sequencedSend)
writer ! sequencedSend
} else writer ! send
private def resendNacked(): Unit = resendBuffer.nacked foreach { writer ! _ }
private def resendAll(): Unit = {
resendNacked()
resendBuffer.nonAcked foreach { writer ! _ }
}
private def tryBuffer(s: Send): Unit =
try {
resendBuffer = resendBuffer buffer s
} catch {
case NonFatal(e) throw new HopelessAssociation(localAddress, remoteAddress, uid, e)
}
private def createWriter(): ActorRef = {
context.watch(context.actorOf(EndpointWriter(
handleOrActive = currentHandle,
localAddress = localAddress,
remoteAddress = remoteAddress,
transport = transport,
settings = settings,
AkkaPduProtobufCodec,
receiveBuffers = receiveBuffers,
reliableDeliverySupervisor = Some(self))
.withDispatcher("akka.remote.writer-dispatcher"),
"endpointWriter"))
}
}
/**
* INTERNAL API
*/
private[remote] abstract class EndpointActor( private[remote] abstract class EndpointActor(
val localAddress: Address, val localAddress: Address,
val remoteAddress: Address, val remoteAddress: Address,
@ -161,16 +327,58 @@ private[remote] abstract class EndpointActor(
} }
} }
/**
* INTERNAL API
*/
private[remote] object EndpointWriter {
def apply(
handleOrActive: Option[AkkaProtocolHandle],
localAddress: Address,
remoteAddress: Address,
transport: Transport,
settings: RemoteSettings,
codec: AkkaPduCodec,
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]],
reliableDeliverySupervisor: Option[ActorRef]): Props =
Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, transport, settings, codec,
receiveBuffers, reliableDeliverySupervisor)
/**
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
* to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the
* same remote endpoint: when a parallel inbound association is detected, the old one is removed and the new one is
* used instead.
* @param handle Handle of the new inbound association.
*/
case class TakeOver(handle: AkkaProtocolHandle)
case object BackoffTimer
case object FlushAndStop
case object AckIdleCheckTimer
case class OutboundAck(ack: Ack)
sealed trait State
case object Initializing extends State
case object Buffering extends State
case object Writing extends State
case object Handoff extends State
val AckIdleTimerName = "AckIdleTimer"
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[remote] class EndpointWriter( private[remote] class EndpointWriter(
handleOrActive: Option[AssociationHandle], handleOrActive: Option[AkkaProtocolHandle],
localAddress: Address, localAddress: Address,
remoteAddress: Address, remoteAddress: Address,
transport: Transport, transport: Transport,
settings: RemoteSettings, settings: RemoteSettings,
codec: AkkaPduCodec) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with Stash with FSM[EndpointWriter.State, Unit] { codec: AkkaPduCodec,
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]],
val reliableDeliverySupervisor: Option[ActorRef]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with Stash with FSM[EndpointWriter.State, Unit] {
import EndpointWriter._ import EndpointWriter._
import context.dispatcher import context.dispatcher
@ -178,9 +386,14 @@ private[remote] class EndpointWriter(
val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem] val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem]
var reader: Option[ActorRef] = None var reader: Option[ActorRef] = None
var handle: Option[AssociationHandle] = handleOrActive // FIXME: refactor into state data var handle: Option[AkkaProtocolHandle] = handleOrActive // FIXME: refactor into state data
val readerId = Iterator from 0 val readerId = Iterator from 0
def newAckDeadline: Deadline = Deadline.now + settings.SysMsgAckTimeout
var ackDeadline: Deadline = newAckDeadline
var lastAck: Option[Ack] = None
override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) publishAndThrow(e) } override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) publishAndThrow(e) }
val msgDispatch = new DefaultMessageDispatcher(extendedSystem, RARP(extendedSystem).provider, log) val msgDispatch = new DefaultMessageDispatcher(extendedSystem, RARP(extendedSystem).provider, log)
@ -203,7 +416,10 @@ private[remote] class EndpointWriter(
preStart() preStart()
} }
override def preStart(): Unit = override def preStart(): Unit = {
setTimer(AckIdleTimerName, AckIdleCheckTimer, settings.SysMsgAckTimeout / 2, repeat = true)
startWith( startWith(
handle match { handle match {
case Some(h) case Some(h)
@ -213,18 +429,20 @@ private[remote] class EndpointWriter(
transport.associate(remoteAddress) pipeTo self transport.associate(remoteAddress) pipeTo self
Initializing Initializing
}, },
()) stateData = ())
}
when(Initializing) { when(Initializing) {
case Event(Send(msg, senderOption, recipient), _) case Event(Send(msg, senderOption, recipient, _), _)
stash() stash()
stay() stay()
case Event(Status.Failure(e: InvalidAssociationException), _) case Event(Status.Failure(e: InvalidAssociationException), _)
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
case Event(Status.Failure(e), _) case Event(Status.Failure(e), _)
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e)) publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
case Event(inboundHandle: AssociationHandle, _) case Event(inboundHandle: AkkaProtocolHandle, _)
// Assert handle == None? // Assert handle == None?
context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid)
handle = Some(inboundHandle) handle = Some(inboundHandle)
reader = startReadEndpoint(inboundHandle) reader = startReadEndpoint(inboundHandle)
goto(Writing) goto(Writing)
@ -232,7 +450,7 @@ private[remote] class EndpointWriter(
} }
when(Buffering) { when(Buffering) {
case Event(Send(msg, senderOption, recipient), _) case Event(_: Send, _)
stash() stash()
stay() stay()
@ -244,17 +462,26 @@ private[remote] class EndpointWriter(
} }
when(Writing) { when(Writing) {
case Event(Send(msg, senderOption, recipient), _) case Event(s @ Send(msg, senderOption, recipient, seqOption), _)
try { try {
handle match { handle match {
case Some(h) case Some(h)
val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) ackDeadline = newAckDeadline
val pdu = codec.constructMessage(
recipient.localAddressToUse,
recipient,
serializeMessage(msg),
senderOption,
seqOption = seqOption,
ackOption = lastAck)
if (pdu.size > transport.maximumPayloadBytes) { if (pdu.size > transport.maximumPayloadBytes) {
publishAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes.")) publishAndStay(new OversizedPayloadException(s"Discarding oversized payload sent to ${recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${msg.getClass} was ${pdu.size} bytes."))
} else if (h.write(pdu)) { } else if (h.write(pdu)) {
stay() stay()
} else { } else {
stash() if (seqOption.isEmpty) stash()
goto(Buffering) goto(Buffering)
} }
case None case None
@ -267,7 +494,14 @@ private[remote] class EndpointWriter(
} }
// We are in Writing state, so stash is empty, safe to stop here // We are in Writing state, so stash is empty, safe to stop here
case Event(FlushAndStop, _) stop() case Event(FlushAndStop, _)
// Try to send a last Ack message
trySendPureAck()
stop()
case Event(AckIdleCheckTimer, _) if ackDeadline.isOverdue()
trySendPureAck()
stay()
} }
when(Handoff) { when(Handoff) {
@ -276,16 +510,15 @@ private[remote] class EndpointWriter(
unstashAll() unstashAll()
goto(Writing) goto(Writing)
case Event(Send(msg, senderOption, recipient), _) case Event(Send(msg, senderOption, recipient, _), _)
stash() stash()
stay() stay()
// TakeOver messages are not handled here but inside the whenUnhandled block, because the procedure is exactly the
// same. Any outstanding
} }
whenUnhandled { whenUnhandled {
case Event(Terminated(r), _) if r == reader.orNull publishAndThrow(new EndpointDisassociatedException("Disassociated")) case Event(Terminated(r), _) if r == reader.orNull
publishAndThrow(new EndpointDisassociatedException("Disassociated"))
case Event(TakeOver(newHandle), _) case Event(TakeOver(newHandle), _)
// Shutdown old reader // Shutdown old reader
handle foreach { _.disassociate() } handle foreach { _.disassociate() }
@ -294,6 +527,11 @@ private[remote] class EndpointWriter(
goto(Handoff) goto(Handoff)
case Event(FlushAndStop, _) case Event(FlushAndStop, _)
stop() stop()
case Event(OutboundAck(ack), _)
lastAck = Some(ack)
trySendPureAck()
stay()
case Event(AckIdleCheckTimer, _) stay() // Ignore
} }
onTransition { onTransition {
@ -309,7 +547,7 @@ private[remote] class EndpointWriter(
onTermination { onTermination {
case StopEvent(_, _, _) case StopEvent(_, _, _)
// FIXME: Add a test case for this cancelTimer(AckIdleTimerName)
// It is important to call unstashAll() for the stash to work properly and maintain messages during restart. // It is important to call unstashAll() for the stash to work properly and maintain messages during restart.
// As the FSM trait does not call super.postStop(), this call is needed // As the FSM trait does not call super.postStop(), this call is needed
unstashAll() unstashAll()
@ -317,16 +555,20 @@ private[remote] class EndpointWriter(
eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound)) eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound))
} }
private def startReadEndpoint(handle: AssociationHandle): Some[ActorRef] = { private def trySendPureAck(): Unit = for (h handle; ack lastAck)
if (h.write(codec.constructPureAck(ack))) ackDeadline = newAckDeadline
private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
val newReader = val newReader =
context.watch(context.actorOf( context.watch(context.actorOf(
Props(new EndpointReader(localAddress, remoteAddress, transport, settings, codec, msgDispatch, inbound)), EndpointReader(localAddress, remoteAddress, transport, settings, codec,
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers),
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())) "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
handle.readHandlerPromise.success(ActorHandleEventListener(newReader)) handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
Some(newReader) Some(newReader)
} }
private def serializeMessage(msg: Any): MessageProtocol = handle match { private def serializeMessage(msg: Any): SerializedMessage = handle match {
case Some(h) case Some(h)
Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, context.system)) { Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, context.system)) {
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
@ -337,6 +579,26 @@ private[remote] class EndpointWriter(
} }
/**
* INTERNAL API
*/
private[remote] object EndpointReader {
def apply(
localAddress: Address,
remoteAddress: Address,
transport: Transport,
settings: RemoteSettings,
codec: AkkaPduCodec,
msgDispatch: InboundMessageDispatcher,
inbound: Boolean,
reliableDeliverySupervisor: Option[ActorRef],
receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props =
Props(classOf[EndpointReader], localAddress, remoteAddress, transport, settings, codec, msgDispatch, inbound,
reliableDeliverySupervisor, receiveBuffers)
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -347,23 +609,73 @@ private[remote] class EndpointReader(
settings: RemoteSettings, settings: RemoteSettings,
codec: AkkaPduCodec, codec: AkkaPduCodec,
msgDispatch: InboundMessageDispatcher, msgDispatch: InboundMessageDispatcher,
val inbound: Boolean) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) { val inbound: Boolean,
val reliableDeliverySupervisor: Option[ActorRef],
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) {
import EndpointWriter.OutboundAck
val provider = RARP(context.system).provider val provider = RARP(context.system).provider
var ackedReceiveBuffer = new AckedReceiveBuffer[Message]
override def preStart(): Unit = {
receiveBuffers.get(Link(localAddress, remoteAddress)) match {
case null
case buf
ackedReceiveBuffer = buf
deliverAndAck()
}
}
override def postStop(): Unit = {
@tailrec
def updateSavedState(key: Link, expectedState: AckedReceiveBuffer[Message]): Unit = {
if (expectedState eq null) {
if (receiveBuffers.putIfAbsent(key, ackedReceiveBuffer) ne null) updateSavedState(key, receiveBuffers.get(key))
} else if (!receiveBuffers.replace(key, expectedState, expectedState.mergeFrom(ackedReceiveBuffer)))
updateSavedState(key, receiveBuffers.get(key))
}
val key = Link(localAddress, remoteAddress)
updateSavedState(key, receiveBuffers.get(key))
}
override def receive: Receive = { override def receive: Receive = {
case Disassociated context.stop(self) case Disassociated context.stop(self)
case InboundPayload(p) case InboundPayload(p) if p.size <= transport.maximumPayloadBytes
if (p.size > transport.maximumPayloadBytes) { val (ackOption, msgOption) = tryDecodeMessageAndAck(p)
publishError(new OversizedPayloadException(s"Discarding oversized payload received: max allowed size ${transport.maximumPayloadBytes} bytes, actual size ${p.size} bytes."))
} else { for (ack ackOption; reliableDelivery reliableDeliverySupervisor) reliableDelivery ! ack
val msg = decodePdu(p)
msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) msgOption match {
case Some(msg)
if (msg.reliableDeliveryEnabled) {
ackedReceiveBuffer = ackedReceiveBuffer.receive(msg)
deliverAndAck()
} else msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption)
case None
} }
case InboundPayload(oversized)
publishError(new OversizedPayloadException(s"Discarding oversized payload received: " +
s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."))
} }
private def decodePdu(pdu: ByteString): Message = try { private def deliverAndAck(): Unit = {
val (updatedBuffer, deliver, ack) = ackedReceiveBuffer.extractDeliverable
ackedReceiveBuffer = updatedBuffer
// Notify writer that some messages can be acked
context.parent ! OutboundAck(ack)
deliver foreach { m
msgDispatch.dispatch(m.recipient, m.recipientAddress, m.serializedMessage, m.senderOption)
}
}
private def tryDecodeMessageAndAck(pdu: ByteString): (Option[Ack], Option[Message]) = try {
codec.decodeMessage(pdu, provider, localAddress) codec.decodeMessage(pdu, provider, localAddress)
} catch { } catch {
case NonFatal(e) throw new EndpointException("Error while decoding incoming Akka PDU", e) case NonFatal(e) throw new EndpointException("Error while decoding incoming Akka PDU", e)

View file

@ -4,7 +4,7 @@
package akka.remote package akka.remote
import akka.remote.RemoteProtocol._ import akka.remote.WireFormats._
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
@ -19,7 +19,7 @@ private[akka] object MessageSerializer {
/** /**
* Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message
*/ */
def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = { def deserialize(system: ExtendedActorSystem, messageProtocol: SerializedMessage): AnyRef = {
SerializationExtension(system).deserialize( SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray, messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId, messageProtocol.getSerializerId,
@ -29,10 +29,10 @@ private[akka] object MessageSerializer {
/** /**
* Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol
*/ */
def serialize(system: ExtendedActorSystem, message: AnyRef): MessageProtocol = { def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = {
val s = SerializationExtension(system) val s = SerializationExtension(system)
val serializer = s.findSerializerFor(message) val serializer = s.findSerializerFor(message)
val builder = MessageProtocol.newBuilder val builder = SerializedMessage.newBuilder
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier) builder.setSerializerId(serializer.identifier)
if (serializer.includeManifest) if (serializer.includeManifest)

View file

@ -14,6 +14,7 @@ import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook }
import scala.util.control.Exception.Catcher import scala.util.control.Exception.Catcher
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.forkjoin.ThreadLocalRandom
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.ConfigurationException import akka.ConfigurationException
@ -74,16 +75,18 @@ private[akka] object RemoteActorRefProvider {
private class RemoteDeadLetterActorRef(_provider: ActorRefProvider, private class RemoteDeadLetterActorRef(_provider: ActorRefProvider,
_path: ActorPath, _path: ActorPath,
_eventStream: EventStream) extends DeadLetterActorRef(_provider, _path, _eventStream) { _eventStream: EventStream) extends DeadLetterActorRef(_provider, _path, _eventStream) {
import EndpointManager.Send
override def !(message: Any)(implicit sender: ActorRef): Unit = message match { override def !(message: Any)(implicit sender: ActorRef): Unit = message match {
case EndpointManager.Send(m, senderOption, _) super.!(m)(senderOption.orNull) case Send(m, senderOption, _, seqOpt)
case _ super.!(message)(sender) // else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved
} // the dead letter status
if (seqOpt.isEmpty) super.!(m)(senderOption.orNull)
override def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match { case DeadLetter(Send(m, senderOption, recipient, seqOpt), _, _)
// unwrap again in case the original message was DeadLetter(EndpointManager.Send(m)) // else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved
case EndpointManager.Send(m, _, _) super.specialHandle(m, sender) // the dead letter status
case _ super.specialHandle(msg, sender) if (seqOpt.isEmpty) super.!(m)(senderOption.orNull)
case _ super.!(message)(sender)
} }
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])

View file

@ -42,6 +42,12 @@ class RemoteSettings(val config: Config) {
val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remote.backoff-interval"), MILLISECONDS) val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remote.backoff-interval"), MILLISECONDS)
val SysMsgAckTimeout: FiniteDuration = Duration(getMilliseconds("akka.remote.system-message-ack-piggyback-timeout"), MILLISECONDS)
val SysResendTimeout: FiniteDuration = Duration(getMilliseconds("akka.remote.resend-interval"), MILLISECONDS)
val SysMsgBufferSize: Int = getInt("akka.remote.system-message-buffer-size")
val CommandAckTimeout: Timeout = val CommandAckTimeout: Timeout =
Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS)) Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS))

View file

@ -7,7 +7,7 @@ import akka.actor.SupervisorStrategy._
import akka.actor._ import akka.actor._
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.pattern.{ gracefulStop, pipe, ask } import akka.pattern.{ AskTimeoutException, gracefulStop, pipe, ask }
import akka.remote.EndpointManager._ import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation } import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation }
@ -21,6 +21,8 @@ import scala.concurrent.duration._
import scala.concurrent.{ Promise, Await, Future } import scala.concurrent.{ Promise, Await, Future }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
/** /**
* INTERNAL API * INTERNAL API
@ -223,8 +225,13 @@ private[remote] object EndpointManager {
case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand
case object StartupFinished extends RemotingCommand case object StartupFinished extends RemotingCommand
case object ShutdownAndFlush extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand
case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand { case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None)
extends RemotingCommand with HasSequenceNumber {
override def toString = s"Remote message $senderOption -> $recipient" override def toString = s"Remote message $senderOption -> $recipient"
// This MUST throw an exception to indicate that we attempted to put a nonsequenced message in one of the
// acknowledged delivery buffers
def seq = seqOpt.get
} }
case class ManagementCommand(cmd: Any) extends RemotingCommand case class ManagementCommand(cmd: Any) extends RemotingCommand
case class ManagementCommandAck(status: Boolean) case class ManagementCommandAck(status: Boolean)
@ -234,6 +241,9 @@ private[remote] object EndpointManager {
case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]], case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]],
results: Seq[(Transport, Address, Promise[AssociationEventListener])]) results: Seq[(Transport, Address, Promise[AssociationEventListener])])
// Helper class to store address pairs
case class Link(localAddress: Address, remoteAddress: Address)
sealed trait EndpointPolicy { sealed trait EndpointPolicy {
/** /**
@ -358,29 +368,29 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
log.error("Tried to associate with invalid remote address [{}]. " + log.error("Tried to associate with invalid remote address [{}]. " +
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
context.system.eventStream.publish(AddressTerminated(remoteAddress))
Stop
case e @ HopelessAssociation(localAddress, remoteAddress, uid, _)
log.error("Association with [{}] having uid [{}] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover" +
"from this situation.", remoteAddress, uid)
endpoints.markAsQuarantined(remoteAddress, e) // TODO: quarantine uid
context.system.eventStream.publish(AddressTerminated(remoteAddress))
Stop Stop
case NonFatal(e) case NonFatal(e)
// logging // logging
e match { e match {
case _: EndpointDisassociatedException | _: EndpointAssociationException // no logging case _: EndpointDisassociatedException | _: EndpointAssociationException // no logging
case _ log.error(e, e.getMessage) case _ log.error(e, e.getMessage)
} }
Stop
// Retrying immediately if the retry gate is disabled, and it is an endpoint used for writing.
if (!retryGateEnabled && endpoints.isWritable(sender)) {
// This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue
// to the restarted endpoint -- thus no messages are lost
Restart
} else {
// This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure,
// keeps throwing away messages until the retry gate becomes open (time specified in RetryGateClosedFor)
endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor)
Stop
}
} }
// Structure for saving reliable delivery state across restarts of Endpoints
val receiveBuffers = new ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]()
def receive = { def receive = {
case Listen(addressesPromise) case Listen(addressesPromise)
listens map { ListensResult(addressesPromise, _) } pipeTo self listens map { ListensResult(addressesPromise, _) } pipeTo self
@ -415,7 +425,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender
case s @ Send(message, senderOption, recipientRef) case s @ Send(message, senderOption, recipientRef, _)
val recipientAddress = recipientRef.path.address val recipientAddress = recipientRef.path.address
def createAndRegisterWritingEndpoint(): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint( def createAndRegisterWritingEndpoint(): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint(
@ -423,34 +433,37 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
recipientRef.localAddressToUse, recipientRef.localAddressToUse,
transportMapping(recipientRef.localAddressToUse), transportMapping(recipientRef.localAddressToUse),
settings, settings,
None)) None,
writing = true))
endpoints.writableEndpointWithPolicyFor(recipientAddress) match { endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
case Some(Pass(endpoint)) case Some(Pass(endpoint))
endpoint ! s endpoint ! s
case Some(Gated(timeOfRelease)) case Some(Gated(timeOfRelease))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
else forwardToDeadLetters(s) else extendedSystem.deadLetters ! s
case Some(Quarantined(_)) case Some(Quarantined(_))
forwardToDeadLetters(s) extendedSystem.deadLetters ! s
case None case None
createAndRegisterWritingEndpoint() ! s createAndRegisterWritingEndpoint() ! s
} }
case InboundAssociation(handle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint) endpoint ! EndpointWriter.TakeOver(handle) case Some(endpoint) endpoint ! EndpointWriter.TakeOver(handle)
case None case None
if (endpoints.isQuarantined(handle.remoteAddress)) handle.disassociate() if (endpoints.isQuarantined(handle.remoteAddress)) handle.disassociate()
else { else {
val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, true)) eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, true))
val endpoint = createEndpoint( val endpoint = createEndpoint(
handle.remoteAddress, handle.remoteAddress,
handle.localAddress, handle.localAddress,
transportMapping(handle.localAddress), transportMapping(handle.localAddress),
settings, settings,
Some(handle)) Some(handle),
if (settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)) writing)
if (writing)
endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint) endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint)
else else
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
@ -465,22 +478,19 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
val sys = context.system // Avoid closing over context val sys = context.system // Avoid closing over context
Future sequence endpoints.allEndpoints.map { Future sequence endpoints.allEndpoints.map {
gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop) gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop)
} map { _.foldLeft(true) { _ && _ } } pipeTo sender } map { _.foldLeft(true) { _ && _ } } recover {
case _: AskTimeoutException false
} pipeTo sender
// Ignore all other writes // Ignore all other writes
context.become(flushing) context.become(flushing)
} }
def flushing: Receive = { def flushing: Receive = {
case s: Send forwardToDeadLetters(s) case s: Send extendedSystem.deadLetters ! s
case InboundAssociation(h) h.disassociate() case InboundAssociation(h) h.disassociate()
case Terminated(_) // why should we care now? case Terminated(_) // why should we care now?
} }
private def forwardToDeadLetters(s: Send): Unit = {
val sender = s.senderOption.getOrElse(extendedSystem.deadLetters)
extendedSystem.deadLetters.tell(DeadLetter(s.message, sender, s.recipient), sender)
}
private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = { private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = {
/* /*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
@ -532,18 +542,28 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
localAddress: Address, localAddress: Address,
transport: Transport, transport: Transport,
endpointSettings: RemoteSettings, endpointSettings: RemoteSettings,
handleOption: Option[AssociationHandle]): ActorRef = { handleOption: Option[AkkaProtocolHandle],
writing: Boolean): ActorRef = {
assert(transportMapping contains localAddress) assert(transportMapping contains localAddress)
context.watch(context.actorOf(Props( if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor(
new EndpointWriter( handleOption,
handleOption, localAddress,
localAddress, remoteAddress,
remoteAddress, transport,
transport, endpointSettings,
endpointSettings, AkkaPduProtobufCodec,
AkkaPduProtobufCodec)) receiveBuffers).withDispatcher("akka.remote.writer-dispatcher"),
.withDispatcher("akka.remote.writer-dispatcher"), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
else context.watch(context.actorOf(EndpointWriter(
handleOption,
localAddress,
remoteAddress,
transport,
endpointSettings,
AkkaPduProtobufCodec,
receiveBuffers,
reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher"),
"endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
} }

View file

@ -10,7 +10,7 @@ import com.google.protobuf.ByteString
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.{ Actor, ActorRef, Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope } import akka.actor.{ Actor, ActorRef, Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope }
import akka.remote.DaemonMsgCreate import akka.remote.DaemonMsgCreate
import akka.remote.RemoteProtocol.{ DaemonMsgCreateProtocol, DeployProtocol, PropsProtocol } import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData }
import akka.routing.{ NoRouter, RouterConfig } import akka.routing.{ NoRouter, RouterConfig }
import scala.reflect.ClassTag import scala.reflect.ClassTag
import util.{ Failure, Success } import util.{ Failure, Success }
@ -36,8 +36,8 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
def toBinary(obj: AnyRef): Array[Byte] = obj match { def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgCreate(props, deploy, path, supervisor) case DaemonMsgCreate(props, deploy, path, supervisor)
def deployProto(d: Deploy): DeployProtocol = { def deployProto(d: Deploy): DeployData = {
val builder = DeployProtocol.newBuilder.setPath(d.path) val builder = DeployData.newBuilder.setPath(d.path)
if (d.config != ConfigFactory.empty) if (d.config != ConfigFactory.empty)
builder.setConfig(serialize(d.config)) builder.setConfig(serialize(d.config))
if (d.routerConfig != NoRouter) if (d.routerConfig != NoRouter)
@ -50,7 +50,7 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
} }
def propsProto = { def propsProto = {
val builder = PropsProtocol.newBuilder val builder = PropsData.newBuilder
.setClazz(props.clazz.getName) .setClazz(props.clazz.getName)
.setDeploy(deployProto(props.deploy)) .setDeploy(deployProto(props.deploy))
props.args map serialize foreach builder.addArgs props.args map serialize foreach builder.addArgs
@ -58,7 +58,7 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
builder.build builder.build
} }
DaemonMsgCreateProtocol.newBuilder. DaemonMsgCreateData.newBuilder.
setProps(propsProto). setProps(propsProto).
setDeploy(deployProto(deploy)). setDeploy(deployProto(deploy)).
setPath(path). setPath(path).
@ -71,9 +71,9 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
} }
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgCreateProtocol.parseFrom(bytes) val proto = DaemonMsgCreateData.parseFrom(bytes)
def deploy(protoDeploy: DeployProtocol): Deploy = { def deploy(protoDeploy: DeployData): Deploy = {
val config = val config =
if (protoDeploy.hasConfig) deserialize(protoDeploy.getConfig, classOf[Config]) if (protoDeploy.hasConfig) deserialize(protoDeploy.getConfig, classOf[Config])
else ConfigFactory.empty else ConfigFactory.empty

View file

@ -4,10 +4,10 @@
package akka.remote.serialization package akka.remote.serialization
import akka.actor.{ ExtendedActorSystem, ActorRef }
import akka.remote.WireFormats.ActorRefData
import akka.serialization.{ Serializer, Serialization } import akka.serialization.{ Serializer, Serialization }
import com.google.protobuf.Message import com.google.protobuf.Message
import akka.actor.{ ActorSystem, ActorRef, ExtendedActorSystem }
import akka.remote.RemoteProtocol.ActorRefProtocol
object ProtobufSerializer { object ProtobufSerializer {
@ -15,8 +15,8 @@ object ProtobufSerializer {
* Helper to serialize an [[akka.actor.ActorRef]] to Akka's * Helper to serialize an [[akka.actor.ActorRef]] to Akka's
* protobuf representation. * protobuf representation.
*/ */
def serializeActorRef(ref: ActorRef): ActorRefProtocol = { def serializeActorRef(ref: ActorRef): ActorRefData = {
ActorRefProtocol.newBuilder.setPath(Serialization.serializedActorPath(ref)).build ActorRefData.newBuilder.setPath(Serialization.serializedActorPath(ref)).build
} }
/** /**
@ -24,7 +24,7 @@ object ProtobufSerializer {
* from Akka's protobuf representation in the supplied * from Akka's protobuf representation in the supplied
* [[akka.actor.ActorSystem]]. * [[akka.actor.ActorSystem]].
*/ */
def deserializeActorRef(system: ExtendedActorSystem, refProtocol: ActorRefProtocol): ActorRef = def deserializeActorRef(system: ExtendedActorSystem, refProtocol: ActorRefData): ActorRef =
system.provider.resolveActorRef(refProtocol.getPath) system.provider.resolveActorRef(refProtocol.getPath)
} }

View file

@ -5,12 +5,16 @@ package akka.remote.transport
import akka.AkkaException import akka.AkkaException
import akka.actor.{ AddressFromURIString, InternalActorRef, Address, ActorRef } import akka.actor.{ AddressFromURIString, InternalActorRef, Address, ActorRef }
import akka.remote.RemoteProtocol._ import akka.remote.WireFormats._
import akka.remote.transport.AkkaPduCodec._ import akka.remote.transport.AkkaPduCodec._
import akka.remote.{ RemoteActorRefProvider, RemoteProtocol } import akka.remote._
import akka.util.ByteString import akka.util.ByteString
import com.google.protobuf.InvalidProtocolBufferException import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.{ ByteString PByteString } import com.google.protobuf.{ ByteString PByteString }
import akka.remote.Ack
import akka.remote.transport.AkkaPduCodec.Payload
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AkkaPduCodec.Message
/** /**
* INTERNAL API * INTERNAL API
@ -30,16 +34,21 @@ private[remote] object AkkaPduCodec {
* Trait that represents decoded Akka PDUs (Protocol Data Units) * Trait that represents decoded Akka PDUs (Protocol Data Units)
*/ */
sealed trait AkkaPdu sealed trait AkkaPdu
case class Associate(info: HandshakeInfo) extends AkkaPdu
case class Associate(cookie: Option[String], origin: Address) extends AkkaPdu
case object Disassociate extends AkkaPdu case object Disassociate extends AkkaPdu
case object Heartbeat extends AkkaPdu case object Heartbeat extends AkkaPdu
case class Payload(bytes: ByteString) extends AkkaPdu case class Payload(bytes: ByteString) extends AkkaPdu
case class Message(recipient: InternalActorRef, case class Message(recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
serializedMessage: MessageProtocol, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]) senderOption: Option[ActorRef],
seqOption: Option[SeqNo]) extends HasSequenceNumber {
def reliableDeliveryEnabled = seqOption.isDefined
override def seq: SeqNo = seqOption.get
}
} }
/** /**
@ -72,27 +81,31 @@ private[remote] trait AkkaPduCodec {
* Encoded form as raw bytes * Encoded form as raw bytes
*/ */
def encodePdu(pdu: AkkaPdu): ByteString = pdu match { def encodePdu(pdu: AkkaPdu): ByteString = pdu match {
case Associate(cookie, origin) constructAssociate(cookie, origin) case Associate(info) constructAssociate(info)
case Payload(bytes) constructPayload(bytes) case Payload(bytes) constructPayload(bytes)
case Disassociate constructDisassociate case Disassociate constructDisassociate
case Heartbeat constructHeartbeat case Heartbeat constructHeartbeat
} }
def constructPayload(payload: ByteString): ByteString def constructPayload(payload: ByteString): ByteString
def constructAssociate(cookie: Option[String], origin: Address): ByteString def constructAssociate(info: HandshakeInfo): ByteString
def constructDisassociate: ByteString def constructDisassociate: ByteString
def constructHeartbeat: ByteString def constructHeartbeat: ByteString
def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Message def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message])
def constructMessage( def constructMessage(
localAddress: Address, localAddress: Address,
recipient: ActorRef, recipient: ActorRef,
serializedMessage: MessageProtocol, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]): ByteString senderOption: Option[ActorRef],
seqOption: Option[SeqNo] = None,
ackOption: Option[Ack] = None): ByteString
def constructPureAck(ack: Ack): ByteString
} }
/** /**
@ -100,36 +113,56 @@ private[remote] trait AkkaPduCodec {
*/ */
private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
val ackBuilder = AcknowledgementInfo.newBuilder()
ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue)
ack.nacks foreach { nack ackBuilder.addNacks(nack.rawValue) }
ackBuilder
}
override def constructMessage( override def constructMessage(
localAddress: Address, localAddress: Address,
recipient: ActorRef, recipient: ActorRef,
serializedMessage: MessageProtocol, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]): ByteString = { senderOption: Option[ActorRef],
seqOption: Option[SeqNo] = None,
ackOption: Option[Ack] = None): ByteString = {
val messageBuilder = RemoteMessageProtocol.newBuilder val ackAndEnvelopeBuilder = AckAndEnvelopeContainer.newBuilder
messageBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) val envelopeBuilder = RemoteEnvelope.newBuilder
senderOption foreach { ref messageBuilder.setSender(serializeActorRef(localAddress, ref)) }
messageBuilder.setMessage(serializedMessage)
ByteString(messageBuilder.build.toByteArray) envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
senderOption foreach { ref envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) }
seqOption foreach { seq envelopeBuilder.setSeq(seq.rawValue) }
ackOption foreach { ack ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }
envelopeBuilder.setMessage(serializedMessage)
ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder)
ByteString(ackAndEnvelopeBuilder.build.toByteArray)
} }
override def constructPayload(payload: ByteString): ByteString = override def constructPureAck(ack: Ack): ByteString =
ByteString(AkkaRemoteProtocol.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build.toByteArray) ByteString(AckAndEnvelopeContainer.newBuilder.setAck(ackBuilder(ack)).build().toByteArray)
override def constructAssociate(cookie: Option[String], origin: Address): ByteString = override def constructPayload(payload: ByteString): ByteString =
constructControlMessagePdu(RemoteProtocol.CommandType.CONNECT, cookie, Some(origin)) ByteString(AkkaProtocolMessage.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build.toByteArray)
override def constructAssociate(info: HandshakeInfo): ByteString = {
val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid)
info.cookie foreach handshakeInfo.setCookie
constructControlMessagePdu(WireFormats.CommandType.CONNECT, Some(handshakeInfo))
}
override val constructDisassociate: ByteString = override val constructDisassociate: ByteString =
constructControlMessagePdu(RemoteProtocol.CommandType.SHUTDOWN, None, None) constructControlMessagePdu(WireFormats.CommandType.SHUTDOWN, None)
override val constructHeartbeat: ByteString = override val constructHeartbeat: ByteString =
constructControlMessagePdu(RemoteProtocol.CommandType.HEARTBEAT, None, None) constructControlMessagePdu(WireFormats.CommandType.HEARTBEAT, None)
override def decodePdu(raw: ByteString): AkkaPdu = { override def decodePdu(raw: ByteString): AkkaPdu = {
try { try {
val pdu = AkkaRemoteProtocol.parseFrom(raw.toArray) val pdu = AkkaProtocolMessage.parseFrom(raw.toArray)
if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer()))
else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction)
else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null)
@ -141,57 +174,75 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
override def decodeMessage( override def decodeMessage(
raw: ByteString, raw: ByteString,
provider: RemoteActorRefProvider, provider: RemoteActorRefProvider,
localAddress: Address): Message = { localAddress: Address): (Option[Ack], Option[Message]) = {
val msgPdu = RemoteMessageProtocol.parseFrom(raw.toArray) val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray)
Message(
recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress), val ackOption = if (ackAndEnvelope.hasAck) {
recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath), import scala.collection.JavaConverters._
serializedMessage = msgPdu.getMessage, Some(Ack(SeqNo(ackAndEnvelope.getAck.getCumulativeAck), ackAndEnvelope.getAck.getNacksList.asScala.map(SeqNo(_)).toSet))
senderOption = if (!msgPdu.hasSender) None } else None
else Some(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress)))
val messageOption = if (ackAndEnvelope.hasEnvelope) {
val msgPdu = ackAndEnvelope.getEnvelope
Some(Message(
recipient = provider.resolveActorRefWithLocalAddress(msgPdu.getRecipient.getPath, localAddress),
recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath),
serializedMessage = msgPdu.getMessage,
senderOption =
if (msgPdu.hasSender) Some(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))
else None,
seqOption =
if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None))
} else None
(ackOption, messageOption)
} }
private def decodeControlPdu(controlPdu: RemoteControlProtocol): AkkaPdu = { private def decodeControlPdu(controlPdu: AkkaControlMessage): AkkaPdu = {
val cookie = if (controlPdu.hasCookie) Some(controlPdu.getCookie) else None
controlPdu.getCommandType match { controlPdu.getCommandType match {
case CommandType.CONNECT if controlPdu.hasOrigin Associate(cookie, decodeAddress(controlPdu.getOrigin)) case CommandType.CONNECT if controlPdu.hasHandshakeInfo
case CommandType.SHUTDOWN Disassociate val handshakeInfo = controlPdu.getHandshakeInfo
val cookie = if (handshakeInfo.hasCookie) Some(handshakeInfo.getCookie) else None
Associate(
HandshakeInfo(
decodeAddress(handshakeInfo.getOrigin),
handshakeInfo.getUid.toInt, // 64 bits are allocated in the wire formats, but we use only 32 for now
cookie))
case CommandType.SHUTDOWN Disassociate
case CommandType.HEARTBEAT Heartbeat case CommandType.HEARTBEAT Heartbeat
case _ throw new PduCodecException("Decoding of control PDU failed: format invalid", null) case _ throw new PduCodecException("Decoding of control PDU failed: format invalid", null)
} }
} }
private def decodeAddress(encodedAddress: AddressProtocol): Address = private def decodeAddress(encodedAddress: AddressData): Address =
Address(encodedAddress.getProtocol, encodedAddress.getSystem, encodedAddress.getHostname, encodedAddress.getPort) Address(encodedAddress.getProtocol, encodedAddress.getSystem, encodedAddress.getHostname, encodedAddress.getPort)
private def constructControlMessagePdu( private def constructControlMessagePdu(
code: RemoteProtocol.CommandType, code: WireFormats.CommandType,
cookie: Option[String], handshakeInfo: Option[AkkaHandshakeInfo.Builder]): ByteString = {
origin: Option[Address]): ByteString = {
val controlMessageBuilder = RemoteControlProtocol.newBuilder()
val controlMessageBuilder = AkkaControlMessage.newBuilder()
controlMessageBuilder.setCommandType(code) controlMessageBuilder.setCommandType(code)
cookie foreach controlMessageBuilder.setCookie handshakeInfo foreach controlMessageBuilder.setHandshakeInfo
for (originAddress origin; serialized serializeAddress(originAddress))
controlMessageBuilder.setOrigin(serialized)
ByteString(AkkaRemoteProtocol.newBuilder().setInstruction(controlMessageBuilder.build).build.toByteArray) ByteString(AkkaProtocolMessage.newBuilder().setInstruction(controlMessageBuilder.build).build.toByteArray)
} }
private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = { private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = {
ActorRefProtocol.newBuilder.setPath( ActorRefData.newBuilder.setPath(
if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()
} }
private def serializeAddress(address: Address): Option[AddressProtocol] = { private def serializeAddress(address: Address): AddressData = address match {
for (host address.host; port address.port) yield AddressProtocol.newBuilder case Address(protocol, system, Some(host), Some(port))
.setHostname(host) AddressData.newBuilder
.setPort(port) .setHostname(host)
.setSystem(address.system) .setPort(port)
.setProtocol(address.protocol) .setSystem(system)
.build() .setProtocol(protocol)
.build()
case _ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
} }
} }

View file

@ -38,11 +38,9 @@ private[remote] class AkkaProtocolSettings(config: Config) {
Duration(TransportFailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS) Duration(TransportFailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
} requiring (_ > Duration.Zero, "transport-failure-detector.heartbeat-interval must be > 0") } requiring (_ > Duration.Zero, "transport-failure-detector.heartbeat-interval must be > 0")
val WaitActivityEnabled: Boolean = getBoolean("akka.remote.wait-activity-enabled")
val RequireCookie: Boolean = getBoolean("akka.remote.require-cookie") val RequireCookie: Boolean = getBoolean("akka.remote.require-cookie")
val SecureCookie: String = getString("akka.remote.secure-cookie") val SecureCookie: Option[String] = if (RequireCookie) Some(getString("akka.remote.secure-cookie")) else None
} }
private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead? private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead?
@ -52,6 +50,8 @@ private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remo
} }
case class HandshakeInfo(origin: Address, uid: Int, cookie: Option[String])
/** /**
* Implementation of the Akka protocol as a Transport that wraps an underlying Transport instance. * Implementation of the Akka protocol as a Transport that wraps an underlying Transport instance.
* *
@ -59,7 +59,6 @@ private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remo
* - Soft-state associations via the use of heartbeats and failure detectors * - Soft-state associations via the use of heartbeats and failure detectors
* - Secure-cookie handling * - Secure-cookie handling
* - Transparent origin address handling * - Transparent origin address handling
* - Fire-And-Forget vs. implicit ack based handshake (controllable via wait-activity-enabled configuration option)
* - pluggable codecs to encode and decode Akka PDUs * - pluggable codecs to encode and decode Akka PDUs
* *
* It is not possible to load this transport dynamically using the configuration of remoting, because it does not * It is not possible to load this transport dynamically using the configuration of remoting, because it does not
@ -116,12 +115,12 @@ private[transport] class AkkaProtocolManager(
val stateActorSettings = settings val stateActorSettings = settings
val failureDetector = createTransportFailureDetector() val failureDetector = createTransportFailureDetector()
context.actorOf(Props(new ProtocolStateActor( context.actorOf(Props(new ProtocolStateActor(
stateActorLocalAddress, HandshakeInfo(stateActorLocalAddress, AddressUidExtension(context.system).addressUid, stateActorSettings.SecureCookie),
handle, handle,
stateActorAssociationHandler, stateActorAssociationHandler,
stateActorSettings, stateActorSettings,
AkkaPduProtobufCodec, AkkaPduProtobufCodec,
failureDetector)), actorNameFor(handle.remoteAddress)) // Why don't we watch this one? failureDetector)), actorNameFor(handle.remoteAddress))
case AssociateUnderlying(remoteAddress, statusPromise) case AssociateUnderlying(remoteAddress, statusPromise)
val stateActorLocalAddress = localAddress val stateActorLocalAddress = localAddress
@ -129,13 +128,13 @@ private[transport] class AkkaProtocolManager(
val stateActorWrappedTransport = wrappedTransport val stateActorWrappedTransport = wrappedTransport
val failureDetector = createTransportFailureDetector() val failureDetector = createTransportFailureDetector()
context.actorOf(Props(new ProtocolStateActor( context.actorOf(Props(new ProtocolStateActor(
stateActorLocalAddress, HandshakeInfo(stateActorLocalAddress, AddressUidExtension(context.system).addressUid, stateActorSettings.SecureCookie),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
stateActorWrappedTransport, stateActorWrappedTransport,
stateActorSettings, stateActorSettings,
AkkaPduProtobufCodec, AkkaPduProtobufCodec,
failureDetector)), actorNameFor(remoteAddress)) // Why don't we watch this one? failureDetector)), actorNameFor(remoteAddress))
} }
private def createTransportFailureDetector(): FailureDetector = { private def createTransportFailureDetector(): FailureDetector = {
@ -153,11 +152,12 @@ private[transport] class AkkaProtocolManager(
} }
private[transport] class AkkaProtocolHandle( private[remote] class AkkaProtocolHandle(
_localAddress: Address, _localAddress: Address,
_remoteAddress: Address, _remoteAddress: Address,
val readHandlerPromise: Promise[HandleEventListener], val readHandlerPromise: Promise[HandleEventListener],
_wrappedHandle: AssociationHandle, _wrappedHandle: AssociationHandle,
val handshakeInfo: HandshakeInfo,
private val stateActor: ActorRef, private val stateActor: ActorRef,
private val codec: AkkaPduCodec) private val codec: AkkaPduCodec)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, AkkaScheme) { extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, AkkaScheme) {
@ -179,12 +179,11 @@ private[transport] object ProtocolStateActor {
/* /*
* State when the underlying transport is initialized, there is an association present, and we are waiting * State when the underlying transport is initialized, there is an association present, and we are waiting
* for the first message. Outbound connections can skip this phase if WaitActivity configuration parameter * for the first message (has to be CONNECT if inbound).
* is turned off.
* State data can be OutboundUnderlyingAssociated (for outbound associations) or InboundUnassociated (for inbound * State data can be OutboundUnderlyingAssociated (for outbound associations) or InboundUnassociated (for inbound
* when upper layer is not notified yet) * when upper layer is not notified yet)
*/ */
case object WaitActivity extends AssociationState case object WaitHandshake extends AssociationState
/* /*
* State when the underlying transport is initialized and the handshake succeeded. * State when the underlying transport is initialized and the handshake succeeded.
@ -224,7 +223,7 @@ private[transport] object ProtocolStateActor {
} }
private[transport] class ProtocolStateActor(initialData: InitialProtocolStateData, private[transport] class ProtocolStateActor(initialData: InitialProtocolStateData,
private val localAddress: Address, private val localHandshakeInfo: HandshakeInfo,
private val settings: AkkaProtocolSettings, private val settings: AkkaProtocolSettings,
private val codec: AkkaPduCodec, private val codec: AkkaPduCodec,
private val failureDetector: FailureDetector) private val failureDetector: FailureDetector)
@ -234,26 +233,28 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
import context.dispatcher import context.dispatcher
// Outbound case // Outbound case
def this(localAddress: Address, def this(handshakeInfo: HandshakeInfo,
remoteAddress: Address, remoteAddress: Address,
statusPromise: Promise[AssociationHandle], statusPromise: Promise[AssociationHandle],
transport: Transport, transport: Transport,
settings: AkkaProtocolSettings, settings: AkkaProtocolSettings,
codec: AkkaPduCodec, codec: AkkaPduCodec,
failureDetector: FailureDetector) = { failureDetector: FailureDetector) = {
this(OutboundUnassociated(remoteAddress, statusPromise, transport), localAddress, settings, codec, failureDetector) this(OutboundUnassociated(remoteAddress, statusPromise, transport), handshakeInfo, settings, codec, failureDetector)
} }
// Inbound case // Inbound case
def this(localAddress: Address, def this(handshakeInfo: HandshakeInfo,
wrappedHandle: AssociationHandle, wrappedHandle: AssociationHandle,
associationListener: AssociationEventListener, associationListener: AssociationEventListener,
settings: AkkaProtocolSettings, settings: AkkaProtocolSettings,
codec: AkkaPduCodec, codec: AkkaPduCodec,
failureDetector: FailureDetector) = { failureDetector: FailureDetector) = {
this(InboundUnassociated(associationListener, wrappedHandle), localAddress, settings, codec, failureDetector) this(InboundUnassociated(associationListener, wrappedHandle), handshakeInfo, settings, codec, failureDetector)
} }
val localAddress = localHandshakeInfo.origin
initialData match { initialData match {
case d: OutboundUnassociated case d: OutboundUnassociated
d.transport.associate(d.remoteAddress) pipeTo self d.transport.associate(d.remoteAddress) pipeTo self
@ -261,7 +262,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case d: InboundUnassociated case d: InboundUnassociated
d.wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self)) d.wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self))
startWith(WaitActivity, d) startWith(WaitHandshake, d)
} }
when(Closed) { when(Closed) {
@ -273,14 +274,11 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Event(wrappedHandle: AssociationHandle, OutboundUnassociated(_, statusPromise, _)) case Event(wrappedHandle: AssociationHandle, OutboundUnassociated(_, statusPromise, _))
wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self)) wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self))
if (sendAssociate(wrappedHandle)) { if (sendAssociate(wrappedHandle, localHandshakeInfo)) {
failureDetector.heartbeat() failureDetector.heartbeat()
initTimers() initTimers()
goto(WaitHandshake) using OutboundUnderlyingAssociated(statusPromise, wrappedHandle)
if (settings.WaitActivityEnabled)
goto(WaitActivity) using OutboundUnderlyingAssociated(statusPromise, wrappedHandle)
else
goto(Open) using AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue.empty)
} else { } else {
// Underlying transport was busy -- Associate could not be sent // Underlying transport was busy -- Associate could not be sent
setTimer("associate-retry", wrappedHandle, RARP(context.system).provider.remoteSettings.BackoffPeriod, repeat = false) setTimer("associate-retry", wrappedHandle, RARP(context.system).provider.remoteSettings.BackoffPeriod, repeat = false)
@ -295,29 +293,28 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
} }
// Timeout of this state is implicitly handled by the failure detector // Timeout of this state is implicitly handled by the failure detector
when(WaitActivity) { when(WaitHandshake) {
case Event(Disassociated, _) case Event(Disassociated, _)
stop() stop()
case Event(InboundPayload(p), OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) case Event(InboundPayload(p), OutboundUnderlyingAssociated(statusPromise, wrappedHandle))
decodePdu(p) match { decodePdu(p) match {
case Associate(handshakeInfo)
failureDetector.heartbeat()
goto(Open) using AssociatedWaitHandler(
notifyOutboundHandler(wrappedHandle, handshakeInfo, statusPromise),
wrappedHandle,
immutable.Queue.empty)
case Disassociate case Disassociate
// After receiving Disassociate we MUST NOT send back a Disassociate (loop)
stop() stop()
// Any other activity is considered an implicit acknowledgement of the association case _
case Payload(payload) // Expected handshake to be finished, dropping connection
sendHeartbeat(wrappedHandle) sendDisassociate(wrappedHandle)
goto(Open) using stop()
AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue(payload))
case Heartbeat
sendHeartbeat(wrappedHandle)
failureDetector.heartbeat()
goto(Open) using
AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue.empty)
case _ goto(Open) using
AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue.empty)
} }
case Event(HeartbeatTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) handleTimers(wrappedHandle) case Event(HeartbeatTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) handleTimers(wrappedHandle)
@ -329,14 +326,18 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Disassociate stop() case Disassociate stop()
// Incoming association -- implicitly ACK by a heartbeat // Incoming association -- implicitly ACK by a heartbeat
case Associate(cookieOption, origin) case Associate(info)
if (!settings.RequireCookie || cookieOption.getOrElse("") == settings.SecureCookie) { if (!settings.RequireCookie || info.cookie == settings.SecureCookie) {
sendHeartbeat(wrappedHandle) sendAssociate(wrappedHandle, localHandshakeInfo)
failureDetector.heartbeat() failureDetector.heartbeat()
initTimers() initTimers()
goto(Open) using AssociatedWaitHandler(notifyInboundHandler(wrappedHandle, origin, associationHandler), wrappedHandle, immutable.Queue.empty) goto(Open) using AssociatedWaitHandler(
notifyInboundHandler(wrappedHandle, info, associationHandler),
wrappedHandle,
immutable.Queue.empty)
} else { } else {
log.warning(s"Association attempt with mismatching cookie from [{}]. Expected [{}] but received [{}].",
info.origin, localHandshakeInfo.cookie.getOrElse(""), info.cookie.getOrElse(""))
stop() stop()
} }
@ -441,6 +442,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self
private def notifyOutboundHandler(wrappedHandle: AssociationHandle, private def notifyOutboundHandler(wrappedHandle: AssociationHandle,
handshakeInfo: HandshakeInfo,
statusPromise: Promise[AssociationHandle]): Future[HandleEventListener] = { statusPromise: Promise[AssociationHandle]): Future[HandleEventListener] = {
val readHandlerPromise = Promise[HandleEventListener]() val readHandlerPromise = Promise[HandleEventListener]()
listenForListenerRegistration(readHandlerPromise) listenForListenerRegistration(readHandlerPromise)
@ -451,13 +453,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
wrappedHandle.remoteAddress, wrappedHandle.remoteAddress,
readHandlerPromise, readHandlerPromise,
wrappedHandle, wrappedHandle,
handshakeInfo,
self, self,
codec)) codec))
readHandlerPromise.future readHandlerPromise.future
} }
private def notifyInboundHandler(wrappedHandle: AssociationHandle, private def notifyInboundHandler(wrappedHandle: AssociationHandle,
originAddress: Address, handshakeInfo: HandshakeInfo,
associationListener: AssociationEventListener): Future[HandleEventListener] = { associationListener: AssociationEventListener): Future[HandleEventListener] = {
val readHandlerPromise = Promise[HandleEventListener]() val readHandlerPromise = Promise[HandleEventListener]()
listenForListenerRegistration(readHandlerPromise) listenForListenerRegistration(readHandlerPromise)
@ -465,9 +468,10 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
associationListener notify InboundAssociation( associationListener notify InboundAssociation(
new AkkaProtocolHandle( new AkkaProtocolHandle(
localAddress, localAddress,
originAddress, handshakeInfo.origin,
readHandlerPromise, readHandlerPromise,
wrappedHandle, wrappedHandle,
handshakeInfo,
self, self,
codec)) codec))
readHandlerPromise.future readHandlerPromise.future
@ -488,9 +492,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case NonFatal(e) throw new AkkaProtocolException("Error writing DISASSOCIATE to transport", e) case NonFatal(e) throw new AkkaProtocolException("Error writing DISASSOCIATE to transport", e)
} }
private def sendAssociate(wrappedHandle: AssociationHandle): Boolean = try { private def sendAssociate(wrappedHandle: AssociationHandle, info: HandshakeInfo): Boolean = try {
val cookie = if (settings.RequireCookie) Some(settings.SecureCookie) else None wrappedHandle.write(codec.constructAssociate(info))
wrappedHandle.write(codec.constructAssociate(cookie, localAddress))
} catch { } catch {
case NonFatal(e) throw new AkkaProtocolException("Error writing ASSOCIATE to transport", e) case NonFatal(e) throw new AkkaProtocolException("Error writing ASSOCIATE to transport", e)
} }

View file

@ -75,7 +75,7 @@ class TestTransport(
remoteHandlerFuture.map { _ localHandle } remoteHandlerFuture.map { _ localHandle }
case None case None
Future.failed(new IllegalArgumentException(s"No registered transport: $remoteAddress")) Future.failed(new InvalidAssociationException(s"No registered transport: $remoteAddress", null))
} }
} }

View file

@ -449,8 +449,8 @@ private[transport] class ThrottledAssociation(
private def peekOrigin(b: ByteString): Option[Address] = { private def peekOrigin(b: ByteString): Option[Address] = {
try { try {
AkkaPduProtobufCodec.decodePdu(b) match { AkkaPduProtobufCodec.decodePdu(b) match {
case Associate(_, origin) Some(origin) case Associate(info) Some(info.origin)
case _ None case _ None
} }
} catch { } catch {
// This layer should not care about malformed packets. Also, this also useful for testing, because // This layer should not care about malformed packets. Also, this also useful for testing, because

View file

@ -0,0 +1,334 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit.AkkaSpec
import scala.annotation.tailrec
import scala.concurrent.forkjoin.ThreadLocalRandom
object AckedDeliverySpec {
case class Sequenced(seq: SeqNo, body: String) extends HasSequenceNumber {
override def toString = s"MSG[${seq.rawValue}]"
}
}
class AckedDeliverySpec extends AkkaSpec {
import AckedDeliverySpec._
def msg(seq: Long) = Sequenced(SeqNo(seq), "msg" + seq)
"SeqNo" must {
"implement simple ordering" in {
val sm1 = SeqNo(-1)
val s0 = SeqNo(0)
val s1 = SeqNo(1)
val s2 = SeqNo(2)
val s0b = SeqNo(0)
sm1 < s0 must be(true)
sm1 > s0 must be(false)
s0 < s1 must be(true)
s0 > s1 must be(false)
s1 < s2 must be(true)
s1 > s2 must be(false)
s0b == s0 must be(true)
}
"correctly handle wrapping over" in {
val s1 = SeqNo(Long.MaxValue - 1)
val s2 = SeqNo(Long.MaxValue)
val s3 = SeqNo(Long.MinValue)
val s4 = SeqNo(Long.MinValue + 1)
s1 < s2 must be(true)
s1 > s2 must be(false)
s2 < s3 must be(true)
s2 > s3 must be(false)
s3 < s4 must be(true)
s3 > s4 must be(false)
}
"correctly handle large gaps" in {
val smin = SeqNo(Long.MinValue)
val smin2 = SeqNo(Long.MinValue + 1)
val s0 = SeqNo(0)
s0 < smin must be(true)
s0 > smin must be(false)
smin2 < s0 must be(true)
smin2 > s0 must be(false)
}
}
"SendBuffer" must {
"aggregate unacked messages in order" in {
val b0 = new AckedSendBuffer[Sequenced](10)
val msg0 = msg(0)
val msg1 = msg(1)
val msg2 = msg(2)
val b1 = b0.buffer(msg0)
b1.nonAcked must be === Vector(msg0)
val b2 = b1.buffer(msg1)
b2.nonAcked must be === Vector(msg0, msg1)
val b3 = b2.buffer(msg2)
b3.nonAcked must be === Vector(msg0, msg1, msg2)
}
"refuse buffering new messages if capacity reached" in {
val buffer = new AckedSendBuffer[Sequenced](4).buffer(msg(0)).buffer(msg(1)).buffer(msg(2)).buffer(msg(3))
intercept[ResendBufferCapacityReachedException] {
buffer buffer msg(4)
}
}
"remove messages from buffer when cumulative ack received" in {
val b0 = new AckedSendBuffer[Sequenced](10)
val msg0 = msg(0)
val msg1 = msg(1)
val msg2 = msg(2)
val msg3 = msg(3)
val msg4 = msg(4)
val b1 = b0.buffer(msg0)
b1.nonAcked must be === Vector(msg0)
val b2 = b1.buffer(msg1)
b2.nonAcked must be === Vector(msg0, msg1)
val b3 = b2.buffer(msg2)
b3.nonAcked must be === Vector(msg0, msg1, msg2)
val b4 = b3.acknowledge(Ack(SeqNo(1)))
b4.nonAcked must be === Vector(msg2)
val b5 = b4.buffer(msg3)
b5.nonAcked must be === Vector(msg2, msg3)
val b6 = b5.buffer(msg4)
b6.nonAcked must be === Vector(msg2, msg3, msg4)
val b7 = b6.acknowledge(Ack(SeqNo(1)))
b7.nonAcked must be === Vector(msg2, msg3, msg4)
val b8 = b7.acknowledge(Ack(SeqNo(2)))
b8.nonAcked must be === Vector(msg3, msg4)
val b9 = b8.acknowledge(Ack(SeqNo(5)))
b9.nonAcked must be === Vector.empty
}
"keep NACKed messages in buffer if selective nacks are received" in {
val b0 = new AckedSendBuffer[Sequenced](10)
val msg0 = msg(0)
val msg1 = msg(1)
val msg2 = msg(2)
val msg3 = msg(3)
val msg4 = msg(4)
val b1 = b0.buffer(msg0)
b1.nonAcked must be === Vector(msg0)
val b2 = b1.buffer(msg1)
b2.nonAcked must be === Vector(msg0, msg1)
val b3 = b2.buffer(msg2)
b3.nonAcked must be === Vector(msg0, msg1, msg2)
val b4 = b3.acknowledge(Ack(SeqNo(1), nacks = Set(SeqNo(0))))
b4.nonAcked must be === Vector(msg2)
b4.nacked must be === Vector(msg0)
val b5 = b4.buffer(msg3).buffer(msg4)
b5.nonAcked must be === Vector(msg2, msg3, msg4)
b5.nacked must be === Vector(msg0)
val b6 = b5.acknowledge(Ack(SeqNo(4), nacks = Set(SeqNo(2), SeqNo(3))))
b6.nonAcked must be === Vector()
b6.nacked must be === Vector(msg2, msg3)
val b7 = b6.acknowledge(Ack(SeqNo(5)))
b7.nonAcked must be === Vector.empty
b7.nacked must be === Vector.empty
}
}
"ReceiveBuffer" must {
"enqueue message in buffer if needed, return the list of deliverable messages and acks" in {
val b0 = new AckedReceiveBuffer[Sequenced]
val msg0 = msg(0)
val msg1 = msg(1)
val msg2 = msg(2)
val msg3 = msg(3)
val msg4 = msg(4)
val msg5 = msg(5)
val (b1, deliver1, ack1) = b0.receive(msg1).extractDeliverable
deliver1 must be === Vector.empty
ack1 must be === Ack(SeqNo(1), nacks = Set(SeqNo(0)))
val (b2, deliver2, ack2) = b1.receive(msg0).extractDeliverable
deliver2 must be === Vector(msg0, msg1)
ack2 must be === Ack(SeqNo(1))
val (b3, deliver3, ack3) = b2.receive(msg4).extractDeliverable
deliver3 must be === Vector.empty
ack3 must be === Ack(SeqNo(4), nacks = Set(SeqNo(2), SeqNo(3)))
val (b4, deliver4, ack4) = b3.receive(msg2).extractDeliverable
deliver4 must be === Vector(msg2)
ack4 must be === Ack(SeqNo(4), nacks = Set(SeqNo(3)))
val (b5, deliver5, ack5) = b4.receive(msg5).extractDeliverable
deliver5 must be === Vector.empty
ack5 must be === Ack(SeqNo(5), nacks = Set(SeqNo(3)))
val (_, deliver6, ack6) = b5.receive(msg3).extractDeliverable
deliver6 must be === Vector(msg3, msg4, msg5)
ack6 must be === Ack(SeqNo(5))
}
"handle duplicate arrivals correctly" in {
val buf = new AckedReceiveBuffer[Sequenced]
val msg0 = msg(0)
val msg1 = msg(1)
val msg2 = msg(2)
val (buf2, _, _) = buf
.receive(msg0)
.receive(msg1)
.receive(msg2)
.extractDeliverable
val buf3 = buf2.receive(msg0)
.receive(msg1)
.receive(msg2)
val (_, deliver, ack) = buf3.extractDeliverable
deliver must be === Vector.empty
ack must be === Ack(SeqNo(2))
}
"be able to correctly merge with another receive buffer" in {
val buf1 = new AckedReceiveBuffer[Sequenced]
val buf2 = new AckedReceiveBuffer[Sequenced]
val msg0 = msg(0)
val msg1a = msg(1)
val msg1b = msg(1)
val msg2 = msg(2)
val msg3 = msg(3)
val buf = buf1.receive(msg1a).receive(msg2).mergeFrom(
buf2.receive(msg1b).receive(msg3))
val (_, deliver, ack) = buf.receive(msg0).extractDeliverable
deliver must be === Vector(msg0, msg1a, msg2, msg3)
ack must be === Ack(SeqNo(3))
}
}
"SendBuffer and ReceiveBuffer" must {
def happened(p: Double) = ThreadLocalRandom.current().nextDouble() < p
@tailrec def geom(p: Double, limit: Int = 5, acc: Int = 0): Int =
if (acc == limit) acc
else if (happened(p)) acc
else geom(p, limit, acc + 1)
"correctly cooperate with each other" in {
val MsgCount = 1000
val DeliveryProbability = 0.5
val referenceList: Seq[Sequenced] = (0 until MsgCount).toSeq map { i msg(i.toLong) }
var toSend = referenceList
var received = Seq.empty[Sequenced]
var sndBuf = new AckedSendBuffer[Sequenced](10)
var rcvBuf = new AckedReceiveBuffer[Sequenced]
var log = Vector.empty[String]
var lastAck: Ack = Ack(SeqNo(-1))
def dbgLog(message: String): Unit = log :+= message
def senderSteps(steps: Int, p: Double = 1.0) = {
val resends = (sndBuf.nacked ++ sndBuf.nonAcked).take(steps)
val sends = if (steps - resends.size > 0) {
val tmp = toSend.take(steps - resends.size)
toSend = toSend.drop(steps - resends.size)
tmp
} else Seq.empty[Sequenced]
(resends ++ sends) foreach { msg
if (sends.contains(msg)) sndBuf = sndBuf.buffer(msg)
if (happened(p)) {
val (updatedRcvBuf, delivers, ack) = rcvBuf.receive(msg).extractDeliverable
rcvBuf = updatedRcvBuf
dbgLog(s"$sndBuf -- $msg --> $rcvBuf")
lastAck = ack
received ++= delivers
dbgLog(s"R: ${received.mkString(", ")}")
} else dbgLog(s"$sndBuf -- $msg --X $rcvBuf")
}
}
def receiverStep(p: Double = 1.0) = {
if (happened(p)) {
sndBuf = sndBuf.acknowledge(lastAck)
dbgLog(s"$sndBuf <-- $lastAck -- $rcvBuf")
} else dbgLog(s"$sndBuf X-- $lastAck -- $rcvBuf")
}
// Dropping phase
info(s"Starting unreliable delivery for $MsgCount messages, with delivery probability P = $DeliveryProbability")
var steps = MsgCount * 2
while (steps > 0) {
val s = geom(0.3, limit = 5)
senderSteps(s, DeliveryProbability)
receiverStep(DeliveryProbability)
steps -= s
}
info(s"Successfully delivered ${received.size} messages from ${MsgCount}")
info("Entering reliable phase")
// Finalizing phase
for (_ 1 to MsgCount) {
senderSteps(1, 1.0)
receiverStep(1.0)
}
if (received != referenceList) {
println(log.mkString("\n"))
println("Received:")
println(received)
fail("Not all messages were received")
}
info("All messages have been successfully delivered")
}
}
}

View file

@ -18,7 +18,6 @@ class RemoteConfigSpec extends AkkaSpec(
akka.remote.netty.tcp.port = 0 akka.remote.netty.tcp.port = 0
""") { """) {
// FIXME: These tests are ignored as it tests configuration specific to the old remoting.
"Remoting" must { "Remoting" must {
"contain correct configuration values in reference.conf" in { "contain correct configuration values in reference.conf" in {
@ -38,6 +37,9 @@ class RemoteConfigSpec extends AkkaSpec(
MaximumRetriesInWindow must be(5) MaximumRetriesInWindow must be(5)
RetryWindow must be(3 seconds) RetryWindow must be(3 seconds)
BackoffPeriod must be(10 millis) BackoffPeriod must be(10 millis)
SysMsgAckTimeout must be(0.3 seconds)
SysResendTimeout must be(1 seconds)
SysMsgBufferSize must be(1000)
CommandAckTimeout.duration must be(30 seconds) CommandAckTimeout.duration must be(30 seconds)
Transports.size must be(1) Transports.size must be(1)
Transports.head._1 must be(classOf[akka.remote.transport.netty.NettyTransport].getName) Transports.head._1 must be(classOf[akka.remote.transport.netty.NettyTransport].getName)
@ -62,9 +64,8 @@ class RemoteConfigSpec extends AkkaSpec(
val settings = new AkkaProtocolSettings(RARP(system).provider.remoteSettings.config) val settings = new AkkaProtocolSettings(RARP(system).provider.remoteSettings.config)
import settings._ import settings._
WaitActivityEnabled must be(true)
RequireCookie must be(false) RequireCookie must be(false)
SecureCookie must be === "" SecureCookie must be === None
TransportFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) TransportFailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName)
TransportHeartBeatInterval must be === 1.seconds TransportHeartBeatInterval must be === 1.seconds

View file

@ -50,7 +50,7 @@ akka {
} }
"receive ActorIdentity(None) when identified node is unknown host" in { "receive ActorIdentity(None) when identified node is unknown host" in {
val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject" val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost2", 2552)) / "user" / "subject"
system.actorSelection(path) ! Identify(path) system.actorSelection(path) ! Identify(path)
expectMsg(60.seconds, ActorIdentity(path, None)) expectMsg(60.seconds, ActorIdentity(path, None))
} }

View file

@ -214,7 +214,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"send error message for wrong address" in { "send error message for wrong address" in {
filterEvents(EventFilter.error(start = "AssociationError", occurrences = 1), filterEvents(EventFilter.error(start = "AssociationError", occurrences = 1),
EventFilter.warning(pattern = ".*dead letter.*echo.*", occurrences = 1)) { EventFilter.error(pattern = "Address is now quarantined", occurrences = 1)) {
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping" system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
} }
} }

View file

@ -6,7 +6,7 @@ package akka.remote.serialization
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.remote.RemoteProtocol.MessageProtocol import akka.remote.WireFormats.SerializedMessage
import akka.remote.ProtobufProtocol.MyMessage import akka.remote.ProtobufProtocol.MyMessage
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -17,7 +17,7 @@ class ProtobufSerializerSpec extends AkkaSpec {
"Serialization" must { "Serialization" must {
"resolve protobuf serializer" in { "resolve protobuf serializer" in {
ser.serializerFor(classOf[MessageProtocol]).getClass must be(classOf[ProtobufSerializer]) ser.serializerFor(classOf[SerializedMessage]).getClass must be(classOf[ProtobufSerializer])
ser.serializerFor(classOf[MyMessage]).getClass must be(classOf[ProtobufSerializer]) ser.serializerFor(classOf[MyMessage]).getClass must be(classOf[ProtobufSerializer])
} }

View file

@ -6,7 +6,7 @@ import akka.remote.transport.AkkaProtocolSpec.TestFailureDetector
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload } import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.TestTransport._ import akka.remote.transport.TestTransport._
import akka.remote.transport.Transport._ import akka.remote.transport.Transport._
import akka.remote.{ RemoteProtocol, RemoteActorRefProvider, FailureDetector } import akka.remote.{ SeqNo, WireFormats, RemoteActorRefProvider, FailureDetector }
import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.util.ByteString import akka.util.ByteString
import com.google.protobuf.{ ByteString PByteString } import com.google.protobuf.{ ByteString PByteString }
@ -44,8 +44,6 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
heartbeat-interval = 0.1 s heartbeat-interval = 0.1 s
} }
wait-activity-enabled = on
backoff-interval = 1 s backoff-interval = 1 s
require-cookie = off require-cookie = off
@ -70,7 +68,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val codec = AkkaPduProtobufCodec val codec = AkkaPduProtobufCodec
val testMsg = RemoteProtocol.MessageProtocol.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build val testMsg = WireFormats.SerializedMessage.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build
val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, None) val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, None)
val testMsgPdu: ByteString = codec.constructPayload(testEnvelope) val testMsgPdu: ByteString = codec.constructPayload(testEnvelope)
@ -78,7 +76,8 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
def testPayload = InboundPayload(testMsgPdu) def testPayload = InboundPayload(testMsgPdu)
def testDisassociate = InboundPayload(codec.constructDisassociate) def testDisassociate = InboundPayload(codec.constructDisassociate)
def testAssociate(cookie: Option[String]) = InboundPayload(codec.constructAssociate(cookie, remoteAkkaAddress)) def testAssociate(uid: Int, cookie: Option[String]) =
InboundPayload(codec.constructAssociate(HandshakeInfo(remoteAkkaAddress, uid, cookie)))
def collaborators = { def collaborators = {
val registry = new AssociationRegistry val registry = new AssociationRegistry
@ -100,11 +99,12 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case _ false case _ false
} }
def lastActivityIsAssociate(registry: AssociationRegistry, cookie: Option[String]) = def lastActivityIsAssociate(registry: AssociationRegistry, uid: Long, cookie: Option[String]) =
if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match { codec.decodePdu(payload) match {
case Associate(c, origin) if c == cookie && origin == localAddress true case Associate(info)
info.cookie == cookie && info.origin == localAddress && info.uid == uid
case _ false case _ false
} }
case _ false case _ false
@ -126,7 +126,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val (failureDetector, _, _, handle) = collaborators val (failureDetector, _, _, handle) = collaborators
system.actorOf(Props(new ProtocolStateActor( system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
handle, handle,
ActorAssociationEventListener(testActor), ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf), new AkkaProtocolSettings(conf),
@ -140,19 +140,21 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val (failureDetector, registry, _, handle) = collaborators val (failureDetector, registry, _, handle) = collaborators
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
handle, handle,
ActorAssociationEventListener(testActor), ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf), new AkkaProtocolSettings(conf),
codec, codec,
failureDetector))) failureDetector)))
reader ! testAssociate(None) reader ! testAssociate(uid = 33, cookie = None)
awaitCond(failureDetector.called) awaitCond(failureDetector.called)
val wrappedHandle = expectMsgPF() { val wrappedHandle = expectMsgPF() {
case InboundAssociation(h) h case InboundAssociation(h: AkkaProtocolHandle)
h.handshakeInfo.uid must be === 33
h
} }
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
@ -173,7 +175,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val (failureDetector, registry, _, handle) = collaborators val (failureDetector, registry, _, handle) = collaborators
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
handle, handle,
ActorAssociationEventListener(testActor), ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf), new AkkaProtocolSettings(conf),
@ -184,7 +186,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
reader ! testHeartbeat reader ! testHeartbeat
// this associate will now be ignored // this associate will now be ignored
reader ! testAssociate(None) reader ! testAssociate(uid = 33, cookie = None)
awaitCond(registry.logSnapshot.exists { awaitCond(registry.logSnapshot.exists {
case DisassociateAttempt(requester, remote) true case DisassociateAttempt(requester, remote) true
@ -192,42 +194,14 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
}) })
} }
"serve the handle as soon as possible if WaitActivity is turned off" in { "in outbound mode delay readiness until hadnshake finished" in {
val (failureDetector, registry, transport, handle) = collaborators
transport.associateBehavior.pushConstant(handle)
val statusPromise: Promise[AssociationHandle] = Promise()
system.actorOf(Props(new ProtocolStateActor(
localAddress,
remoteAddress,
statusPromise,
transport,
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)),
codec,
failureDetector)))
Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
case _ fail()
}
lastActivityIsAssociate(registry, None) must be(true)
failureDetector.called must be(true)
}
"in outbound mode with WaitActivity delay readiness until activity detected" in {
val (failureDetector, registry, transport, handle) = collaborators val (failureDetector, registry, transport, handle) = collaborators
transport.associateBehavior.pushConstant(handle) transport.associateBehavior.pushConstant(handle)
val statusPromise: Promise[AssociationHandle] = Promise() val statusPromise: Promise[AssociationHandle] = Promise()
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
transport, transport,
@ -235,7 +209,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec, codec,
failureDetector))) failureDetector)))
awaitCond(lastActivityIsAssociate(registry, None)) awaitCond(lastActivityIsAssociate(registry, 42, None))
failureDetector.called must be(true) failureDetector.called must be(true)
// keeps sending heartbeats // keeps sending heartbeats
@ -243,13 +217,14 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
statusPromise.isCompleted must be(false) statusPromise.isCompleted must be(false)
// finish connection by sending back a payload // finish connection by sending back an associate message
reader ! testPayload reader ! testAssociate(33, None)
Await.result(statusPromise.future, 3.seconds) match { Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle case h: AkkaProtocolHandle
h.remoteAddress must be === remoteAkkaAddress h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress h.localAddress must be === localAkkaAddress
h.handshakeInfo.uid must be === 33
case _ fail() case _ fail()
} }
@ -260,14 +235,14 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val (failureDetector, registry, _, handle) = collaborators val (failureDetector, registry, _, handle) = collaborators
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = Some("abcde")),
handle, handle,
ActorAssociationEventListener(testActor), ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)),
codec, codec,
failureDetector))) failureDetector)))
reader ! testAssociate(Some("xyzzy")) reader ! testAssociate(uid = 33, Some("xyzzy"))
awaitCond(registry.logSnapshot.exists { awaitCond(registry.logSnapshot.exists {
case DisassociateAttempt(requester, remote) true case DisassociateAttempt(requester, remote) true
@ -279,7 +254,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val (failureDetector, registry, _, handle) = collaborators val (failureDetector, registry, _, handle) = collaborators
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = Some("abcde")),
handle, handle,
ActorAssociationEventListener(testActor), ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)),
@ -287,10 +262,13 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
failureDetector))) failureDetector)))
// Send the correct cookie // Send the correct cookie
reader ! testAssociate(Some("abcde")) reader ! testAssociate(uid = 33, Some("abcde"))
val wrappedHandle = expectMsgPF() { val wrappedHandle = expectMsgPF() {
case InboundAssociation(h) h case InboundAssociation(h: AkkaProtocolHandle)
h.handshakeInfo.uid must be === 33
h.handshakeInfo.cookie must be === Some("abcde")
h
} }
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
@ -308,27 +286,15 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val statusPromise: Promise[AssociationHandle] = Promise() val statusPromise: Promise[AssociationHandle] = Promise()
system.actorOf(Props(new ProtocolStateActor( system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = Some("abcde")),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
transport, transport,
new AkkaProtocolSettings(ConfigFactory.parseString( new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.require-cookie = on").withFallback(conf)),
"""
akka.remote.require-cookie = on
akka.remote.wait-activity-enabled = off
""").withFallback(conf)),
codec, codec,
failureDetector))) failureDetector)))
Await.result(statusPromise.future, 3.seconds) match { awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = Some("abcde")))
case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress
h.localAddress must be === localAkkaAddress
case _ fail()
}
lastActivityIsAssociate(registry, Some("abcde")) must be(true)
} }
"handle explicit disassociate messages" in { "handle explicit disassociate messages" in {
@ -338,14 +304,18 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val statusPromise: Promise[AssociationHandle] = Promise() val statusPromise: Promise[AssociationHandle] = Promise()
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
transport, transport,
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), new AkkaProtocolSettings(conf),
codec, codec,
failureDetector))) failureDetector)))
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
reader ! testAssociate(uid = 33, cookie = None)
val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress h.remoteAddress must be === remoteAkkaAddress
@ -357,8 +327,6 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
lastActivityIsAssociate(registry, None) must be(true)
reader ! testDisassociate reader ! testDisassociate
expectMsg(Disassociated) expectMsg(Disassociated)
@ -371,7 +339,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val statusPromise: Promise[AssociationHandle] = Promise() val statusPromise: Promise[AssociationHandle] = Promise()
val reader = system.actorOf(Props(new ProtocolStateActor( val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
transport, transport,
@ -379,10 +347,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
codec, codec,
failureDetector))) failureDetector)))
awaitCond(lastActivityIsAssociate(registry, None)) awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
// Finish association with a heartbeat -- pushes state out of WaitActivity reader ! testAssociate(uid = 33, cookie = None)
reader ! testHeartbeat
val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle case h: AssociationHandle
@ -406,15 +373,19 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val statusPromise: Promise[AssociationHandle] = Promise() val statusPromise: Promise[AssociationHandle] = Promise()
system.actorOf(Props(new ProtocolStateActor( val stateActor = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
transport, transport,
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), new AkkaProtocolSettings(conf),
codec, codec,
failureDetector))) failureDetector)))
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
stateActor ! testAssociate(uid = 33, cookie = None)
val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress h.remoteAddress must be === remoteAkkaAddress
@ -426,8 +397,6 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
lastActivityIsAssociate(registry, None) must be(true)
//wait for one heartbeat //wait for one heartbeat
awaitCond(lastActivityIsHeartbeat(registry)) awaitCond(lastActivityIsHeartbeat(registry))
@ -437,20 +406,24 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
} }
"handle correctly when the handler is registered only after the association is already closed" in { "handle correctly when the handler is registered only after the association is already closed" in {
val (failureDetector, _, transport, handle) = collaborators val (failureDetector, registry, transport, handle) = collaborators
transport.associateBehavior.pushConstant(handle) transport.associateBehavior.pushConstant(handle)
val statusPromise: Promise[AssociationHandle] = Promise() val statusPromise: Promise[AssociationHandle] = Promise()
val stateActor = system.actorOf(Props(new ProtocolStateActor( val stateActor = system.actorOf(Props(new ProtocolStateActor(
localAddress, HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
remoteAddress, remoteAddress,
statusPromise, statusPromise,
transport, transport,
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remote.wait-activity-enabled = off").withFallback(conf)), new AkkaProtocolSettings(conf),
codec, codec,
failureDetector))) failureDetector)))
awaitCond(lastActivityIsAssociate(registry, uid = 42, cookie = None))
stateActor ! testAssociate(uid = 33, cookie = None)
val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match { val wrappedHandle = Await.result(statusPromise.future, 3.seconds) match {
case h: AssociationHandle case h: AssociationHandle
h.remoteAddress must be === remoteAkkaAddress h.remoteAddress must be === remoteAkkaAddress

View file

@ -80,8 +80,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressATest)) awaitCond(registry.transportsReady(addressATest))
// TestTransport throws IllegalArgumentException when trying to associate with non-existing system // TestTransport throws InvalidAssociationException when trying to associate with non-existing system
intercept[IllegalArgumentException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) } intercept[InvalidAssociationException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) }
} }
"successfully send PDUs" in { "successfully send PDUs" in {

View file

@ -0,0 +1,153 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.testkit.TimingTest
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
import com.typesafe.config.{ Config, ConfigFactory }
import AkkaProtocolStressTest._
import akka.actor._
import scala.concurrent.duration._
import akka.testkit._
import akka.remote.EndpointException
import akka.remote.{ RARP, EndpointException }
import akka.remote.transport.FailureInjectorTransportAdapter.{ One, All, Drop }
import scala.concurrent.Await
import akka.actor.ActorRef
import akka.actor.Actor
import akka.testkit.AkkaSpec
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.ExtendedActorSystem
import akka.actor.RootActorPath
import akka.remote.transport.FailureInjectorTransportAdapter.One
import akka.remote.transport.FailureInjectorTransportAdapter.Drop
import akka.testkit.TestEvent
import akka.testkit.EventFilter
import akka.event.Logging
import akka.dispatch.sysmsg.{ Failed, SystemMessage }
import akka.pattern.pipe
object SystemMessageDeliveryStressTest {
val baseConfig: Config = ConfigFactory parseString ("""
akka {
#loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.retry-gate-closed-for = 0 s
remote.log-remote-lifecycle-events = on
remote.failure-detector {
threshold = 1.0
max-sample-size = 2
min-std-deviation = 1 ms
acceptable-heartbeat-pause = 0.01 s
}
remote.retry-window = 1 s
remote.maximum-retries-in-window = 1000
remote.use-passive-connections = on
remote.netty.tcp {
applied-adapters = ["gremlin"]
port = 0
}
}
""")
class SystemMessageSequenceVerifier(system: ActorSystem, testActor: ActorRef) extends MinimalActorRef {
val provider = RARP(system).provider
val path = provider.tempPath()
RARP(system).provider.registerTempActor(this, path)
override def getParent = provider.tempContainer
override def sendSystemMessage(message: SystemMessage): Unit = {
message match {
case Failed(_, _, seq) testActor ! seq
case _
}
}
}
class SystemMessageSender(val msgCount: Int, val target: ActorRef) extends Actor {
var counter = 0
val targetRef = target.asInstanceOf[InternalActorRef]
override def preStart(): Unit = self ! "sendnext"
override def receive = {
case "sendnext"
targetRef.sendSystemMessage(Failed(null, null, counter))
counter += 1
if (counter < msgCount) self ! "sendnext"
}
}
}
abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(configA)) with ImplicitSender with DefaultTimeout {
import SystemMessageDeliveryStressTest._
val systemB = ActorSystem("systemB", system.settings.config)
val sysMsgVerifier = new SystemMessageSequenceVerifier(system, testActor)
val MsgCount = 100
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val root = RootActorPath(address)
// We test internals here (system message delivery) so we are allowed to cheat
val there = RARP(systemB).provider.resolveActorRef(root / "temp" / sysMsgVerifier.path.name).asInstanceOf[InternalActorRef]
override def atStartup() = {
system.eventStream.publish(TestEvent.Mute(
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*")))
systemB.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*")))
}
"Remoting " + msg must {
"guaranteed delivery and message ordering despite packet loss " taggedAs TimingTest in {
Await.result(RARP(systemB).provider.transport.managementCommand(One(address, Drop(0.3, 0.3))), 3.seconds.dilated)
systemB.actorOf(Props(new SystemMessageSender(MsgCount, there)))
val toSend = (0 until MsgCount).toList
val received = expectMsgAllOf(45.seconds, toSend: _*)
received must be === toSend
}
}
override def beforeTermination() {
system.eventStream.publish(TestEvent.Mute(
EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
systemB.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
}
override def afterTermination(): Unit = systemB.shutdown()
}
class SystemMessageDeliveryDefault extends SystemMessageDeliveryStressTest("retry gate off, passive connections on", "")
class SystemMessageDeliveryRetryGate extends SystemMessageDeliveryStressTest("retry gate on, passive connections on",
"akka.remote.retry-gate-closed-for = 0.5 s")
class SystemMessageDeliveryNoPassive extends SystemMessageDeliveryStressTest("retry gate off, passive connections off",
"akka.remote.use-passive-connections = off")
class SystemMessageDeliveryNoPassiveRetryGate extends SystemMessageDeliveryStressTest("retry gate on, passive connections off",
"""
akka.remote.use-passive-connections = off
akka.remote.retry-gate-closed-for = 0.5 s
""")

View file

@ -56,8 +56,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
// TestTransport throws IllegalArgumentException when trying to associate with non-existing system // TestTransport throws IllegalAssociationException when trying to associate with non-existing system
intercept[IllegalArgumentException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) } intercept[InvalidAssociationException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) }
} }

View file

@ -15,12 +15,11 @@ import akka.remote.EndpointException
object ThrottlerTransportAdapterSpec { object ThrottlerTransportAdapterSpec {
val configA: Config = ConfigFactory parseString (""" val configA: Config = ConfigFactory parseString ("""
akka { akka {
#loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider" actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp.hostname = "localhost" remote.netty.tcp.hostname = "localhost"
remote.retry-gate-closed-for = 0 s remote.retry-gate-closed-for = 0 s
remote.log-remote-lifecycle-events = on remote.log-remote-lifecycle-events = off
remote.netty.tcp.applied-adapters = ["trttl"] remote.netty.tcp.applied-adapters = ["trttl"]
remote.netty.tcp.port = 0 remote.netty.tcp.port = 0