commit
889e0a670e
21 changed files with 61 additions and 61 deletions
|
|
@ -885,7 +885,7 @@ splitAfter
|
|||
^^^^^^^^^^
|
||||
End the current substream whenever a predicate returns ``true``, starting a new substream for the next element.
|
||||
|
||||
**emits** when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element
|
||||
**emits** when an element passes through. When the provided predicate is true it emits the element * and opens a new substream for subsequent element
|
||||
|
||||
**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
|
||||
|
||||
|
|
|
|||
|
|
@ -876,7 +876,7 @@ splitAfter
|
|||
^^^^^^^^^^
|
||||
End the current substream whenever a predicate returns ``true``, starting a new substream for the next element.
|
||||
|
||||
**emits** when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element
|
||||
**emits** when an element passes through. When the provided predicate is true it emits the element * and opens a new substream for subsequent element
|
||||
|
||||
**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
|
||||
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ class GraphUnzipWithSpec extends AkkaSpec {
|
|||
rightProbe.expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
"unzipWith expanded Person.unapply (3 ouputs)" in {
|
||||
"unzipWith expanded Person.unapply (3 outputs)" in {
|
||||
val probe0 = TestSubscriber.manualProbe[String]()
|
||||
val probe1 = TestSubscriber.manualProbe[String]()
|
||||
val probe2 = TestSubscriber.manualProbe[Int]()
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ final case class IOResult private[stream] (count: Long, status: Try[Done]) {
|
|||
*/
|
||||
def getError: Throwable = status match {
|
||||
case Failure(t) ⇒ t
|
||||
case Success(_) ⇒ throw new UnsupportedOperationException("IO operation was successfull.")
|
||||
case Success(_) ⇒ throw new UnsupportedOperationException("IO operation was successful.")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ object SubstreamCancelStrategy {
|
|||
def propagate: SubstreamCancelStrategy = Propagate
|
||||
|
||||
/**
|
||||
* Drain substream on cancellation in order to prevent stailling of the stream of streams.
|
||||
* Drain substream on cancellation in order to prevent stalling of the stream of streams.
|
||||
*/
|
||||
def drain: SubstreamCancelStrategy = Drain
|
||||
}
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ private[akka] object FanIn {
|
|||
dequeue(id)
|
||||
}
|
||||
|
||||
def dequeuePrefering(preferred: Int): Any = {
|
||||
def dequeuePreferring(preferred: Int): Any = {
|
||||
preferredId = preferred
|
||||
val id = idToDequeue()
|
||||
dequeue(id)
|
||||
|
|
|
|||
|
|
@ -527,7 +527,7 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
|
|||
}
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[stream] object MaterializerSession {
|
||||
class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace
|
||||
|
|
|
|||
|
|
@ -312,7 +312,7 @@ private[stream] object Fusing {
|
|||
// need to add the module so that the structural (internal) wirings can be rewritten as well
|
||||
// but these modules must not be added to any of the groups
|
||||
struct.addModule(copy, new ju.HashSet, inheritedAttributes, indent, shape)
|
||||
struct.registerInteral(newShape, indent)
|
||||
struct.registerInternals(newShape, indent)
|
||||
|
||||
copy
|
||||
}
|
||||
|
|
@ -560,7 +560,7 @@ private[stream] object Fusing {
|
|||
* connections within imported (and not dissolved) GraphModules.
|
||||
* See also the comment in addModule where this is partially undone.
|
||||
*/
|
||||
def registerInteral(s: Shape, indent: Int): Unit = {
|
||||
def registerInternals(s: Shape, indent: Int): Unit = {
|
||||
if (Debug) println(" " * indent + s"registerInternals(${s.outlets.map(hash)})")
|
||||
internalOuts.addAll(s.outlets.asJava)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -266,7 +266,7 @@ private[akka] object GraphInterpreter {
|
|||
}
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
* INTERNAL API
|
||||
*
|
||||
* From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
|
||||
* [[GraphInterpreter#GraphAssembly]] object and provides facilities to execute and interact with this assembly.
|
||||
|
|
@ -495,7 +495,7 @@ private[stream] final class GraphInterpreter(
|
|||
case owner ⇒ assembly.stages(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections ouput part
|
||||
// Debug name for a connections output part
|
||||
private def outOwnerName(connection: Int): String =
|
||||
assembly.outOwners(connection) match {
|
||||
case Boundary ⇒ "UpstreamBoundary"
|
||||
|
|
@ -509,7 +509,7 @@ private[stream] final class GraphInterpreter(
|
|||
case owner ⇒ logics(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections ouput part
|
||||
// Debug name for a connections output part
|
||||
private def outLogicName(connection: Int): String =
|
||||
assembly.outOwners(connection) match {
|
||||
case Boundary ⇒ "UpstreamBoundary"
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ private[akka] final case class GraphStageModule(shape: Shape,
|
|||
object GraphStages {
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] {
|
||||
val in = Inlet[T](Logging.simpleName(this) + ".in")
|
||||
|
|
@ -73,7 +73,7 @@ object GraphStages {
|
|||
def identity[T] = Identity.asInstanceOf[SimpleLinearGraphStage[T]]
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[stream] final class Detacher[T] extends GraphStage[FlowShape[T, T]] {
|
||||
val in = Inlet[T]("Detacher.in")
|
||||
|
|
|
|||
|
|
@ -201,7 +201,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.
|
|||
}
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
* INTERNAL API
|
||||
*/
|
||||
object Split {
|
||||
sealed abstract class SplitDecision
|
||||
|
|
@ -220,7 +220,7 @@ object Split {
|
|||
}
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
* INTERNAL API
|
||||
*/
|
||||
final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
|
||||
val in: Inlet[T] = Inlet("Split.in")
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ private[akka] class TLSActor(settings: ActorMaterializerSettings,
|
|||
}
|
||||
}
|
||||
|
||||
// These are Nettys default values
|
||||
// These are Netty's default values
|
||||
// 16665 + 1024 (room for compressed data) + 1024 (for OpenJDK compatibility)
|
||||
val transportOutBuffer = ByteBuffer.allocate(16665 + 2048)
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -278,7 +278,7 @@ private[stream] object TcpConnectionStage {
|
|||
|
||||
override def postStop(): Unit = role match {
|
||||
case Outbound(_, _, localAddressPromise, _) ⇒
|
||||
// Fail if has not been completed with an address eariler
|
||||
// Fail if has not been completed with an address earlier
|
||||
localAddressPromise.tryFailure(new StreamTcpException("Connection failed."))
|
||||
case _ ⇒ // do nothing...
|
||||
}
|
||||
|
|
|
|||
|
|
@ -318,7 +318,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many futures as requested elements by
|
||||
* value of that future will be emitted downstream. As many futures as requested elements by
|
||||
* downstream may run in parallel and may complete in any order, but the elements that
|
||||
* are emitted downstream are in the same order as received from upstream.
|
||||
*
|
||||
|
|
@ -349,7 +349,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many futures as requested elements by
|
||||
* value of that future will be emitted downstream. As many futures as requested elements by
|
||||
* downstream may run in parallel and each processed element will be emitted downstream
|
||||
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
|
||||
* in the same order as received from upstream.
|
||||
|
|
@ -948,7 +948,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
*
|
||||
* '''Emits when''' downstream stops backpressuring
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs emtpy
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs empty
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
|
|
@ -1157,7 +1157,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*
|
||||
* '''Emits when''' an element passes through. When the provided predicate is true it emitts the element
|
||||
* '''Emits when''' an element passes through. When the provided predicate is true it emits the element
|
||||
* and opens a new substream for subsequent element
|
||||
*
|
||||
* '''Backpressures when''' there is an element pending for the next substream, but the previous
|
||||
|
|
@ -1565,7 +1565,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
|
|
@ -1593,7 +1593,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate.
|
||||
|
|
|
|||
|
|
@ -807,7 +807,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many CompletionStages as requested elements by
|
||||
* value of that future will be emitted downstream. As many CompletionStages as requested elements by
|
||||
* downstream may run in parallel and may complete in any order, but the elements that
|
||||
* are emitted downstream are in the same order as received from upstream.
|
||||
*
|
||||
|
|
@ -838,7 +838,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many CompletionStages as requested elements by
|
||||
* value of that future will be emitted downstream. As many CompletionStages as requested elements by
|
||||
* downstream may run in parallel and each processed element will be emitted downstream
|
||||
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
|
||||
* in the same order as received from upstream.
|
||||
|
|
@ -1388,7 +1388,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
*
|
||||
* '''Emits when''' downstream stops backpressuring
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs emtpy
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs empty
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
|
|
@ -1584,7 +1584,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*
|
||||
* '''Emits when''' an element passes through. When the provided predicate is true it emitts the element
|
||||
* '''Emits when''' an element passes through. When the provided predicate is true it emits the element
|
||||
* and opens a new substream for subsequent element
|
||||
*
|
||||
* '''Backpressures when''' there is an element pending for the next substream, but the previous
|
||||
|
|
@ -1702,7 +1702,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
|
|
@ -1730,7 +1730,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate.
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many CompletionStages as requested elements by
|
||||
* value of that future will be emitted downstream. As many CompletionStages as requested elements by
|
||||
* downstream may run in parallel and may complete in any order, but the elements that
|
||||
* are emitted downstream are in the same order as received from upstream.
|
||||
*
|
||||
|
|
@ -204,7 +204,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many CompletionStages as requested elements by
|
||||
* value of that future will be emitted downstream. As many CompletionStages as requested elements by
|
||||
* downstream may run in parallel and each processed element will be emitted downstream
|
||||
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
|
||||
* in the same order as received from upstream.
|
||||
|
|
@ -797,7 +797,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
*
|
||||
* '''Emits when''' downstream stops backpressuring
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs emtpy
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs empty
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
|
|
@ -1115,7 +1115,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
|
|
@ -1143,7 +1143,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate.
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many CompletionStages as requested elements by
|
||||
* value of that future will be emitted downstream. As many CompletionStages as requested elements by
|
||||
* downstream may run in parallel and may complete in any order, but the elements that
|
||||
* are emitted downstream are in the same order as received from upstream.
|
||||
*
|
||||
|
|
@ -202,7 +202,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||
* value of that future will be emitted downstreams. As many CompletionStages as requested elements by
|
||||
* value of that future will be emitted downstream. As many CompletionStages as requested elements by
|
||||
* downstream may run in parallel and each processed element will be emitted downstream
|
||||
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
|
||||
* in the same order as received from upstream.
|
||||
|
|
@ -795,7 +795,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
*
|
||||
* '''Emits when''' downstream stops backpressuring
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs emtpy
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs empty
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
|
|
@ -1114,7 +1114,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
|
|
@ -1142,7 +1142,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate.
|
||||
|
|
|
|||
|
|
@ -524,7 +524,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step. The function returns a `Future` and the
|
||||
* value of that future will be emitted downstreams. As many futures as requested elements by
|
||||
* value of that future will be emitted downstream. As many futures as requested elements by
|
||||
* downstream may run in parallel and each processed element will be emitted downstream
|
||||
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
|
||||
* in the same order as received from upstream.
|
||||
|
|
@ -1065,7 +1065,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* '''Emits when''' downstream stops backpressuring
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs emtpy
|
||||
* '''Backpressures when''' downstream backpressures or iterator runs empty
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
|
|
@ -1295,7 +1295,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*
|
||||
* '''Emits when''' an element passes through. When the provided predicate is true it emitts the element
|
||||
* '''Emits when''' an element passes through. When the provided predicate is true it emits the element
|
||||
* and opens a new substream for subsequent element
|
||||
*
|
||||
* '''Backpressures when''' there is an element pending for the next substream, but the previous
|
||||
|
|
@ -1429,7 +1429,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
|
|
@ -1457,7 +1457,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate.
|
||||
|
|
|
|||
|
|
@ -391,7 +391,7 @@ object Broadcast {
|
|||
final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
|
||||
// one output might seem counter intuitive but saves us from special handling in other places
|
||||
require(outputPorts >= 1, "A Broadcast must have one or more output ports")
|
||||
val in: Inlet[T] = Inlet[T]("Broadast.in")
|
||||
val in: Inlet[T] = Inlet[T]("Broadcast.in")
|
||||
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i))
|
||||
override def initialAttributes = DefaultAttributes.broadcast
|
||||
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
|
||||
|
|
|
|||
|
|
@ -226,7 +226,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
private[this] var _interpreter: GraphInterpreter = _
|
||||
|
||||
/**
|
||||
* INTENRAL API
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[stream] def interpreter_=(gi: GraphInterpreter) = _interpreter = gi
|
||||
|
||||
|
|
|
|||
|
|
@ -122,8 +122,8 @@ private[stream] object AbstractStage {
|
|||
final override def absorbTermination(): TerminationDirective = {
|
||||
if (isClosed(shape.out)) {
|
||||
val ex = new UnsupportedOperationException("It is not allowed to call absorbTermination() from onDownstreamFinish.")
|
||||
// This MUST be logged here, since the downstream has cancelled, i.e. there is noone to send onError to, the
|
||||
// stage is just about to finish so noone will catch it anyway just the interpreter
|
||||
// This MUST be logged here, since the downstream has cancelled, i.e. there is no one to send onError to, the
|
||||
// stage is just about to finish so no one will catch it anyway just the interpreter
|
||||
|
||||
interpreter.log.error(ex.getMessage)
|
||||
throw ex // We still throw for correctness (although a finish() would also work here)
|
||||
|
|
@ -204,7 +204,7 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
|
|||
|
||||
/**
|
||||
* `onPush` is called when an element from upstream is available and there is demand from downstream, i.e.
|
||||
* in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstreams,
|
||||
* in `onPush` you are allowed to call [[akka.stream.stage.Context#push]] to emit one element downstream,
|
||||
* or you can absorb the element by calling [[akka.stream.stage.Context#pull]]. Note that you can only
|
||||
* emit zero or one element downstream from `onPull`.
|
||||
*
|
||||
|
|
@ -216,7 +216,7 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
|
|||
|
||||
/**
|
||||
* `onPull` is called when there is demand from downstream, i.e. you are allowed to push one element
|
||||
* downstreams with [[akka.stream.stage.Context#push]], or request elements from upstreams with
|
||||
* downstream with [[akka.stream.stage.Context#push]], or request elements from upstreams with
|
||||
* [[akka.stream.stage.Context#pull]]
|
||||
*/
|
||||
def onPull(ctx: Ctx): PullD
|
||||
|
|
@ -294,7 +294,7 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
|
|||
* stages produce *exactly one* push or pull signal.
|
||||
*
|
||||
* [[#onPush]] is called when an element from upstream is available and there is demand from downstream, i.e.
|
||||
* in `onPush` you are allowed to call [[Context#push]] to emit one element downstreams, or you can absorb the
|
||||
* in `onPush` you are allowed to call [[Context#push]] to emit one element downstream, or you can absorb the
|
||||
* element by calling [[Context#pull]]. Note that you can only emit zero or one element downstream from `onPull`.
|
||||
* To emit more than one element you have to push the remaining elements from [[#onPull]], one-by-one.
|
||||
* `onPush` is not called again until `onPull` has requested more elements with [[Context#pull]].
|
||||
|
|
@ -302,7 +302,7 @@ abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, C
|
|||
* [[StatefulStage]] has support for making it easy to emit more than one element from `onPush`.
|
||||
*
|
||||
* [[#onPull]] is called when there is demand from downstream, i.e. you are allowed to push one element
|
||||
* downstreams with [[Context#push]], or request elements from upstreams with [[Context#pull]]. If you
|
||||
* downstream with [[Context#push]], or request elements from upstreams with [[Context#pull]]. If you
|
||||
* always perform transitive pull by calling `ctx.pull` from `onPull` you can use [[PushStage]] instead of
|
||||
* `PushPullStage`.
|
||||
*
|
||||
|
|
@ -464,13 +464,13 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
|
||||
/**
|
||||
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
|
||||
* element downstreams.
|
||||
* element downstream.
|
||||
*/
|
||||
final def emit(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = emit(iter, ctx, _current)
|
||||
|
||||
/**
|
||||
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
|
||||
* element downstreams.
|
||||
* element downstream.
|
||||
*/
|
||||
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = {
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
@ -479,7 +479,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
|
||||
/**
|
||||
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
|
||||
* element downstreams and after that change behavior.
|
||||
* element downstream and after that change behavior.
|
||||
*/
|
||||
final def emit(iter: Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = {
|
||||
if (emitting) throw new IllegalStateException("already in emitting state")
|
||||
|
|
@ -499,7 +499,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
|
||||
/**
|
||||
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
|
||||
* element downstreams and after that change behavior.
|
||||
* element downstream and after that change behavior.
|
||||
*/
|
||||
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = {
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
@ -508,7 +508,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
|
||||
/**
|
||||
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
|
||||
* element downstreams and after that finish (complete downstreams, cancel upstreams).
|
||||
* element downstream and after that finish (complete downstreams, cancel upstreams).
|
||||
*/
|
||||
final def emitAndFinish(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = {
|
||||
if (emitting) throw new IllegalStateException("already in emitting state")
|
||||
|
|
@ -527,7 +527,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
|
||||
/**
|
||||
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
|
||||
* element downstreams and after that finish (complete downstreams, cancel upstreams).
|
||||
* element downstream and after that finish (complete downstreams, cancel upstreams).
|
||||
*/
|
||||
final def emitAndFinish(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = {
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
@ -535,7 +535,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
}
|
||||
|
||||
/**
|
||||
* Scala API: Can be used from [[#onUpstreamFinish]] to push final elements downstreams
|
||||
* Scala API: Can be used from [[#onUpstreamFinish]] to push final elements downstream
|
||||
* before completing the stream successfully. Note that if this is used from
|
||||
* [[#onUpstreamFailure]] the failure will be absorbed and the stream will be completed
|
||||
* successfully.
|
||||
|
|
@ -556,7 +556,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
|
|||
|
||||
/**
|
||||
* Java API: Can be used from [[#onUpstreamFinish]] or [[#onUpstreamFailure]] to push final
|
||||
* elements downstreams.
|
||||
* elements downstream.
|
||||
*/
|
||||
final def terminationEmit(iter: java.util.Iterator[Out], ctx: Context[Out]): TerminationDirective = {
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue