diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 50437f10f8..cd94b89ffa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -76,13 +76,13 @@ private[akka] object Ast { final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] { final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - if (iterator.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] + if (iterator.isEmpty) EmptyPublisher[I] else ActorPublisher[I](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), name = s"$flowName-0-iterator")) } final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] + if (iterable.isEmpty) EmptyPublisher[I] else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), name = s"$flowName-0-iterable"), Some(iterable)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 97b4fc8586..14a9f7c66f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -10,6 +10,7 @@ import org.reactivestreams.{ Subscriber, Publisher } */ private[akka] case object EmptyPublisher extends Publisher[Nothing] { def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onComplete() + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] } /** @@ -17,4 +18,5 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { */ private[akka] case class ErrorPublisher(t: Throwable) extends Publisher[Nothing] { def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onError(t) + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala index f36038153e..1de4a968ba 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala @@ -14,7 +14,7 @@ import scala.util.control.NonFatal */ private[akka] object SynchronousPublisherFromIterable { def apply[T](iterable: immutable.Iterable[T]): Publisher[T] = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[T]] + if (iterable.isEmpty) EmptyPublisher[T] else new SynchronousPublisherFromIterable(iterable) private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 4bc842d927..154c65f1c1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -15,7 +15,6 @@ import scala.util.{ Failure, Success } import akka.stream.Transformer import akka.stream.scaladsl2.FlowMaterializer import akka.stream.MaterializerSettings -import akka.stream.impl.EmptyPublisher import akka.stream.impl.ActorPublisher import akka.stream.impl.IterablePublisher import akka.stream.impl.TransformProcessorImpl diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 27a6c9b05e..dc13e7d677 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -6,9 +6,7 @@ package akka.stream.scaladsl2 import scala.language.higherKinds import scala.collection.immutable import scala.concurrent.Future -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import akka.stream.Transformer +import akka.stream._ import akka.stream.impl.BlackholeSubscriber import akka.stream.impl2.Ast._ import scala.annotation.unchecked.uncheckedVariance @@ -17,6 +15,7 @@ import scala.concurrent.Promise import akka.stream.impl.EmptyPublisher import akka.stream.impl.IterablePublisher import akka.stream.impl2.ActorBasedFlowMaterializer +import org.reactivestreams._ sealed trait Flow @@ -39,42 +38,45 @@ object FlowFrom { def apply[T](p: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(p)) } -trait Source[-In] { - def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) @uncheckedVariance +trait Source[+In] { + def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In @uncheckedVariance], Any) +} + +trait SourceKey[+In, T] extends Source[In] { + override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In @uncheckedVariance], T) + // these are unique keys, case class equality would break them + override def equals(other: AnyRef): Boolean = this eq other } /** * Default input. * Allows to materialize a Flow with this input to Subscriber. */ -final case class SubscriberSource[In]() extends Source[In] { - override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = { +final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] { + override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], Subscriber[In]) = { val identityProcessor = materializer.identityProcessor[In](flowName) - (identityProcessor.asInstanceOf[Publisher[In]], identityProcessor.asInstanceOf[Subscriber[In]]) + (identityProcessor, identityProcessor) } - def subscriber[I <: In](m: MaterializedSource): Subscriber[I] = - m.getSourceFor(this).asInstanceOf[Subscriber[I]] + def subscriber(m: MaterializedSource): Subscriber[In] = + m.getSourceFor(this) } /** * [[Source]] from `Publisher`. */ -final case class PublisherSource[In](p: Publisher[_ >: In]) extends Source[In] { +final case class PublisherSource[In](p: Publisher[In]) extends Source[In] { override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = - (p.asInstanceOf[Publisher[In]], p) + (p, p) } /** * [[Source]] from `Iterable` - * - * Changing In from Contravariant to Covariant is needed because Iterable[+A]. - * But this brakes IterableSource variance and we get IterableSource(Seq(1,2,3)): IterableSource[Any] */ -final case class IterableSource[In](iterable: immutable.Iterable[_ >: In]) extends Source[In] { +final case class IterableSource[In](iterable: immutable.Iterable[In]) extends Source[In] { override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = { - val p = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[In]] + val p: Publisher[In] = + if (iterable.isEmpty) EmptyPublisher[In] else materializer match { case m: ActorBasedFlowMaterializer ⇒ m.actorPublisher(IterablePublisher.props(iterable, materializer.settings), @@ -82,36 +84,43 @@ final case class IterableSource[In](iterable: immutable.Iterable[_ >: In]) exten case other ⇒ throw new IllegalArgumentException(s"IterableSource requires ActorBasedFlowMaterializer, got [${other.getClass.getName}]") } - (p.asInstanceOf[Publisher[In]], iterable) + (p, iterable) } } /** * [[Source]] from `Future` - * - * Changing In from Contravariant to Covariant is needed because Future[+A]. - * But this brakes FutureSource variance and we get FutureSource(Future{1}): FutureSource[Any] */ -final case class FutureSource[In](f: Future[_ >: In]) extends Source[In] { +final case class FutureSource[In](f: Future[In]) extends Source[In] { override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = ??? } -trait Sink[+Out] { - def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef +trait Sink[-Out] { + def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): Any +} + +trait SinkKey[-Out, T] extends Sink[Out] { + override def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): T + // these are unique keys, case class equality would break them + override def equals(other: AnyRef): Boolean = this eq other } /** * Default output. * Allows to materialize a Flow with this output to Publisher. */ -final case class PublisherSink[+Out]() extends Sink[Out] { - def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = flowPublisher - def publisher[O >: Out](m: MaterializedSink): Publisher[O] = m.getSinkFor(this).asInstanceOf[Publisher[O]] +// FIXME: make case object +final case class PublisherSink[Out]() extends SinkKey[Out, Publisher[Out]] { + def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Publisher[Out] = flowPublisher + def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this) } -final case class BlackholeSink[+Out]() extends Sink[Out] { - override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = { - val s = new BlackholeSubscriber[Out](materializer.settings.maxInputBufferSize) +/** + * Output to nirvana. + */ +final case object BlackholeSink extends Sink[Any] { + override def attach(flowPublisher: Publisher[Any], materializer: FlowMaterializer): AnyRef = { + val s = new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize) flowPublisher.subscribe(s) s } @@ -120,26 +129,19 @@ final case class BlackholeSink[+Out]() extends Sink[Out] { /** * [[Sink]] to a Subscriber. */ -final case class SubscriberSink[+Out](subscriber: Subscriber[_ <: Out]) extends Sink[Out] { - override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = { - flowPublisher.subscribe(subscriber.asInstanceOf[Subscriber[Out]]) +final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends Sink[Out] { + override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = { + flowPublisher.subscribe(subscriber) subscriber } } -/** - * INTERNAL API - */ -private[akka] object ForeachSink { - private val ListOfUnit = List(()) -} - /** * Foreach output. Invokes the given function for each element. Completes the [[#future]] when * all elements processed, or stream failed. */ -final case class ForeachSink[Out](f: Out ⇒ Unit) extends Sink[Out] { // FIXME variance? - override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = { +final case class ForeachSink[Out](f: Out ⇒ Unit) extends SinkKey[Out, Future[Unit]] { + override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Future[Unit] = { val promise = Promise[Unit]() FlowFrom(flowPublisher).transform("foreach", () ⇒ new Transformer[Out, Unit] { override def onNext(in: Out) = { f(in); Nil } @@ -153,49 +155,36 @@ final case class ForeachSink[Out](f: Out ⇒ Unit) extends Sink[Out] { // FIXME }).consume()(materializer) promise.future } - def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this).asInstanceOf[Future[Unit]] + def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this) } /** * Fold output. Reduces output stream according to the given fold function. */ -final case class FoldSink[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Sink[Out] { - override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = ??? +final case class FoldSink[T, Out](zero: T)(f: (T, Out) ⇒ T) extends Sink[Out] { + override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = ??? def future: Future[T] = ??? } /** - * Operations with a Flow which has no attached [[Source]]. - * - * No Out type parameter would be useful for Graph signatures, but we need it here - * for `withSource` and `prependTransform` methods. + * Marker interface for flows that have a free (attachable) input side. */ -sealed trait HasNoSource[-In, +Out] extends Flow { - type Repr[-In, +Out] <: HasNoSource[In, Out] - type AfterAttachingSource[-In, +Out] <: Flow - - def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] - - def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] - def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] - -} +sealed trait HasNoSource[-In] extends Flow /** - * Operations with a Flow which has no attached [[Sink]]. - * - * No In type parameter would be useful for Graph signatures, but we need it here - * for `withSink`. + * Marker interface for flows that have a free (attachable) output side. */ -trait HasNoSink[-In, +Out] extends Flow { - type Repr[-In, +Out] <: HasNoSink[In, Out] - type AfterAttachingSink[-In, +Out] <: Flow +sealed trait HasNoSink[+Out] extends Flow + +/** + * Operations offered by flows with a free output side: the DSL flows left-to-right only. + */ +trait FlowOps[-In, +Out] extends HasNoSink[Out] { + type Repr[-I, +O] <: FlowOps[I, O] // Storing ops in reverse order protected def andThen[U](op: AstNode): Repr[In, U] - def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] - def map[T](f: Out ⇒ T): Repr[In, T] = transform("map", () ⇒ new Transformer[Out, T] { override def onNext(in: Out) = List(f(in)) @@ -204,52 +193,39 @@ trait HasNoSink[-In, +Out] extends Flow { def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = { andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) } - - def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] - def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] - } /** * Flow without attached input and without attached output, can be used as a `Processor`. */ -final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasNoSink[In, Out] with HasNoSource[In, Out] { - override type Repr[-In, +Out] = ProcessorFlow[In, Out] - type AfterAttachingSink[-In, +Out] = FlowWithSink[In, Out] - type AfterAttachingSource[-In, +Out] = FlowWithSource[In, Out] +final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In, Out] with HasNoSource[In] { + override type Repr[-I, +O] = ProcessorFlow[I, O] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = FlowWithSink(out, ops) - override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = FlowWithSource(in, ops) + def withSink(out: Sink[Out]): FlowWithSink[In, Out] = FlowWithSink(out, ops) + def withSource(in: Source[In]): FlowWithSource[In, Out] = FlowWithSource(in, ops) - override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = - ProcessorFlow(ops ::: f.ops) - override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] = - FlowWithSource(f.input, ops ::: f.ops) + def prepend[T](f: ProcessorFlow[T, In]): ProcessorFlow[T, Out] = ProcessorFlow(ops ::: f.ops) + def prepend[T](f: FlowWithSource[T, In]): FlowWithSource[T, Out] = f.append(this) - override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops) - override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] = - FlowWithSink(f.output, f.ops ++: ops) + def append[T](f: ProcessorFlow[Out, T]): ProcessorFlow[In, T] = ProcessorFlow(f.ops ++: ops) + def append[T](f: FlowWithSink[Out, T]): FlowWithSink[In, T] = f.prepend(this) } /** * Flow with attached output, can be used as a `Subscriber`. */ -final case class FlowWithSink[-In, +Out](output: Sink[Out], ops: List[AstNode]) extends HasNoSource[In, Out] { - type Repr[-In, +Out] = FlowWithSink[In, Out] - type AfterAttachingSource[-In, +Out] = RunnableFlow[In, Out] +final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] { - override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = RunnableFlow(in, output, ops) + def withSource(in: Source[In]): RunnableFlow[In, Out] = new RunnableFlow(in, output, ops) def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops) - override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = - FlowWithSink(output, ops ::: f.ops) - override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] = - RunnableFlow(f.input, output, ops ::: f.ops) + def prepend[T](f: ProcessorFlow[T, In]): FlowWithSink[T, Out] = FlowWithSink(output, ops ::: f.ops) + def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = new RunnableFlow(f.input, output, ops ::: f.ops) - def toSubscriber[I <: In]()(implicit materializer: FlowMaterializer): Subscriber[I] = { - val subIn = SubscriberSource[I]() + def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = { + val subIn = SubscriberSource[In]() val mf = withSource(subIn).run() subIn.subscriber(mf) } @@ -258,37 +234,36 @@ final case class FlowWithSink[-In, +Out](output: Sink[Out], ops: List[AstNode]) /** * Flow with attached input, can be used as a `Publisher`. */ -final case class FlowWithSource[-In, +Out](input: Source[In], ops: List[AstNode]) extends HasNoSink[In, Out] { - override type Repr[-In, +Out] = FlowWithSource[In, Out] - type AfterAttachingSink[-In, +Out] = RunnableFlow[In, Out] +final case class FlowWithSource[-In, +Out](private[scaladsl2] val input: Source[In @uncheckedVariance], ops: List[AstNode]) extends FlowOps[In, Out] { + override type Repr[-I, +O] = FlowWithSource[I, O] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = RunnableFlow(input, out, ops) + def withSink(out: Sink[Out]): RunnableFlow[In, Out] = new RunnableFlow(input, out, ops) def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops) - override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = FlowWithSource(input, f.ops ++: ops) - override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] = - RunnableFlow(input, f.output, f.ops ++: ops) + def append[T](f: ProcessorFlow[Out, T]): FlowWithSource[In, T] = FlowWithSource(input, f.ops ++: ops) + def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = new RunnableFlow(input, f.output, f.ops ++: ops) - def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = { + def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { val pubOut = PublisherSink[Out]() val mf = withSink(pubOut).run() pubOut.publisher(mf) } - def publishTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit = - toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]]) + def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = + toPublisher().subscribe(subscriber) def consume()(implicit materializer: FlowMaterializer): Unit = - withSink(BlackholeSink()).run() + withSink(BlackholeSink).run() } /** * Flow with attached input and output, can be executed. */ -final case class RunnableFlow[-In, +Out](input: Source[In], output: Sink[Out], ops: List[AstNode]) extends Flow { +final case class RunnableFlow[-In, +Out](private[scaladsl2] val input: Source[In @uncheckedVariance], + private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends Flow { def withoutSink: FlowWithSource[In, Out] = FlowWithSource(input, ops) def withoutSource: FlowWithSink[In, Out] = FlowWithSink(output, ops) @@ -296,20 +271,20 @@ final case class RunnableFlow[-In, +Out](input: Source[In], output: Sink[Out], o materializer.materialize(input, output, ops) } -class MaterializedFlow(sourceKey: AnyRef, matSource: AnyRef, sinkKey: AnyRef, matSink: AnyRef) extends MaterializedSource with MaterializedSink { - override def getSourceFor(key: AnyRef): AnyRef = - if (key == sourceKey) matSource +class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink { + override def getSourceFor[T](key: SourceKey[_, T]): T = + if (key == sourceKey) matSource.asInstanceOf[T] else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") - def getSinkFor(key: AnyRef): AnyRef = - if (key == sinkKey) matSink + def getSinkFor[T](key: SinkKey[_, T]): T = + if (key == sinkKey) matSink.asInstanceOf[T] else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") } trait MaterializedSource { - def getSourceFor(sourceKey: AnyRef): AnyRef + def getSourceFor[T](sourceKey: SourceKey[_, T]): T } trait MaterializedSink { - def getSinkFor(sinkKey: AnyRef): AnyRef + def getSinkFor[T](sinkKey: SinkKey[_, T]): T } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala index 023c76ebe9..a182038769 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -192,7 +192,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def cleanup() = cleanupProbe.ref ! s }). - withSink(BlackholeSink()).run() + withSink(BlackholeSink).run() cleanupProbe.expectMsg("a") } diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 29afcd49e7..156cc2a3e4 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -25,7 +25,7 @@ object StreamTestKit { */ def errorPublisher[T](cause: Throwable): Publisher[T] = ErrorPublisher(cause: Throwable).asInstanceOf[Publisher[T]] - def emptyPublisher[T](): Publisher[T] = EmptyPublisher.asInstanceOf[Publisher[T]] + def emptyPublisher[T](): Publisher[T] = EmptyPublisher[T] /** * Subscribes the subscriber and signals error after the first request.