diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index d297c3cba3..e212df54be 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -14,6 +14,8 @@ import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.util.ByteString +import scala.concurrent.Future + class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val ec = system.dispatcher @@ -45,8 +47,9 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { //#file-source - SynchronousFileSource(file) - .runForeach((chunk: ByteString) ⇒ handle(chunk)) + val foreach: Future[Long] = SynchronousFileSource(file) + .to(Sink.ignore) + .run() //#file-source } diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala index 3c81fa40b0..16bb322843 100644 --- a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala @@ -36,7 +36,7 @@ object InputStreamSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(createInputStream: Creator[InputStream]): javadsl.Source[ByteString, Future[Long]] = + def create(createInputStream: Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = create(createInputStream, DefaultChunkSize) /** @@ -47,7 +47,7 @@ object InputStreamSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(createInputStream: Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[Long]] = - apply(() ⇒ createInputStream.create(), chunkSize).asJava + def create(createInputStream: Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = + apply(() ⇒ createInputStream.create(), chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]] } diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala index 436bf5dbde..91fa91b3ed 100644 --- a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala @@ -38,7 +38,7 @@ object OutputStreamSink { * * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. */ - def create(f: Creator[OutputStream]): javadsl.Sink[ByteString, Future[Long]] = - apply(() ⇒ f.create()).asJava + def create(f: Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] = + apply(() ⇒ f.create()).asJava.asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] } diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala index de7175b9dd..10da78cdce 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala @@ -28,7 +28,7 @@ object SynchronousFileSink { * unless configured otherwise by using [[ActorOperationAttributes]]. */ def apply(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = - new Sink(new SynchronousFileSink(f, append, DefaultAttributes, Sink.shape("SynchronousFileSink"))) + new Sink(new impl.SynchronousFileSink(f, append, DefaultAttributes, Sink.shape("SynchronousFileSink"))) /** * Java API @@ -41,8 +41,8 @@ object SynchronousFileSink { * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, * unless configured otherwise by using [[ActorOperationAttributes]]. */ - def create(f: File): javadsl.Sink[ByteString, Future[Long]] = - apply(f, append = false).asJava + def create(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = + apply(f, append = false).asJava.asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] /** * Java API @@ -54,7 +54,7 @@ object SynchronousFileSink { * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, * unless configured otherwise by using [[ActorOperationAttributes]]. */ - def appendTo(f: File): javadsl.Sink[ByteString, Future[Long]] = - apply(f, append = true).asJava + def appendTo(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = + apply(f, append = true).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] } diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala index a48131b155..3764c558ed 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -41,7 +41,7 @@ object SynchronousFileSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(f: File): javadsl.Source[ByteString, Future[Long]] = + def create(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = create(f, DefaultChunkSize) /** @@ -54,7 +54,7 @@ object SynchronousFileSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[Long]] = - apply(f, chunkSize).asJava + def create(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = + apply(f, chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]] }