squash some warnings

This commit is contained in:
Roland Kuhn 2016-01-17 16:37:45 +01:00
parent 3140e72265
commit e5baba2b29
42 changed files with 146 additions and 161 deletions

View file

@ -56,8 +56,7 @@ class HttpBenchmark {
def shutdown() = {
Await.ready(Http().shutdownAllConnectionPools(), 1.second)
binding.unbind()
system.terminate()
system.awaitTermination()
Await.result(system.terminate(), 5.seconds)
}
@Benchmark

View file

@ -1,6 +1,6 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
@ -10,7 +10,7 @@ import akka.stream.scaladsl._
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.duration._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@ -44,13 +44,12 @@ class FlatMapMergeBenchmark {
@TearDown
def shutdown() {
system.terminate()
system.awaitTermination()
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000) // Note: needs to match NumberOfElements.
def flat_map_merge_100k_elements() {
Await.result(graph.run(), Inf)
Await.result(graph.run(), Duration.Inf)
}
}

View file

@ -14,6 +14,8 @@ import scala.concurrent.Lock
import scala.util.Success
import akka.stream.impl.fusing.GraphStages
import org.reactivestreams._
import scala.concurrent.Await
import scala.concurrent.duration._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@ -110,8 +112,7 @@ class FlowMapBenchmark {
@TearDown
def shutdown() {
system.terminate()
system.awaitTermination()
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@ -133,5 +134,4 @@ class FlowMapBenchmark {
f
}
}

View file

@ -9,6 +9,8 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import org.openjdk.jmh.annotations._
import scala.concurrent.Await
import scala.concurrent.duration._
object MaterializationBenchmark {
@ -51,22 +53,23 @@ object MaterializationBenchmark {
flow
import GraphDSL.Implicits._
Source.single(()) ~> flow ~> Sink.ignore
ClosedShape
ClosedShape
})
}
val graphWithImportedFlowBuilder = (numOfFlows: Int) =>
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b source
import GraphDSL.Implicits._
val flow = Flow[Unit].map(identity)
var out: Outlet[Unit] = source.out
for (i <- 0 until numOfFlows) {
val flowShape = b.add(flow)
out ~> flowShape
out = flowShape.outlet
}
out ~> Sink.ignore
ClosedShape
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b
source
import GraphDSL.Implicits._
val flow = Flow[Unit].map(identity)
var out: Outlet[Unit] = source.out
for (i <- 0 until numOfFlows) {
val flowShape = b.add(flow)
out ~> flowShape
out = flowShape.outlet
}
out ~> Sink.ignore
ClosedShape
})
}
@ -97,8 +100,7 @@ class MaterializationBenchmark {
@TearDown
def shutdown() {
system.terminate()
system.awaitTermination()
Await.result(system.terminate(), 5.seconds)
}
@Benchmark

View file

@ -4,18 +4,18 @@
package akka.stream.io
import java.io.{FileInputStream, File}
import java.io.{ FileInputStream, File }
import java.util.concurrent.TimeUnit
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.{Attributes, ActorMaterializer}
import akka.stream.{ Attributes, ActorMaterializer }
import akka.stream.scaladsl._
import akka.util.ByteString
import org.openjdk.jmh.annotations._
import scala.concurrent.duration._
import scala.concurrent.{Promise, Await, Future}
import scala.concurrent.{ Promise, Await, Future }
/**
* Benchmark (bufSize) Mode Cnt Score Error Units
@ -64,8 +64,7 @@ class FileSourcesBenchmark {
@TearDown
def shutdown() {
system.terminate()
system.awaitTermination()
Await.result(system.terminate(), Duration.Inf)
}
@Benchmark
@ -102,5 +101,4 @@ class FileSourcesBenchmark {
Await.result(p.future, 30.seconds)
}
}

View file

@ -17,11 +17,11 @@ class ActorComponentConfigurationTest extends WordSpec with Matchers with Shared
val component: Component = camel.context.getComponent("akka")
"Endpoint url config should be correctly parsed" in {
val actorEndpointConfig = component.createEndpoint("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig]
val actorEndpointConfig = component.createEndpoint(s"akka://test/user/$$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig]
actorEndpointConfig should have(
'endpointUri("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos"),
'path(ActorEndpointPath.fromCamelPath("akka://test/user/$a")),
'endpointUri(s"akka://test/user/$$a?autoAck=false&replyTimeout=987000000+nanos"),
'path(ActorEndpointPath.fromCamelPath(s"akka://test/user/$$a")),
'autoAck(false),
'replyTimeout(987000000 nanos))
}

View file

@ -4,27 +4,13 @@
package akka.cluster.singleton;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.Member;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.MemberStatus;
public class ClusterSingletonManagerTest {
@SuppressWarnings("null")
public void demo() {
final ActorSystem system = null;
final ActorRef queue = null;

View file

@ -10,7 +10,6 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import com.typesafe.config.ConfigFactory;
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

View file

@ -14,7 +14,6 @@ import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actor.Messages.Swap.Swap;
import static docs.actor.Messages.*;
import static akka.japi.Util.immutableSeq;
import java.util.concurrent.TimeUnit;
@ -63,9 +62,8 @@ public class ActorDocTest {
}
@AfterClass
public static void afterClass() {
system.terminate();
system.awaitTermination(Duration.create("5 seconds"));
public static void afterClass() throws Exception {
Await.result(system.terminate(), Duration.create("5 seconds"));
}
static

View file

@ -13,6 +13,7 @@ import org.junit.Test;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import scala.concurrent.Await;
import java.util.concurrent.TimeUnit;
@ -26,9 +27,8 @@ public class InitializationDocTest {
}
@AfterClass
public static void afterClass() {
system.terminate();
system.awaitTermination(Duration.create("5 seconds"));
public static void afterClass() throws Exception {
Await.result(system.terminate(), Duration.create("5 seconds"));
}
public static class MessageInitExample extends AbstractActor {

View file

@ -36,7 +36,6 @@ public abstract class Util {
public static <T, U extends T> Source<U, scala.Unit> upcastSource(Source<T, scala.Unit> p) {
return (Source<U, scala.Unit>)(Object) p;
}
@SuppressWarnings("unchecked")
public static scala.collection.immutable.Map<String, String> convertMapToScala(Map<String, String> map) {
return emptyMap.$plus$plus(scala.collection.JavaConverters.mapAsScalaMapConverter(map).asScala());
}
@ -58,7 +57,6 @@ public abstract class Util {
public static <T, U extends T> Seq<U> convertIterable(Iterable<T> els) {
return scala.collection.JavaConverters.iterableAsScalaIterableConverter((Iterable<U>)els).asScala().toVector();
}
@SuppressWarnings("unchecked")
public static <T, U extends T> Seq<U> convertArray(T[] els) {
return Util.<T, U>convertIterable(Arrays.asList(els));
}

View file

@ -4,14 +4,12 @@
package akka.http.javadsl.model;
import java.nio.charset.Charset;
import akka.http.impl.model.JavaUri;
import akka.http.scaladsl.model.UriJavaAccessor;
import akka.japi.Pair;
import akka.parboiled2.ParserInput$;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**

View file

@ -108,7 +108,8 @@ private[http] object HttpServerBluePrint {
val entity = createEntity(entityCreator) withSizeLimit settings.parserSettings.maxContentLength
push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol))
case _ throw new IllegalStateException
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
}
setHandler(in, idle)

View file

@ -96,11 +96,15 @@ object WebsocketClientBlueprint {
parser.onPull() match {
case NeedMoreData ctx.pull()
case RemainingBytes(bytes) ctx.push(bytes)
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
case Left(problem)
result.success(InvalidUpgradeResponse(response, s"Websocket server at $uri returned $problem"))
ctx.fail(throw new IllegalArgumentException(s"Websocket upgrade did not finish because of '$problem'"))
}
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
}
}

View file

@ -26,6 +26,7 @@ import java.util.List;
public class WSEchoTestClientApp {
private static final Function<Message, String> messageStringifier = new Function<Message, String>() {
private static final long serialVersionUID = 1L;
@Override
public String apply(Message msg) throws Exception {
if (msg.isText() && msg.asTextMessage().isStrict())

View file

@ -86,14 +86,14 @@ public class JavaApiTestCases {
// just for the sake of explicitly touching the interfaces
if (mediaType.binary()) anything = (akka.http.javadsl.model.MediaType.Binary) mediaType;
if (1 == 2) anything = (akka.http.javadsl.model.MediaType.Multipart) mediaType;
if (1 == 2) anything = (akka.http.javadsl.model.MediaType.WithOpenCharset) mediaType;
if (1 == 2) anything = (akka.http.javadsl.model.MediaType.WithFixedCharset) mediaType;
anything = (akka.http.javadsl.model.MediaType.Multipart) mediaType;
anything = (akka.http.javadsl.model.MediaType.WithOpenCharset) mediaType;
anything = (akka.http.javadsl.model.MediaType.WithFixedCharset) mediaType;
if (type.binary()) anything = (akka.http.javadsl.model.ContentType.Binary) type;
if (1 == 2) anything = (akka.http.javadsl.model.ContentType.NonBinary) type;
if (1 == 2) anything = (akka.http.javadsl.model.ContentType.WithCharset) type;
if (1 == 2) anything = (akka.http.javadsl.model.ContentType.WithFixedCharset) type;
anything = (akka.http.javadsl.model.ContentType.NonBinary) type;
anything = (akka.http.javadsl.model.ContentType.WithCharset) type;
anything = (akka.http.javadsl.model.ContentType.WithFixedCharset) type;
return anything;
}

View file

@ -201,6 +201,8 @@ object WSClientAutobahnTest extends App {
val sink = Sink.head[Message]
runWs(uri, Flow.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.left)).flatMap {
case tm: TextMessage tm.textStream.runWith(Sink.fold("")(_ + _))
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
}
def runToSingleJsonValue[T: JsonReader](uri: Uri): Future[T] =

View file

@ -370,7 +370,7 @@ class UriSpec extends WordSpec with Matchers {
a[IllegalUriException] should be thrownBy Uri("foo/another@url/[]and{}", mode = Uri.ParsingMode.Strict)
// handle query parameters with more than percent-encoded character
Uri("?%7Ba%7D=$%7B%7D", UTF8, Uri.ParsingMode.Strict).query() shouldEqual Query.Cons("{a}", "${}", Query.Empty)
Uri("?%7Ba%7D=$%7B%7D", UTF8, Uri.ParsingMode.Strict).query() shouldEqual Query.Cons("{a}", s"$${}", Query.Empty)
// don't double decode
Uri("%2520").path.head shouldEqual "%20"

View file

@ -10,8 +10,8 @@ import akka.http.scaladsl.model.HttpResponse
import akka.stream.{ Materializer, ActorMaterializer }
import org.junit.rules.ExternalResource
import org.junit.{ Assert, Rule }
import scala.concurrent.duration._
import scala.concurrent.Await
/**
* A RouteTest that uses JUnit assertions. ActorSystem and Materializer are provided as an [[ExternalResource]]
@ -66,8 +66,7 @@ class ActorSystemResource extends ExternalResource {
_materializer = createMaterializer(_system)
}
override def after(): Unit = {
_system.terminate()
_system.awaitTermination(5.seconds)
Await.result(_system.terminate(), 5.seconds)
_system = null
_materializer = null
}

View file

@ -29,9 +29,8 @@ public class CodingDirectivesTest extends JUnitRouteTest {
}
@AfterClass
public static void tearDown() {
system.terminate();
system.awaitTermination();
public static void tearDown() throws Exception {
Await.result(system.terminate(), Duration.Inf());
system = null;
}

View file

@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import scala.concurrent.Await
class FormDataSpec extends WordSpec with Matchers with ScalaFutures with BeforeAndAfterAll {
implicit val system = ActorSystem(getClass.getSimpleName)
@ -36,7 +37,6 @@ class FormDataSpec extends WordSpec with Matchers with ScalaFutures with BeforeA
}
override def afterAll() = {
system.terminate()
system.awaitTermination(10.seconds)
Await.result(system.terminate(), 10.seconds)
}
}

View file

@ -9,6 +9,7 @@ import org.scalatest.{ Suite, BeforeAndAfterAll, Matchers }
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.util.ByteString
import scala.concurrent.Await
trait CodecSpecSupport extends Matchers with BeforeAndAfterAll { self: Suite
@ -73,7 +74,6 @@ est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscin
implicit val materializer = ActorMaterializer()
override def afterAll() = {
system.terminate()
system.awaitTermination(10.seconds)
Await.result(system.terminate(), 10.seconds)
}
}

View file

@ -95,8 +95,7 @@ class DontLeakActorsOnFailingConnectionSpecs extends WordSpecLike with Matchers
}
override def afterAll = {
system.terminate()
system.awaitTermination(3.seconds)
Await.result(system.terminate(), 3.seconds)
}
}

View file

@ -215,6 +215,8 @@ object Credentials {
new Credentials.Provided(token) {
def verify(secret: String): Boolean = secret secure_== token
}
case Some(GenericHttpCredentials(scheme, token, params))
throw new UnsupportedOperationException("cannot verify generic HTTP credentials")
case None Credentials.Missing
}
}
@ -250,4 +252,4 @@ trait AuthenticationDirective[T] extends Directive1[T] {
object AuthenticationDirective {
implicit def apply[T](other: Directive1[T]): AuthenticationDirective[T] =
new AuthenticationDirective[T] { def tapply(inner: Tuple1[T] Route) = other.tapply(inner) }
}
}

View file

@ -73,7 +73,6 @@ package akka.parboiled2.util;
import java.util.Arrays;
@SuppressWarnings({"UnnecessaryParentheses"})
public class Base64 {
// -------- FIELDS -------------------------------------------------------------------------------------------------

View file

@ -121,7 +121,7 @@ abstract class RemoteRestartedQuarantinedSpec
enterBarrier("still-quarantined")
system.awaitTermination(10.seconds)
Await.result(system.whenTerminated, 10.seconds)
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.retry-gate-closed-for = 0.5 s

View file

@ -38,8 +38,8 @@ class TestPublisherSubscriberSpec extends AkkaSpec {
upstreamSubscription.sendComplete()
downstream.expectEventPF {
case c @ OnComplete
case _ fail()
case OnComplete
case _ fail()
}
}

View file

@ -44,6 +44,7 @@ public class ActorPublisherTest extends StreamTest {
final Publisher<Integer> publisher = UntypedActorPublisher.create(ref);
Source.fromPublisher(publisher)
.runForeach(new akka.japi.function.Procedure<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Integer elem) throws Exception {
probe.getRef().tell(elem, ActorRef.noSender());

View file

@ -3,25 +3,25 @@
*/
package akka.stream.io;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.japi.function.Procedure;
import akka.stream.StreamTest;
import akka.stream.javadsl.*;
import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.Utils;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertEquals;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import org.junit.ClassRule;
import org.junit.Test;
import akka.actor.ActorRef;
import akka.japi.function.Procedure;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters;
import akka.stream.testkit.Utils;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import scala.concurrent.duration.FiniteDuration;
public class OutputStreamSourceTest extends StreamTest {
public OutputStreamSourceTest() {
@ -38,6 +38,7 @@ public class OutputStreamSourceTest extends StreamTest {
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout);
final OutputStream s = source.to(Sink.foreach(new Procedure<ByteString>() {
private static final long serialVersionUID = 1L;
public void apply(ByteString elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}

View file

@ -385,7 +385,6 @@ public class FlowTest extends StreamTest {
})).run(materializer);
List<Object> output = Arrays.asList(probe.receiveN(3));
@SuppressWarnings("unchecked")
List<Pair<String, Integer>> expected = Arrays.asList(new Pair<String, Integer>("A", 1), new Pair<String, Integer>(
"B", 2), new Pair<String, Integer>("C", 3));
assertEquals(expected, output);

View file

@ -450,6 +450,7 @@ public class SourceTest extends StreamTest {
final JavaTestKit probe = new JavaTestKit(system);
Source<String, Cancellable> tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS),
FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick");
@SuppressWarnings("unused")
Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());

View file

@ -61,8 +61,7 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender {
"report correctly if it has been shut down from the side" in {
val sys = ActorSystem()
val m = ActorMaterializer.create(sys)
sys.terminate()
sys.awaitTermination()
Await.result(sys.terminate(), Duration.Inf)
m.isShutdown should ===(true)
}
}

View file

@ -61,12 +61,12 @@ class DslConsistencySpec extends WordSpec with Matchers {
"Java and Scala DSLs" must {
("Source" -> List(sSourceClass, jSourceClass)) ::
("SubSource" -> List(sSubSourceClass, jSubSourceClass)) ::
("Flow" -> List(sFlowClass, jFlowClass)) ::
("SubFlow" -> List(sSubFlowClass, jSubFlowClass)) ::
("Sink" -> List(sSinkClass, jSinkClass)) ::
("RunanbleFlow" -> List(sRunnableGraphClass, jRunnableGraphClass)) ::
("Source" -> List[Class[_]](sSourceClass, jSourceClass)) ::
("SubSource" -> List[Class[_]](sSubSourceClass, jSubSourceClass)) ::
("Flow" -> List[Class[_]](sFlowClass, jFlowClass)) ::
("SubFlow" -> List[Class[_]](sSubFlowClass, jSubFlowClass)) ::
("Sink" -> List[Class[_]](sSinkClass, jSinkClass)) ::
("RunanbleFlow" -> List[Class[_]](sRunnableGraphClass, jRunnableGraphClass)) ::
Nil foreach {
case (element, classes)

View file

@ -25,13 +25,13 @@ object ActorSubscriberSpec {
override val requestStrategy = ZeroRequestStrategy
def receive = {
case next @ OnNext(elem) probe ! next
case complete @ OnComplete probe ! complete
case err @ OnError(cause) probe ! err
case "ready" request(elements = 2)
case "boom" throw new RuntimeException("boom") with NoStackTrace
case "requestAndCancel" { request(1); cancel() }
case "cancel" cancel()
case next @ OnNext(elem) probe ! next
case OnComplete probe ! OnComplete
case err @ OnError(cause) probe ! err
case "ready" request(elements = 2)
case "boom" throw new RuntimeException("boom") with NoStackTrace
case "requestAndCancel" { request(1); cancel() }
case "cancel" cancel()
}
}
@ -55,8 +55,8 @@ object ActorSubscriberSpec {
override val requestStrategy = strat
def receive = {
case next @ OnNext(elem) probe ! next
case complete @ OnComplete probe ! complete
case next @ OnNext(elem) probe ! next
case OnComplete probe ! OnComplete
}
}

View file

@ -368,8 +368,8 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
events
}
class UpstreamOneBoundedProbe[T] extends UpstreamBoundaryStageLogic[T] {
val out = Outlet[T]("out")
class UpstreamOneBoundedProbe[TT] extends UpstreamBoundaryStageLogic[TT] {
val out = Outlet[TT]("out")
out.id = 0
setHandler(out, new OutHandler {
@ -381,7 +381,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
override def onDownstreamFinish(): Unit = lastEvent += Cancel
})
def onNext(elem: T): Unit = {
def onNext(elem: TT): Unit = {
push(out, elem)
run()
}
@ -390,7 +390,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
run()
}
def onNextAndComplete(elem: T): Unit = {
def onNextAndComplete(elem: TT): Unit = {
push(out, elem)
complete(out)
run()
@ -402,8 +402,8 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
}
}
class DownstreamOneBoundedPortProbe[T] extends DownstreamBoundaryStageLogic[T] {
val in = Inlet[T]("in")
class DownstreamOneBoundedPortProbe[TT] extends DownstreamBoundaryStageLogic[TT] {
val in = Inlet[TT]("in")
in.id = 0
setHandler(in, new InHandler {

View file

@ -392,7 +392,7 @@ class TcpSpec extends AkkaSpec("akka.stream.materializer.subscription-timeout.ti
val result = Source.maybe[ByteString].via(Tcp(system2).outgoingConnection(serverAddress)).runFold(0)(_ + _.size)(mat2)
// Getting rid of existing connection actors by using a blunt instrument
system2.actorSelection(akka.io.Tcp(system2).getManager.path / "selectors" / "$a" / "*") ! Kill
system2.actorSelection(akka.io.Tcp(system2).getManager.path / "selectors" / s"$$a" / "*") ! Kill
a[StreamTcpException] should be thrownBy
Await.result(result, 3.seconds)

View file

@ -8,7 +8,7 @@ import akka.stream.testkit.AkkaSpec
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
class FlowDispatcherSpec extends AkkaSpec("my-dispatcher = ${akka.test.stream-dispatcher}") {
class FlowDispatcherSpec extends AkkaSpec(s"my-dispatcher = $${akka.test.stream-dispatcher}") {
val defaultSettings = ActorMaterializerSettings(system)

View file

@ -72,7 +72,7 @@ private[akka] class GroupByProcessorImpl(settings: ActorMaterializerSettings, va
if (keyToSubstreamOutput.size == maxSubstreams)
throw new IllegalStateException(s"cannot open substream for key '$key': too many substreams open")
val substreamOutput = createSubstreamOutput()
val substreamFlow = Source.fromPublisher(substreamOutput)
val substreamFlow = Source.fromPublisher[Any](substreamOutput)
primaryOutputs.enqueueOutputElement(substreamFlow)
keyToSubstreamOutput(key) = substreamOutput
nextPhase(dispatchToSubstream(elem, substreamOutput))

View file

@ -504,6 +504,7 @@ private[stream] object Fusing {
private def removeMapping[T](orig: T, map: ju.Map[T, List[T]]): T =
map.remove(orig) match {
case null null.asInstanceOf[T]
case Nil throw new IllegalStateException("mappings corrupted")
case x :: Nil x
case x :: xs
map.put(orig, xs)

View file

@ -11,9 +11,7 @@ import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.stage._
import akka.stream.scaladsl._
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.ActorSubscriberMessage._
import akka.stream.actor.ActorPublisherMessage
import akka.stream.actor.ActorPublisherMessage._
import java.{ util ju }
import scala.collection.immutable
import scala.concurrent._
@ -375,7 +373,9 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean) extends Gr
* INTERNAL API
*/
object SubSink {
val RequestOne = Request(1) // No need to frivolously allocate these
sealed trait Command
case object RequestOne extends Command
case object Cancel extends Command
}
/**
@ -390,30 +390,30 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage
override def initialAttributes = Attributes.name(s"SubSink($name)")
override val shape = SinkShape(in)
val status = new AtomicReference[AnyRef]
private val status = new AtomicReference[AnyRef]
def pullSubstream(): Unit = status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(RequestOne)
case null
if (!status.compareAndSet(null, RequestOne))
status.get.asInstanceOf[ActorPublisherMessage Unit](RequestOne)
status.get.asInstanceOf[Command Unit](RequestOne)
}
def cancelSubstream(): Unit = status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(Cancel)
case x // a potential RequestOne is overwritten
if (!status.compareAndSet(x, Cancel))
status.get.asInstanceOf[ActorPublisherMessage Unit](Cancel)
status.get.asInstanceOf[Command Unit](Cancel)
}
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler {
setHandler(in, this)
override def onPush(): Unit = externalCallback(OnNext(grab(in)))
override def onUpstreamFinish(): Unit = externalCallback(OnComplete)
override def onUpstreamFailure(ex: Throwable): Unit = externalCallback(OnError(ex))
override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in)))
override def onUpstreamFinish(): Unit = externalCallback(ActorSubscriberMessage.OnComplete)
override def onUpstreamFailure(ex: Throwable): Unit = externalCallback(ActorSubscriberMessage.OnError(ex))
@tailrec private def setCB(cb: AsyncCallback[ActorPublisherMessage]): Unit = {
@tailrec private def setCB(cb: AsyncCallback[Command]): Unit = {
status.get match {
case null
if (!status.compareAndSet(null, cb)) setCB(cb)
@ -429,7 +429,7 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage
}
override def preStart(): Unit = {
val ourOwnCallback = getAsyncCallback[ActorPublisherMessage] {
val ourOwnCallback = getAsyncCallback[Command] {
case RequestOne tryPull(in)
case Cancel completeStage()
case _ throw new IllegalStateException("Bug")
@ -452,7 +452,7 @@ object SubSource {
private[akka] def kill[T, M](s: Source[T, M]): Unit = {
s.module match {
case GraphStageModule(_, _, stage: SubSource[_])
stage.externalCallback.invoke(Cancel)
stage.externalCallback.invoke(SubSink.Cancel)
case pub: PublisherSource[_]
pub.create(null)._1.subscribe(new CancellingSubscriber)
case m
@ -467,7 +467,7 @@ object SubSource {
/**
* INTERNAL API
*/
final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[ActorPublisherMessage])
final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[SubSink.Command])
extends GraphStage[SourceShape[T]] {
import SubSink._
@ -475,48 +475,48 @@ final class SubSource[T](name: String, private[fusing] val externalCallback: Asy
override def initialAttributes = Attributes.name(s"SubSource($name)")
override val shape: SourceShape[T] = SourceShape(out)
val status = new AtomicReference[AnyRef]
private val status = new AtomicReference[AnyRef]
def pushSubstream(elem: T): Unit = status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(OnNext(elem))
case f: AsyncCallback[Any] @unchecked f.invoke(ActorSubscriberMessage.OnNext(elem))
case _ throw new IllegalStateException("cannot push to uninitialized substream")
}
def completeSubstream(): Unit = status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(OnComplete)
case f: AsyncCallback[Any] @unchecked f.invoke(ActorSubscriberMessage.OnComplete)
case null
if (!status.compareAndSet(null, OnComplete))
status.get.asInstanceOf[AsyncCallback[Any]].invoke(OnComplete)
if (!status.compareAndSet(null, ActorSubscriberMessage.OnComplete))
status.get.asInstanceOf[AsyncCallback[Any]].invoke(ActorSubscriberMessage.OnComplete)
}
def failSubstream(ex: Throwable): Unit = status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(OnError(ex))
case f: AsyncCallback[Any] @unchecked f.invoke(ActorSubscriberMessage.OnError(ex))
case null
val failure = OnError(ex)
val failure = ActorSubscriberMessage.OnError(ex)
if (!status.compareAndSet(null, failure))
status.get.asInstanceOf[AsyncCallback[Any]].invoke(failure)
}
def timeout(d: FiniteDuration): Boolean =
status.compareAndSet(null, OnError(new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d")))
status.compareAndSet(null, ActorSubscriberMessage.OnError(new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d")))
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
setHandler(out, this)
@tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = {
status.get match {
case null if (!status.compareAndSet(null, cb)) setCB(cb)
case OnComplete completeStage()
case OnError(ex) failStage(ex)
case _: AsyncCallback[_] failStage(new IllegalStateException("Substream Source cannot be materialized more than once"))
case null if (!status.compareAndSet(null, cb)) setCB(cb)
case ActorSubscriberMessage.OnComplete completeStage()
case ActorSubscriberMessage.OnError(ex) failStage(ex)
case _: AsyncCallback[_] failStage(new IllegalStateException("Substream Source cannot be materialized more than once"))
}
}
override def preStart(): Unit = {
val ourOwnCallback = getAsyncCallback[ActorSubscriberMessage] {
case OnComplete completeStage()
case OnError(ex) failStage(ex)
case OnNext(elem) push(out, elem.asInstanceOf[T])
case ActorSubscriberMessage.OnComplete completeStage()
case ActorSubscriberMessage.OnError(ex) failStage(ex)
case ActorSubscriberMessage.OnNext(elem) push(out, elem.asInstanceOf[T])
}
setCB(ourOwnCallback)
}

View file

@ -1014,13 +1014,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
private var available = false
private var closed = false
private val callback = getAsyncCallback[ActorPublisherMessage] {
private val callback = getAsyncCallback[SubSink.Command] {
case SubSink.RequestOne
if (!closed) {
available = true
handler.onPull()
}
case ActorPublisherMessage.Cancel
case SubSink.Cancel
if (!closed) {
available = false
closed = true

View file

@ -558,7 +558,7 @@ public class JavaTestKit {
this(clazz, max, Duration.Inf(), messages);
}
@SuppressWarnings("unchecked")
@SuppressWarnings("all")
public ReceiveWhile(Class<T> clazz, Duration max, Duration idle, int messages) {
results = p.receiveWhile(max, idle, messages, new CachingPartialFunction<Object, T>() {
public T match(Object msg) throws Exception {