From 2f85cf0fb8af1f1edf779314c7aca00b6eee08a6 Mon Sep 17 00:00:00 2001 From: Nafer Sanabria Date: Thu, 11 Aug 2016 07:37:54 -0500 Subject: [PATCH] =str fix up Scaladocs for Source & Sink --- .../main/scala/akka/stream/javadsl/Sink.scala | 18 ++++++++++-------- .../scala/akka/stream/javadsl/Source.scala | 18 ++++++++++-------- .../scala/akka/stream/scaladsl/Flow.scala | 2 +- .../scala/akka/stream/scaladsl/Sink.scala | 19 ++++++++++++------- .../scala/akka/stream/scaladsl/Source.scala | 12 +++++++----- 5 files changed, 40 insertions(+), 29 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 97ea1214aa..61ed2566bb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -8,11 +8,11 @@ import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function -import akka.stream.impl.{ LazySink, StreamLayout, SinkQueueAdapter } +import akka.stream.impl.{ StreamLayout, SinkQueueAdapter } import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.compat.java8.OptionConverters._ -import scala.concurrent.{ Future, ExecutionContext } +import scala.concurrent.ExecutionContext import scala.util.Try import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ @@ -110,7 +110,7 @@ object Sink { /** * A `Sink` that materializes into a `CompletionStage` of the first value received. * If the stream completes before signaling at least a single element, the CompletionStage will be failed with a [[NoSuchElementException]]. - * If the stream signals an error errors before signaling at least a single element, the CompletionStage will be failed with the streams exception. + * If the stream signals an error before signaling at least a single element, the CompletionStage will be failed with the streams exception. * * See also [[headOption]]. */ @@ -266,7 +266,7 @@ object Sink { /** * Java API * - * A `Sink` is a set of stream processing steps that has one open input and an attached output. + * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` */ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[In], Mat] { @@ -276,7 +276,9 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink override def toString: String = delegate.toString - /** Converts this Sink to its Scala DSL counterpart */ + /** + * Converts this Sink to its Scala DSL counterpart. + */ def asScala: scaladsl.Sink[In, Mat] = delegate /** @@ -303,7 +305,7 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink new Sink(delegate.mapMaterializedValue(f.apply _)) /** - * Change the attributes of this [[Source]] to the given ones and seal the list + * Change the attributes of this [[Sink]] to the given ones and seal the list * of attributes. This means that further calls will not be able to remove these * attributes, but instead add new ones. Note that this * operation has no effect on an empty Flow (because the attributes apply @@ -313,7 +315,7 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink new Sink(delegate.withAttributes(attr)) /** - * Add the given attributes to this Source. Further calls to `withAttributes` + * Add the given attributes to this Sink. Further calls to `withAttributes` * will not remove these attributes. Note that this * operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). @@ -322,7 +324,7 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink new Sink(delegate.addAttributes(attr)) /** - * Add a ``name`` attribute to this Flow. + * Add a ``name`` attribute to this Sink. */ override def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 94b31c0fd3..420b4edfe1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -309,7 +309,7 @@ object Source { } /** - * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. + * Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * if downstream is terminated. @@ -318,15 +318,15 @@ object Source { * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.SourceQueue.offer]] returns `CompletionStage>` which completes with `Success(true)` + * [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage>` which completes with `Success(true)` * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete - * with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] + * with [[akka.stream.javadsl.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] * when downstream is completed. * * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage` * call when buffer is full. * - * You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]]. + * You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueue.watchCompletion]]. * It returns future that completes with success when stream is completed or fail when stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand. @@ -376,7 +376,7 @@ object Source { /** * Start a new `Source` from some resource which can be opened, read and closed. - * It's similar to `unfoldResource` but takes functions that return `CopletionStage` instead of plain values. + * It's similar to `unfoldResource` but takes functions that return `CompletionStage` instead of plain values. * * You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`. * All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream. @@ -418,7 +418,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap override def toString: String = delegate.toString - /** Converts this Java DSL element to its Scala DSL counterpart. */ + /** + * Converts this Java DSL element to its Scala DSL counterpart. + */ def asScala: scaladsl.Source[Out, Mat] = delegate /** @@ -909,6 +911,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap */ def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = new Source(delegate.recoverWithRetries(attempts, pf)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. @@ -1553,7 +1556,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels * - * @param seed Provides the first state for extrapolation using the first unconsumed element * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ @@ -2001,7 +2003,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.addAttributes(attr)) /** - * Add a ``name`` attribute to this Flow. + * Add a ``name`` attribute to this Source. */ override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index eb5b12a91a..0db0efd21d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1831,7 +1831,7 @@ trait FlowOps[+Out, +Mat] { /** * Put an asynchronous boundary around this `Flow`. - * + * * If this is a `SubFlow` (created e.g. by `groupBy`), this creates an * asynchronous boundary around each materialized sub-flow, not the * super-flow. That way, the super-flow will communicate with sub-flows diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index e2964f727e..c82445586c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -23,7 +23,7 @@ import scala.concurrent.{ Await, ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** - * A `Sink` is a set of stream processing steps that has one open input and an attached output. + * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` */ final class Sink[-In, +Mat](override val module: Module) @@ -50,11 +50,14 @@ final class Sink[-In, +Mat](override val module: Module) def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: Materializer): Mat2 = Source.fromGraph(source).to(this).run() + /** + * Transform only the materialized value of this Sink, leaving all other properties as they were. + */ def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** - * Change the attributes of this [[Source]] to the given ones and seal the list + * Change the attributes of this [[Sink]] to the given ones and seal the list * of attributes. This means that further calls will not be able to remove these * attributes, but instead add new ones. Note that this * operation has no effect on an empty Flow (because the attributes apply @@ -64,7 +67,7 @@ final class Sink[-In, +Mat](override val module: Module) new Sink(module.withAttributes(attr)) /** - * Add the given attributes to this Source. Further calls to `withAttributes` + * Add the given attributes to this Sink. Further calls to `withAttributes` * will not remove these attributes. Note that this * operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). @@ -73,7 +76,7 @@ final class Sink[-In, +Mat](override val module: Module) withAttributes(module.attributes and attr) /** - * Add a ``name`` attribute to this Flow. + * Add a ``name`` attribute to this Sink. */ override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name)) @@ -82,7 +85,9 @@ final class Sink[-In, +Mat](override val module: Module) */ override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary) - /** Converts this Scala DSL element to it's Java DSL counterpart. */ + /** + * Converts this Scala DSL element to it's Java DSL counterpart. + */ def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) } @@ -228,7 +233,7 @@ object Sink { * [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the * element is dropped and the stream continues. * - * @see [[#mapAsyncUnordered]] + * See also [[Flow.mapAsyncUnordered]] */ def foreachParallel[T](parallelism: Int)(f: T ⇒ Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] = Flow[T].mapAsyncUnordered(parallelism)(t ⇒ Future(f(t))).toMat(Sink.ignore)(Keep.right) @@ -344,7 +349,7 @@ object Sink { * For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None * as completion marker * - * @see [[akka.stream.scaladsl.SinkQueueWithCancel]] + * See also [[akka.stream.scaladsl.SinkQueueWithCancel]] */ def queue[T](): Sink[T, SinkQueueWithCancel[T]] = Sink.fromGraph(new QueueSink()) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 3ce9ddc387..c0549efb71 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -136,7 +136,7 @@ final class Source[+Out, +Mat](override val module: Module) override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr) /** - * Add a ``name`` attribute to this Flow. + * Add a ``name`` attribute to this Source. */ override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name)) @@ -145,7 +145,9 @@ final class Source[+Out, +Mat](override val module: Module) */ override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) - /** Converts this Scala DSL element to it's Java DSL counterpart. */ + /** + * Converts this Scala DSL element to it's Java DSL counterpart. + */ def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) /** @@ -436,7 +438,7 @@ object Source { } /** - * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. + * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * if downstream is terminated. @@ -445,7 +447,7 @@ object Source { * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)` + * [[akka.stream.scaladsl.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)` * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete * with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] * when downstream is completed. @@ -453,7 +455,7 @@ object Source { * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` * call when buffer is full. * - * You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]]. + * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]]. * It returns future that completes with success when stream is completed or fail when stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand.