diff --git a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index 2cc6c67175..915d582a15 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -46,8 +46,8 @@ class FileSourcesBenchmark { @Param(Array("2048")) val bufSize = 0 - var fileChannelSource: Source[ByteString, Future[Long]] = _ - var fileInputStreamSource: Source[ByteString, Future[Long]] = _ + var fileChannelSource: Source[ByteString, Future[IOResult]] = _ + var fileInputStreamSource: Source[ByteString, Future[IOResult]] = _ var ioSourceLinesIterator: Source[ByteString, NotUsed] = _ @Setup diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java index d0f7ee97cc..67d885db8e 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java @@ -11,6 +11,7 @@ import java.io.IOException; import akka.Done; import akka.actor.ActorSystem; import akka.stream.ActorAttributes; +import akka.stream.io.IOResult; import akka.stream.javadsl.Sink; import akka.stream.javadsl.FileIO; import docs.stream.SilenceSystemOut; @@ -58,7 +59,7 @@ public class StreamFileDocTest { Sink> printlnSink = Sink.foreach(chunk -> System.out.println(chunk.utf8String())); - Future bytesWritten = + Future ioResult = FileIO.fromFile(file) .to(printlnSink) .run(mat); @@ -73,7 +74,7 @@ public class StreamFileDocTest { final File file = File.createTempFile(getClass().getName(), ".tmp"); try { - Sink> byteStringFutureSink = + Sink> fileSink = //#custom-dispatcher-code FileIO.toFile(file) .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher")); diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index cbc4c409ae..10ccbed2b0 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -54,7 +54,7 @@ Iterator. The most prominent use-case previously was to just repeat the previously received value:: // This no longer works! - Flow.of(Integer.class).expand(i -> i)(i -> new Pair<>(i, i)); + Flow.of(Integer.class).expand(i -> i)(i -> new Pair<>(i, i)); In Akka 2.4.x this is simplified to: @@ -73,8 +73,8 @@ In Akka 2.4.x this is formulated like so: .. includecode:: ../code/docs/stream/MigrationsJava.java#expand-state -Changed Sinks -============= +Changed Sources / Sinks +======================= Sink.asPublisher is now configured using an enum ------------------------------------------------ @@ -85,3 +85,16 @@ In order to not use a meaningless boolean parameter we have changed the signatur .. includecode:: ../code/docs/stream/MigrationsJava.java#asPublisher +IO Sources / Sinks materialize IOResult +--------------------------------------- + +Materialized values of the following sources and sinks: + + * ``FileIO.fromFile`` + * ``FileIO.toFile`` + * ``StreamConverters.fromInputStream`` + * ``StreamConverters.fromOutputStream`` + +have been changed from ``Long`` to ``akka.stream.io.IOResult``. +This allows to signal more complicated completion scenarios. For example, on failure it is now possible +to return the exception and the number of bytes written until that exception occured. diff --git a/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index 5cb634444f..482c40297d 100644 --- a/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -6,6 +6,7 @@ package docs.stream.io import java.io.File import akka.stream._ +import akka.stream.io.IOResult import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit.Utils._ import akka.stream.testkit._ @@ -45,7 +46,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { //#file-source - val foreach: Future[Long] = FileIO.fromFile(file) + val foreach: Future[IOResult] = FileIO.fromFile(file) .to(Sink.ignore) .run() //#file-source diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 561fead548..9d7f4d5bd7 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -71,3 +71,19 @@ In Akka 2.4.x this is formulated like so: .. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-state +Changed Sources / Sinks +======================= + +IO Sources / Sinks materialize IOResult +--------------------------------------- + +Materialized values of the following sources and sinks: + + * ``FileIO.fromFile`` + * ``FileIO.toFile`` + * ``StreamConverters.fromInputStream`` + * ``StreamConverters.fromOutputStream`` + +have been changed from ``Long`` to ``akka.stream.io.IOResult``. +This allows to signal more complicated completion scenarios. For example, on failure it is now possible +to return the exception and the number of bytes written until that exception occured. diff --git a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java index fe9233120c..452ad07cb4 100644 --- a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java @@ -4,10 +4,12 @@ package akka.stream; +import org.scalatest.junit.JUnitSuite; + import akka.actor.ActorSystem; import akka.stream.javadsl.AkkaJUnitActorSystemResource; -public abstract class StreamTest { +public abstract class StreamTest extends JUnitSuite { final protected ActorSystem system; final protected ActorMaterializer materializer; diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java new file mode 100644 index 0000000000..c9040b5b35 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io; + +import akka.stream.StreamTest; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.StreamConverters; +import akka.stream.testkit.Utils; +import akka.util.ByteString; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class OutputStreamSinkTest extends StreamTest { + public OutputStreamSinkTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSink", + Utils.UnboundedMailboxConfig()); + @Test + public void mustSignalFailureViaIoResult() throws Exception { + + final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); + + final OutputStream os = new OutputStream() { + volatile int left = 3; + public void write(int data) { + if (left == 0) { + throw new RuntimeException("Can't accept more data."); + } + left -= 1; + } + }; + final Future resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer); + final IOResult result = Await.result(resultFuture, timeout); + + assertFalse(result.wasSuccessful()); + assertTrue(result.getError().getMessage().equals("Can't accept more data.")); + } + +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java index caaf023fef..9e58e7a21a 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java @@ -4,14 +4,26 @@ package akka.stream.javadsl; import static org.junit.Assert.assertEquals; + import java.util.Arrays; import java.util.Collections; +import org.junit.ClassRule; import org.junit.Test; import akka.stream.Attributes; +import akka.stream.StreamTest; +import akka.stream.testkit.AkkaSpec; -public class AttributesTest { +public class AttributesTest extends StreamTest { + + public AttributesTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("AttributesTest", + AkkaSpec.testConf()); final Attributes attributes = Attributes.name("a") diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index d254982998..c14f854a8f 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -106,7 +106,7 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals("a"); probe.expectMsgEquals("b"); probe.expectMsgEquals("c"); - probe.expectMsgEquals("()"); + probe.expectMsgEquals("Done"); } @Ignore("StatefulStage to be converted to GraphStage when Java Api is available (#18817)") @Test @@ -300,7 +300,7 @@ public class SourceTest extends StreamTest { } }), materializer); - probe.expectMsgClass(NotUsed.class); + probe.expectMsgClass(Done.class); } @Test diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index e7f697600b..5c95c05b50 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -46,8 +46,8 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val completion = Source(TestByteStrings) .runWith(FileIO.toFile(f)) - val size = Await.result(completion, 3.seconds) - size should equal(6006) + val result = Await.result(completion, 3.seconds) + result.count should equal(6006) checkFileContents(f, TestLines.mkString("")) } } @@ -64,9 +64,9 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val lastWrite = List("x" * 100) val completion2 = write(lastWrite) - val written2 = Await.result(completion2, 3.seconds) + val result = Await.result(completion2, 3.seconds) - written2 should ===(lastWrite.flatten.length) + result.count should ===(lastWrite.flatten.length) checkFileContents(f, lastWrite.mkString("") + TestLines.mkString("").drop(100)) } } @@ -79,13 +79,13 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { .runWith(FileIO.toFile(f, append = true)) val completion1 = write() - val written1 = Await.result(completion1, 3.seconds) + val result1 = Await.result(completion1, 3.seconds) val lastWrite = List("x" * 100) val completion2 = write(lastWrite) - val written2 = Await.result(completion2, 3.seconds) + val result2 = Await.result(completion2, 3.seconds) - f.length() should ===(written1 + written2) + f.length() should ===(result1.count + result2.count) checkFileContents(f, TestLines.mkString("") + lastWrite.mkString("") + "\n") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 9487456e91..527dfd3a68 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -7,17 +7,20 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel +import akka.Done import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } import akka.stream.actor.ActorPublisherMessage +import akka.stream.io.IOResult import akka.util.ByteString import scala.annotation.tailrec import scala.concurrent.Promise +import scala.util.{ Failure, Success } import scala.util.control.NonFatal /** INTERNAL API */ private[akka] object FilePublisher { - def props(f: File, completionPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { + def props(f: File, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") @@ -32,7 +35,7 @@ private[akka] object FilePublisher { } /** INTERNAL API */ -private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) +private[akka] final class FilePublisher(f: File, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { import FilePublisher._ @@ -101,8 +104,14 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long] override def postStop(): Unit = { super.postStop() - bytesReadPromise.trySuccess(readBytesTotal) - if (chan ne null) chan.close() + try { + if (chan ne null) chan.close() + } catch { + case ex: Exception ⇒ + completionPromise.success(IOResult(readBytesTotal, Failure(ex))) + } + + completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done))) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 807ffcde3a..095372d91a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -7,15 +7,18 @@ import java.io.File import java.nio.channels.FileChannel import java.util.Collections +import akka.Done import akka.actor.{ Deploy, ActorLogging, Props } +import akka.stream.io.IOResult import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString import scala.concurrent.Promise +import scala.util.{ Failure, Success } /** INTERNAL API */ private[akka] object FileSubscriber { - def props(f: File, completionPromise: Promise[Long], bufSize: Int, append: Boolean) = { + def props(f: File, completionPromise: Promise[IOResult], bufSize: Int, append: Boolean) = { require(bufSize > 0, "buffer size must be > 0") Props(classOf[FileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local) } @@ -26,7 +29,7 @@ private[akka] object FileSubscriber { } /** INTERNAL API */ -private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bufSize: Int, append: Boolean) +private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult], bufSize: Int, append: Boolean) extends akka.stream.actor.ActorSubscriber with ActorLogging { @@ -43,7 +46,7 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], super.preStart() } catch { case ex: Exception ⇒ - bytesWrittenPromise.failure(ex) + completionPromise.success(IOResult(bytesWritten, Failure(ex))) cancel() } @@ -53,12 +56,13 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bytesWritten += chan.write(bytes.asByteBuffer) } catch { case ex: Exception ⇒ - bytesWrittenPromise.failure(ex) + completionPromise.success(IOResult(bytesWritten, Failure(ex))) cancel() } - case ActorSubscriberMessage.OnError(cause) ⇒ - log.error(cause, "Tearing down FileSink({}) due to upstream error", f.getAbsolutePath) + case ActorSubscriberMessage.OnError(ex) ⇒ + log.error(ex, "Tearing down FileSink({}) due to upstream error", f.getAbsolutePath) + completionPromise.success(IOResult(bytesWritten, Failure(ex))) context.stop(self) case ActorSubscriberMessage.OnComplete ⇒ @@ -66,15 +70,20 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], chan.force(true) } catch { case ex: Exception ⇒ - bytesWrittenPromise.failure(ex) + completionPromise.success(IOResult(bytesWritten, Failure(ex))) } context.stop(self) } override def postStop(): Unit = { - bytesWrittenPromise.trySuccess(bytesWritten) + try { + if (chan ne null) chan.close() + } catch { + case ex: Exception ⇒ + completionPromise.success(IOResult(bytesWritten, Failure(ex))) + } - if (chan ne null) chan.close() + completionPromise.trySuccess(IOResult(bytesWritten, Success(Done))) super.postStop() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index c7fad7964c..6ed6012765 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -4,6 +4,7 @@ package akka.stream.impl.io import java.io.{ File, OutputStream } +import akka.stream.io.IOResult import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module import akka.stream.impl.Stages.DefaultAttributes.IODispatcher @@ -18,21 +19,21 @@ import scala.concurrent.{ Future, Promise } * (creating it before hand if necessary). */ private[akka] final class FileSink(f: File, append: Boolean, val attributes: Attributes, shape: SinkShape[ByteString]) - extends SinkModule[ByteString, Future[Long]](shape) { + extends SinkModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { val materializer = ActorMaterializer.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) - val bytesWrittenPromise = Promise[Long]() - val props = FileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append) + val ioResultPromise = Promise[IOResult]() + val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, append) val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) - (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) + (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) } - override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[Long]] = + override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] = new FileSink(f, append, attributes, shape) override def withAttributes(attr: Attributes): Module = @@ -45,22 +46,22 @@ private[akka] final class FileSink(f: File, append: Boolean, val attributes: Att * (creating it before hand if necessary). */ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString]) - extends SinkModule[ByteString, Future[Long]](shape) { + extends SinkModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { val materializer = ActorMaterializer.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) - val bytesWrittenPromise = Promise[Long]() + val ioResultPromise = Promise[IOResult]() val os = createOutput() // if it fails, we fail the materialization - val props = OutputStreamSubscriber.props(os, bytesWrittenPromise, settings.maxInputBufferSize) + val props = OutputStreamSubscriber.props(os, ioResultPromise, settings.maxInputBufferSize) val ref = materializer.actorOf(context, props) - (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) + (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) } - override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[Long]] = + override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] = new OutputStreamSink(createOutput, attributes, shape) override def withAttributes(attr: Attributes): Module = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 2880364714..7c1dabb59d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -7,6 +7,7 @@ import java.io.{ File, InputStream } import akka.stream._ import akka.stream.ActorAttributes.Dispatcher +import akka.stream.io.IOResult import akka.stream.impl.StreamLayout.Module import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.{ ErrorPublisher, SourceModule } @@ -19,23 +20,23 @@ import scala.concurrent.{ Future, Promise } * Creates simple synchronous (Java 6 compatible) Source backed by the given file. */ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) - extends SourceModule[ByteString, Future[Long]](shape) { + extends SourceModule[ByteString, Future[IOResult]](shape) { require(chunkSize > 0, "chunkSize must be greater than 0") override def create(context: MaterializationContext) = { // FIXME rewrite to be based on GraphStage rather than dangerous downcasts val materializer = ActorMaterializer.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) - val bytesReadPromise = Promise[Long]() - val props = FilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) + val ioResultPromise = Promise[IOResult]() + val props = FilePublisher.props(f, ioResultPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) - (akka.stream.actor.ActorPublisher[ByteString](ref), bytesReadPromise.future) + (akka.stream.actor.ActorPublisher[ByteString](ref), ioResultPromise.future) } - override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[Long]] = + override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] = new FileSource(f, chunkSize, attributes, shape) override def withAttributes(attr: Attributes): Module = @@ -47,28 +48,28 @@ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: At * Source backed by the given input stream. */ private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) - extends SourceModule[ByteString, Future[Long]](shape) { + extends SourceModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { val materializer = ActorMaterializer.downcast(context.materializer) - val bytesReadPromise = Promise[Long]() + val ioResultPromise = Promise[IOResult]() val pub = try { val is = createInputStream() // can throw, i.e. FileNotFound - val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize) + val props = InputStreamPublisher.props(is, ioResultPromise, chunkSize) val ref = materializer.actorOf(context, props) akka.stream.actor.ActorPublisher[ByteString](ref) } catch { case ex: Exception ⇒ - bytesReadPromise.failure(ex) + ioResultPromise.failure(ex) ErrorPublisher(ex, attributes.nameOrDefault("inputStreamSource")).asInstanceOf[Publisher[ByteString]] } - (pub, bytesReadPromise.future) + (pub, ioResultPromise.future) } - override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[Long]] = + override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] = new InputStreamSource(createInputStream, chunkSize, attributes, shape) override def withAttributes(attr: Attributes): Module = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index 7300f9652f..8ccd298853 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -5,19 +5,22 @@ package akka.stream.impl.io import java.io.InputStream +import akka.Done import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } import akka.io.DirectByteBufferPool import akka.stream.actor.ActorPublisherMessage +import akka.stream.io.IOResult import akka.util.ByteString import akka.util.ByteString.ByteString1C import scala.annotation.tailrec import scala.concurrent.Promise +import scala.util.{ Failure, Success } /** INTERNAL API */ private[akka] object InputStreamPublisher { - def props(is: InputStream, completionPromise: Promise[Long], chunkSize: Int): Props = { + def props(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int): Props = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize).withDeploy(Deploy.local) @@ -27,7 +30,7 @@ private[akka] object InputStreamPublisher { } /** INTERNAL API */ -private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Promise[Long], chunkSize: Int) +private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { @@ -73,8 +76,14 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom override def postStop(): Unit = { super.postStop() - bytesReadPromise.trySuccess(readBytesTotal) - if (is ne null) is.close() + try { + if (is ne null) is.close() + } catch { + case ex: Exception ⇒ + completionPromise.success(IOResult(readBytesTotal, Failure(ex))) + } + + completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done))) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index a82f875019..87d0b9d057 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -5,15 +5,18 @@ package akka.stream.impl.io import java.io.OutputStream +import akka.Done import akka.actor.{ Deploy, ActorLogging, Props } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } +import akka.stream.io.IOResult import akka.util.ByteString import scala.concurrent.Promise +import scala.util.{ Failure, Success } /** INTERNAL API */ private[akka] object OutputStreamSubscriber { - def props(os: OutputStream, completionPromise: Promise[Long], bufSize: Int) = { + def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int) = { require(bufSize > 0, "buffer size must be > 0") Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize).withDeploy(Deploy.local) } @@ -21,7 +24,7 @@ private[akka] object OutputStreamSubscriber { } /** INTERNAL API */ -private[akka] class OutputStreamSubscriber(os: OutputStream, bytesWrittenPromise: Promise[Long], bufSize: Int) +private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int) extends akka.stream.actor.ActorSubscriber with ActorLogging { @@ -37,12 +40,13 @@ private[akka] class OutputStreamSubscriber(os: OutputStream, bytesWrittenPromise bytesWritten += bytes.length } catch { case ex: Exception ⇒ - bytesWrittenPromise.failure(ex) + completionPromise.success(IOResult(bytesWritten, Failure(ex))) cancel() } - case ActorSubscriberMessage.OnError(cause) ⇒ - log.error(cause, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {}", bytesWritten) + case ActorSubscriberMessage.OnError(ex) ⇒ + log.error(ex, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {}", bytesWritten) + completionPromise.success(IOResult(bytesWritten, Failure(ex))) context.stop(self) case ActorSubscriberMessage.OnComplete ⇒ @@ -51,9 +55,14 @@ private[akka] class OutputStreamSubscriber(os: OutputStream, bytesWrittenPromise } override def postStop(): Unit = { - bytesWrittenPromise.trySuccess(bytesWritten) + try { + if (os ne null) os.close() + } catch { + case ex: Exception ⇒ + completionPromise.success(IOResult(bytesWritten, Failure(ex))) + } - if (os ne null) os.close() + completionPromise.trySuccess(IOResult(bytesWritten, Success(Done))) super.postStop() } } diff --git a/akka-stream/src/main/scala/akka/stream/io/IOResult.scala b/akka-stream/src/main/scala/akka/stream/io/IOResult.scala new file mode 100644 index 0000000000..c65e2d4e32 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/IOResult.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.stream.io + +import akka.Done +import scala.util.{ Failure, Success, Try } + +/** + * Holds a result of an IO operation. + * + * @param count Numeric value depending on context, for example IO operations performed or bytes processed. + * @param status Status of the result. Can be either [[akka.Done]] or an exception. + */ +final case class IOResult private[stream] (count: Long, status: Try[Done]) { + + /** + * Java API: Numeric value depending on context, for example IO operations performed or bytes processed. + */ + def getCount: Long = count + + /** + * Java API: Indicates whether IO operation completed successfully or not. + */ + def wasSuccessful: Boolean = status.isSuccess + + /** + * Java API: If the IO operation resulted in an error, returns the corresponding [[Throwable]] + * or throws [[UnsupportedOperationException]] otherwise. + */ + def getError: Throwable = status match { + case Failure(t) ⇒ t + case Success(_) ⇒ throw new UnsupportedOperationException("IO operation was successfull.") + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 35349c70f4..a927cc2ae0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -7,6 +7,7 @@ import java.io.{ InputStream, OutputStream, File } import akka.japi.function import akka.stream.{ scaladsl, javadsl, ActorAttributes } +import akka.stream.io.IOResult import akka.util.ByteString import scala.concurrent.Future @@ -21,20 +22,22 @@ object FileIO { * Overwrites existing files, if you want to append to an existing file use [[#file(File, Boolean)]] and * pass in `true` as the Boolean argument. * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * @param f The file to write to */ - def toFile(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = toFile(f, append = false) + def toFile(f: File): javadsl.Sink[ByteString, Future[IOResult]] = toFile(f, append = false) /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file and either overwrites * or appends to it. * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. @@ -42,8 +45,8 @@ object FileIO { * @param f The file to write to * @param append Whether or not the file should be overwritten or appended to */ - def toFile(f: File, append: Boolean): javadsl.Sink[ByteString, Future[java.lang.Long]] = - new Sink(scaladsl.FileIO.toFile(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + def toFile(f: File, append: Boolean): javadsl.Sink[ByteString, Future[IOResult]] = + new Sink(scaladsl.FileIO.toFile(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[IOResult]]] /** * Creates a Source from a Files contents. @@ -53,9 +56,10 @@ object FileIO { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. */ - def fromFile(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = fromFile(f, 8192) + def fromFile(f: File): javadsl.Source[ByteString, Future[IOResult]] = fromFile(f, 8192) /** * Creates a synchronous (Java 6 compatible) Source from a Files contents. @@ -65,9 +69,10 @@ object FileIO { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. */ - def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = - new Source(scaladsl.FileIO.fromFile(f, chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] + def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[IOResult]] = + new Source(scaladsl.FileIO.fromFile(f, chunkSize)).asInstanceOf[Source[ByteString, Future[IOResult]]] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 64ae0ade08..d75bfca3ec 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -7,6 +7,7 @@ import java.io.{ InputStream, OutputStream } import akka.japi.function import akka.stream.{ scaladsl, javadsl, ActorAttributes } +import akka.stream.io.IOResult import akka.util.ByteString import scala.concurrent.Future @@ -19,15 +20,16 @@ object StreamConverters { /** * Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * @param f A Creator which creates an OutputStream to write to */ - def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] = - new Sink(scaladsl.StreamConverters.fromOutputStream(() ⇒ f.create())).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, Future[IOResult]] = + new Sink(scaladsl.StreamConverters.fromOutputStream(() ⇒ f.create())).asInstanceOf[javadsl.Sink[ByteString, Future[IOResult]]] /** * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible @@ -67,8 +69,8 @@ object StreamConverters { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def fromInputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = - new Source(scaladsl.StreamConverters.fromInputStream(() ⇒ in.create(), chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] + def fromInputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[IOResult]] = + new Source(scaladsl.StreamConverters.fromInputStream(() ⇒ in.create(), chunkSize)).asInstanceOf[Source[ByteString, Future[IOResult]]] /** * Creates a Source from an [[java.io.InputStream]] created by the given function. @@ -78,9 +80,10 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. */ - def fromInputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = fromInputStream(in, 8192) + def fromInputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, Future[IOResult]] = fromInputStream(in, 8192) /** * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 82006c58d8..14d2118081 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import java.io.{ OutputStream, InputStream, File } import akka.stream.ActorAttributes +import akka.stream.io.IOResult import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io._ import akka.util.ByteString @@ -29,24 +30,26 @@ object FileIO { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. * * @param f the File to read from * @param chunkSize the size of each read operation, defaults to 8192 */ - def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = + def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource"))) /** * Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites * or appends to it. * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. * * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, * unless configured otherwise by using [[ActorAttributes]]. */ - def toFile(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = + def toFile(f: File, append: Boolean = false): Sink[ByteString, Future[IOResult]] = new Sink(new FileSink(f, append, DefaultAttributes.fileSink, sinkShape("FileSink"))) -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 32f5c34476..db34ba79a4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import java.io.{ OutputStream, InputStream } import akka.stream.ActorAttributes +import akka.stream.io.IOResult import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, OutputStreamSourceStage, InputStreamSource } import akka.util.ByteString @@ -29,12 +30,13 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. * * @param in a function which creates the InputStream to read from * @param chunkSize the size of each read operation, defaults to 8192 */ - def fromInputStream(in: () ⇒ InputStream, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = + def fromInputStream(in: () ⇒ InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = new Source(new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, sourceShape("InputStreamSource"))) /** @@ -54,12 +56,13 @@ object StreamConverters { /** * Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. */ - def fromOutputStream(out: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = + def fromOutputStream(out: () ⇒ OutputStream): Sink[ByteString, Future[IOResult]] = new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"))) /** diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c7ee0109a6..e735e22d3e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -192,7 +192,7 @@ object Dependencies { lazy val streamTestkit = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit) - lazy val streamTests = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit, Test.junitIntf, Test.commonsIo) + lazy val streamTests = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit, Test.commonsIo) lazy val streamTestsTck = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit, Test.reactiveStreamsTck) @@ -217,4 +217,4 @@ object DependencyHelpers { */ def versionDependentDeps(modules: ScalaVersionDependentModuleID*): Def.Setting[Seq[ModuleID]] = libraryDependencies <++= scalaVersion(version => modules.flatMap(m => m.modules(version))) -} \ No newline at end of file +}