From b49746b0da49a2095fad9225229ccee8120b9f41 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Jun 2015 16:07:35 +0200 Subject: [PATCH] =str - Introduces a TCK-verified SingleElementPublisher to optimize Source.single and things depending on it --- .../tck/SingleElementPublisherTest.scala | 20 +++++++++++ .../stream/impl/CompletedPublishers.scala | 33 +++++++++++++++++-- .../scala/akka/stream/scaladsl/Source.scala | 14 +++----- 3 files changed, 56 insertions(+), 11 deletions(-) create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala new file mode 100644 index 0000000000..c1dc1de756 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.impl.SingleElementPublisher + +import scala.collection.immutable +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams._ + +class SingleElementPublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher) + } + + override def maxElementsFromPublisher(): Long = 1 +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index fbd22a79d1..87ab5d6f59 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -3,8 +3,7 @@ */ package akka.stream.impl -import org.reactivestreams.{ Subscriber, Publisher } -import org.reactivestreams.Subscription +import org.reactivestreams.{ Subscriber, Publisher, Subscription } /** * INTERNAL API @@ -40,6 +39,36 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend override def toString: String = name } +private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] { + import ReactiveStreamsCompliance._ + + private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription { + private[this] var done: Boolean = false + override def cancel(): Unit = done = true + + override def request(elements: Long): Unit = if (!done) { + if (elements < 1) rejectDueToNonPositiveDemand(subscriber) + done = true + try { + tryOnNext(subscriber, value) + tryOnComplete(subscriber) + } catch { + case _: SpecViolation ⇒ // TODO log? + } + } + } + + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = + try { + requireNonNullSubscriber(subscriber) + tryOnSubscribe(subscriber, new SingleElementSubscription(subscriber)) + } catch { + case _: SpecViolation ⇒ // nothing we can do + } + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] + override def toString: String = name +} + /** * INTERNAL API * This is only a legal subscription when it is immediately followed by diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 99551c371a..424557b665 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -192,11 +192,8 @@ object Source extends SourceApply { * Elements are pulled out of the iterator in accordance with the demand coming * from the downstream transformation steps. */ - def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] = { - apply(new immutable.Iterable[T] { - override def iterator: Iterator[T] = f() - }) - } + def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] = + apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() }) /** * A graph with the shape of a source logically is a source, this method makes @@ -214,9 +211,8 @@ object Source extends SourceApply { * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ - def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { - Source.single(()).mapConcat((_: Unit) ⇒ iterable).withAttributes(DefaultAttributes.iterableSource) - } + def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = + Source.single(iterable).mapConcat(identity).withAttributes(DefaultAttributes.iterableSource) /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -242,7 +238,7 @@ object Source extends SourceApply { * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, Unit] = - apply(SynchronousIterablePublisher(List(element), "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize + apply(SingleElementPublisher(element, "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize /** * Create a `Source` that will continually emit the given element.