210 lines
10 KiB
Text
210 lines
10 KiB
Text
# SPDX-License-Identifier: Apache-2.0
|
||
|
||
######################################
|
||
# Pekko Stream Reference Config File #
|
||
######################################
|
||
|
||
# eager creation of the system wide materializer
|
||
pekko.library-extensions += "org.apache.pekko.stream.SystemMaterializer$"
|
||
pekko {
|
||
stream {
|
||
|
||
# Default materializer settings
|
||
materializer {
|
||
|
||
# Initial size of buffers used in stream elements
|
||
initial-input-buffer-size = 4
|
||
# Maximum size of buffers used in stream elements
|
||
max-input-buffer-size = 16
|
||
|
||
# Fully qualified config path which holds the dispatcher configuration
|
||
# or full dispatcher configuration to be used by ActorMaterializer when creating Actors.
|
||
dispatcher = "pekko.actor.default-dispatcher"
|
||
|
||
# FQCN of the MailboxType. The Class of the FQCN must have a public
|
||
# constructor with
|
||
# (org.apache.pekko.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
|
||
# defaults to the single consumer mailbox for better performance.
|
||
mailbox {
|
||
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||
}
|
||
|
||
# Fully qualified config path which holds the dispatcher configuration
|
||
# or full dispatcher configuration to be used by stream operators that
|
||
# perform blocking operations
|
||
blocking-io-dispatcher = "pekko.actor.default-blocking-io-dispatcher"
|
||
|
||
# Cleanup leaked publishers and subscribers when they are not used within a given
|
||
# deadline
|
||
subscription-timeout {
|
||
# when the subscription timeout is reached one of the following strategies on
|
||
# the "stale" publisher:
|
||
# cancel - cancel it (via `onError` or subscribing to the publisher and
|
||
# `cancel()`ing the subscription right away
|
||
# warn - log a warning statement about the stale element (then drop the
|
||
# reference to it)
|
||
# noop - do nothing (not recommended)
|
||
mode = cancel
|
||
|
||
# time after which a subscriber / publisher is considered stale and eligible
|
||
# for cancelation (see `pekko.stream.subscription-timeout.mode`)
|
||
timeout = 5s
|
||
}
|
||
|
||
# Enable additional troubleshooting logging at DEBUG log level
|
||
debug-logging = off
|
||
|
||
# Maximum number of elements emitted in batch if downstream signals large demand
|
||
output-burst-limit = 1000
|
||
|
||
# Enable automatic fusing of all graphs that are run. For short-lived streams
|
||
# this may cause an initial runtime overhead, but most of the time fusing is
|
||
# desirable since it reduces the number of Actors that are created.
|
||
# Deprecated, since Akka 2.5.0, setting does not have any effect.
|
||
auto-fusing = on
|
||
|
||
# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
|
||
# buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
|
||
# buffer upon stream materialization if the requested buffer size is less than this
|
||
# configuration parameter. The default is very high because failing early is better
|
||
# than failing under load.
|
||
#
|
||
# Buffers sized larger than this will dynamically grow/shrink and consume more memory
|
||
# per element than the fixed size buffers.
|
||
max-fixed-buffer-size = 1000000000
|
||
|
||
# Maximum number of sync messages that actor can process for stream to substream communication.
|
||
# Parameter allows to interrupt synchronous processing to get upstream/downstream messages.
|
||
# Allows to accelerate message processing that happening within same actor but keep system responsive.
|
||
sync-processing-limit = 1000
|
||
|
||
debug {
|
||
# Enables the fuzzing mode which increases the chance of race conditions
|
||
# by aggressively reordering events and making certain operations more
|
||
# concurrent than usual.
|
||
# This setting is for testing purposes, NEVER enable this in a production
|
||
# environment!
|
||
# To get the best results, try combining this setting with a throughput
|
||
# of 1 on the corresponding dispatchers.
|
||
fuzzing-mode = off
|
||
}
|
||
|
||
io.tcp {
|
||
# The outgoing bytes are accumulated in a buffer while waiting for acknowledgment
|
||
# of pending write. This improves throughput for small messages (frames) without
|
||
# sacrificing latency. While waiting for the ack the stage will eagerly pull
|
||
# from upstream until the buffer exceeds this size. That means that the buffer may hold
|
||
# slightly more bytes than this limit (at most one element more). It can be set to 0
|
||
# to disable the usage of the buffer.
|
||
write-buffer-size = 16 KiB
|
||
|
||
# In addition to the buffering described for property write-buffer-size, try to collect
|
||
# more consecutive writes from the upstream stream producers.
|
||
#
|
||
# The rationale is to increase write efficiency by avoiding separate small
|
||
# writes to the network which is expensive to do. Merging those writes together
|
||
# (up to `write-buffer-size`) improves throughput for small writes.
|
||
#
|
||
# The idea is that a running stream may produce multiple small writes consecutively
|
||
# in one go without waiting for any external input. To probe the stream for
|
||
# data, this features delays sending a write immediately by probing the stream
|
||
# for more writes. This works by rescheduling the TCP connection stage via the
|
||
# actor mailbox of the underlying actor. Thus, before the stage is reactivated
|
||
# the upstream gets another opportunity to emit writes.
|
||
#
|
||
# When the stage is reactivated and if new writes are detected another round-trip
|
||
# is scheduled. The loop repeats until either the number of round trips given in this
|
||
# setting is reached, the buffer reaches `write-buffer-size`, or no new writes
|
||
# were detected during the last round-trip.
|
||
#
|
||
# This mechanism ensures that a write is guaranteed to be sent when the remaining stream
|
||
# becomes idle waiting for external signals.
|
||
#
|
||
# In most cases, the extra latency this mechanism introduces should be negligible,
|
||
# but depending on the stream setup it may introduce a noticeable delay,
|
||
# if the upstream continuously produces small amounts of writes in a
|
||
# blocking (CPU-bound) way.
|
||
#
|
||
# In that case, the feature can either be disabled, or the producing CPU-bound
|
||
# work can be taken off-stream to avoid excessive delays (e.g. using `mapAsync` instead of `map`).
|
||
#
|
||
# A value of 0 disables this feature.
|
||
coalesce-writes = 10
|
||
}
|
||
|
||
# Time to wait for async materializer creation before throwing an exception
|
||
creation-timeout = 20 seconds
|
||
|
||
//#stream-ref
|
||
# configure defaults for SourceRef and SinkRef
|
||
stream-ref {
|
||
# Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref
|
||
#
|
||
# The buffer will be attempted to be filled eagerly even while the local stage did not request elements,
|
||
# because the delay of requesting over network boundaries is much higher.
|
||
buffer-capacity = 32
|
||
|
||
# Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number)
|
||
# Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should
|
||
# be very rare in any case, yet possible -- mostly under connection break-down and re-establishment).
|
||
#
|
||
# The semantics of handling and updating the demand however are in-line with what Reactive Streams dictates.
|
||
#
|
||
# In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive
|
||
# within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost.
|
||
demand-redelivery-interval = 1 second
|
||
|
||
# Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref.
|
||
# This timeout does not have to be very low in normal situations, since the remote side may also need to
|
||
# prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking
|
||
# in-active streams which are never subscribed to.
|
||
subscription-timeout = 30 seconds
|
||
|
||
# In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed
|
||
# message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it.
|
||
# This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the
|
||
# other side of the stream ref would never send the "final" terminal message.
|
||
#
|
||
# The timeout specifically means the time between the Terminated signal being received and when the local SourceRef
|
||
# determines to fail itself, assuming there was message loss or a complete partition of the completion signal.
|
||
final-termination-signal-deadline = 2 seconds
|
||
}
|
||
//#stream-ref
|
||
}
|
||
|
||
# Deprecated, left here to not break Pekko HTTP which refers to it
|
||
blocking-io-dispatcher = "pekko.actor.default-blocking-io-dispatcher"
|
||
|
||
# Deprecated, will not be used unless user code refer to it, use 'pekko.stream.materializer.blocking-io-dispatcher'
|
||
# instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute
|
||
default-blocking-io-dispatcher = "pekko.actor.default-blocking-io-dispatcher"
|
||
}
|
||
|
||
# configure overrides to ssl-configuration here (to be used by pekko-streams, and pekko-http – i.e. when serving https connections)
|
||
ssl-config {
|
||
protocol = "TLSv1.2"
|
||
}
|
||
|
||
actor {
|
||
|
||
serializers {
|
||
pekko-stream-ref = "org.apache.pekko.stream.serialization.StreamRefSerializer"
|
||
}
|
||
|
||
serialization-bindings {
|
||
"org.apache.pekko.stream.SinkRef" = pekko-stream-ref
|
||
"org.apache.pekko.stream.SourceRef" = pekko-stream-ref
|
||
"org.apache.pekko.stream.impl.streamref.StreamRefsProtocol" = pekko-stream-ref
|
||
}
|
||
|
||
serialization-identifiers {
|
||
"org.apache.pekko.stream.serialization.StreamRefSerializer" = 30
|
||
}
|
||
}
|
||
}
|
||
|
||
# ssl configuration
|
||
# folded in from former ssl-config-pekko module
|
||
ssl-config {
|
||
logger = "com.typesafe.sslconfig.pekko.util.PekkoLoggerBridge"
|
||
}
|