Merge pull request #17142 from 2m/wip-minor-typo-fixes
=str minor typo fixes and scala version bump
This commit is contained in:
commit
da5fde03c7
5 changed files with 14 additions and 14 deletions
|
|
@ -19,7 +19,7 @@ import scala.util.control.NonFatal
|
||||||
private[akka] object StreamLayout {
|
private[akka] object StreamLayout {
|
||||||
|
|
||||||
// compile-time constant
|
// compile-time constant
|
||||||
val debug = true
|
final val Debug = true
|
||||||
|
|
||||||
// TODO: Materialization order
|
// TODO: Materialization order
|
||||||
// TODO: Special case linear composites
|
// TODO: Special case linear composites
|
||||||
|
|
@ -55,7 +55,7 @@ private[akka] object StreamLayout {
|
||||||
this.grow(that, f).connect(from, to)
|
this.grow(that, f).connect(from, to)
|
||||||
|
|
||||||
def connect[A, B](from: OutPort, to: InPort): Module = {
|
def connect[A, B](from: OutPort, to: InPort): Module = {
|
||||||
if (debug) validate()
|
if (Debug) validate()
|
||||||
|
|
||||||
require(outPorts(from),
|
require(outPorts(from),
|
||||||
if (downstreams.contains(from)) s"The output port [$from] is already connected"
|
if (downstreams.contains(from)) s"The output port [$from] is already connected"
|
||||||
|
|
@ -73,7 +73,7 @@ private[akka] object StreamLayout {
|
||||||
}
|
}
|
||||||
|
|
||||||
def transformMaterializedValue(f: Any ⇒ Any): Module = {
|
def transformMaterializedValue(f: Any ⇒ Any): Module = {
|
||||||
if (debug) validate()
|
if (Debug) validate()
|
||||||
|
|
||||||
CompositeModule(
|
CompositeModule(
|
||||||
subModules = if (this.isAtomic) Set(this) else this.subModules,
|
subModules = if (this.isAtomic) Set(this) else this.subModules,
|
||||||
|
|
@ -86,7 +86,7 @@ private[akka] object StreamLayout {
|
||||||
def grow(that: Module): Module = grow(that, Keep.left)
|
def grow(that: Module): Module = grow(that, Keep.left)
|
||||||
|
|
||||||
def grow[A, B, C](that: Module, f: (A, B) ⇒ C): Module = {
|
def grow[A, B, C](that: Module, f: (A, B) ⇒ C): Module = {
|
||||||
if (debug) validate()
|
if (Debug) validate()
|
||||||
|
|
||||||
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to grow().")
|
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to grow().")
|
||||||
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
|
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
|
||||||
|
|
@ -105,7 +105,7 @@ private[akka] object StreamLayout {
|
||||||
}
|
}
|
||||||
|
|
||||||
def wrap(): Module = {
|
def wrap(): Module = {
|
||||||
if (debug) validate()
|
if (Debug) validate()
|
||||||
|
|
||||||
CompositeModule(
|
CompositeModule(
|
||||||
subModules = Set(this),
|
subModules = Set(this),
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ private[akka] object OneBoundedInterpreter {
|
||||||
* - The on-stack reentrant implementation by Mathias Doenitz -- the difference here that reentrancy is handled by the
|
* - The on-stack reentrant implementation by Mathias Doenitz -- the difference here that reentrancy is handled by the
|
||||||
* interpreter itself, not user code, and the interpreter is able to use the heap when needed instead of the
|
* interpreter itself, not user code, and the interpreter is able to use the heap when needed instead of the
|
||||||
* callstack.
|
* callstack.
|
||||||
* - The pinball interpreter by Endre Sándor Varga -- the difference here that the restricition for "one ball" is
|
* - The pinball interpreter by Endre Sándor Varga -- the difference here that the restriction for "one ball" is
|
||||||
* lifted by using isolated execution regions, completion handling is introduced and communication with the external
|
* lifted by using isolated execution regions, completion handling is introduced and communication with the external
|
||||||
* world is done via boundary ops.
|
* world is done via boundary ops.
|
||||||
*
|
*
|
||||||
|
|
@ -111,8 +111,8 @@ private[akka] object OneBoundedInterpreter {
|
||||||
* it is immediately replaced by an artificial Finished op which makes sure that the two execution paths are isolated
|
* it is immediately replaced by an artificial Finished op which makes sure that the two execution paths are isolated
|
||||||
* forever.
|
* forever.
|
||||||
* - ctx.fail() which is similar to finish()
|
* - ctx.fail() which is similar to finish()
|
||||||
* - ctx.pushAndPull() which (as a response to a previous ctx.hold()) starts a wawe of downstream push and upstream
|
* - ctx.pushAndPull() which (as a response to a previous ctx.hold()) starts a wave of downstream push and upstream
|
||||||
* pull. The two execution paths are isolated by the op itself since onPull() from downstream can only answered by hold or
|
* pull. The two execution paths are isolated by the op itself since onPull() from downstream can only be answered by hold or
|
||||||
* push, while onPush() from upstream can only answered by hold or pull -- it is impossible to "cross" the op.
|
* push, while onPush() from upstream can only answered by hold or pull -- it is impossible to "cross" the op.
|
||||||
* - ctx.pushAndFinish() which is different from the forking ops above because the execution of push and finish happens on
|
* - ctx.pushAndFinish() which is different from the forking ops above because the execution of push and finish happens on
|
||||||
* the same execution region and they are order dependent, too.
|
* the same execution region and they are order dependent, too.
|
||||||
|
|
@ -223,7 +223,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override this method to enter the current op and execute it. Do NOT put code that should be executed after the
|
* Override this method to enter the current op and execute it. Do NOT put code that should be executed after the
|
||||||
* op has been invoked, that should be in the advance() method of the next state resulting from the invokation of
|
* op has been invoked, that should be in the advance() method of the next state resulting from the invocation of
|
||||||
* the op.
|
* the op.
|
||||||
*/
|
*/
|
||||||
def run(): Unit
|
def run(): Unit
|
||||||
|
|
|
||||||
|
|
@ -242,7 +242,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = {
|
override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = {
|
||||||
//No need to copy here, op is a fresh instanc
|
//No need to copy here, op is a fresh instance
|
||||||
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]]
|
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]]
|
||||||
else new Flow(module.growConnect(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort)))
|
else new Flow(module.growConnect(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort)))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -193,7 +193,7 @@ object FlowGraph extends GraphApply {
|
||||||
* connected.
|
* connected.
|
||||||
*/
|
*/
|
||||||
def add[S <: Shape](graph: Graph[S, _]): S = {
|
def add[S <: Shape](graph: Graph[S, _]): S = {
|
||||||
if (StreamLayout.debug) graph.module.validate()
|
if (StreamLayout.Debug) graph.module.validate()
|
||||||
val copy = graph.module.carbonCopy
|
val copy = graph.module.carbonCopy
|
||||||
moduleInProgress = moduleInProgress.grow(copy)
|
moduleInProgress = moduleInProgress.grow(copy)
|
||||||
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
|
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
|
||||||
|
|
@ -206,7 +206,7 @@ object FlowGraph extends GraphApply {
|
||||||
* Flow, Sink and Graph.
|
* Flow, Sink and Graph.
|
||||||
*/
|
*/
|
||||||
private[stream] def add[S <: Shape, A, B](graph: Graph[S, _], combine: (A, B) ⇒ Any): S = {
|
private[stream] def add[S <: Shape, A, B](graph: Graph[S, _], combine: (A, B) ⇒ Any): S = {
|
||||||
if (StreamLayout.debug) graph.module.validate()
|
if (StreamLayout.Debug) graph.module.validate()
|
||||||
val copy = graph.module.carbonCopy
|
val copy = graph.module.carbonCopy
|
||||||
moduleInProgress = moduleInProgress.grow(copy, combine)
|
moduleInProgress = moduleInProgress.grow(copy, combine)
|
||||||
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
|
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import akka.stream.FlowMaterializer
|
||||||
*
|
*
|
||||||
* It is possible to keep state in the concrete `Stage` instance with
|
* It is possible to keep state in the concrete `Stage` instance with
|
||||||
* ordinary instance variables. The `Transformer` is executed by an actor and
|
* ordinary instance variables. The `Transformer` is executed by an actor and
|
||||||
* therefore you don not have to add any additional thread safety or memory
|
* therefore you do not have to add any additional thread safety or memory
|
||||||
* visibility constructs to access the state from the callback methods.
|
* visibility constructs to access the state from the callback methods.
|
||||||
*
|
*
|
||||||
* @see [[akka.stream.scaladsl.Flow#transform]]
|
* @see [[akka.stream.scaladsl.Flow#transform]]
|
||||||
|
|
@ -293,7 +293,7 @@ abstract class AsyncStage[In, Out, Ext]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The behavior of [[StatefulStage]] is defined by these two methods, which
|
* The behavior of [[StatefulStage]] is defined by these two methods, which
|
||||||
* has the same sematics as corresponding methods in [[PushPullStage]].
|
* has the same semantics as corresponding methods in [[PushPullStage]].
|
||||||
*/
|
*/
|
||||||
abstract class StageState[In, Out] {
|
abstract class StageState[In, Out] {
|
||||||
def onPush(elem: In, ctx: Context[Out]): SyncDirective
|
def onPush(elem: In, ctx: Context[Out]): SyncDirective
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue