+str #17338 add OutputStreamSource and InputStreamSink

This commit is contained in:
Alexander Golubev 2015-07-12 23:04:26 -04:00
parent fc0ecfebef
commit 8ea52a6bb4
24 changed files with 1076 additions and 32 deletions

View file

@ -118,7 +118,7 @@ Streaming data from a file is as easy as defining a `SynchronousFileSource` give
Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured
threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest
of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom
dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.file-io-dispatcher``,
dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.blocking-io-dispatcher``,
or for a specific stage by specifying a custom Dispatcher in code, like this:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamFileDocTest.java#custom-dispatcher-code

View file

@ -57,7 +57,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
"configure dispatcher in code" in {
//#custom-dispatcher-code
SynchronousFileSink(file)
.withAttributes(ActorAttributes.dispatcher("custom-file-io-dispatcher"))
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))
//#custom-dispatcher-code
}

View file

@ -118,7 +118,7 @@ Streaming data from a file is as easy as defining a `SynchronousFileSource` give
Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured
threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest
of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom
dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.file-io-dispatcher``,
dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.blocking-io-dispatcher``,
or for a specific stage by specifying a custom Dispatcher in code, like this:
.. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#custom-dispatcher-code

View file

@ -39,5 +39,5 @@ akka.http.routing {
# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors for IO operations.
file-io-dispatcher = ${akka.stream.file-io-dispatcher}
file-io-dispatcher = ${akka.stream.blocking-io-dispatcher}
}

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io;
import akka.japi.Pair;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.Utils;
import akka.util.ByteString;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
public class InputStreamSinkTest extends StreamTest {
public InputStreamSinkTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("InputStreamSink",
Utils.UnboundedMailboxConfig());
@Test
public void mustReadEventViaInputStream() throws Exception {
final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS);
final Sink<ByteString, InputStream> sink = InputStreamSink.create(timeout);
final List<ByteString> list = Collections.singletonList(ByteString.fromString("a"));
final InputStream stream = Source.from(list).runWith(sink, materializer);
byte[] a = new byte[1];
stream.read(a);
assertTrue(Arrays.equals("a".getBytes(), a));
}
}

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.japi.function.Procedure;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.Utils;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class OutputStreamSourceTest extends StreamTest {
public OutputStreamSourceTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSource",
Utils.UnboundedMailboxConfig());
@Test
public void mustSendEventsViaOutputStream() throws Exception {
final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS);
final JavaTestKit probe = new JavaTestKit(system);
final Source<ByteString, OutputStream> source = OutputStreamSource.create(timeout);
final OutputStream s = source.to(Sink.foreach(new Procedure<ByteString>() {
public void apply(ByteString elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
})).run(materializer);
s.write("a".getBytes());
assertEquals(ByteString.fromString("a"), probe.receiveOne(timeout));
s.close();
}
}

View file

@ -0,0 +1,241 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.{ IOException, InputStream }
import java.util.concurrent.TimeoutException
import akka.actor.{ ActorSystem, NoSerializationVerificationNeeded }
import akka.stream._
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.io.InputStreamSinkStage
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
import akka.stream.scaladsl.{ Keep, Sink }
import akka.stream.stage.InHandler
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.TestProbe
import akka.util.ByteString
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import scala.util.Random
import scala.util.control.NoStackTrace
class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
import system.dispatcher
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val timeout = 300.milliseconds
def randomArray(size: Int) = {
val a = new Array[Byte](size)
Random.nextBytes(a)
a
}
val byteArray = randomArray(3)
val byteString = ByteString(byteArray)
def newArray() = new Array[Byte](3)
def expectSuccess[T](f: Future[T], value: T) =
Await.result(f, timeout) should be(value)
object InputStreamSinkTestMessages {
case object Push extends NoSerializationVerificationNeeded
case object Finish extends NoSerializationVerificationNeeded
case class Failure(ex: Throwable) extends NoSerializationVerificationNeeded
}
def testSink(probe: TestProbe): Sink[ByteString, InputStream] = {
class InputStreamSinkTestStage(val timeout: FiniteDuration)
extends InputStreamSinkStage(timeout) {
override def createLogicAndMaterializedValue = {
val (logic, inputStream) = super.createLogicAndMaterializedValue
val inHandler = logic.inHandlers(in.id)
logic.inHandlers(in.id) = new InHandler {
override def onPush(): Unit = {
probe.ref ! InputStreamSinkTestMessages.Push
inHandler.onPush()
}
override def onUpstreamFinish(): Unit = {
probe.ref ! InputStreamSinkTestMessages.Finish
inHandler.onUpstreamFinish()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
probe.ref ! InputStreamSinkTestMessages.Failure(ex)
inHandler.onUpstreamFailure(ex)
}
}
(logic, inputStream)
}
}
Sink.fromGraph(new InputStreamSinkTestStage(timeout))
}
"InputStreamSink" must {
"read bytes from InputStream" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
probe.sendNext(byteString)
val arr = newArray()
inputStream.read(arr)
arr should ===(byteArray)
probe.sendComplete()
inputStream.close()
}
"read bytes correctly if requested by InputStream not in chunk size" in assertAllStagesStopped {
val sinkProbe = TestProbe()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run()
probe.sendNext(byteString)
val byteArray2 = randomArray(3)
probe.sendNext(ByteString(byteArray2))
sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push)
val arr = new Array[Byte](2)
inputStream.read(arr)
arr should ===(Array(byteArray(0), byteArray(1)))
inputStream.read(arr)
arr should ===(Array(byteArray(2), byteArray2(0)))
inputStream.read(arr)
arr should ===(Array(byteArray2(1), byteArray2(2)))
inputStream.close()
}
"returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val data = randomArray(2)
probe.sendNext(ByteString(data))
val arr = newArray()
inputStream.read(arr) should ===(2)
arr should ===(Array(data(0), data(1), 0))
probe.sendComplete()
inputStream.close()
}
"block read until get requested number of bytes from upstream" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val arr = newArray()
val f = Future(inputStream.read(arr))
the[Exception] thrownBy Await.result(f, timeout) shouldBe a[TimeoutException]
probe.sendNext(byteString)
expectSuccess(f, 3)
probe.sendComplete()
inputStream.read(newArray())
inputStream.close()
}
"fill up buffer by default" in assertAllStagesStopped {
import system.dispatcher
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val array2 = randomArray(3)
probe.sendNext(byteString)
probe.sendNext(ByteString(array2))
val arr1 = newArray()
val arr2 = newArray()
val f1 = Future(inputStream.read(arr1))
val f2 = Future(inputStream.read(arr2))
Await.result(f1, timeout) should be(3)
Await.result(f2, timeout) should be(3)
arr1 should ===(byteString)
arr2 should ===(array2)
probe.sendComplete()
inputStream.close()
}
"throw error when reactive stream is closed" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
probe.sendNext(byteString)
inputStream.close()
probe.expectCancellation()
the[Exception] thrownBy inputStream.read(newArray()) shouldBe a[IOException]
}
"return all data when upstream is completed" in assertAllStagesStopped {
val sinkProbe = TestProbe()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run()
val bytes = randomArray(1)
probe.sendNext(ByteString(bytes))
sinkProbe.expectMsg(InputStreamSinkTestMessages.Push)
probe.sendComplete()
sinkProbe.expectMsg(InputStreamSinkTestMessages.Finish)
val arr = newArray()
val f = Future(inputStream.read(arr))
expectSuccess(f, 1)
arr should ===(Array[Byte](bytes(0), 0, 0))
}
"return -1 when read after stream is completed" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
probe.sendNext(byteString)
val arr = newArray()
inputStream.read(arr)
arr should ===(byteArray)
probe.sendComplete()
Await.result(Future(inputStream.read(arr)), timeout) should ===(-1)
inputStream.close()
}
"return IOException when stream is failed" in assertAllStagesStopped {
val sinkProbe = TestProbe()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run()
val ex = new RuntimeException("Stream failed.") with NoStackTrace
probe.sendNext(byteString)
sinkProbe.expectMsg(InputStreamSinkTestMessages.Push)
val arr = newArray()
inputStream.read(arr)
probe.sendError(ex)
sinkProbe.expectMsg(InputStreamSinkTestMessages.Failure(ex))
val p = Future(inputStream.read(arr))
p.onFailure {
case e
(e.isInstanceOf[IOException] && e.getCause.equals(ex)) should ===(true)
Unit
}
p.onSuccess { case _ fail() }
}
"use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorMaterializer()(sys)
try {
TestSource.probe[ByteString].runWith(InputStreamSink())(mat)
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "InputStreamSink").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}
}
}

View file

@ -0,0 +1,179 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.{ IOException, OutputStream }
import java.util.concurrent.TimeoutException
import akka.actor.{ ActorSystem, NoSerializationVerificationNeeded }
import akka.stream._
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.io.OutputStreamSourceStage
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
import akka.stream.scaladsl.{ Keep, Source }
import akka.stream.stage.OutHandler
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestProbe
import akka.util.ByteString
import scala.concurrent.duration.Duration.Zero
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import scala.util.Random
class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
import system.dispatcher
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val timeout = 300.milliseconds
val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte])
val byteString = ByteString(bytesArray)
def expectTimeout[T](f: Future[T], timeout: Duration) =
the[Exception] thrownBy Await.result(f, timeout) shouldBe a[TimeoutException]
def expectSuccess[T](f: Future[T], value: T) =
Await.result(f, timeout) should be(value)
object OutputStreamSourceTestMessages {
case object Pull extends NoSerializationVerificationNeeded
case object Finish extends NoSerializationVerificationNeeded
}
def testSource(probe: TestProbe): Source[ByteString, OutputStream] = {
class OutputStreamSourceTestStage(val timeout: FiniteDuration)
extends OutputStreamSourceStage(timeout) {
override def createLogicAndMaterializedValue = {
val (logic, inputStream) = super.createLogicAndMaterializedValue
val outHandler = logic.outHandlers(out.id)
logic.outHandlers(out.id) = new OutHandler {
override def onDownstreamFinish(): Unit = {
probe.ref ! OutputStreamSourceTestMessages.Finish
outHandler.onDownstreamFinish()
}
override def onPull(): Unit = {
probe.ref ! OutputStreamSourceTestMessages.Pull
outHandler.onPull()
}
}
(logic, inputStream)
}
}
Source.fromGraph(new OutputStreamSourceTestStage(timeout))
}
"OutputStreamSource" must {
"read bytes from OutputStream" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
s.request(1)
probe.expectNext(byteString)
outputStream.close()
probe.expectComplete()
}
"block flush call until send all buffer to downstream" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
val f = Future(outputStream.flush())
expectTimeout(f, timeout)
probe.expectNoMsg(Zero)
s.request(1)
expectSuccess(f, ())
probe.expectNext(byteString)
outputStream.close()
probe.expectComplete()
}
"not block flushes when buffer is empty" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
val f = Future(outputStream.flush())
s.request(1)
expectSuccess(f, ())
probe.expectNext(byteString)
val f2 = Future(outputStream.flush())
expectSuccess(f2, ())
outputStream.close()
probe.expectComplete()
}
"block writes when buffer is full" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both)
.withAttributes(Attributes.inputBuffer(16, 16)).run
val s = probe.expectSubscription()
(1 to 16).foreach { _ outputStream.write(bytesArray) }
//blocked call
val f = Future(outputStream.write(bytesArray))
expectTimeout(f, timeout)
probe.expectNoMsg(Zero)
s.request(17)
expectSuccess(f, ())
probe.expectNextN(List.fill(17)(byteString).toSeq)
outputStream.close()
probe.expectComplete()
}
"throw error when write after stream is closed" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
probe.expectSubscription()
outputStream.close()
probe.expectComplete()
the[Exception] thrownBy outputStream.write(bytesArray) shouldBe a[IOException]
}
"use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorMaterializer()(sys)
try {
OutputStreamSource().runWith(TestSink.probe[ByteString])(mat)
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "OutputStreamSource").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}
"throw IOException when writing to the stream after the subscriber has cancelled the reactive stream" in assertAllStagesStopped {
val sourceProbe = TestProbe()
val (outputStream, probe) = testSource(sourceProbe).toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
s.request(1)
sourceProbe.expectMsg(OutputStreamSourceTestMessages.Pull)
probe.expectNext(byteString)
s.cancel()
sourceProbe.expectMsg(OutputStreamSourceTestMessages.Finish)
the[Exception] thrownBy outputStream.write(bytesArray) shouldBe a[IOException]
}
}
}

View file

@ -91,7 +91,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
}
"use dedicated file-io-dispatcher by default" in assertAllStagesStopped {
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
targetFile { f
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorMaterializer()(sys)
@ -102,7 +102,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
assertDispatcher(ref, "akka.stream.default-file-io-dispatcher")
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}
}

View file

@ -166,7 +166,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
}
"use dedicated file-io-dispatcher by default" in assertAllStagesStopped {
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorMaterializer()(sys)
implicit val timeout = Timeout(500.millis)
@ -176,7 +176,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
try assertDispatcher(ref, "akka.stream.default-file-io-dispatcher") finally p.cancel()
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel()
} finally shutdown(sys)
}

View file

@ -138,12 +138,7 @@ class GraphFlowSpec extends AkkaSpec {
"work with a Sink when having KeyedSource inside" in {
val probe = TestSubscriber.manualProbe[Int]()
val source = Source.fromGraph(FlowGraph.create(Source.subscriber[Int]) { implicit b
subSource
SourceShape(subSource.outlet)
})
val source = Source.subscriber[Int]
val mm: Subscriber[Int] = source.to(Sink(probe)).run()
source1.to(Sink(mm)).run()

View file

@ -45,9 +45,9 @@ akka {
# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors for IO operations,
# such as FileSource, FileSink and others.
file-io-dispatcher = "akka.stream.default-file-io-dispatcher"
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
default-file-io-dispatcher {
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1

View file

@ -78,6 +78,7 @@ private[stream] object Stages {
val synchronousFileSource = name("synchronousFileSource")
val inputStreamSource = name("inputStreamSource")
val acknowledgeSource = name("acknowledgeSource")
val outputStreamSource = name("outputStreamSource")
val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink")
@ -90,6 +91,7 @@ private[stream] object Stages {
val synchronousFileSink = name("synchronousFileSink")
val outputStreamSink = name("outputStreamSink")
val acknowledgeSink = name("acknowledgeSink")
val inputStreamSink = name("inputStreamSink")
}
import DefaultAttributes._

View file

@ -4,11 +4,12 @@ import akka.stream.ActorAttributes.Dispatcher
import akka.stream.{ ActorMaterializer, MaterializationContext }
private[stream] object IOSettings {
/** Picks default akka.stream.file-io-dispatcher or the Attributes configured one */
def fileIoDispatcher(context: MaterializationContext): String = {
/** Picks default akka.stream.blocking-io-dispatcher or the Attributes configured one */
def blockingIoDispatcher(context: MaterializationContext): String = {
val mat = ActorMaterializer.downcast(context.materializer)
context.effectiveAttributes.attributeList.collectFirst { case d: Dispatcher d.dispatcher } getOrElse {
mat.system.settings.config.getString("akka.stream.file-io-dispatcher")
mat.system.settings.config.getString("akka.stream.blocking-io-dispatcher")
}
}
}

View file

@ -4,10 +4,11 @@
package akka.stream.impl.io
import java.io.{ File, OutputStream }
import java.lang.{ Long JLong }
import akka.stream._
import akka.stream.impl.SinkModule
import akka.stream.impl.StreamLayout.Module
import akka.stream.{ ActorMaterializer, MaterializationContext, Attributes, SinkShape }
import akka.util.ByteString
import scala.concurrent.{ Future, Promise }
@ -26,7 +27,7 @@ private[akka] final class SynchronousFileSink(f: File, append: Boolean, val attr
val bytesWrittenPromise = Promise[Long]()
val props = SynchronousFileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append)
val dispatcher = IOSettings.fileIoDispatcher(context)
val dispatcher = IOSettings.blockingIoDispatcher(context)
val ref = mat.actorOf(context, props.withDispatcher(dispatcher))
(akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future)
@ -66,3 +67,4 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va
override def withAttributes(attr: Attributes): Module =
new OutputStreamSink(createOutput, attr, amendShape(attr))
}

View file

@ -3,15 +3,22 @@
*/
package akka.stream.impl.io
import java.io.{ File, InputStream }
import java.io.{ File, IOException, InputStream, OutputStream }
import java.lang.{ Long JLong }
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import akka.actor.{ ActorRef, Deploy }
import akka.japi
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.{ ErrorPublisher, SourceModule }
import akka.util.ByteString
import akka.stream.scaladsl.{ Source, FlowGraph }
import akka.util.{ ByteString, Timeout }
import org.reactivestreams._
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future, Promise }
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -26,7 +33,7 @@ private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val att
val bytesReadPromise = Promise[Long]()
val props = SynchronousFilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize)
val dispatcher = IOSettings.fileIoDispatcher(context)
val dispatcher = IOSettings.blockingIoDispatcher(context)
val ref = mat.actorOf(context, props.withDispatcher(dispatcher))

View file

@ -0,0 +1,200 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.io
import java.io.{ IOException, InputStream }
import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit }
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.io.InputStreamSinkStage._
import akka.stream.stage._
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
private[akka] object InputStreamSinkStage {
sealed trait AdapterToStageMessage
case object ReadElementAcknowledgement extends AdapterToStageMessage
case object Close extends AdapterToStageMessage
sealed trait StreamToAdapterMessage
case class Data(data: ByteString) extends StreamToAdapterMessage
case object Finished extends StreamToAdapterMessage
case class Failed(cause: Throwable) extends StreamToAdapterMessage
sealed trait StageWithCallback {
def wakeUp(msg: AdapterToStageMessage): Unit
}
}
/**
* INTERNAL API
*/
private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSinkStage") {
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogicAndMaterializedValue: (GraphStageLogic, InputStream) = {
val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1)
var pullRequestIsSent = true
val logic = new GraphStageLogic(shape) with StageWithCallback {
private val callback: AsyncCallback[AdapterToStageMessage] =
getAsyncCallback(onAsyncMessage)
override def wakeUp(msg: AdapterToStageMessage): Unit = {
if (!isClosed(in)) {
Future(callback.invoke(msg))(interpreter.materializer.executionContext)
}
}
private def onAsyncMessage(event: AdapterToStageMessage): Unit =
event match {
case ReadElementAcknowledgement
sendPullIfAllowed()
case Close
completeStage()
}
private def sendPullIfAllowed(): Unit =
if (!pullRequestIsSent) {
pullRequestIsSent = true
pull(in)
}
override def preStart() = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
//1 is buffer for Finished or Failed callback
require(dataQueue.remainingCapacity() > 1)
pullRequestIsSent = false
dataQueue.add(Data(grab(in)))
if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed()
}
override def onUpstreamFinish(): Unit = {
dataQueue.add(Finished)
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
dataQueue.add(Failed(ex))
failStage(ex)
}
})
}
(logic, new InputStreamAdapter(dataQueue, logic.wakeUp, timeout))
}
}
/**
* INTERNAL API
* InputStreamAdapter that interacts with InputStreamSinkStage
*/
private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapterMessage],
sendToStage: (AdapterToStageMessage) Unit,
timeout: FiniteDuration)
extends InputStream {
var isActive = true
var isStageAlive = true
val subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible")
var skipBytes = 0
var detachedChunk: Option[ByteString] = None
@scala.throws(classOf[IOException])
private[this] def executeIfNotClosed[T](f: () T): T =
if (isActive) f()
else throw subscriberClosedException
@scala.throws(classOf[IOException])
override def read(): Int = {
val a = Array[Byte](1)
if (read(a, 0, 1) != -1) a(0)
else -1
}
@scala.throws(classOf[IOException])
override def read(a: Array[Byte]): Int = read(a, 0, a.length)
@scala.throws(classOf[IOException])
override def read(a: Array[Byte], begin: Int, length: Int): Int = {
executeIfNotClosed(()
if (isStageAlive) {
detachedChunk match {
case None
sharedBuffer.poll(timeout.toMillis, TimeUnit.MILLISECONDS) match {
case Data(data)
detachedChunk = Some(data)
readBytes(a, begin, length)
case Finished
isStageAlive = false
-1
case Failed(ex)
isStageAlive = false
throw new IOException(ex)
}
case Some(data)
readBytes(a, begin, length)
}
} else -1)
}
private[this] def readBytes(a: Array[Byte], begin: Int, length: Int): Int = {
val availableInChunk = detachedChunk.size - skipBytes
val readBytes = getData(a, begin, length, 0)
if (readBytes >= availableInChunk) sendToStage(ReadElementAcknowledgement)
readBytes
}
@scala.throws(classOf[IOException])
override def close(): Unit = {
executeIfNotClosed(() {
// at this point Subscriber may be already terminated
if (isStageAlive) sendToStage(Close)
isActive = false
})
}
@tailrec
private[this] def getData(arr: Array[Byte], begin: Int, length: Int,
gotBytes: Int): Int = {
getDataChunk() match {
case Some(data)
val size = data.size - skipBytes
if (size + gotBytes <= length) {
System.arraycopy(data.toArray, skipBytes, arr, begin, size)
skipBytes = 0
detachedChunk = None
if (length - size == 0)
gotBytes + size
else
getData(arr, begin + size, length - size, gotBytes + size)
} else {
System.arraycopy(data.toArray, skipBytes, arr, begin, length)
skipBytes = length
gotBytes + length
}
case None gotBytes
}
}
private[this] def getDataChunk(): Option[ByteString] = {
detachedChunk match {
case None
sharedBuffer.poll() match {
case Data(data)
detachedChunk = Some(data)
detachedChunk
case _ None
}
case Some(_) detachedChunk
}
}
}

View file

@ -0,0 +1,178 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.io
import java.io.{ IOException, OutputStream }
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue }
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.io.OutputStreamSourceStage._
import akka.stream.stage._
import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future, Promise }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
private[akka] object OutputStreamSourceStage {
sealed trait AdapterToStageMessage
case object Flush extends AdapterToStageMessage
case object Close extends AdapterToStageMessage
sealed trait DownstreamStatus
case object Ok extends DownstreamStatus
case object Canceled extends DownstreamStatus
sealed trait StageWithCallback {
def wakeUp(msg: AdapterToStageMessage): Future[Unit]
}
}
private[akka] class OutputStreamSourceStage(timeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSourceStage") {
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogicAndMaterializedValue: (GraphStageLogic, OutputStream) = {
val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer)
var flush: Option[Promise[Unit]] = None
var close: Option[Promise[Unit]] = None
val downstreamStatus = new AtomicReference[DownstreamStatus](Ok)
val logic = new GraphStageLogic(shape) with StageWithCallback {
private val downstreamCallback: AsyncCallback[Try[ByteString]] =
getAsyncCallback(onAsyncElem)
private val upstreamCallback: AsyncCallback[(AdapterToStageMessage, Promise[Unit])] =
getAsyncCallback(onAsyncMessage)
override def wakeUp(msg: AdapterToStageMessage): Future[Unit] = {
implicit val ex = interpreter.materializer.executionContext
val p = Promise[Unit]()
Future(upstreamCallback.invoke((msg, p)))
p.future
}
private def onAsyncMessage(event: (AdapterToStageMessage, Promise[Unit])): Unit =
event._1 match {
case Flush
flush = Some(event._2)
sendResponseIfNeed()
case Close
close = Some(event._2)
if (dataQueue.isEmpty) {
downstreamStatus.set(Canceled)
completeStage()
unblockUpstream()
} else sendResponseIfNeed()
}
private def onAsyncElem(event: Try[ByteString]): Unit = event match {
case Success(elem) onPush(elem)
case Failure(ex) failStage(ex)
}
private def unblockUpstream(): Boolean =
flush match {
case Some(p)
p.complete(Success(()))
flush = None
true
case None close match {
case Some(p)
p.complete(Success(()))
close = None
true
case None false
}
}
private def sendResponseIfNeed(): Unit =
if (downstreamStatus.get() == Canceled || dataQueue.isEmpty) unblockUpstream()
private def onPush(data: ByteString): Unit =
if (downstreamStatus.get() == Ok) {
push(out, data)
sendResponseIfNeed()
}
setHandler(out, new OutHandler {
override def onDownstreamFinish(): Unit = {
//assuming there can be no further in messages
downstreamStatus.set(Canceled)
dataQueue.clear()
completeStage()
}
override def onPull(): Unit = {
implicit val ex = interpreter.materializer.executionContext
Future(dataQueue.take()).onComplete(downstreamCallback.invoke)
}
})
}
(logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, timeout))
}
}
private[akka] class OutputStreamAdapter(dataQueue: BlockingQueue[ByteString],
downstreamStatus: AtomicReference[DownstreamStatus],
sendToStage: (AdapterToStageMessage) Future[Unit],
timeout: FiniteDuration)
extends OutputStream {
var isActive = true
var isPublisherAlive = true
val publisherClosedException = new IOException("Reactive stream is terminated, no writes are possible")
private[this] def send(sendAction: () Unit): Unit = {
if (isActive) {
if (isPublisherAlive) {
sendAction()
} else throw publisherClosedException
} else throw new IOException("OutputStream is closed")
}
private[this] def sendData(data: ByteString): Unit =
send(() {
dataQueue.put(data)
if (downstreamStatus.get() == Canceled) {
isPublisherAlive = false
throw publisherClosedException
}
})
private[this] def sendMessage(message: AdapterToStageMessage, handleCancelled: Boolean = true) =
send(()
try {
Await.ready(sendToStage(message), timeout)
if (downstreamStatus.get() == Canceled && handleCancelled) {
//Publisher considered to be terminated at earliest convenience to minimize messages sending back and forth
isPublisherAlive = false
throw publisherClosedException
}
} catch {
case e: IOException throw e
case NonFatal(e) throw new IOException(e)
})
@scala.throws(classOf[IOException])
override def write(b: Int): Unit = {
sendData(ByteString(b))
}
@scala.throws(classOf[IOException])
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
sendData(ByteString.fromArray(b, off, len))
}
@scala.throws(classOf[IOException])
override def flush(): Unit = sendMessage(Flush)
@scala.throws(classOf[IOException])
override def close(): Unit = {
sendMessage(Close, handleCancelled = false)
isActive = false
}
}

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.InputStream
import java.lang.{ Long JLong }
import akka.stream.impl.io.InputStreamSinkStage
import akka.stream.scaladsl.Sink
import akka.stream.{ Attributes, ActorAttributes, javadsl }
import akka.util.ByteString
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.language.postfixOps
/**
* Sink which allows to use [[java.io.InputStream]] to interact with reactive stream.
*/
object InputStreamSink {
/**
* Creates a synchronous (Java 6 compatible) Sink
*
* It materializes an [[java.io.InputStream]] to interacting with reactive stream.
*
* This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def apply(timeout: FiniteDuration = 5 seconds): Sink[ByteString, InputStream] =
Sink.fromGraph(new InputStreamSinkStage(timeout))
.withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and
Attributes.name("InputStreamSink"))
/**
* Creates a synchronous (Java 6 compatible) Sink
*
* It materializes an [[java.io.InputStream]] to interacting with reactive stream.
*
* This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(): javadsl.Sink[ByteString, InputStream] =
new javadsl.Sink(apply())
/**
* Creates a synchronous (Java 6 compatible) Sink
*
* It materializes an [[java.io.InputStream]] to interacting with reactive stream.
*
* This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(timeout: FiniteDuration): javadsl.Sink[ByteString, InputStream] =
new javadsl.Sink(apply(timeout))
}

View file

@ -25,7 +25,7 @@ object OutputStreamSink {
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`,
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def apply(output: () OutputStream): Sink[ByteString, Future[Long]] =

View file

@ -0,0 +1,60 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.OutputStream
import akka.stream.Attributes.Name
import akka.stream._
import akka.stream.impl.io.OutputStreamSourceStage
import akka.stream.scaladsl.{ FlowGraph, Source }
import akka.util.ByteString
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.implicitConversions
/**
* Source which allows to use [[java.io.OutputStream]] to interact with reactive stream.
*/
object OutputStreamSource {
import scala.language.postfixOps
/**
* Creates a synchronous (Java 6 compatible) Source.
*
* It materializes an [[java.io.OutputStream]] to interact with reactive stream.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def apply(timeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] =
Source.fromGraph(new OutputStreamSourceStage(timeout))
.withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and
Attributes.name("OutputStreamSource"))
/**
* Creates a synchronous (Java 6 compatible) Source.
*
* It materializes an [[java.io.OutputStream]] to interact with reactive stream.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(): javadsl.Source[ByteString, OutputStream] =
new javadsl.Source(apply())
/**
* Creates a synchronous (Java 6 compatible) Source.
*
* It materializes an [[java.io.OutputStream]] to interacting with reactive stream.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(timeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = {
new javadsl.Source(apply(timeout))
}
}

View file

@ -24,7 +24,7 @@ object SynchronousFileSink {
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`,
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def apply(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] =
@ -38,7 +38,7 @@ object SynchronousFileSink {
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`,
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def create(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] =
@ -51,7 +51,7 @@ object SynchronousFileSink {
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`,
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def appendTo(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] =

View file

@ -19,7 +19,7 @@ object SynchronousFileSource {
* Emitted elements are `chunkSize` sized [[ByteString]] elements.
*
* This source is backed by an Actor which will use the dedicated thread-pool base dispatcher.
* You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
@ -32,7 +32,7 @@ object SynchronousFileSource {
* Emitted elements are [[ByteString]] elements, chunked by default by [[DefaultChunkSize]] bytes.
*
* This source is backed by an Actor which will use the dedicated thread-pool base dispatcher.
* You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
@ -44,7 +44,7 @@ object SynchronousFileSource {
* Emitted elements are `chunkSize` sized [[ByteString]] elements.
*
* This source is backed by an Actor which will use the dedicated thread-pool base dispatcher.
* You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.

View file

@ -45,6 +45,24 @@ abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S,
def createLogic: GraphStageLogic
}
/**
* A SourceStage represents a reusable graph stream processing stage. A SourceStage consists of a [[akka.stream.Shape]] which describes
* its output port.
*/
abstract class SourceStage[Out, M](name: String) extends GraphStageWithMaterializedValue[SourceShape[Out], M] {
val out: Outlet[Out] = Outlet[Out](name + ".out")
override val shape: SourceShape[Out] = SourceShape(out)
}
/**
* A SinkStage represents a reusable graph stream processing stage. A SinkStage consists of a [[akka.stream.Shape]] which describes
* its input port.
*/
abstract class SinkStage[In, M](name: String) extends GraphStageWithMaterializedValue[SinkShape[In], M] {
val in: Inlet[In] = Inlet[In](name + ".in")
override val shape: SinkShape[In] = SinkShape(in)
}
private object TimerMessages {
final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression
final case class Timer(id: Int, task: Cancellable)