diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala index b9b3737f21..50ec8eff52 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorPublisherTest.scala @@ -38,7 +38,7 @@ object ActorPublisherTest { } -class ActorPublisherTest extends AkkaPublisherVerification[Int](true) { +class ActorPublisherTest extends AkkaPublisherVerification[Int] { override def createPublisher(elements: Long): Publisher[Int] = { val ref = system.actorOf(Props(classOf[TestPublisher], elements).withDispatcher("akka.test.stream-dispatcher")) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala new file mode 100644 index 0000000000..e01bb69247 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl._ +import org.reactivestreams.Subscriber + +import scala.concurrent.Promise + +class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { + import HeadSink._ + + override def createSubscriber(): Subscriber[Int] = + new HeadSinkSubscriber[Int](Promise[Int]()) + + override def createHelperPublisher(elements: Long) = + createSimpleIntPublisher(elements) +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index fdd33d4c62..2c36723d9f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -96,6 +96,19 @@ final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSi object HeadSink { def apply[T](): HeadSink[T] = new HeadSink[T] + + /** INTERNAL API */ + private[akka] class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] { + private val sub = new AtomicReference[Subscription] + override def onSubscribe(s: Subscription): Unit = + if (!sub.compareAndSet(null, s)) s.cancel() + else s.request(1) + + override def onNext(t: In): Unit = { p.trySuccess(t); sub.get.cancel() } + override def onError(t: Throwable): Unit = p.tryFailure(t) + override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream")) + } + } /** @@ -117,15 +130,7 @@ class HeadSink[In] extends KeyedActorFlowSink[In] { override def isActive = true override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { val p = Promise[In]() - val sub = new Subscriber[In] { // TODO #15804 verify this using the RS TCK - private val sub = new AtomicReference[Subscription] - override def onSubscribe(s: Subscription): Unit = - if (!sub.compareAndSet(null, s)) s.cancel() - else s.request(1) - override def onNext(t: In): Unit = { p.trySuccess(t); sub.get.cancel() } - override def onError(t: Throwable): Unit = p.tryFailure(t) - override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream")) - } + val sub = new HeadSink.HeadSinkSubscriber[In](p) (sub, p.future) }