size of outboundEnvelopePool
This commit is contained in:
parent
8756ffd75c
commit
9a7d79c882
1 changed files with 9 additions and 9 deletions
|
|
@ -128,9 +128,9 @@ private[akka] object AssociationState {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final class AssociationState(
|
||||
val incarnation: Int,
|
||||
val incarnation: Int,
|
||||
val uniqueRemoteAddressPromise: Promise[UniqueAddress],
|
||||
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) {
|
||||
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) {
|
||||
|
||||
import AssociationState.QuarantinedTimestamp
|
||||
|
||||
|
|
@ -230,7 +230,7 @@ private[akka] trait OutboundContext {
|
|||
*/
|
||||
private[remote] object FlushOnShutdown {
|
||||
def props(done: Promise[Done], timeout: FiniteDuration,
|
||||
inboundContext: InboundContext, associations: Set[Association]): Props = {
|
||||
inboundContext: InboundContext, associations: Set[Association]): Props = {
|
||||
require(associations.nonEmpty)
|
||||
Props(new FlushOnShutdown(done, timeout, inboundContext, associations))
|
||||
}
|
||||
|
|
@ -242,7 +242,7 @@ private[remote] object FlushOnShutdown {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration,
|
||||
inboundContext: InboundContext, associations: Set[Association]) extends Actor {
|
||||
inboundContext: InboundContext, associations: Set[Association]) extends Actor {
|
||||
|
||||
var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue)
|
||||
|
||||
|
|
@ -343,9 +343,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.MaximumPooledBuffers)
|
||||
|
||||
private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16)
|
||||
// FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity
|
||||
// times a factor (for reasonable number of outbound streams)
|
||||
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 3072 * 2)
|
||||
// The outboundEnvelopePool is shared among all outbound associations
|
||||
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity =
|
||||
settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3)
|
||||
|
||||
val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() match {
|
||||
case None ⇒ (None, None, None)
|
||||
|
|
@ -853,7 +853,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool)
|
||||
|
||||
private def createOutboundSink(streamId: Int, outboundContext: OutboundContext,
|
||||
bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = {
|
||||
bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = {
|
||||
|
||||
outboundLane(outboundContext, bufferPool)
|
||||
.toMat(aeronSink(outboundContext, streamId))(Keep.both)
|
||||
|
|
@ -872,7 +872,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
|
||||
private def outboundLane(
|
||||
outboundContext: OutboundContext,
|
||||
bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = {
|
||||
bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = {
|
||||
|
||||
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
|
||||
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue