also fix FlowInterleaveSpec

Also-by: Johan Andrén <johan@markatta.com>
Also-by: Roland Kuhn <rk@rkuhn.info>
Also-by: Martynas Mickevičius <mmartynas@gmail.com>
This commit is contained in:
Endre Sándor Varga 2016-01-20 10:00:37 +02:00 committed by Martynas Mickevičius
parent ef77b56e66
commit 60497f6561
195 changed files with 1110 additions and 857 deletions

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Props }
import akka.stream.Attributes.InputBuffer
import akka.stream._
@ -93,15 +94,15 @@ private[akka] final class FanoutPublisherSink[In](
* Attaches a subscriber to this stream which will just discard all received
* elements.
*/
private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) {
private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) {
override def create(context: MaterializationContext) = {
val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
val p = Promise[Unit]()
val p = Promise[Done]()
(new SinkholeSubscriber[Any](p), p.future)
}
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new SinkholeSink(attributes, shape)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape)
override def withAttributes(attr: Attributes): Module = new SinkholeSink(attr, amendShape(attr))
override def toString: String = "SinkholeSink"
}
@ -110,11 +111,11 @@ private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkSh
* INTERNAL API
* Attaches a subscriber to this stream.
*/
private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
override def create(context: MaterializationContext) = (subscriber, ())
override def create(context: MaterializationContext) = (subscriber, NotUsed)
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape)
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: Attributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr))
override def toString: String = "SubscriberSink"
}
@ -123,9 +124,9 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att
* INTERNAL API
* A sink that immediately cancels its upstream upon materialization.
*/
private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], Unit) = (new CancellingSubscriber[Any], ())
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new CancelSink(attributes, shape)
private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape)
override def withAttributes(attr: Attributes): Module = new CancelSink(attr, amendShape(attr))
override def toString: String = "CancelSink"
}
@ -152,17 +153,17 @@ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes:
*/
private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
val attributes: Attributes,
shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
override def create(context: MaterializationContext) = {
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
val subscriberRef = actorMaterializer.actorOf(context,
ActorRefSinkActor.props(ref, effectiveSettings.maxInputBufferSize, onCompleteMessage))
(akka.stream.actor.ActorSubscriber[In](subscriberRef), ())
(akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] =
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
override def withAttributes(attr: Attributes): Module =
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))