=str Fixup some docs in GraphStage. (#31259)

This commit is contained in:
kerr 2022-04-09 16:48:53 +08:00 committed by GitHub
parent c1ef89d120
commit afe4a08133
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -53,7 +53,7 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S,
protected def initialAttributes: Attributes = Attributes.none protected def initialAttributes: Attributes = Attributes.none
private var _traversalBuilder: TraversalBuilder = null private var _traversalBuilder: TraversalBuilder = _
/** /**
* INTERNAL API * INTERNAL API
@ -367,7 +367,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[stream] def interpreter_=(gi: GraphInterpreter) = _interpreter = gi private[stream] def interpreter_=(gi: GraphInterpreter): Unit = _interpreter = gi
/** /**
* INTERNAL API * INTERNAL API
@ -382,7 +382,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* The [[akka.stream.Materializer]] that has set this GraphStage in motion. * The [[akka.stream.Materializer]] that has set this GraphStage in motion.
* *
* Can not be used from a `GraphStage` constructor. Access to materializer is provided by the * Can not be used from a `GraphStage` constructor. Access to materializer is provided by the
* [[akka.stream.scaladsl.Source.setup]], [[akka.stream.scaladsl.Flow.setup]] and [[akka.stream.scaladsl.Sink.setup]] * [[akka.stream.scaladsl.Source.fromMaterializer]], [[akka.stream.scaladsl.Flow.fromMaterializer]] and [[akka.stream.scaladsl.Sink.fromMaterializer]]
* and their corresponding Java API factories. * and their corresponding Java API factories.
*/ */
protected def materializer: Materializer = interpreter.materializer protected def materializer: Materializer = interpreter.materializer
@ -664,8 +664,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/ */
final protected def complete[T](out: Outlet[T]): Unit = final protected def complete[T](out: Outlet[T]): Unit =
getHandler(out) match { getHandler(out) match {
case e: Emitting[_] => e.addFollowUp(new EmittingCompletion(e.out, e.previous)) case e: Emitting[T @unchecked] => e.addFollowUp(new EmittingCompletion[T](e.out, e.previous))
case _ => interpreter.complete(conn(out)) case _ => interpreter.complete(conn(out))
} }
/** /**
@ -694,6 +694,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
final def cancelStage(cause: Throwable): Unit = final def cancelStage(cause: Throwable): Unit =
internalCancelStage(cause, attributes.mandatoryAttribute[Attributes.CancellationStrategy].strategy) internalCancelStage(cause, attributes.mandatoryAttribute[Attributes.CancellationStrategy].strategy)
@tailrec
private def internalCancelStage(cause: Throwable, strategy: Attributes.CancellationStrategy.Strategy): Unit = { private def internalCancelStage(cause: Throwable, strategy: Attributes.CancellationStrategy.Strategy): Unit = {
import Attributes.CancellationStrategy._ import Attributes.CancellationStrategy._
import SubscriptionWithCancelException._ import SubscriptionWithCancelException._
@ -730,8 +731,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
interpreter.fail(portToConn(i), optionalFailureCause.get) interpreter.fail(portToConn(i), optionalFailureCause.get)
else else
handlers(i) match { handlers(i) match {
case e: Emitting[_] => e.addFollowUp(new EmittingCompletion(e.out, e.previous)) case e: Emitting[Any @unchecked] => e.addFollowUp(new EmittingCompletion[Any](e.out, e.previous))
case _ => interpreter.complete(portToConn(i)) case _ => interpreter.complete(portToConn(i))
} }
i += 1 i += 1
} }
@ -955,10 +956,10 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
if (isAvailable(out)) { if (isAvailable(out)) {
push(out, elems.next()) push(out, elems.next())
if (elems.hasNext) if (elems.hasNext)
setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) setOrAddEmitting(out, new EmittingIterator[T](out, elems, getNonEmittingHandler(out), andThen))
else andThen() else andThen()
} else { } else {
setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) setOrAddEmitting(out, new EmittingIterator[T](out, elems, getNonEmittingHandler(out), andThen))
} }
} else andThen() } else andThen()
@ -982,7 +983,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
push(out, elem) push(out, elem)
andThen() andThen()
} else { } else {
setOrAddEmitting(out, new EmittingSingle(out, elem, getNonEmittingHandler(out), andThen)) setOrAddEmitting(out, new EmittingSingle[T](out, elem, getNonEmittingHandler(out), andThen))
} }
/** /**
@ -1082,7 +1083,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
} }
private class EmittingSingle[T](_out: Outlet[T], elem: T, _previous: OutHandler, _andThen: () => Unit) private class EmittingSingle[T](_out: Outlet[T], elem: T, _previous: OutHandler, _andThen: () => Unit)
extends Emitting(_out, _previous, _andThen) { extends Emitting[T](_out, _previous, _andThen) {
override def onPull(): Unit = { override def onPull(): Unit = {
push(out, elem) push(out, elem)
@ -1091,7 +1092,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
} }
private class EmittingIterator[T](_out: Outlet[T], elems: Iterator[T], _previous: OutHandler, _andThen: () => Unit) private class EmittingIterator[T](_out: Outlet[T], elems: Iterator[T], _previous: OutHandler, _andThen: () => Unit)
extends Emitting(_out, _previous, _andThen) { extends Emitting[T](_out, _previous, _andThen) {
override def onPull(): Unit = { override def onPull(): Unit = {
push(out, elems.next()) push(out, elems.next())
@ -1102,7 +1103,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
} }
private class EmittingCompletion[T](_out: Outlet[T], _previous: OutHandler) private class EmittingCompletion[T](_out: Outlet[T], _previous: OutHandler)
extends Emitting(_out, _previous, DoNothing) { extends Emitting[T](_out, _previous, DoNothing) {
override def onPull(): Unit = complete(out) override def onPull(): Unit = complete(out)
} }
@ -1500,7 +1501,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/ */
class SubSourceOutlet[T](name: String) { class SubSourceOutlet[T](name: String) {
private var handler: OutHandler = null private var handler: OutHandler = _
private var available = false private var available = false
private var closed = false private var closed = false
@ -1615,7 +1616,7 @@ trait AsyncCallback[T] {
* may be invoked from external execution contexts. * may be invoked from external execution contexts.
* *
* For cases where it is important to know if the notification was ever processed or not * For cases where it is important to know if the notification was ever processed or not
* see [AsyncCallback#invokeWithFeedback]] * see [[AsyncCallback#invokeWithFeedback]]
*/ */
def invoke(t: T): Unit def invoke(t: T): Unit