diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala new file mode 100644 index 0000000000..7e399bbb25 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.actor.Status +import akka.pattern.{ AskTimeoutException, pipe } +import akka.stream.ActorMaterializer +import akka.stream.testkit.Utils._ +import akka.stream.testkit.{ AkkaSpec, _ } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } +import scala.util.control.NoStackTrace + +class AcknowledgeSinkSpec extends AkkaSpec { + implicit val ec = system.dispatcher + implicit val mat = ActorMaterializer() + + val ex = new RuntimeException("ex") with NoStackTrace + + val noMsgTimeout = 300.millis + + def assertSuccess(value: Any, fb: Future[Option[Any]]): Unit = + Await.result(fb, 1.second) should be(Some(value)) + + "An AcknowledgeSink" must { + + "send the elements as result of future" in assertAllStagesStopped { + val queue = Source(List(1, 2, 3)).runWith(Sink.queue(3)) + assertSuccess(1, queue.pull()) + assertSuccess(2, queue.pull()) + assertSuccess(3, queue.pull()) + queue.pull().pipeTo(testActor) + expectMsg(None) + } + + "allow to have only one future waiting for result in each point of time" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source(probe).runWith(Sink.queue(3)) + val sub = probe.expectSubscription() + val future = queue.pull() + val future2 = queue.pull() + an[IllegalStateException] shouldBe thrownBy { Await.result(future2, 300.millis) } + + sub.sendNext(1) + future.pipeTo(testActor) + expectMsg(Some(1)) + + sub.sendComplete() + } + + "wait for next element from upstream" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source(probe).runWith(Sink.queue(3)) + val sub = probe.expectSubscription() + + queue.pull().pipeTo(testActor) + expectNoMsg(noMsgTimeout) + + sub.sendNext(1) + expectMsg(Some(1)) + sub.sendComplete() + } + + "fail future on stream failure" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source(probe).runWith(Sink.queue(3)) + val sub = probe.expectSubscription() + + queue.pull().pipeTo(testActor) + expectNoMsg(noMsgTimeout) + + sub.sendError(ex) + expectMsg(Status.Failure(ex)) + } + + "fail future when stream failed" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source(probe).runWith(Sink.queue(3, 100.milli)) + val sub = probe.expectSubscription() + sub.sendError(ex) // potential race condition + + val future = queue.pull() + future.onFailure { case e ⇒ e.getClass() should be(classOf[AskTimeoutException]); Unit } + future.onSuccess { case _ ⇒ fail() } + + Await.ready(future, 1.second) + } + + "timeout future when stream cannot provide data" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source(probe).runWith(Sink.queue(3)) + val sub = probe.expectSubscription() + + queue.pull().pipeTo(testActor) + expectNoMsg(noMsgTimeout) + + sub.sendNext(1) + expectMsg(Some(1)) + sub.sendComplete() + } + + "work when buffer is 0" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source(probe).runWith(Sink.queue(0)) + val sub = probe.expectSubscription() + sub.sendNext(1) + + queue.pull().pipeTo(testActor) + sub.sendNext(2) + expectMsg(Some(2)) + sub.sendComplete() + } + + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala new file mode 100644 index 0000000000..057d35edbc --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.testkit.{ AkkaSpec, TestSubscriber } +import akka.stream.{ ActorMaterializer, OverflowStrategy } +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.concurrent._ +import akka.pattern.pipe + +class AcknowledgeSourceSpec extends AkkaSpec { + implicit val mat = ActorMaterializer() + implicit val ec = system.dispatcher + + def assertSuccess(b: Boolean, fb: Future[Boolean]): Unit = + Await.result(fb, 1.second) should be(b) + + "A AcknowledgeSource" must { + + "emit received messages to the stream" in { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(10, OverflowStrategy.fail).to(Sink(s)).run() + val sub = s.expectSubscription + sub.request(2) + assertSuccess(true, queue.offer(1)) + s.expectNext(1) + assertSuccess(true, queue.offer(2)) + s.expectNext(2) + assertSuccess(true, queue.offer(3)) + sub.cancel() + } + + "buffer when needed" in { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink(s)).run() + val sub = s.expectSubscription + for (n ← 1 to 20) assertSuccess(true, queue.offer(n)) + sub.request(10) + for (n ← 1 to 10) assertSuccess(true, queue.offer(n)) + sub.request(10) + for (n ← 11 to 20) assertSuccess(true, queue.offer(n)) + + for (n ← 200 to 399) assertSuccess(true, queue.offer(n)) + sub.request(100) + for (n ← 300 to 399) assertSuccess(true, queue.offer(n)) + sub.cancel() + } + + "not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink(s)).run() + val sub = s.expectSubscription + sub.request(1) + assertSuccess(true, queue.offer(1)) + s.expectNext(1) + sub.cancel() + } + + "return false when can reject element to buffer" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink(s)).run() + val sub = s.expectSubscription + assertSuccess(true, queue.offer(1)) + assertSuccess(false, queue.offer(2)) + sub.request(1) + s.expectNext(1) + sub.cancel() + } + + "wait when buffer is full and backpressure is on" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(2, OverflowStrategy.backpressure).to(Sink(s)).run() + val sub = s.expectSubscription + assertSuccess(true, queue.offer(1)) + + val addedSecond = queue.offer(2) + + addedSecond.pipeTo(testActor) + expectNoMsg(300.millis) + + sub.request(1) + s.expectNext(1) + assertSuccess(true, addedSecond) + + sub.request(1) + s.expectNext(2) + + sub.cancel() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/Queue.scala b/akka-stream/src/main/scala/akka/stream/Queue.scala new file mode 100644 index 0000000000..eed01e7a86 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/Queue.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.Future + +/** + * This trait allows to have the queue as a data source for some stream. + */ +trait SourceQueue[T] { + + /** + * Method offers next element to a stream and returns future that: + * - competes with true if element is consumed by a stream + * - competes with false when stream dropped offered element + * - fails if stream is completed or cancelled. + * + * @param elem element to send to a stream + */ + def offer(elem: T): Future[Boolean] +} + +/** + * Trait allows to have the queue as a sink for some stream. + * "SinkQueue" pulls data from stream with backpressure mechanism. + */ +trait SinkQueue[T] { + + /** + * Method pulls elements from stream and returns future that: + * - fails if stream is finished + * - completes with None in case if stream is completed after we got future + * - completes with `Some(element)` in case next element is available from stream. + */ + def pull(): Future[Option[T]] +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala new file mode 100644 index 0000000000..a7973a5462 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.{ ActorRef, Props } +import akka.stream.OverflowStrategy +import akka.stream.OverflowStrategy._ +import akka.stream.actor.ActorPublisherMessage.Request +import akka.stream.impl.AcknowledgePublisher.{ Rejected, Ok } + +/** + * INTERNAL API + */ +private[akka] object AcknowledgePublisher { + def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = + Props(new AcknowledgePublisher(bufferSize, overflowStrategy)) + + case class Ok() + case class Rejected() +} + +/** + * INTERNAL API + */ +private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: OverflowStrategy) + extends ActorRefSourceActor(bufferSize, overflowStrategy) { + + var backpressedElem: Option[ActorRef] = None + + override def requestElem: Receive = { + case _: Request ⇒ + // totalDemand is tracked by super + if (bufferSize != 0) + while (totalDemand > 0L && !buffer.isEmpty) { + //if buffer is full - sent ack message to sender in case of Backpressure mode + if (buffer.isFull) backpressedElem match { + case Some(ref) ⇒ + ref ! Ok(); backpressedElem = None + case None ⇒ //do nothing + } + onNext(buffer.dequeue()) + } + } + + override def receiveElem: Receive = { + case elem if isActive ⇒ + if (totalDemand > 0L) { + onNext(elem) + sendAck(true) + } else if (bufferSize == 0) { + log.debug("Dropping element because there is no downstream demand: [{}]", elem) + sendAck(false) + } else if (!buffer.isFull) + enqueueAndSendAck(elem) + else overflowStrategy match { + case DropHead ⇒ + buffer.dropHead() + enqueueAndSendAck(elem) + case DropTail ⇒ + buffer.dropTail() + enqueueAndSendAck(elem) + case DropBuffer ⇒ + buffer.clear() + enqueueAndSendAck(elem) + case DropNew ⇒ + sendAck(false) + case Fail ⇒ + onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) + case Backpressure ⇒ + sendAck(false) //does not allow to send more than buffer size + } + } + + def enqueueAndSendAck(elem: Any): Unit = { + buffer.enqueue(elem) + if (buffer.isFull && overflowStrategy == Backpressure) backpressedElem = Some(sender) + else sendAck(true) + } + + def sendAck(isOk: Boolean): Unit = { + val msg = if (isOk) Ok() else Rejected() + context.sender() ! msg + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgeSubscriber.scala new file mode 100644 index 0000000000..b93073d75e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgeSubscriber.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.{ ActorLogging, ActorRef, Props, Status } +import akka.stream.actor.ActorPublisherMessage.Request +import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage, RequestStrategy } + +import scala.util.{ Try, Failure, Success } + +private[akka] object AcknowledgeSubscriber { + def props(highWatermark: Int) = + Props(new AcknowledgeSubscriber(highWatermark)) +} + +/** + * INTERNAL API + */ +private[akka] class AcknowledgeSubscriber(maxBuffer: Int) extends ActorSubscriber with ActorLogging { + import ActorSubscriberMessage._ + + var buffer: Vector[Any] = Vector.empty + + override val requestStrategy = new RequestStrategy { + def requestDemand(remainingRequested: Int): Int = { + maxBuffer - buffer.size - remainingRequested + } + } + + var requester: Option[ActorRef] = None + + def receive = { + case Request(_) ⇒ + if (requester.isEmpty) { + requester = Some(sender) + trySendElementDownstream() + } else + sender ! Status.Failure( + new IllegalStateException("You have to wait for first future to be resolved to send another request")) + + case OnNext(elem) ⇒ + if (maxBuffer != 0) { + buffer :+= elem + trySendElementDownstream() + } else requester match { + case Some(ref) ⇒ + requester = None + ref ! Some(elem) + case None ⇒ log.debug("Dropping element because there is no downstream demand: [{}]", elem) + } + + case OnError(cause) ⇒ + trySendDownstream(Status.Failure(cause)) + context.stop(self) + + case OnComplete ⇒ + if (buffer.isEmpty) { + trySendDownstream(Status.Success(None)) + context.stop(self) + } + } + + def trySendElementDownstream(): Unit = { + requester match { + case Some(ref) ⇒ + if (buffer.size > 0) { + ref ! Some(buffer.head) + requester = None + buffer = buffer.tail + } else if (canceled) { + ref ! None + context.stop(self) + } + + case None ⇒ //do nothing + } + } + + def trySendDownstream(e: Any): Unit = { + requester match { + case Some(ref) ⇒ + ref ! e + case None ⇒ //do nothing + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 5d8bde2560..1eb9e1dd3d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -27,15 +27,9 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf import akka.stream.OverflowStrategy._ // when bufferSize is 0 there the buffer is not used - private val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize) - - def receive = { - case _: Request ⇒ - // totalDemand is tracked by super - if (bufferSize != 0) - while (totalDemand > 0L && !buffer.isEmpty) - onNext(buffer.dequeue()) + protected val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize) + def receive = ({ case Cancel ⇒ context.stop(self) @@ -46,6 +40,17 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf case Status.Failure(cause) if isActive ⇒ onErrorThenStop(cause) + }: Receive).orElse(requestElem).orElse(receiveElem) + + def requestElem: Receive = { + case _: Request ⇒ + // totalDemand is tracked by super + if (bufferSize != 0) + while (totalDemand > 0L && !buffer.isEmpty) + onNext(buffer.dequeue()) + } + + def receiveElem: Receive = { case elem if isActive ⇒ if (totalDemand > 0L) onNext(elem) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 1ded51b290..8e48cb7f0c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -4,13 +4,18 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean -import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props } -import akka.stream.impl.StreamLayout.Module + +import akka.actor._ import akka.stream._ +import akka.stream.impl.AcknowledgePublisher.{ Ok, Rejected } +import akka.stream.impl.StreamLayout.Module +import akka.util.Timeout import org.reactivestreams._ + import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.Promise +import scala.concurrent.duration.{ FiniteDuration, _ } +import scala.concurrent.{ Future, Promise } +import scala.language.postfixOps import scala.util.{ Failure, Success } /** @@ -163,3 +168,32 @@ private[akka] final class ActorRefSource[Out]( override def withAttributes(attr: Attributes): Module = new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) } + +/** + * INTERNAL API + */ +private[akka] final class AcknowledgeSource[Out](bufferSize: Int, overflowStrategy: OverflowStrategy, + val attributes: Attributes, shape: SourceShape[Out], + timeout: FiniteDuration = 5 seconds) + extends SourceModule[Out, SourceQueue[Out]](shape) { + + override def create(context: MaterializationContext) = { + import akka.pattern.ask + val ref = ActorMaterializer.downcast(context.materializer).actorOf(context, + AcknowledgePublisher.props(bufferSize, overflowStrategy)) + implicit val t = Timeout(timeout) + + (akka.stream.actor.ActorPublisher[Out](ref), new SourceQueue[Out] { + implicit val ctx = context.materializer.executionContext + override def offer(out: Out): Future[Boolean] = (ref ? out).map { + case Ok() ⇒ true + case Rejected() ⇒ false + } + }) + } + + override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, SourceQueue[Out]] = + new AcknowledgeSource[Out](bufferSize, overflowStrategy, attributes, shape, timeout) + override def withAttributes(attr: Attributes): Module = + new AcknowledgeSource(bufferSize, overflowStrategy, attr, amendShape(attr), timeout) +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index f66d452b91..514b8f499d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -4,11 +4,16 @@ package akka.stream.impl import akka.actor.{ Deploy, ActorRef, Props } +import akka.stream.actor.ActorPublisherMessage.Request import akka.stream.impl.StreamLayout.Module -import akka.stream.{ Attributes, Inlet, Shape, SinkShape, MaterializationContext, ActorMaterializer } +import akka.stream._ +import akka.util.Timeout import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance +import scala.concurrent.duration.{ FiniteDuration, _ } import scala.concurrent.{ Future, Promise } +import scala.language.postfixOps +import scala.util.Try /** * INTERNAL API @@ -217,3 +222,28 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any override def toString: String = "ActorRefSink" } +/** + * INTERNAL API + */ +private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: Attributes, + shape: SinkShape[In], timeout: FiniteDuration) extends SinkModule[In, SinkQueue[In]](shape) { + + override def create(context: MaterializationContext) = { + import akka.pattern.ask + val actorMaterializer = ActorMaterializer.downcast(context.materializer) + + implicit val t = Timeout(timeout) + val subscriberRef = actorMaterializer.actorOf(context, + AcknowledgeSubscriber.props(bufferSize)) + (akka.stream.actor.ActorSubscriber[In](subscriberRef), + new SinkQueue[In] { + override def pull(): Future[Option[In]] = (subscriberRef ? Request(1)).mapTo[Option[In]] + }) + } + + override protected def newInstance(shape: SinkShape[In]): SinkModule[In, SinkQueue[In]] = + new AcknowledgeSink[In](bufferSize, attributes, shape, timeout) + override def withAttributes(attr: Attributes): Module = + new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout) + override def toString: String = "AcknowledgeSink" +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 2c2d769935..b235f90f8f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -75,6 +75,7 @@ private[stream] object Stages { val actorRefSource = name("actorRefSource") val synchronousFileSource = name("synchronousFileSource") val inputStreamSource = name("inputStreamSource") + val acknowledgeSource = name("acknowledgeSource") val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") @@ -86,6 +87,7 @@ private[stream] object Stages { val actorSubscriberSink = name("actorSubscriberSink") val synchronousFileSink = name("synchronousFileSink") val outputStreamSink = name("outputStreamSink") + val acknowledgeSink = name("acknowledgeSink") } import DefaultAttributes._ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index b56cdb15fd..15663556c9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -9,6 +9,7 @@ import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Future } import scala.util.Try @@ -147,6 +148,20 @@ object Sink { new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num ⇒ strategy.apply(num))) } + /** + * Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]]. + * [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. + * `Future` completes when element is available. + * + * `Sink` will request at most `bufferSize` number of elements from + * upstream and then stop back pressure. + * + * @param bufferSize The size of the buffer in element count + * @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]`` + */ + def queue[T](bufferSize: Int, timeout: FiniteDuration): Sink[T, SinkQueue[T]] = + new Sink(scaladsl.Sink.queue(bufferSize, timeout)) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 630d9aa665..f7eea3ca49 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,27 +3,20 @@ */ package akka.stream.javadsl -import java.io.File -import akka.japi.function -import scala.collection.immutable -import java.util.concurrent.Callable -import akka.actor.{ Cancellable, ActorRef, Props } +import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter -import akka.japi.Util -import akka.stream.Attributes._ +import akka.japi.{ Util, function } import akka.stream._ -import akka.stream.impl.{ ActorPublisherSource, StreamLayout } -import akka.util.ByteString -import org.reactivestreams.{ Processor, Publisher, Subscriber } +import akka.stream.impl.StreamLayout +import akka.stream.stage.Stage +import org.reactivestreams.{ Publisher, Subscriber } + import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ -import scala.concurrent.{ Promise, Future } +import scala.collection.immutable import scala.concurrent.duration.FiniteDuration -import scala.language.higherKinds -import scala.language.implicitConversions -import akka.stream.stage.Stage -import akka.stream.impl.StreamLayout -import scala.annotation.varargs +import scala.concurrent.{ Future, Promise } +import scala.language.{ higherKinds, implicitConversions } /** Java API */ object Source { @@ -235,6 +228,32 @@ object Source { val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq() new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num ⇒ strategy.apply(num))) } + + /** + * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. + * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * Acknowledgement mechanism is available. + * [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true + * if element was added to buffer or sent downstream. It completes + * with false if element was dropped. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped + * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does + * not matter. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + * @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]`` + */ + def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration): Source[T, SourceQueue[T]] = + new Source(scaladsl.Source.queue(bufferSize, overflowStrategy, timeout)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 76b76a8750..55b464b623 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,18 +3,17 @@ */ package akka.stream.scaladsl -import akka.stream.javadsl import akka.actor.{ ActorRef, Props } -import akka.stream._ -import akka.stream.impl.Stages.{ MapAsyncUnordered, DefaultAttributes } +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ -import akka.stream.Attributes._ -import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage, SyncDirective } +import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective } +import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec -import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.concurrent.duration.{ FiniteDuration, _ } +import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** @@ -205,4 +204,20 @@ object Sink extends SinkApply { def actorSubscriber[T](props: Props): Sink[T, ActorRef] = new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink"))) + /** + * Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]]. + * [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. + * `Future` completes when element is available. + * + * `Sink` will request at most `bufferSize` number of elements from + * upstream and then stop back pressure. + * + * @param bufferSize The size of the buffer in element count + * @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]`` + */ + def queue[T](bufferSize: Int, timeout: FiniteDuration = 5.seconds): Sink[T, SinkQueue[T]] = { + require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") + new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout)) + } + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 95b9d759e2..a1c5e83e5c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -12,7 +12,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{ FiniteDuration, _ } import scala.concurrent.{ Future, Promise } import scala.language.higherKinds @@ -401,4 +401,32 @@ object Source extends SourceApply { combineRest(2, rest.iterator) }) + /** + * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. + * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * Acknowledgement mechanism is available. + * [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true + * if element was added to buffer or sent downstream. It completes + * with false if element was dropped. + * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped + * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does + * not matter. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + * @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]`` + */ + def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration = 5.seconds): Source[T, SourceQueue[T]] = { + require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") + new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource"))) + } + }