=str fix up Scaladocs for Source & Sink

This commit is contained in:
Nafer Sanabria 2016-08-11 07:37:54 -05:00 committed by Johan Andrén
parent 614512f92b
commit 2f85cf0fb8
5 changed files with 40 additions and 29 deletions

View file

@ -8,11 +8,11 @@ import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Props } import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.japi.function import akka.japi.function
import akka.stream.impl.{ LazySink, StreamLayout, SinkQueueAdapter } import akka.stream.impl.{ StreamLayout, SinkQueueAdapter }
import akka.stream.{ javadsl, scaladsl, _ } import akka.stream.{ javadsl, scaladsl, _ }
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.ExecutionContext
import scala.util.Try import scala.util.Try
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
@ -110,7 +110,7 @@ object Sink {
/** /**
* A `Sink` that materializes into a `CompletionStage` of the first value received. * A `Sink` that materializes into a `CompletionStage` of the first value received.
* If the stream completes before signaling at least a single element, the CompletionStage will be failed with a [[NoSuchElementException]]. * If the stream completes before signaling at least a single element, the CompletionStage will be failed with a [[NoSuchElementException]].
* If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception. * If the stream signals an error before signaling at least a single element, the CompletionStage will be failed with the streams exception.
* *
* See also [[headOption]]. * See also [[headOption]].
*/ */
@ -266,7 +266,7 @@ object Sink {
/** /**
* Java API * Java API
* *
* A `Sink` is a set of stream processing steps that has one open input and an attached output. * A `Sink` is a set of stream processing steps that has one open input.
* Can be used as a `Subscriber` * Can be used as a `Subscriber`
*/ */
final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[In], Mat] { final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[In], Mat] {
@ -276,7 +276,9 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
override def toString: String = delegate.toString override def toString: String = delegate.toString
/** Converts this Sink to its Scala DSL counterpart */ /**
* Converts this Sink to its Scala DSL counterpart.
*/
def asScala: scaladsl.Sink[In, Mat] = delegate def asScala: scaladsl.Sink[In, Mat] = delegate
/** /**
@ -303,7 +305,7 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
new Sink(delegate.mapMaterializedValue(f.apply _)) new Sink(delegate.mapMaterializedValue(f.apply _))
/** /**
* Change the attributes of this [[Source]] to the given ones and seal the list * Change the attributes of this [[Sink]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these * of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this * attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply * operation has no effect on an empty Flow (because the attributes apply
@ -313,7 +315,7 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
new Sink(delegate.withAttributes(attr)) new Sink(delegate.withAttributes(attr))
/** /**
* Add the given attributes to this Source. Further calls to `withAttributes` * Add the given attributes to this Sink. Further calls to `withAttributes`
* will not remove these attributes. Note that this * will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply * operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages). * only to the contained processing stages).
@ -322,7 +324,7 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
new Sink(delegate.addAttributes(attr)) new Sink(delegate.addAttributes(attr))
/** /**
* Add a ``name`` attribute to this Flow. * Add a ``name`` attribute to this Sink.
*/ */
override def named(name: String): javadsl.Sink[In, Mat] = override def named(name: String): javadsl.Sink[In, Mat] =
new Sink(delegate.named(name)) new Sink(delegate.named(name))

View file

@ -309,7 +309,7 @@ object Source {
} }
/** /**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. * Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated. * if downstream is terminated.
@ -318,15 +318,15 @@ object Source {
* there is no space available in the buffer. * there is no space available in the buffer.
* *
* Acknowledgement mechanism is available. * Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns `CompletionStage<StreamCallbackStatus<Boolean>>` which completes with `Success(true)` * [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage<StreamCallbackStatus<Boolean>>` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] * with [[akka.stream.javadsl.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed. * when downstream is completed.
* *
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage` * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage`
* call when buffer is full. * call when buffer is full.
* *
* You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]]. * You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed. * It returns future that completes with success when stream is completed or fail when stream is failed.
* *
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand. * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand.
@ -376,7 +376,7 @@ object Source {
/** /**
* Start a new `Source` from some resource which can be opened, read and closed. * Start a new `Source` from some resource which can be opened, read and closed.
* It's similar to `unfoldResource` but takes functions that return `CopletionStage` instead of plain values. * It's similar to `unfoldResource` but takes functions that return `CompletionStage` instead of plain values.
* *
* You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`. * You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`.
* All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream. * All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream.
@ -418,7 +418,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
override def toString: String = delegate.toString override def toString: String = delegate.toString
/** Converts this Java DSL element to its Scala DSL counterpart. */ /**
* Converts this Java DSL element to its Scala DSL counterpart.
*/
def asScala: scaladsl.Source[Out, Mat] = delegate def asScala: scaladsl.Source[Out, Mat] = delegate
/** /**
@ -909,6 +911,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*/ */
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] =
new Source(delegate.recoverWithRetries(attempts, pf)) new Source(delegate.recoverWithRetries(attempts, pf))
/** /**
* Transform each input element into an `Iterable` of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. * then flattened into the output stream.
@ -1553,7 +1556,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state. * state.
*/ */
@ -2001,7 +2003,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.addAttributes(attr)) new Source(delegate.addAttributes(attr))
/** /**
* Add a ``name`` attribute to this Flow. * Add a ``name`` attribute to this Source.
*/ */
override def named(name: String): javadsl.Source[Out, Mat] = override def named(name: String): javadsl.Source[Out, Mat] =
new Source(delegate.named(name)) new Source(delegate.named(name))

View file

@ -1831,7 +1831,7 @@ trait FlowOps[+Out, +Mat] {
/** /**
* Put an asynchronous boundary around this `Flow`. * Put an asynchronous boundary around this `Flow`.
* *
* If this is a `SubFlow` (created e.g. by `groupBy`), this creates an * If this is a `SubFlow` (created e.g. by `groupBy`), this creates an
* asynchronous boundary around each materialized sub-flow, not the * asynchronous boundary around each materialized sub-flow, not the
* super-flow. That way, the super-flow will communicate with sub-flows * super-flow. That way, the super-flow will communicate with sub-flows

View file

@ -23,7 +23,7 @@ import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
/** /**
* A `Sink` is a set of stream processing steps that has one open input and an attached output. * A `Sink` is a set of stream processing steps that has one open input.
* Can be used as a `Subscriber` * Can be used as a `Subscriber`
*/ */
final class Sink[-In, +Mat](override val module: Module) final class Sink[-In, +Mat](override val module: Module)
@ -50,11 +50,14 @@ final class Sink[-In, +Mat](override val module: Module)
def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: Materializer): Mat2 = def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: Materializer): Mat2 =
Source.fromGraph(source).to(this).run() Source.fromGraph(source).to(this).run()
/**
* Transform only the materialized value of this Sink, leaving all other properties as they were.
*/
def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] = def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] =
new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any])) new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/** /**
* Change the attributes of this [[Source]] to the given ones and seal the list * Change the attributes of this [[Sink]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these * of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this * attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply * operation has no effect on an empty Flow (because the attributes apply
@ -64,7 +67,7 @@ final class Sink[-In, +Mat](override val module: Module)
new Sink(module.withAttributes(attr)) new Sink(module.withAttributes(attr))
/** /**
* Add the given attributes to this Source. Further calls to `withAttributes` * Add the given attributes to this Sink. Further calls to `withAttributes`
* will not remove these attributes. Note that this * will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply * operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages). * only to the contained processing stages).
@ -73,7 +76,7 @@ final class Sink[-In, +Mat](override val module: Module)
withAttributes(module.attributes and attr) withAttributes(module.attributes and attr)
/** /**
* Add a ``name`` attribute to this Flow. * Add a ``name`` attribute to this Sink.
*/ */
override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name)) override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name))
@ -82,7 +85,9 @@ final class Sink[-In, +Mat](override val module: Module)
*/ */
override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary) override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary)
/** Converts this Scala DSL element to it's Java DSL counterpart. */ /**
* Converts this Scala DSL element to it's Java DSL counterpart.
*/
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this)
} }
@ -228,7 +233,7 @@ object Sink {
* [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the * [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the
* element is dropped and the stream continues. * element is dropped and the stream continues.
* *
* @see [[#mapAsyncUnordered]] * See also [[Flow.mapAsyncUnordered]]
*/ */
def foreachParallel[T](parallelism: Int)(f: T Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] = def foreachParallel[T](parallelism: Int)(f: T Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] =
Flow[T].mapAsyncUnordered(parallelism)(t Future(f(t))).toMat(Sink.ignore)(Keep.right) Flow[T].mapAsyncUnordered(parallelism)(t Future(f(t))).toMat(Sink.ignore)(Keep.right)
@ -344,7 +349,7 @@ object Sink {
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None * For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None
* as completion marker * as completion marker
* *
* @see [[akka.stream.scaladsl.SinkQueueWithCancel]] * See also [[akka.stream.scaladsl.SinkQueueWithCancel]]
*/ */
def queue[T](): Sink[T, SinkQueueWithCancel[T]] = def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink()) Sink.fromGraph(new QueueSink())

View file

@ -136,7 +136,7 @@ final class Source[+Out, +Mat](override val module: Module)
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr) override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr)
/** /**
* Add a ``name`` attribute to this Flow. * Add a ``name`` attribute to this Source.
*/ */
override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name)) override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name))
@ -145,7 +145,9 @@ final class Source[+Out, +Mat](override val module: Module)
*/ */
override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary)
/** Converts this Scala DSL element to it's Java DSL counterpart. */ /**
* Converts this Scala DSL element to it's Java DSL counterpart.
*/
def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)
/** /**
@ -436,7 +438,7 @@ object Source {
} }
/** /**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated. * if downstream is terminated.
@ -445,7 +447,7 @@ object Source {
* there is no space available in the buffer. * there is no space available in the buffer.
* *
* Acknowledgement mechanism is available. * Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)` * [[akka.stream.scaladsl.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] * with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed. * when downstream is completed.
@ -453,7 +455,7 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full. * call when buffer is full.
* *
* You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]]. * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed. * It returns future that completes with success when stream is completed or fail when stream is failed.
* *
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand. * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand.