Merge pull request #16404 from ktoso/str-verify-headsubscriber-ktoso

+str #15804 blackbox verification for head subscriber
This commit is contained in:
Konrad Malawski 2014-12-01 12:25:31 +01:00
commit dde86bc764
3 changed files with 34 additions and 10 deletions

View file

@ -38,7 +38,7 @@ object ActorPublisherTest {
}
class ActorPublisherTest extends AkkaPublisherVerification[Int](true) {
class ActorPublisherTest extends AkkaPublisherVerification[Int] {
override def createPublisher(elements: Long): Publisher[Int] = {
val ref = system.actorOf(Props(classOf[TestPublisher], elements).withDispatcher("akka.test.stream-dispatcher"))

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl._
import org.reactivestreams.Subscriber
import scala.concurrent.Promise
class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
import HeadSink._
override def createSubscriber(): Subscriber[Int] =
new HeadSinkSubscriber[Int](Promise[Int]())
override def createHelperPublisher(elements: Long) =
createSimpleIntPublisher(elements)
}

View file

@ -96,6 +96,19 @@ final case class FanoutPublisherSink[In](initialBufferSize: Int, maximumBufferSi
object HeadSink {
def apply[T](): HeadSink[T] = new HeadSink[T]
/** INTERNAL API */
private[akka] class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] {
private val sub = new AtomicReference[Subscription]
override def onSubscribe(s: Subscription): Unit =
if (!sub.compareAndSet(null, s)) s.cancel()
else s.request(1)
override def onNext(t: In): Unit = { p.trySuccess(t); sub.get.cancel() }
override def onError(t: Throwable): Unit = p.tryFailure(t)
override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream"))
}
}
/**
@ -117,15 +130,7 @@ class HeadSink[In] extends KeyedActorFlowSink[In] {
override def isActive = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val p = Promise[In]()
val sub = new Subscriber[In] { // TODO #15804 verify this using the RS TCK
private val sub = new AtomicReference[Subscription]
override def onSubscribe(s: Subscription): Unit =
if (!sub.compareAndSet(null, s)) s.cancel()
else s.request(1)
override def onNext(t: In): Unit = { p.trySuccess(t); sub.get.cancel() }
override def onError(t: Throwable): Unit = p.tryFailure(t)
override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream"))
}
val sub = new HeadSink.HeadSinkSubscriber[In](p)
(sub, p.future)
}