=str - Introduces a TCK-verified SingleElementPublisher to optimize Source.single and things depending on it

This commit is contained in:
Viktor Klang 2015-06-06 16:07:35 +02:00
parent 8527e0347e
commit b49746b0da
3 changed files with 56 additions and 11 deletions

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.impl.SingleElementPublisher
import scala.collection.immutable
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams._
class SingleElementPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher)
}
override def maxElementsFromPublisher(): Long = 1
}

View file

@ -3,8 +3,7 @@
*/
package akka.stream.impl
import org.reactivestreams.{ Subscriber, Publisher }
import org.reactivestreams.Subscription
import org.reactivestreams.{ Subscriber, Publisher, Subscription }
/**
* INTERNAL API
@ -40,6 +39,36 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend
override def toString: String = name
}
private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] {
import ReactiveStreamsCompliance._
private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription {
private[this] var done: Boolean = false
override def cancel(): Unit = done = true
override def request(elements: Long): Unit = if (!done) {
if (elements < 1) rejectDueToNonPositiveDemand(subscriber)
done = true
try {
tryOnNext(subscriber, value)
tryOnComplete(subscriber)
} catch {
case _: SpecViolation // TODO log?
}
}
}
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
try {
requireNonNullSubscriber(subscriber)
tryOnSubscribe(subscriber, new SingleElementSubscription(subscriber))
} catch {
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = name
}
/**
* INTERNAL API
* This is only a legal subscription when it is immediately followed by

View file

@ -192,11 +192,8 @@ object Source extends SourceApply {
* Elements are pulled out of the iterator in accordance with the demand coming
* from the downstream transformation steps.
*/
def apply[T](f: () Iterator[T]): Source[T, Unit] = {
apply(new immutable.Iterable[T] {
override def iterator: Iterator[T] = f()
})
}
def apply[T](f: () Iterator[T]): Source[T, Unit] =
apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() })
/**
* A graph with the shape of a source logically is a source, this method makes
@ -214,9 +211,8 @@ object Source extends SourceApply {
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = {
Source.single(()).mapConcat((_: Unit) iterable).withAttributes(DefaultAttributes.iterableSource)
}
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] =
Source.single(iterable).mapConcat(identity).withAttributes(DefaultAttributes.iterableSource)
/**
* Start a new `Source` from the given `Future`. The stream will consist of
@ -242,7 +238,7 @@ object Source extends SourceApply {
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def single[T](element: T): Source[T, Unit] =
apply(SynchronousIterablePublisher(List(element), "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize
apply(SingleElementPublisher(element, "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize
/**
* Create a `Source` that will continually emit the given element.