Scaladoc improvements (#27929)

* private[akka] visibility on some internal classes
  * found via unidoc
* fix Scaladoc links
This commit is contained in:
Patrik Nordwall 2019-10-10 14:17:01 +02:00 committed by GitHub
parent 544c5fa17c
commit bb6d6365b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 159 additions and 162 deletions

View file

@ -15,8 +15,8 @@ import org.slf4j.event.Level
/**
* Representation of a Log Event issued by a [[akka.actor.typed.Behavior]]
* when testing with [[akka.actor.testkit.typed.scaladsl.BehaviorTestKit`]]
* or [[akka.actor.testkit.typed.javadsl.BehaviorTestKit`]].
* when testing with [[akka.actor.testkit.typed.scaladsl.BehaviorTestKit]]
* or [[akka.actor.testkit.typed.javadsl.BehaviorTestKit]].
*/
final case class CapturedLogEvent(level: Level, message: String, cause: Option[Throwable], marker: Option[Marker]) {

View file

@ -17,7 +17,7 @@ import scala.annotation.varargs
* Manual time allows you to do async tests while controlling the scheduler of the system.
*
* To use it you need to configure the `ActorSystem`/`ActorTestKit` with [[ManualTime.config]] and access the
* scheduler control through [[ManualTime.get()]]
* scheduler control through [[ManualTime.get]]
*/
object ManualTime {
@ -27,8 +27,8 @@ object ManualTime {
def config(): Config = akka.actor.testkit.typed.scaladsl.ManualTime.config
/**
* Access the manual scheduler, note that you need to setup the actor system/testkit with [[config()]] for this to
* work.
* Access the manual scheduler, note that you need to setup the actor system/testkit with [[ManualTime.config]]
* for this to work.
*/
def get[A](system: ActorSystem[A]): ManualTime =
system.scheduler match {

View file

@ -197,9 +197,9 @@ abstract class TestProbe[M] {
* Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
* message, and returns one of the following effects to decide on what happens next:
*
* * [[FishingOutcomes.continue()]] - continue with the next message given that the timeout has not been reached
* * [[FishingOutcomes.complete()]] - successfully complete and return the message
* * [[FishingOutcomes.fail(errorMsg)]] - fail the test with a custom message
* * [[FishingOutcomes.continue]] - continue with the next message given that the timeout has not been reached
* * [[FishingOutcomes.complete]] - successfully complete and return the message
* * [[FishingOutcomes.fail]] - fail the test with a custom message
*
* Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is
* received this will also fail the test, additionally if the `fisher` function throws a match error the error

View file

@ -15,7 +15,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
* Manual time allows you to do async tests while controlling the scheduler of the system.
*
* To use it you need to configure the `ActorSystem`/`ActorTestKit` with [[ManualTime.config]] and access the
* scheduler control through [[ManualTime.apply()]]
* scheduler control through [[ManualTime.apply]]
*/
object ManualTime {
@ -26,8 +26,8 @@ object ManualTime {
ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"""")
/**
* Access the manual scheduler, note that you need to setup the actor system/testkit with [[config()]] for this to
* work.
* Access the manual scheduler, note that you need to setup the actor system/testkit with [[ManualTime.config]]
* for this to work.
*/
def apply()(implicit system: ActorSystem[_]): ManualTime =
system.scheduler match {

View file

@ -86,7 +86,7 @@ abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Clas
object BehaviorInterceptor {
/**
* Abstraction of passing the on further in the behavior stack in [[BehaviorInterceptor#preStart]].
* Abstraction of passing the on further in the behavior stack in [[BehaviorInterceptor#aroundStart]].
*
* Not for user extension
*/

View file

@ -11,7 +11,7 @@ import akka.annotation.DoNotInherit
/**
* A message protocol for actors that support spawning a child actor when receiving a [[SpawnProtocol#Spawn]]
* message and sending back the [[ActorRef]] of the child actor. Create instances through the [[SpawnProtocol#apply]]
* or [[SpawnProtocol.create()]] factory methods.
* or [[SpawnProtocol#create]] factory methods.
*
* The typical usage of this is to use it as the guardian actor of the [[ActorSystem]], possibly combined with
* `Behaviors.setup` to starts some initial tasks or actors. Child actors can then be started from the outside

View file

@ -57,7 +57,7 @@ object SupervisorStrategy {
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* A maximum number of restarts can be specified with [[Backoff#withMaxRestarts]]
* A maximum number of restarts can be specified with [[BackoffSupervisorStrategy#withMaxRestarts]]
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated

View file

@ -15,21 +15,19 @@ import akka.util.ccompat.JavaConverters._
import akka.actor.typed.ExtensionSetup
/**
* Actor system extensions registry
*
* INTERNAL API
*
* Actor system extensions registry
*/
@InternalApi
trait ExtensionsImpl extends Extensions { self: ActorSystem[_] =>
private[akka] trait ExtensionsImpl extends Extensions { self: ActorSystem[_] =>
private val extensions = new ConcurrentHashMap[ExtensionId[_], AnyRef]
/**
* INTERNAL API
*
* Hook for ActorSystem to load extensions on startup
*/
@InternalApi private[akka] def loadExtensions(): Unit = {
def loadExtensions(): Unit = {
/*
* @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)

View file

@ -12,8 +12,10 @@ import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
@InternalApi
class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem)
/**
* INTERNAL API
*/
@InternalApi private[akka] final class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem)
extends SerializerWithStringManifest
with BaseSerializer {

View file

@ -15,7 +15,7 @@ import akka.actor.typed.internal.adapter.ActorContextAdapter
import akka.japi.Creator
/**
* Java API: Adapters between typed and classic actors and actor systems.
* Adapters between typed and classic actors and actor systems.
* The underlying `ActorSystem` is the classic [[akka.actor.ActorSystem]]
* which runs Akka [[akka.actor.typed.Behavior]] on an emulation layer. In this
* system typed and classic actors can coexist.

View file

@ -122,8 +122,6 @@ object Behaviors {
/**
* Construct an actor `Behavior` from a partial message handler which treats undefined messages as unhandled.
*
* Behaviors can also be composed with [[Behavior#orElse]].
*/
def receivePartial[T](onMessage: PartialFunction[(ActorContext[T], T), Behavior[T]]): Receive[T] =
Behaviors.receive[T] { (ctx, t) =>
@ -132,8 +130,6 @@ object Behaviors {
/**
* Construct an actor `Behavior` from a partial message handler which treats undefined messages as unhandled.
*
* Behaviors can also be composed with [[Behavior#orElse]].
*/
def receiveMessagePartial[T](onMessage: PartialFunction[T, Behavior[T]]): Receive[T] =
Behaviors.receive[T] { (_, t) =>

View file

@ -10,7 +10,7 @@ import akka.actor.typed.internal.adapter.{ PropsAdapter => _, _ }
import akka.annotation.InternalApi
/**
* Scala API: Adapters between typed and classic actors and actor systems.
* Adapters between typed and classic actors and actor systems.
* The underlying `ActorSystem` is the classic [[akka.actor.ActorSystem]]
* which runs Akka Typed [[akka.actor.typed.Behavior]] on an emulation layer. In this
* system typed and classic actors can coexist.

View file

@ -24,7 +24,7 @@ case object Done extends Done {
/**
* Java API: the singleton instance
*
* This is equivalent to [[Done#getInstance()]], but can be used with static import.
* This is equivalent to [[Done.getInstance]], but can be used with static import.
*/
def done(): Done = this
}

View file

@ -22,7 +22,7 @@ case object NotUsed extends NotUsed {
/**
* Java API: the singleton instance
*
* This is equivalent to [[NotUsed#getInstance()]], but can be used with static import.
* This is equivalent to [[NotUsed.getInstance]], but can be used with static import.
*/
def notUsed(): NotUsed = this
}

View file

@ -634,7 +634,7 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid
def mailboxes: Mailboxes
/**
* Register a block of code (callback) to run after [[ActorSystem.terminate()]] has been issued and
* Register a block of code (callback) to run after [[ActorSystem.terminate]] has been issued and
* all actors in this actor system have been stopped.
* Multiple code blocks may be registered by calling this method multiple times.
* The callbacks will be run sequentially in reverse order of registration, i.e.
@ -648,7 +648,7 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid
def registerOnTermination[T](code: => T): Unit
/**
* Java API: Register a block of code (callback) to run after [[ActorSystem.terminate()]] has been issued and
* Java API: Register a block of code (callback) to run after [[ActorSystem.terminate]] has been issued and
* all actors in this actor system have been stopped.
* Multiple code blocks may be registered by calling this method multiple times.
* The callbacks will be run sequentially in reverse order of registration, i.e.

View file

@ -526,37 +526,37 @@ object Backoff {
trait BackoffOptions {
/**
* @see [[ExtendedBackoffOptions.withAutoReset()]]
* @see [[ExtendedBackoffOptions.withAutoReset]]
*/
def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions
/**
* @see [[ExtendedBackoffOptions.withManualReset()]]
* @see [[ExtendedBackoffOptions.withManualReset]]
*/
def withManualReset: BackoffOptions
/**
* @see [[ExtendedBackoffOptions.withSupervisorStrategy()]]
* @see [[ExtendedBackoffOptions.withSupervisorStrategy]]
*/
def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): BackoffOptions
/**
* @see [[ExtendedBackoffOptions.withDefaultStoppingStrategy()]]
* @see [[ExtendedBackoffOptions.withDefaultStoppingStrategy]]
*/
def withDefaultStoppingStrategy: BackoffOptions
/**
* @see [[ExtendedBackoffOptions.withMaxNrOfRetries()]]
* @see [[ExtendedBackoffOptions.withMaxNrOfRetries]]
*/
def withMaxNrOfRetries(maxNrOfRetries: Int): BackoffOptions
/**
* @see [[ExtendedBackoffOptions.withReplyWhileStopped()]]
* @see [[ExtendedBackoffOptions.withReplyWhileStopped]]
*/
def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions
/**
* @see [[BackoffOnStopOptions.withFinalStopMessage()]]
* @see [[BackoffOnStopOptions.withFinalStopMessage]]
*/
def withFinalStopMessage(isFinalStopMessage: Any => Boolean): BackoffOptions

View file

@ -31,7 +31,7 @@ object SerializationSetup {
}
/**
* Setup for the serialization subsystem, constructor is *Internal API*, use factories in [[SerializationSetup()]]
* Setup for the serialization subsystem, constructor is *Internal API*, use factories in [[SerializationSetup]]
*/
final class SerializationSetup private (val createSerializers: ExtendedActorSystem => immutable.Seq[SerializerDetails])
extends Setup

View file

@ -236,7 +236,7 @@ trait BaseSerializer extends Serializer {
/**
* Globally unique serialization identifier configured in the `reference.conf`.
*
* See [[Serializer#identifier]].
* See [[Serializer.identifier]].
*/
override val identifier: Int = identifierFromConfig

View file

@ -14,8 +14,7 @@ import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState }
/**
* INTERNAL API
*/
@InternalApi
object ShardingState {
@InternalApi private[akka] object ShardingState {
def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage {
case GetShardRegionState(key, replyTo) =>

View file

@ -63,7 +63,7 @@ object Replicator {
extends Command
/**
* Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
* Reply from `Get`. The data value is retrieved with [[dd.Replicator.GetSuccess.get]] using the typed key.
*/
type GetResponse[A <: ReplicatedData] = dd.Replicator.GetResponse[A]
object GetSuccess {
@ -114,7 +114,7 @@ object Replicator {
*/
def apply[A <: ReplicatedData](key: Key[A], initial: A, writeConsistency: WriteConsistency)(
modify: A => A): ActorRef[UpdateResponse[A]] => Update[A] =
(replyTo => Update(key, writeConsistency, replyTo)(modifyWithInitial(initial, modify)))
replyTo => Update(key, writeConsistency, replyTo)(modifyWithInitial(initial, modify))
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A => A): Option[A] => A = {
case Some(data) => modify(data)
@ -225,28 +225,28 @@ object Replicator {
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
* @see [[Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef[Changed[A]]) extends Command
/**
* @see [[Replicator.Subscribe]]
* @see [[Subscribe]]
*/
type SubscribeResponse[A <: ReplicatedData] = dd.Replicator.SubscribeResponse[A]
/**
* The data value is retrieved with [[#get]] using the typed key.
* The data value is retrieved with [[dd.Replicator.Changed.get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
* @see [[Subscribe]]
*/
object Changed {
def unapply[A <: ReplicatedData](chg: Changed[A]): Option[Key[A]] = Some(chg.key)
}
/**
* The data value is retrieved with [[#get]] using the typed key.
* The data value is retrieved with [[dd.Replicator.Changed.get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
* @see [[Subscribe]]
*/
type Changed[A <: ReplicatedData] = dd.Replicator.Changed[A]
@ -255,7 +255,7 @@ object Replicator {
}
/**
* @see [[Replicator.Subscribe]]
* @see [[Delete]]
*/
type Deleted[A <: ReplicatedData] = dd.Replicator.Deleted[A]
@ -267,7 +267,7 @@ object Replicator {
def apply[A <: ReplicatedData](
key: Key[A],
consistency: WriteConsistency): ActorRef[DeleteResponse[A]] => Delete[A] =
(replyTo => Delete(key, consistency, replyTo))
replyTo => Delete(key, consistency, replyTo)
}
/**

View file

@ -14,7 +14,7 @@ import com.typesafe.config.Config
* Verifies that receptionist distributed-key-count are the same across cluster nodes
*/
@InternalApi
final class ClusterReceptionistConfigCompatChecker extends JoinConfigCompatChecker {
private[akka] final class ClusterReceptionistConfigCompatChecker extends JoinConfigCompatChecker {
override def requiredKeys = "akka.cluster.typed.receptionist.distributed-key-count" :: Nil

View file

@ -23,7 +23,7 @@ abstract class JoinConfigCompatChecker {
* Runs the Config check.
*
* Implementers are free to define what makes Config entry compatible or not.
* We do provide some pre-build checks tough: [[JoinConfigCompatChecker.exists()]] and [[JoinConfigCompatChecker.fullMatch()]]
* We do provide some pre-build checks tough: [[JoinConfigCompatChecker.exists]] and [[JoinConfigCompatChecker.fullMatch]]
*
* @param toCheck - the Config instance to be checked
* @param actualConfig - the Config instance containing the actual values

View file

@ -32,7 +32,7 @@ import akka.actor.typed.ActorRef
/**
* Persist all of a the given events. Each event will be applied through `applyEffect` separately but not until
* all events has been persisted. If `callback` is added through [[Effect#thenRun]] that will invoked
* all events has been persisted. If `callback` is added through [[EffectBuilder.thenRun]] that will invoked
* after all the events has been persisted.
*/
final def persist(events: java.util.List[Event]): EffectBuilder[Event, State] = PersistAll(events.asScala.toVector)
@ -73,7 +73,7 @@ import akka.actor.typed.ActorRef
* commands will not be processed by this `unstashAll` effect and have to be unstashed
* by another `unstashAll`.
*
* @see [[Effect.thenUnstashAll]]
* @see [[EffectBuilder.thenUnstashAll]]
*/
def unstashAll(): Effect[Event, State] =
none().thenUnstashAll()
@ -186,7 +186,7 @@ import akka.actor.typed.ActorRef
/**
* [[EventSourcedBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
* created with `Effects().reply`, `Effects().noReply`, [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]].
*/
@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State] {
self: EffectImpl[Event, State] =>

View file

@ -212,7 +212,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
/**
* A [[EventSourcedBehavior]] that is enforcing that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
* created with `Effects().reply`, `Effects().noReply`, [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]].
*/
abstract class EventSourcedBehaviorWithEnforcedReplies[Command, Event, State](
persistenceId: PersistenceId,

View file

@ -41,7 +41,7 @@ object RetentionCriteria {
@DoNotInherit abstract class SnapshotCountRetentionCriteria extends RetentionCriteria {
/**
* Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery()]].
* Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery]].
* Events that have sequence number less than the snapshot sequence number minus
* `keepNSnapshots * numberOfEvents` are deleted.
*/

View file

@ -38,7 +38,7 @@ object SignalHandlerBuilder {
/**
* Mutable builder for handling signals in [[EventSourcedBehavior]]
*
* Not for user instantiation, use [[EventSourcedBehavior#newSignalHandlerBuilder()]] to get an instance.
* Not for user instantiation, use [[EventSourcedBehavior.newSignalHandlerBuilder]] to get an instance.
*/
final class SignalHandlerBuilder[State] {

View file

@ -79,7 +79,7 @@ object Effect {
* commands will not be processed by this `unstashAll` effect and have to be unstashed
* by another `unstashAll`.
*
* @see [[Effect.thenUnstashAll]]
* @see [[EffectBuilder.thenUnstashAll]]
*/
def unstashAll[Event, State](): Effect[Event, State] =
CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]())
@ -178,7 +178,7 @@ trait EffectBuilder[+Event, State] extends Effect[Event, State] {
/**
* [[EventSourcedBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
* created with [[Effect.reply]], [[Effect.noReply]], [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]].
*
* Not intended for user extension.
*/

View file

@ -56,7 +56,7 @@ object EventSourcedBehavior {
/**
* Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
* created with [[Effect.reply]], [[Effect.noReply]], [[EffectBuilder.thenReply]], or [[EffectBuilder.thenNoReply]].
*/
def withEnforcedReplies[Command, Event, State](
persistenceId: PersistenceId,

View file

@ -41,7 +41,7 @@ object RetentionCriteria {
@DoNotInherit trait SnapshotCountRetentionCriteria extends RetentionCriteria {
/**
* Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery()]].
* Delete events after saving snapshot via [[RetentionCriteria.snapshotEvery]].
* Events that have sequence number less than the snapshot sequence number minus
* `keepNSnapshots * numberOfEvents` are deleted.
*/

View file

@ -31,8 +31,10 @@ import java.nio.MappedByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
/**
* Application to print out errors recorded in the command-and-control (cnc) file is maintained by
* media driver in shared memory. This application reads the cnc file and prints the distinct
* INTERNAL API
*
* <p>Application to print out errors recorded in the command-and-control (cnc) file is maintained
* by media driver in shared memory. This application reads the cnc file and prints the distinct
* errors. Layout of the cnc file is described in {@link CncFileDescriptor}.
*/
public class AeronErrorLog {

View file

@ -213,7 +213,7 @@ object SSLEngineProviderSetup {
* when the SSLEngineProvider implementation require other external constructor parameters
* or is created before the ActorSystem is created.
*
* Constructor is *Internal API*, use factories in [[SSLEngineProviderSetup()]]
* Constructor is *Internal API*, use factories in [[SSLEngineProviderSetup]]
*/
class SSLEngineProviderSetup private (val sslEngineProvider: ExtendedActorSystem => SSLEngineProvider) extends Setup

View file

@ -473,7 +473,7 @@ object TestSubscriber {
* Depending on the `signalDemand` parameter demand may be signalled immediately after obtaining the subscription
* in order to wake up a possibly lazy upstream. You can disable this by setting the `signalDemand` parameter to `false`.
*
* See also [[#expectSubscriptionAndError()]].
* See also [[#expectSubscriptionAndError]].
*/
def expectSubscriptionAndError(signalDemand: Boolean): Throwable = {
val sub = expectSubscription()

View file

@ -549,7 +549,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
}
/**
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push]]
* is now allowed to be called on this port.
*/
override def onPull(): Unit = {

View file

@ -99,7 +99,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
*
* This is the expected way for operators to access attributes.
*
* @see [[Attributes#get()]] For providing a default value if the attribute was not set
* @see [[Attributes#get]] For providing a default value if the attribute was not set
*/
def get[T <: Attribute: ClassTag]: Option[T] = {
val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]

View file

@ -146,12 +146,12 @@ object KillSwitches {
trait KillSwitch {
/**
* After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
* After calling [[KillSwitch#shutdown]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
*/
def shutdown(): Unit
/**
* After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed.
* After calling [[KillSwitch#abort]] the linked [[Graph]]s of [[FlowShape]] are failed.
*/
def abort(ex: Throwable): Unit
}
@ -194,11 +194,11 @@ private[stream] final class TerminationSignal {
* A [[UniqueKillSwitch]] is always a result of a materialization (unlike [[SharedKillSwitch]] which is constructed
* before any materialization) and it always controls that graph and operator which yielded the materialized value.
*
* After calling [[UniqueKillSwitch#shutdown()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* After calling [[UniqueKillSwitch#shutdown]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will complete its downstream and cancel its upstream (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of completion commands will be ignored.
*
* After calling [[UniqueKillSwitch#abort()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* After calling [[UniqueKillSwitch#abort]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will fail its downstream with the provided exception and cancel its upstream
* (unless if finished or failed already in which case the command is ignored). Subsequent invocations of
* completion commands will be ignored.
@ -209,14 +209,14 @@ private[stream] final class TerminationSignal {
final class UniqueKillSwitch private[stream] (private val promise: Promise[Done]) extends KillSwitch {
/**
* After calling [[UniqueKillSwitch#shutdown()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* After calling [[UniqueKillSwitch#shutdown]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will complete its downstream and cancel its upstream (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of completion commands will be ignored.
*/
def shutdown(): Unit = promise.trySuccess(Done)
/**
* After calling [[UniqueKillSwitch#abort()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* After calling [[UniqueKillSwitch#abort]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will fail its downstream with the provided exception and cancel its upstream
* (unless if finished or failed already in which case the command is ignored). Subsequent invocations of
* completion commands will be ignored.
@ -232,15 +232,15 @@ final class UniqueKillSwitch private[stream] (private val promise: Promise[Done]
* belongs to the switch from which it was acquired. Multiple [[SharedKillSwitch]] instances are isolated from each other,
* shutting down or aborting on instance does not affect the [[Graph]]s provided by another instance.
*
* After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the
* After calling [[SharedKillSwitch#shutdown]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will complete their downstreams and cancel their upstreams (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be
* case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown]] and [[SharedKillSwitch#abort]] will be
* ignored.
*
* After calling [[SharedKillSwitch#abort()]] all materialized, running instances of all [[Graph]]s provided by the
* After calling [[SharedKillSwitch#abort]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will fail their downstreams with the provided exception and cancel their upstreams
* (unless it finished or failed already in which case the command is ignored). Subsequent invocations of
* [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be ignored.
* [[SharedKillSwitch#shutdown]] and [[SharedKillSwitch#abort]] will be ignored.
*
* The [[Graph]]s provided by the [[SharedKillSwitch]] do not modify the passed through elements in any way or affect
* backpressure in the stream. All provided [[Graph]]s provide the parent [[SharedKillSwitch]] as materialized value.
@ -252,18 +252,18 @@ final class SharedKillSwitch private[stream] (val name: String) extends KillSwit
private[this] val _flow: Graph[FlowShape[Any, Any], SharedKillSwitch] = new SharedKillSwitchFlow
/**
* After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the
* After calling [[SharedKillSwitch#shutdown]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will complete their downstreams and cancel their upstreams (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be
* case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown]] and [[SharedKillSwitch#abort]] will be
* ignored.
*/
def shutdown(): Unit = terminationSignal.tryComplete(Success(Done))
/**
* After calling [[SharedKillSwitch#abort()]] all materialized, running instances of all [[Graph]]s provided by the
* After calling [[SharedKillSwitch#abort]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will fail their downstreams with the provided exception and cancel their upstreams
* (unless it finished or failed already in which case the command is ignored). Subsequent invocations of
* [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be ignored.
* [[SharedKillSwitch#shutdown]] and [[SharedKillSwitch#abort]] will be ignored.
*
* These provided [[Graph]]s materialize to their owning switch. This might make certain integrations simpler than
* passing around the switch instance itself.
@ -274,7 +274,7 @@ final class SharedKillSwitch private[stream] (val name: String) extends KillSwit
/**
* Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
* [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this
* [[SharedKillSwitch#shutdown]] or [[SharedKillSwitch#abort]] all running instances of all provided [[Graph]]s by this
* switch will be stopped normally or failed.
*
* @tparam T Type of the elements the Flow will forward

View file

@ -133,12 +133,12 @@ import akka.stream.snapshot._
* From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
* [[GraphInterpreter#GraphAssembly]] object and provides facilities to execute and interact with this assembly.
* The lifecycle of the Interpreter is roughly the following:
* - [[init()]] is called
* - [[execute()]] is called whenever there is need for execution, providing an upper limit on the processed events
* - [[finish()]] is called before the interpreter is disposed, preferably after [[isCompleted]] returned true, although
* - [[init]] is called
* - [[execute]] is called whenever there is need for execution, providing an upper limit on the processed events
* - [[finish]] is called before the interpreter is disposed, preferably after [[isCompleted]] returned true, although
* in abort cases this is not strictly necessary
*
* The [[execute()]] method of the interpreter accepts an upper bound on the events it will process. After this limit
* The [[execute]] method of the interpreter accepts an upper bound on the events it will process. After this limit
* is reached or there are no more pending events to be processed, the call returns. It is possible to inspect
* if there are unprocessed events left via the [[isSuspended]] method. [[isCompleted]] returns true once all operators
* reported completion inside the interpreter.

View file

@ -211,7 +211,7 @@ package akka.stream
* val newBuilder = builder.add(submodule, newShape) // Add the module, and register it with the new shape
* newBuilder.wire(newShape.in, ...) // Use the new ports to wire
*
* What happens in the background is that [[Shape.deepCopy()]] creates copies of the ports, and fills their
* What happens in the background is that [[Shape.deepCopy]] creates copies of the ports, and fills their
* mappedTo field to point to their original port counterpart. Whenever we call wire in the outer module, it
* delegates calls to the submodule, but using the original port (as the submodule builder has no knowledge of
* the external mapping):
@ -300,7 +300,7 @@ package akka.stream
* If we consider the purely linear case, we still need to figure out how can we provide a traversal even though
* the last output port is unwired. The trick that is used is to wire this output port optimistically to the
* relative address -1 which is almost always correct (why -1? explained a bit later). If it turns out to be incorrect
* later, we fix it by the helper method [[akka.stream.impl.Traversal.rewireFirstTo()]] which tears down the traversal
* later, we fix it by the helper method [[akka.stream.impl.Traversal.rewireFirstTo]] which tears down the traversal
* until the wrong module is found, then fixes the port assignment. This is only possible on purely linear layouts though.
* Again, this is an example of the 90% rule. Most appends will not need this rewiring and hence be as fast as possible
* while the rarer cases suffering a minor penalty.
@ -455,8 +455,8 @@ package akka.stream
*
* * [[akka.stream.impl.PhasedFusingActorMaterializer.Debug]]: if this flag is turned on, the materializer will
* log the steps it takes
* * [[akka.stream.impl.TraversalBuilder.printTraversal()]]: Prints the Traversal in a readable format
* * [[akka.stream.impl.TraversalBuilder.printWiring()]]: Prints the calculated port assignments. Useful for
* * [[akka.stream.impl.TraversalBuilder.printTraversal]]: Prints the Traversal in a readable format
* * [[akka.stream.impl.TraversalBuilder.printWiring]]: Prints the calculated port assignments. Useful for
* debugging if everything is wired to the right thing.
*
*/

View file

@ -13,7 +13,7 @@ import akka.stream.StreamRefResolver
/**
* INTERNAL API
*/
@InternalApi final class StreamRefResolverImpl(system: ExtendedActorSystem) extends StreamRefResolver {
@InternalApi private[akka] final class StreamRefResolverImpl(system: ExtendedActorSystem) extends StreamRefResolver {
def toSerializationFormat[T](ref: SourceRef[T]): String = ref match {
case SourceRefImpl(actorRef) =>

View file

@ -3264,7 +3264,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -3279,7 +3279,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -3294,7 +3294,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -3313,7 +3313,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated

View file

@ -3668,7 +3668,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -3683,7 +3683,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -3698,7 +3698,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -3717,7 +3717,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated

View file

@ -34,7 +34,7 @@ object SourceWithContext {
* use [[SourceWithContext#via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.asSourceWithContext()]]
* Can be created by calling [[Source.asSourceWithContext]]
*/
final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithContext[Out, Ctx, Mat])
extends GraphDelegate(delegate) {

View file

@ -2110,7 +2110,7 @@ class SubFlow[In, Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -2125,7 +2125,7 @@ class SubFlow[In, Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -2140,7 +2140,7 @@ class SubFlow[In, Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -2159,7 +2159,7 @@ class SubFlow[In, Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated

View file

@ -2086,7 +2086,7 @@ class SubSource[Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -2101,7 +2101,7 @@ class SubSource[Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -2116,7 +2116,7 @@ class SubSource[Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@ -2135,7 +2135,7 @@ class SubSource[Out, Mat](
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* [[throttle]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated

View file

@ -314,7 +314,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*/
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
@ -335,7 +335,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*
* Marked API-may-change to leave room for an improvement around the very long parameter list.
*/
@ -371,7 +371,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
@ -389,7 +389,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
@ -422,7 +422,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind()]]
* @see [[Tcp.bind]]
* Marked API-may-change to leave room for an improvement around the very long parameter list.
*
* Note: the half close parameter is currently ignored
@ -451,7 +451,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind()]]
* @see [[Tcp.bind]]
*/
@deprecated(
"Use bindWithTls that takes a SSLEngine factory instead. " +
@ -472,7 +472,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind()]]
* @see [[Tcp.bind]]
*/
def bindWithTls(
interface: String,
@ -489,7 +489,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
* where all incoming and outgoing bytes are passed through TLS.
*
* @see [[Tcp.bind()]]
* @see [[Tcp.bind]]
*/
def bindWithTls(
interface: String,

View file

@ -2334,8 +2334,8 @@ trait FlowOps[+Out, +Mat] {
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
* [[throttle]] with maximumBurst attribute.
* @see [[throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
@ -2349,8 +2349,8 @@ trait FlowOps[+Out, +Mat] {
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
* [[throttle]] with maximumBurst attribute.
* @see [[throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")

View file

@ -23,7 +23,7 @@ object SourceWithContext {
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.asSourceWithContext()]]
* Can be created by calling [[Source.asSourceWithContext]]
*/
final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Source[(Out, Ctx), Mat])
extends GraphDelegate(delegate)

View file

@ -247,12 +247,12 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* For more advanced use cases you can manually combine [[Tcp.outgoingConnection()]] and [[TLS]]
* For more advanced use cases you can manually combine [[Tcp.outgoingConnection]] and [[TLS]]
*
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*/
@deprecated(
"Use outgoingConnectionWithTls that takes a SSLEngine factory instead. " +
@ -270,7 +270,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* The returned flow represents a TCP client connection to the given endpoint where all bytes in and
* out go through TLS.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
*
@ -304,7 +304,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
@ -327,7 +327,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
* You specify a factory to create an SSLEngine that must already be configured for
* client mode and with all the parameters for the first session.
*
* @see [[Tcp.outgoingConnection()]]
* @see [[Tcp.outgoingConnection]]
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
@ -483,7 +483,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* @param negotiateNewSession Details about what to require when negotiating the connection with the server
* @param sslContext Context containing details such as the trust and keystore
* @see [[Tcp.bindAndHandle()]]
* @see [[Tcp.bindAndHandle]]
*
* Marked API-may-change to leave room for an improvement around the very long parameter list.
*/

View file

@ -280,7 +280,7 @@ object GraphStageLogic {
* of the enclosing [[GraphStage]]
* * Possible mutable state, accessible from the [[InHandler]] and [[OutHandler]] callbacks, but not from anywhere
* else (as such access would not be thread-safe)
* * The lifecycle hooks [[preStart()]] and [[postStop()]]
* * The lifecycle hooks [[preStart]] and [[postStop]]
* * Methods for performing stream processing actions, like pulling or pushing elements
*
* The operator logic is completed once all its input and output ports have been closed. This can be changed by
@ -474,7 +474,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/**
* Requests an element on the given port. Calling this method twice before an element arrived will fail.
* There can only be one outstanding request at any given time. The method [[hasBeenPulled()]] can be used
* There can only be one outstanding request at any given time. The method [[hasBeenPulled]] can be used
* query whether pull is allowed to be called or not. This method will also fail if the port is already closed.
*/
final protected def pull[T](in: Inlet[T]): Unit = {
@ -499,7 +499,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/**
* Requests an element on the given port unless the port is already closed.
* Calling this method twice before an element arrived will fail.
* There can only be one outstanding request at any given time. The method [[hasBeenPulled()]] can be used
* There can only be one outstanding request at any given time. The method [[hasBeenPulled]] can be used
* query whether pull is allowed to be called or not.
*/
final protected def tryPull[T](in: Inlet[T]): Unit = if (!isClosed(in)) pull(in)
@ -518,11 +518,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
final protected def cancel[T](in: Inlet[T], cause: Throwable): Unit = interpreter.cancel(conn(in), cause)
/**
* Once the callback [[InHandler.onPush()]] for an input port has been invoked, the element that has been pushed
* can be retrieved via this method. After [[grab()]] has been called the port is considered to be empty, and further
* calls to [[grab()]] will fail until the port is pulled again and a new element is pushed as a response.
* Once the callback [[InHandler.onPush]] for an input port has been invoked, the element that has been pushed
* can be retrieved via this method. After [[grab]] has been called the port is considered to be empty, and further
* calls to [[grab]] will fail until the port is pulled again and a new element is pushed as a response.
*
* The method [[isAvailable()]] can be used to query if the port has an element that can be grabbed or not.
* The method [[isAvailable]] can be used to query if the port has an element that can be grabbed or not.
*/
final protected def grab[T](in: Inlet[T]): T = {
val connection = conn(in)
@ -554,15 +554,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/**
* Indicates whether there is already a pending pull for the given input port. If this method returns true
* then [[isAvailable()]] must return false for that same port.
* then [[isAvailable]] must return false for that same port.
*/
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (conn(in).portState & (InReady | InClosed)) == 0
/**
* Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the
* element. After calling [[grab()]] this method will return false.
* Indicates whether there is an element waiting at the given input port. [[grab]] can be used to retrieve the
* element. After calling [[grab]] this method will return false.
*
* If this method returns true then [[hasBeenPulled()]] will return false for that same port.
* If this method returns true then [[hasBeenPulled]] will return false for that same port.
*/
final protected def isAvailable[T](in: Inlet[T]): Boolean = {
val connection = conn(in)
@ -591,8 +591,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
final protected def isClosed[T](in: Inlet[T]): Boolean = (conn(in).portState & InClosed) != 0
/**
* Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived
* will fail. There can be only one outstanding push request at any given time. The method [[isAvailable()]] can be
* Emits an element through the given output port. Calling this method twice before a [[pull]] has been arrived
* will fail. There can be only one outstanding push request at any given time. The method [[isAvailable]] can be
* used to check if the port is ready to be pushed or not.
*/
final protected def push[T](out: Outlet[T], elem: T): Unit = {
@ -645,7 +645,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex)
/**
* Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called,
* Automatically invokes [[cancel]] or [[complete]] on all the input or output ports that have been called,
* then marks the operator as stopped.
*/
final def completeStage(): Unit = cancelStage(SubscriptionWithCancelException.StageWasCompleted)
@ -658,7 +658,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
@InternalApi private[stream] var lastCancellationCause: Throwable = _
/**
* Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called,
* Automatically invokes [[cancel]] or [[complete]] on all the input or output ports that have been called,
* then marks the stage as stopped.
*/
final def cancelStage(cause: Throwable): Unit = {
@ -681,7 +681,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
/**
* Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called,
* Automatically invokes [[cancel]] or [[fail]] on all the input or output ports that have been called,
* then marks the operator as stopped.
*/
final def failStage(ex: Throwable): Unit = {
@ -1077,14 +1077,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/**
* Obtain a callback object that can be used asynchronously to re-enter the
* current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned
* current [[GraphStage]] with an asynchronous notification. The [[invoke]] method of the returned
* [[AsyncCallback]] is safe to be called from other threads. It will in the background thread-safely
* delegate to the passed callback function. I.e. [[invoke()]] will be called by other thread and
* delegate to the passed callback function. I.e. [[invoke]] will be called by other thread and
* the passed handler will be invoked eventually in a thread-safe way by the execution environment.
*
* In case stream is not yet materialized [[AsyncCallback]] will buffer events until stream is available.
*
* [[AsyncCallback.invokeWithFeedback()]] has an internal promise that will be failed if event cannot be processed
* [[AsyncCallback.invokeWithFeedback]] has an internal promise that will be failed if event cannot be processed
* due to stream completion.
*
* To be thread safe this method must only be called from either the constructor of the graph operator during
@ -1100,26 +1100,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
/**
* ConcurrentAsyncCallback allows to call [[invoke()]] and [[invokeWithPromise()]] with event attribute.
* ConcurrentAsyncCallback allows to call [[invoke]] and [[invokeWithPromise]] with event attribute.
* This event will be sent to the stream and the corresponding handler will be called with this attribute in thread-safe manner.
*
* State of this object can be changed both "internally" by the owning GraphStage or by the "external world" (e.g. other threads).
* Specifically, calls to this class can be made:
* * From the owning [[GraphStage]], to [[onStart]] - when materialization is finished and to [[onStop()]] -
* * From the owning [[GraphStage]], to [[onStart]] - when materialization is finished and to [[onStop]] -
* because the operator is about to stop or fail.
* * "Real world" calls [[invoke()]] and [[invokeWithFeedback()]]. These methods have synchronization
* * "Real world" calls [[invoke]] and [[invokeWithFeedback]]. These methods have synchronization
* with class state that reflects the stream state
*
* onStart sends all events that were buffered while stream was materializing.
* In case "Real world" added more events while initializing, onStart checks for more events in buffer when exiting and
* resend new events
*
* Once class is in `Initialized` state - all "Real world" calls of [[invoke()]] and [[invokeWithFeedback()]] are running
* Once class is in `Initialized` state - all "Real world" calls of [[invoke]] and [[invokeWithFeedback]] are running
* as is - without blocking each other.
*
* [[GraphStage]] is called [[onStop()]] when stream is wrapping down. onStop fails all futures for events that have not yet processed
* [[onStop()]] puts class in `Completed` state
* "Real world" calls of [[invokeWithFeedback()]] always return failed promises for `Completed` state
* [[GraphStage]] is called [[onStop]] when stream is wrapping down. onStop fails all futures for events that have not yet processed
* [[onStop]] puts class in `Completed` state
* "Real world" calls of [[invokeWithFeedback]] always return failed promises for `Completed` state
*/
private final class ConcurrentAsyncCallback[T](handler: T => Unit) extends AsyncCallback[T] {
@ -1198,12 +1198,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/**
* Java API: Obtain a callback object that can be used asynchronously to re-enter the
* current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned
* current [[GraphStage]] with an asynchronous notification. The [[invoke]] method of the returned
* [[AsyncCallback]] is safe to be called from other threads. It will in the background thread-safely
* delegate to the passed callback function. I.e. [[invoke()]] will be called by other thread and
* delegate to the passed callback function. I.e. [[invoke]] will be called by other thread and
* the passed handler will be invoked eventually in a thread-safe way by the execution environment.
*
* [[AsyncCallback.invokeWithFeedback()]] has an internal promise that will be failed if event cannot be processed due to stream completion.
* [[AsyncCallback.invokeWithFeedback]] has an internal promise that will be failed if event cannot be processed due to stream completion.
*
* This object can be cached and reused within the same [[GraphStageLogic]].
*/
@ -1752,7 +1752,7 @@ trait InHandler {
/**
* Called when the input port has a new element available. The actual element can be retrieved via the
* [[GraphStageLogic.grab()]] method.
* [[GraphStageLogic.grab]] method.
*/
@throws(classOf[Exception])
def onPush(): Unit
@ -1776,7 +1776,7 @@ trait InHandler {
trait OutHandler {
/**
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push]]
* is now allowed to be called on this port.
*/
@throws(classOf[Exception])