From 9a7d79c88276cf98aa494e1ae42e9336ad34c68e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 15:43:08 +0200 Subject: [PATCH] size of outboundEnvelopePool --- .../akka/remote/artery/ArteryTransport.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index d9f6bc05f1..b257f38da0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -128,9 +128,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -230,7 +230,7 @@ private[akka] trait OutboundContext { */ private[remote] object FlushOnShutdown { def props(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]): Props = { + inboundContext: InboundContext, associations: Set[Association]): Props = { require(associations.nonEmpty) Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) } @@ -242,7 +242,7 @@ private[remote] object FlushOnShutdown { * INTERNAL API */ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]) extends Actor { + inboundContext: InboundContext, associations: Set[Association]) extends Actor { var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) @@ -343,9 +343,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.MaximumPooledBuffers) private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) - // FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity - // times a factor (for reasonable number of outbound streams) - private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 3072 * 2) + // The outboundEnvelopePool is shared among all outbound associations + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = + settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3) val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() match { case None ⇒ (None, None, None) @@ -853,7 +853,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool) private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { + bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { outboundLane(outboundContext, bufferPool) .toMat(aeronSink(outboundContext, streamId))(Keep.both) @@ -872,7 +872,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def outboundLane( outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { + bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,