format with new Scalariform version
* and fix mima issue
This commit is contained in:
parent
839ec5f167
commit
3465a221f0
24 changed files with 114 additions and 91 deletions
|
|
@ -139,8 +139,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
||||||
else {
|
else {
|
||||||
val cache = manifestCache.get
|
val cache = manifestCache.get
|
||||||
cache.get(manifest) match {
|
cache.get(manifest) match {
|
||||||
case Some(cachedClassManifest) => s1.fromBinary(bytes, cachedClassManifest)
|
case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest)
|
||||||
case None =>
|
case None ⇒
|
||||||
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
|
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
|
||||||
case Success(classManifest) ⇒
|
case Success(classManifest) ⇒
|
||||||
val classManifestOption: Option[Class[_]] = Some(classManifest)
|
val classManifestOption: Option[Class[_]] = Some(classManifest)
|
||||||
|
|
@ -167,7 +167,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
||||||
"akka.actor.serializers is not in synch between the two systems.")
|
"akka.actor.serializers is not in synch between the two systems.")
|
||||||
}
|
}
|
||||||
serializer match {
|
serializer match {
|
||||||
case ser: ByteBufferSerializer =>
|
case ser: ByteBufferSerializer ⇒
|
||||||
ser.fromBinary(buf, manifest)
|
ser.fromBinary(buf, manifest)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
val bytes = Array.ofDim[Byte](buf.remaining())
|
val bytes = Array.ofDim[Byte](buf.remaining())
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,8 @@ class CodecBenchmark {
|
||||||
remote.artery.hostname = localhost
|
remote.artery.hostname = localhost
|
||||||
remote.artery.port = 0
|
remote.artery.port = 0
|
||||||
}
|
}
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
implicit val system = ActorSystem("CodecBenchmark", config)
|
implicit val system = ActorSystem("CodecBenchmark", config)
|
||||||
val systemB = ActorSystem("systemB", system.settings.config)
|
val systemB = ActorSystem("systemB", system.settings.config)
|
||||||
|
|
@ -56,8 +57,10 @@ class CodecBenchmark {
|
||||||
val headerIn = HeaderBuilder(compression)
|
val headerIn = HeaderBuilder(compression)
|
||||||
val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN)
|
val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN)
|
||||||
|
|
||||||
val uniqueLocalAddress = UniqueAddress(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
|
val uniqueLocalAddress = UniqueAddress(
|
||||||
AddressUidExtension(system).addressUid)
|
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
|
||||||
|
AddressUidExtension(system).addressUid
|
||||||
|
)
|
||||||
val payload = Array.ofDim[Byte](1000)
|
val payload = Array.ofDim[Byte](1000)
|
||||||
|
|
||||||
private var materializer: ActorMaterializer = _
|
private var materializer: ActorMaterializer = _
|
||||||
|
|
|
||||||
|
|
@ -65,11 +65,13 @@ abstract class QuickRestartSpec
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
restartingSystem =
|
restartingSystem =
|
||||||
if (restartingSystem == null)
|
if (restartingSystem == null)
|
||||||
ActorSystem(system.name,
|
ActorSystem(
|
||||||
|
system.name,
|
||||||
ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]")
|
ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
else
|
else
|
||||||
ActorSystem(system.name,
|
ActorSystem(
|
||||||
|
system.name,
|
||||||
ConfigFactory.parseString(s"""
|
ConfigFactory.parseString(s"""
|
||||||
akka.cluster.roles = [round-$n]
|
akka.cluster.roles = [round-$n]
|
||||||
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port
|
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port
|
||||||
|
|
|
||||||
|
|
@ -195,11 +195,11 @@ object MultiNodeSpec {
|
||||||
require(selfIndex >= 0 && selfIndex < maxNodes, "multinode.index is out of bounds: " + selfIndex)
|
require(selfIndex >= 0 && selfIndex < maxNodes, "multinode.index is out of bounds: " + selfIndex)
|
||||||
|
|
||||||
private[testkit] val nodeConfig = mapToConfig(Map(
|
private[testkit] val nodeConfig = mapToConfig(Map(
|
||||||
"akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
|
"akka.actor.provider" → "akka.remote.RemoteActorRefProvider",
|
||||||
"akka.remote.artery.hostname" -> selfName,
|
"akka.remote.artery.hostname" → selfName,
|
||||||
"akka.remote.netty.tcp.hostname" -> selfName,
|
"akka.remote.netty.tcp.hostname" → selfName,
|
||||||
"akka.remote.netty.tcp.port" -> selfPort,
|
"akka.remote.netty.tcp.port" → selfPort,
|
||||||
"akka.remote.artery.port" -> selfPort))
|
"akka.remote.artery.port" → selfPort))
|
||||||
|
|
||||||
private[testkit] val baseConfig: Config = ConfigFactory.parseString("""
|
private[testkit] val baseConfig: Config = ConfigFactory.parseString("""
|
||||||
akka {
|
akka {
|
||||||
|
|
|
||||||
|
|
@ -57,10 +57,10 @@ object AeronStreamLatencySpec extends MultiNodeConfig {
|
||||||
""")))
|
""")))
|
||||||
|
|
||||||
final case class TestSettings(
|
final case class TestSettings(
|
||||||
testName: String,
|
testName: String,
|
||||||
messageRate: Int, // msg/s
|
messageRate: Int, // msg/s
|
||||||
payloadSize: Int,
|
payloadSize: Int,
|
||||||
repeat: Int)
|
repeat: Int)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,9 +50,9 @@ object AeronStreamMaxThroughputSpec extends MultiNodeConfig {
|
||||||
""")))
|
""")))
|
||||||
|
|
||||||
final case class TestSettings(
|
final case class TestSettings(
|
||||||
testName: String,
|
testName: String,
|
||||||
totalMessages: Long,
|
totalMessages: Long,
|
||||||
payloadSize: Int)
|
payloadSize: Int)
|
||||||
|
|
||||||
def iterate(start: Long, end: Long): Iterator[Long] = new AbstractIterator[Long] {
|
def iterate(start: Long, end: Long): Iterator[Long] = new AbstractIterator[Long] {
|
||||||
private[this] var first = true
|
private[this] var first = true
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ object HandshakeRestartReceiverSpec extends MultiNodeConfig {
|
||||||
class Subject extends Actor {
|
class Subject extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "shutdown" ⇒ context.system.terminate()
|
case "shutdown" ⇒ context.system.terminate()
|
||||||
case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self)
|
case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -122,10 +122,10 @@ object LatencySpec extends MultiNodeConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class TestSettings(
|
final case class TestSettings(
|
||||||
testName: String,
|
testName: String,
|
||||||
messageRate: Int, // msg/s
|
messageRate: Int, // msg/s
|
||||||
payloadSize: Int,
|
payloadSize: Int,
|
||||||
repeat: Int)
|
repeat: Int)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -147,10 +147,10 @@ object MaxThroughputSpec extends MultiNodeConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class TestSettings(
|
final case class TestSettings(
|
||||||
testName: String,
|
testName: String,
|
||||||
totalMessages: Long,
|
totalMessages: Long,
|
||||||
burstSize: Int,
|
burstSize: Int,
|
||||||
payloadSize: Int,
|
payloadSize: Int,
|
||||||
senderReceiverPairs: Int)
|
senderReceiverPairs: Int)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -240,7 +240,8 @@ abstract class MaxThroughputSpec
|
||||||
val senders = for (n ← 1 to senderReceiverPairs) yield {
|
val senders = for (n ← 1 to senderReceiverPairs) yield {
|
||||||
val receiver = identifyReceiver(receiverName + n)
|
val receiver = identifyReceiver(receiverName + n)
|
||||||
val plotProbe = TestProbe()
|
val plotProbe = TestProbe()
|
||||||
val snd = system.actorOf(senderProps(receiver, testSettings, plotProbe.ref),
|
val snd = system.actorOf(
|
||||||
|
senderProps(receiver, testSettings, plotProbe.ref),
|
||||||
testName + "-snd" + n)
|
testName + "-snd" + n)
|
||||||
val terminationProbe = TestProbe()
|
val terminationProbe = TestProbe()
|
||||||
terminationProbe.watch(snd)
|
terminationProbe.watch(snd)
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.remote.artery
|
||||||
final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) {
|
final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) {
|
||||||
|
|
||||||
def add(key: String, value: Number): PlotResult =
|
def add(key: String, value: Number): PlotResult =
|
||||||
copy(values = values :+ (key -> value))
|
copy(values = values :+ (key → value))
|
||||||
|
|
||||||
def addAll(p: PlotResult): PlotResult =
|
def addAll(p: PlotResult): PlotResult =
|
||||||
copy(values ++ p.values)
|
copy(values ++ p.values)
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig {
|
||||||
class Subject extends Actor {
|
class Subject extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "shutdown" ⇒ context.system.terminate()
|
case "shutdown" ⇒ context.system.terminate()
|
||||||
case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self)
|
case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,8 @@ package akka.remote.artery
|
||||||
import java.util.concurrent.TimeUnit.SECONDS
|
import java.util.concurrent.TimeUnit.SECONDS
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
|
|
||||||
class TestRateReporter(name: String) extends RateReporter(SECONDS.toNanos(1),
|
class TestRateReporter(name: String) extends RateReporter(
|
||||||
|
SECONDS.toNanos(1),
|
||||||
new RateReporter.Reporter {
|
new RateReporter.Reporter {
|
||||||
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
|
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
|
||||||
println(name +
|
println(name +
|
||||||
|
|
|
||||||
|
|
@ -71,11 +71,11 @@ import akka.stream.ActorMaterializerSettings
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final case class InboundEnvelope(
|
private[akka] final case class InboundEnvelope(
|
||||||
recipient: InternalActorRef,
|
recipient: InternalActorRef,
|
||||||
recipientAddress: Address,
|
recipientAddress: Address,
|
||||||
message: AnyRef,
|
message: AnyRef,
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
originUid: Long)
|
originUid: Long)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -123,9 +123,9 @@ private[akka] object AssociationState {
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final class AssociationState(
|
private[akka] final class AssociationState(
|
||||||
val incarnation: Int,
|
val incarnation: Int,
|
||||||
val uniqueRemoteAddressPromise: Promise[UniqueAddress],
|
val uniqueRemoteAddressPromise: Promise[UniqueAddress],
|
||||||
val quarantined: Set[Long]) {
|
val quarantined: Set[Long]) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Full outbound address with UID for this association.
|
* Full outbound address with UID for this association.
|
||||||
|
|
@ -239,7 +239,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
||||||
private val systemMessageResendInterval: FiniteDuration = 1.second
|
private val systemMessageResendInterval: FiniteDuration = 1.second
|
||||||
private val handshakeRetryInterval: FiniteDuration = 1.second
|
private val handshakeRetryInterval: FiniteDuration = 1.second
|
||||||
private val handshakeTimeout: FiniteDuration =
|
private val handshakeTimeout: FiniteDuration =
|
||||||
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero,
|
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(
|
||||||
|
_ > Duration.Zero,
|
||||||
"handshake-timeout must be > 0")
|
"handshake-timeout must be > 0")
|
||||||
private val injectHandshakeInterval: FiniteDuration = 1.second
|
private val injectHandshakeInterval: FiniteDuration = 1.second
|
||||||
private val giveUpSendAfter: FiniteDuration = 60.seconds
|
private val giveUpSendAfter: FiniteDuration = 60.seconds
|
||||||
|
|
|
||||||
|
|
@ -44,11 +44,11 @@ import akka.util.{ Unsafe, WildcardTree }
|
||||||
* remote address.
|
* remote address.
|
||||||
*/
|
*/
|
||||||
private[akka] class Association(
|
private[akka] class Association(
|
||||||
val transport: ArteryTransport,
|
val transport: ArteryTransport,
|
||||||
val materializer: Materializer,
|
val materializer: Materializer,
|
||||||
override val remoteAddress: Address,
|
override val remoteAddress: Address,
|
||||||
override val controlSubject: ControlMessageSubject,
|
override val controlSubject: ControlMessageSubject,
|
||||||
largeMessageDestinations: WildcardTree[NotUsed])
|
largeMessageDestinations: WildcardTree[NotUsed])
|
||||||
extends AbstractAssociation with OutboundContext {
|
extends AbstractAssociation with OutboundContext {
|
||||||
|
|
||||||
private val log = Logging(transport.system, getClass.getName)
|
private val log = Logging(transport.system, getClass.getName)
|
||||||
|
|
@ -103,7 +103,8 @@ private[akka] class Association(
|
||||||
Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]
|
Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]
|
||||||
|
|
||||||
def completeHandshake(peer: UniqueAddress): Unit = {
|
def completeHandshake(peer: UniqueAddress): Unit = {
|
||||||
require(remoteAddress == peer.address,
|
require(
|
||||||
|
remoteAddress == peer.address,
|
||||||
s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}")
|
s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}")
|
||||||
val current = associationState
|
val current = associationState
|
||||||
current.uniqueRemoteAddressPromise.trySuccess(peer)
|
current.uniqueRemoteAddressPromise.trySuccess(peer)
|
||||||
|
|
@ -114,7 +115,8 @@ private[akka] class Association(
|
||||||
if (swapState(current, newState)) {
|
if (swapState(current, newState)) {
|
||||||
current.uniqueRemoteAddressValue() match {
|
current.uniqueRemoteAddressValue() match {
|
||||||
case Some(Success(old)) ⇒
|
case Some(Success(old)) ⇒
|
||||||
log.debug("Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
|
log.debug(
|
||||||
|
"Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
|
||||||
newState.incarnation, peer.address, peer.uid, old.uid)
|
newState.incarnation, peer.address, peer.uid, old.uid)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
// Failed, nothing to do
|
// Failed, nothing to do
|
||||||
|
|
@ -190,7 +192,8 @@ private[akka] class Association(
|
||||||
val newState = current.newQuarantined()
|
val newState = current.newQuarantined()
|
||||||
if (swapState(current, newState)) {
|
if (swapState(current, newState)) {
|
||||||
// quarantine state change was performed
|
// quarantine state change was performed
|
||||||
log.warning("Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}",
|
log.warning(
|
||||||
|
"Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}",
|
||||||
remoteAddress, u, reason)
|
remoteAddress, u, reason)
|
||||||
// end delivery of system messages to that incarnation after this point
|
// end delivery of system messages to that incarnation after this point
|
||||||
send(ClearSystemMessageDelivery, None, dummyRecipient)
|
send(ClearSystemMessageDelivery, None, dummyRecipient)
|
||||||
|
|
@ -200,10 +203,12 @@ private[akka] class Association(
|
||||||
quarantine(reason, uid) // recursive
|
quarantine(reason, uid) // recursive
|
||||||
}
|
}
|
||||||
case Some(Success(peer)) ⇒
|
case Some(Success(peer)) ⇒
|
||||||
log.debug("Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
|
log.debug(
|
||||||
|
"Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
|
||||||
remoteAddress, u, peer.uid, reason)
|
remoteAddress, u, peer.uid, reason)
|
||||||
case None ⇒
|
case None ⇒
|
||||||
log.debug("Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
|
log.debug(
|
||||||
|
"Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
|
||||||
remoteAddress, reason)
|
remoteAddress, reason)
|
||||||
}
|
}
|
||||||
case None ⇒
|
case None ⇒
|
||||||
|
|
|
||||||
|
|
@ -15,9 +15,9 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||||
// TODO: Long UID
|
// TODO: Long UID
|
||||||
class Encoder(
|
class Encoder(
|
||||||
uniqueLocalAddress: UniqueAddress,
|
uniqueLocalAddress: UniqueAddress,
|
||||||
system: ActorSystem,
|
system: ActorSystem,
|
||||||
compressionTable: LiteralCompressionTable,
|
compressionTable: LiteralCompressionTable,
|
||||||
pool: EnvelopeBufferPool)
|
pool: EnvelopeBufferPool)
|
||||||
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
|
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
|
||||||
|
|
||||||
val in: Inlet[Send] = Inlet("Artery.Encoder.in")
|
val in: Inlet[Send] = Inlet("Artery.Encoder.in")
|
||||||
|
|
@ -108,11 +108,11 @@ class Encoder(
|
||||||
}
|
}
|
||||||
|
|
||||||
class Decoder(
|
class Decoder(
|
||||||
uniqueLocalAddress: UniqueAddress,
|
uniqueLocalAddress: UniqueAddress,
|
||||||
system: ExtendedActorSystem,
|
system: ExtendedActorSystem,
|
||||||
resolveActorRefWithLocalAddress: String ⇒ InternalActorRef,
|
resolveActorRefWithLocalAddress: String ⇒ InternalActorRef,
|
||||||
compressionTable: LiteralCompressionTable,
|
compressionTable: LiteralCompressionTable,
|
||||||
pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
|
pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
|
||||||
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
|
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
|
||||||
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
|
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
|
||||||
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
|
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
|
||||||
|
|
@ -172,7 +172,8 @@ class Decoder(
|
||||||
push(out, decoded)
|
push(out, decoded)
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
log.warning("Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
|
log.warning(
|
||||||
|
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
|
||||||
headerBuilder.serializer, headerBuilder.manifest, e.getMessage)
|
headerBuilder.serializer, headerBuilder.manifest, e.getMessage)
|
||||||
pull(in)
|
pull(in)
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
||||||
|
|
@ -198,14 +198,16 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
|
||||||
push(out, env)
|
push(out, env)
|
||||||
else {
|
else {
|
||||||
// FIXME remove, only debug
|
// FIXME remove, only debug
|
||||||
log.warning(s"Dropping message [{}] from unknown system with UID [{}]. " +
|
log.warning(
|
||||||
"This system with UID [{}] was probably restarted. " +
|
s"Dropping message [{}] from unknown system with UID [{}]. " +
|
||||||
"Messages will be accepted when new handshake has been completed.",
|
|
||||||
env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
|
|
||||||
if (log.isDebugEnabled)
|
|
||||||
log.debug(s"Dropping message [{}] from unknown system with UID [{}]. " +
|
|
||||||
"This system with UID [{}] was probably restarted. " +
|
"This system with UID [{}] was probably restarted. " +
|
||||||
"Messages will be accepted when new handshake has been completed.",
|
"Messages will be accepted when new handshake has been completed.",
|
||||||
|
env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
|
||||||
|
if (log.isDebugEnabled)
|
||||||
|
log.debug(
|
||||||
|
s"Dropping message [{}] from unknown system with UID [{}]. " +
|
||||||
|
"This system with UID [{}] was probably restarted. " +
|
||||||
|
"Messages will be accepted when new handshake has been completed.",
|
||||||
env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
|
env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
|
||||||
pull(in)
|
pull(in)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,8 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten
|
||||||
push(out, env)
|
push(out, env)
|
||||||
case association ⇒
|
case association ⇒
|
||||||
if (association.associationState.isQuarantined(env.originUid)) {
|
if (association.associationState.isQuarantined(env.originUid)) {
|
||||||
inboundContext.sendControl(association.remoteAddress,
|
inboundContext.sendControl(
|
||||||
|
association.remoteAddress,
|
||||||
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
|
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
|
||||||
pull(in)
|
pull(in)
|
||||||
} else
|
} else
|
||||||
|
|
|
||||||
|
|
@ -21,16 +21,17 @@ import akka.remote.RemoteRef
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class MessageDispatcher(
|
private[akka] class MessageDispatcher(
|
||||||
system: ExtendedActorSystem,
|
system: ExtendedActorSystem,
|
||||||
provider: RemoteActorRefProvider) {
|
provider: RemoteActorRefProvider) {
|
||||||
|
|
||||||
private val remoteDaemon = provider.remoteDaemon
|
private val remoteDaemon = provider.remoteDaemon
|
||||||
private val log = Logging(system, getClass.getName)
|
private val log = Logging(system, getClass.getName)
|
||||||
|
|
||||||
def dispatch(recipient: InternalActorRef,
|
def dispatch(
|
||||||
recipientAddress: Address,
|
recipient: InternalActorRef,
|
||||||
message: AnyRef,
|
recipientAddress: Address,
|
||||||
senderOption: Option[ActorRef]): Unit = {
|
message: AnyRef,
|
||||||
|
senderOption: Option[ActorRef]): Unit = {
|
||||||
|
|
||||||
import provider.remoteSettings._
|
import provider.remoteSettings._
|
||||||
|
|
||||||
|
|
@ -54,8 +55,9 @@ private[akka] class MessageDispatcher(
|
||||||
case sel: ActorSelectionMessage ⇒
|
case sel: ActorSelectionMessage ⇒
|
||||||
if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
|
if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
|
||||||
sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
|
sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
|
||||||
log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " +
|
log.debug(
|
||||||
"allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
|
"operating in UntrustedMode, dropping inbound actor selection to [{}], " +
|
||||||
|
"allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
|
||||||
sel.elements.mkString("/", "/", ""))
|
sel.elements.mkString("/", "/", ""))
|
||||||
else
|
else
|
||||||
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
|
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
|
||||||
|
|
@ -72,10 +74,12 @@ private[akka] class MessageDispatcher(
|
||||||
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
|
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
|
||||||
r.!(message)(sender)
|
r.!(message)(sender)
|
||||||
else
|
else
|
||||||
log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
log.error(
|
||||||
|
"dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
||||||
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
||||||
|
|
||||||
case r ⇒ log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
case r ⇒ log.error(
|
||||||
|
"dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
||||||
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,8 +44,8 @@ private[akka] object SystemMessageDelivery {
|
||||||
*/
|
*/
|
||||||
private[akka] class SystemMessageDelivery(
|
private[akka] class SystemMessageDelivery(
|
||||||
outboundContext: OutboundContext,
|
outboundContext: OutboundContext,
|
||||||
resendInterval: FiniteDuration,
|
resendInterval: FiniteDuration,
|
||||||
maxBufferSize: Int)
|
maxBufferSize: Int)
|
||||||
extends GraphStage[FlowShape[Send, Send]] {
|
extends GraphStage[FlowShape[Send, Send]] {
|
||||||
|
|
||||||
import SystemMessageDelivery._
|
import SystemMessageDelivery._
|
||||||
|
|
|
||||||
|
|
@ -9,19 +9,19 @@ class EnvelopeBufferSpec extends AkkaSpec {
|
||||||
|
|
||||||
object TestCompressor extends LiteralCompressionTable {
|
object TestCompressor extends LiteralCompressionTable {
|
||||||
val refToIdx = Map(
|
val refToIdx = Map(
|
||||||
"compressable0" -> 0,
|
"compressable0" → 0,
|
||||||
"compressable1" -> 1,
|
"compressable1" → 1,
|
||||||
"reallylongcompressablestring" -> 2)
|
"reallylongcompressablestring" → 2)
|
||||||
val idxToRef = refToIdx.map(_.swap)
|
val idxToRef = refToIdx.map(_.swap)
|
||||||
|
|
||||||
val serializerToIdx = Map(
|
val serializerToIdx = Map(
|
||||||
"serializer0" -> 0,
|
"serializer0" → 0,
|
||||||
"serializer1" -> 1)
|
"serializer1" → 1)
|
||||||
val idxToSer = serializerToIdx.map(_.swap)
|
val idxToSer = serializerToIdx.map(_.swap)
|
||||||
|
|
||||||
val manifestToIdx = Map(
|
val manifestToIdx = Map(
|
||||||
"manifest0" -> 0,
|
"manifest0" → 0,
|
||||||
"manifest1" -> 1)
|
"manifest1" → 1)
|
||||||
val idxToManifest = manifestToIdx.map(_.swap)
|
val idxToManifest = manifestToIdx.map(_.swap)
|
||||||
|
|
||||||
override def compressActorRef(ref: String): Int = refToIdx.getOrElse(ref, -1)
|
override def compressActorRef(ref: String): Int = refToIdx.getOrElse(ref, -1)
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
||||||
|
|
||||||
private def setupStream(
|
private def setupStream(
|
||||||
outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds,
|
outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds,
|
||||||
retryInterval: FiniteDuration = 10.seconds,
|
retryInterval: FiniteDuration = 10.seconds,
|
||||||
injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
|
injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
|
||||||
|
|
||||||
val destination = null.asInstanceOf[RemoteActorRef] // not used
|
val destination = null.asInstanceOf[RemoteActorRef] // not used
|
||||||
|
|
|
||||||
|
|
@ -75,8 +75,8 @@ class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) wit
|
||||||
|
|
||||||
EventFilter.warning(
|
EventFilter.warning(
|
||||||
start = "Failed to deserialize message with serializer id [4]", occurrences = 1).intercept {
|
start = "Failed to deserialize message with serializer id [4]", occurrences = 1).intercept {
|
||||||
remoteRef ! "boom".getBytes("utf-8")
|
remoteRef ! "boom".getBytes("utf-8")
|
||||||
}(systemB)
|
}(systemB)
|
||||||
|
|
||||||
remoteRef ! "ping2"
|
remoteRef ! "ping2"
|
||||||
expectMsg("ping2")
|
expectMsg("ping2")
|
||||||
|
|
|
||||||
|
|
@ -21,9 +21,9 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
||||||
|
|
||||||
private[akka] class TestInboundContext(
|
private[akka] class TestInboundContext(
|
||||||
override val localAddress: UniqueAddress,
|
override val localAddress: UniqueAddress,
|
||||||
val controlSubject: TestControlMessageSubject = new TestControlMessageSubject,
|
val controlSubject: TestControlMessageSubject = new TestControlMessageSubject,
|
||||||
val controlProbe: Option[ActorRef] = None,
|
val controlProbe: Option[ActorRef] = None,
|
||||||
val replyDropRate: Double = 0.0) extends InboundContext {
|
val replyDropRate: Double = 0.0) extends InboundContext {
|
||||||
|
|
||||||
private val associationsByAddress = new ConcurrentHashMap[Address, OutboundContext]()
|
private val associationsByAddress = new ConcurrentHashMap[Address, OutboundContext]()
|
||||||
private val associationsByUid = new ConcurrentHashMap[Long, OutboundContext]()
|
private val associationsByUid = new ConcurrentHashMap[Long, OutboundContext]()
|
||||||
|
|
@ -58,10 +58,10 @@ private[akka] class TestInboundContext(
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] class TestOutboundContext(
|
private[akka] class TestOutboundContext(
|
||||||
override val localAddress: UniqueAddress,
|
override val localAddress: UniqueAddress,
|
||||||
override val remoteAddress: Address,
|
override val remoteAddress: Address,
|
||||||
override val controlSubject: TestControlMessageSubject,
|
override val controlSubject: TestControlMessageSubject,
|
||||||
val controlProbe: Option[ActorRef] = None) extends OutboundContext {
|
val controlProbe: Option[ActorRef] = None) extends OutboundContext {
|
||||||
|
|
||||||
// access to this is synchronized (it's a test utility)
|
// access to this is synchronized (it's a test utility)
|
||||||
private var _associationState = AssociationState()
|
private var _associationState = AssociationState()
|
||||||
|
|
@ -117,8 +117,8 @@ private[akka] class TestControlMessageSubject extends ControlMessageSubject {
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] class ManualReplyInboundContext(
|
private[akka] class ManualReplyInboundContext(
|
||||||
replyProbe: ActorRef,
|
replyProbe: ActorRef,
|
||||||
localAddress: UniqueAddress,
|
localAddress: UniqueAddress,
|
||||||
controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) {
|
controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) {
|
||||||
|
|
||||||
private var lastReply: Option[(Address, ControlMessage)] = None
|
private var lastReply: Option[(Address, ControlMessage)] = None
|
||||||
|
|
|
||||||
|
|
@ -747,9 +747,6 @@ object MiMa extends AutoPlugin {
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.DefaultSSLContextCreation.validateAndWarnAboutLooseSettings")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.DefaultSSLContextCreation.validateAndWarnAboutLooseSettings")
|
||||||
),
|
),
|
||||||
"2.4.4" -> Seq(
|
"2.4.4" -> Seq(
|
||||||
// Remove useUntrustedMode which is an internal API and not used anywhere anymore
|
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"),
|
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"),
|
|
||||||
|
|
||||||
// #20080, #20081 remove race condition on HTTP client
|
// #20080, #20081 remove race condition on HTTP client
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"),
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"),
|
||||||
|
|
@ -851,6 +848,11 @@ object MiMa extends AutoPlugin {
|
||||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.client.ClusterClient.initialContactsSel")
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.client.ClusterClient.initialContactsSel")
|
||||||
),
|
),
|
||||||
"2.4.6" -> Seq(
|
"2.4.6" -> Seq(
|
||||||
|
|
||||||
|
// Remove useUntrustedMode which is an internal API and not used anywhere anymore
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"),
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"),
|
||||||
|
|
||||||
// internal api
|
// internal api
|
||||||
FilterAnyProblemStartingWith("akka.stream.impl"),
|
FilterAnyProblemStartingWith("akka.stream.impl"),
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue