diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 6e96859ecf..f573f98bc4 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -51,6 +51,10 @@ class CodecBenchmark { val systemB = ActorSystem("systemB", system.settings.config) val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val inboundEnvelopePool = new ObjectPool[InboundEnvelope]( + 16, + create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear() + ) val compression = new Compression(system) val headerIn = HeaderBuilder(compression) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) @@ -151,7 +155,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, compression, envelopePool)) + resolveActorRefWithLocalAddress, compression, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map { _ => @@ -162,6 +166,10 @@ class CodecBenchmark { envelope } .via(decoder) + .map { env => + inboundEnvelopePool.release(env) + () + } .runWith(new LatchSink(N, latch))(materializer) if (!latch.await(30, TimeUnit.SECONDS)) @@ -188,12 +196,16 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, compression, envelopePool)) + resolveActorRefWithLocalAddress, compression, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) .via(encoder) .via(decoder) + .map { env => + inboundEnvelopePool.release(env) + () + } .runWith(new LatchSink(N, latch))(materializer) if (!latch.await(30, TimeUnit.SECONDS)) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 1e61bc2468..68d05736bb 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -78,15 +78,82 @@ import scala.collection.JavaConverters._ import akka.stream.ActorMaterializerSettings import scala.annotation.tailrec import akka.util.OptionVal + /** * INTERNAL API */ -private[akka] final case class InboundEnvelope( - recipient: InternalActorRef, - recipientAddress: Address, - message: AnyRef, - senderOption: OptionVal[ActorRef], - originUid: Long) +private[akka] object InboundEnvelope { + def apply( + recipient: InternalActorRef, + recipientAddress: Address, + message: AnyRef, + senderOption: OptionVal[ActorRef], + originUid: Long): InboundEnvelope = { + val env = new ReusableInboundEnvelope + env.init(recipient, recipientAddress, message, senderOption, originUid) + env + } + +} + +/** + * INTERNAL API + */ +private[akka] trait InboundEnvelope { + def recipient: InternalActorRef + def recipientAddress: Address + def message: AnyRef + def senderOption: OptionVal[ActorRef] + def originUid: Long + + def withMessage(message: AnyRef): InboundEnvelope +} + +/** + * INTERNAL API + */ +private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { + private var _recipient: InternalActorRef = null + private var _recipientAddress: Address = null + private var _message: AnyRef = null + private var _senderOption: OptionVal[ActorRef] = OptionVal.None + private var _originUid: Long = 0L + + override def recipient: InternalActorRef = _recipient + override def recipientAddress: Address = _recipientAddress + override def message: AnyRef = _message + override def senderOption: OptionVal[ActorRef] = _senderOption + override def originUid: Long = _originUid + + override def withMessage(message: AnyRef): InboundEnvelope = { + _message = message + this + } + + def clear(): Unit = { + _recipient = null + _recipientAddress = null + _message = null + _senderOption = OptionVal.None + _originUid = 0L + } + + def init( + recipient: InternalActorRef, + recipientAddress: Address, + message: AnyRef, + senderOption: OptionVal[ActorRef], + originUid: Long): Unit = { + _recipient = recipient + _recipientAddress = recipientAddress + _message = message + _senderOption = senderOption + _originUid = originUid + } + + override def toString: String = + s"InboundEnvelope($recipient, $recipientAddress, $message, $senderOption, $originUid)" +} /** * INTERNAL API @@ -316,8 +383,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val maxRestarts = 5 // FIXME config private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) - val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) - val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers) + private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + private val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers) + + private val inboundEnvelopePool = new ObjectPool[InboundEnvelope]( + 16, + create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear()) val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() @@ -532,11 +603,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .run()(materializer) } - aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow) - .toMat(inboundSink)(Keep.right) - .run()(materializer) - attachStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) } @@ -664,8 +730,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def createEncoder(pool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, pool)) + def createEncoder(bufferPool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(localAddress, system, compression, bufferPool)) def encoder: Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool) @@ -675,12 +741,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption) + inboundEnvelopePool.release(m) } - def createDecoder(pool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + def createDecoder(bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = recipient ⇒ provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) - Flow.fromGraph(new Decoder(localAddress, system, resolveActorRefWithLocalAddress, compression, pool)) + Flow.fromGraph(new Decoder(localAddress, system, resolveActorRefWithLocalAddress, compression, bufferPool, + inboundEnvelopePool)) } def decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = createDecoder(envelopePool) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 299cd7e5fd..47df0a7efc 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -15,9 +15,9 @@ import akka.util.OptionVal // TODO: Long UID class Encoder( uniqueLocalAddress: UniqueAddress, - system: ActorSystem, - compressionTable: LiteralCompressionTable, - pool: EnvelopeBufferPool) + system: ActorSystem, + compressionTable: LiteralCompressionTable, + bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { val in: Inlet[Send] = Inlet("Artery.Encoder.in") @@ -41,7 +41,7 @@ class Encoder( override def onPush(): Unit = { val send = grab(in) - val envelope = pool.acquire() + val envelope = bufferPool.acquire() val recipientStr = recipientCache.get(send.recipient) match { case null ⇒ @@ -56,8 +56,8 @@ class Encoder( headerBuilder.recipientActorRef = recipientStr send.senderOption match { - case OptionVal.None => headerBuilder.setNoSender() - case OptionVal.Some(sender) => + case OptionVal.None ⇒ headerBuilder.setNoSender() + case OptionVal.Some(sender) ⇒ val senderStr = senderCache.get(sender) match { case null ⇒ val s = sender.path.toSerializationFormatWithAddress(localAddress) @@ -85,7 +85,7 @@ class Encoder( } catch { case NonFatal(e) ⇒ - pool.release(envelope) + bufferPool.release(envelope) send.message match { case _: SystemMessageEnvelope ⇒ log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName) @@ -105,11 +105,12 @@ class Encoder( } class Decoder( - uniqueLocalAddress: UniqueAddress, - system: ExtendedActorSystem, + uniqueLocalAddress: UniqueAddress, + system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, - compressionTable: LiteralCompressionTable, - pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + compressionTable: LiteralCompressionTable, + bufferPool: EnvelopeBufferPool, + inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) @@ -163,7 +164,8 @@ class Decoder( val deserializedMessage = MessageSerializer.deserializeForArtery( system, serialization, headerBuilder, envelope) - val decoded = InboundEnvelope( + val decoded = inEnvelopePool.acquire() + decoded.asInstanceOf[ReusableInboundEnvelope].init( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, @@ -178,7 +180,7 @@ class Decoder( headerBuilder.serializer, headerBuilder.manifest, e.getMessage) pull(in) } finally { - pool.release(envelope) + bufferPool.release(envelope) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 6d68ee0825..c88d63a43a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -105,7 +105,7 @@ private[akka] class InboundControlJunction // InHandler override def onPush(): Unit = { grab(in) match { - case env @ InboundEnvelope(_, _, _: ControlMessage, _, _) ⇒ + case env: InboundEnvelope if env.message.isInstanceOf[ControlMessage] ⇒ observers.foreach(_.notify(env)) pull(in) case env ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 84dd9871f1..93d8fad3ec 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -166,23 +166,25 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt if (inControlStream) setHandler(in, new InHandler { override def onPush(): Unit = { - grab(in) match { - case InboundEnvelope(_, _, HandshakeReq(from), _, _) ⇒ - onHandshakeReq(from) - case InboundEnvelope(_, _, HandshakeRsp(from), _, _) ⇒ + val env = grab(in) + env.message match { + case HandshakeReq(from) ⇒ onHandshakeReq(from) + case HandshakeRsp(from) ⇒ inboundContext.completeHandshake(from) pull(in) - case other ⇒ onMessage(other) + case _ ⇒ + onMessage(env) } } }) else setHandler(in, new InHandler { override def onPush(): Unit = { - grab(in) match { - case InboundEnvelope(_, _, HandshakeReq(from), _, _) ⇒ - onHandshakeReq(from) - case other ⇒ onMessage(other) + val env = grab(in) + env.message match { + case HandshakeReq(from) ⇒ onHandshakeReq(from) + case _ ⇒ + onMessage(env) } } }) diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala index 1a4454c58b..9727ecd3f3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala @@ -29,10 +29,10 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten override def onPush(): Unit = { val env = grab(in) inboundContext.association(env.originUid) match { - case OptionVal.None => + case OptionVal.None ⇒ // unknown, handshake not completed push(out, env) - case OptionVal.Some(association) => + case OptionVal.Some(association) ⇒ if (association.associationState.isQuarantined(env.originUid)) { inboundContext.sendControl( association.remoteAddress, diff --git a/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala b/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala new file mode 100644 index 0000000000..0020d52481 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import org.agrona.concurrent.ManyToManyConcurrentArrayQueue + +/** + * INTERNAL API + */ +private[remote] class ObjectPool[A <: AnyRef](capacity: Int, create: () ⇒ A, clear: A ⇒ Unit) { + private val pool = new ManyToManyConcurrentArrayQueue[A](capacity) + + def acquire(): A = { + val obj = pool.poll() + if (obj eq null) create() + else obj + } + + def release(obj: A) = { + clear(obj) + (!pool.offer(obj)) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 4f1766821e..a6ba8b25fe 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -226,12 +226,13 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G // InHandler override def onPush(): Unit = { - grab(in) match { - case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _, _) ⇒ + val env = grab(in) + env.message match { + case sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo) ⇒ if (n == seqNo) { inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) seqNo += 1 - val unwrapped = env.copy(message = sysEnv.message) + val unwrapped = env.withMessage(sysEnv.message) push(out, unwrapped) } else if (n < seqNo) { inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) @@ -240,11 +241,10 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G inboundContext.sendControl(ackReplyTo.address, Nack(seqNo - 1, localAddress)) pull(in) } - case env ⇒ + case _ ⇒ // messages that don't need acking push(out, env) } - } // OutHandler diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala index ec70bb69cf..07b369950f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala @@ -7,7 +7,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ - import akka.Done import akka.actor.Address import akka.remote.EndpointManager.Send @@ -25,6 +24,7 @@ import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic +import akka.util.OptionVal /** * INTERNAL API @@ -158,10 +158,10 @@ private[remote] class InboundTestStage(inboundContext: InboundContext) override def onPush(): Unit = { val env = grab(in) inboundContext.association(env.originUid) match { - case null ⇒ + case OptionVal.None ⇒ // unknown, handshake not completed push(out, env) - case association ⇒ + case OptionVal.Some(association) ⇒ if (blackhole(association.remoteAddress)) { log.debug( "dropping inbound message [{}] from [{}] with UID [{}] because of blackhole", diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index b0c35bc1f4..4777778a97 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -36,11 +36,11 @@ private[remote] object AkkaPduCodec { final case class Payload(bytes: ByteString) extends AkkaPdu final case class Message( - recipient: InternalActorRef, - recipientAddress: Address, + recipient: InternalActorRef, + recipientAddress: Address, serializedMessage: SerializedMessage, - senderOption: OptionVal[ActorRef], - seqOption: Option[SeqNo]) extends HasSequenceNumber { + senderOption: OptionVal[ActorRef], + seqOption: Option[SeqNo]) extends HasSequenceNumber { def reliableDeliveryEnabled = seqOption.isDefined @@ -95,12 +95,12 @@ private[remote] trait AkkaPduCodec { def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message]) def constructMessage( - localAddress: Address, - recipient: ActorRef, + localAddress: Address, + recipient: ActorRef, serializedMessage: SerializedMessage, - senderOption: OptionVal[ActorRef], - seqOption: Option[SeqNo] = None, - ackOption: Option[Ack] = None): ByteString + senderOption: OptionVal[ActorRef], + seqOption: Option[SeqNo] = None, + ackOption: Option[Ack] = None): ByteString def constructPureAck(ack: Ack): ByteString } @@ -119,12 +119,12 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { } override def constructMessage( - localAddress: Address, - recipient: ActorRef, + localAddress: Address, + recipient: ActorRef, serializedMessage: SerializedMessage, - senderOption: OptionVal[ActorRef], - seqOption: Option[SeqNo] = None, - ackOption: Option[Ack] = None): ByteString = { + senderOption: OptionVal[ActorRef], + seqOption: Option[SeqNo] = None, + ackOption: Option[Ack] = None): ByteString = { val ackAndEnvelopeBuilder = AckAndEnvelopeContainer.newBuilder @@ -132,8 +132,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) senderOption match { - case OptionVal.Some(sender) => envelopeBuilder.setSender(serializeActorRef(localAddress, sender)) - case OptionVal.None => + case OptionVal.Some(sender) ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, sender)) + case OptionVal.None ⇒ } seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } @@ -181,8 +181,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { } override def decodeMessage( - raw: ByteString, - provider: RemoteActorRefProvider, + raw: ByteString, + provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message]) = { val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray) @@ -231,7 +231,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { Address(encodedAddress.getProtocol, encodedAddress.getSystem, encodedAddress.getHostname, encodedAddress.getPort) private def constructControlMessagePdu( - code: WireFormats.CommandType, + code: WireFormats.CommandType, handshakeInfo: Option[AkkaHandshakeInfo.Builder]): ByteString = { val controlMessageBuilder = AkkaControlMessage.newBuilder() diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index efa0a10b54..2542833025 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -44,7 +44,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender { val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid)) .viaMat(new InboundControlJunction)(Keep.both) - .map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg } + .map { case env: InboundEnvelope ⇒ env.message } .toMat(TestSink.probe[Any])(Keep.both) .run() diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index 5aa42f93ce..009ed1b34a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -43,7 +43,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { TestSource.probe[AnyRef] .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid)) .via(new InboundHandshake(inboundContext, inControlStream = true)) - .map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg } + .map { case env: InboundEnvelope ⇒ env.message } .toMat(TestSink.probe[Any])(Keep.both) .run() } diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 0ec5454ab8..9d7586c07d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -17,6 +17,7 @@ import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.util.OptionVal +import akka.actor.InternalActorRef private[akka] class TestInboundContext( override val localAddress: UniqueAddress,