diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala index 28802d1918..981d407472 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala @@ -9,12 +9,21 @@ import org.scalacheck.Arbitrary._ import org.scalacheck.Prop._ import org.scalacheck.Gen._ +import scala.collection.mutable.Builder + +import java.nio.{ ByteBuffer, ShortBuffer, IntBuffer, FloatBuffer, DoubleBuffer } +import java.nio.ByteOrder, ByteOrder.{ BIG_ENDIAN, LITTLE_ENDIAN } +import java.lang.Float.floatToRawIntBits +import java.lang.Double.doubleToRawLongBits + class ByteStringSpec extends WordSpec with MustMatchers with Checkers { def genSimpleByteString(min: Int, max: Int) = for { n ← choose(min, max) b ← Gen.containerOfN[Array, Byte](n, arbitrary[Byte]) - } yield ByteString(b) + from ← choose(0, b.length) + until ← choose(from, b.length) + } yield ByteString(b).slice(from, until) implicit val arbitraryByteString: Arbitrary[ByteString] = Arbitrary { Gen.sized { s ⇒ @@ -25,14 +34,483 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers { } } + type ByteStringSlice = (ByteString, Int, Int) + + implicit val arbitraryByteStringSlice: Arbitrary[ByteStringSlice] = Arbitrary { + for { + xs ← arbitraryByteString.arbitrary + from ← choose(0, xs.length) + until ← choose(from, xs.length) + } yield (xs, from, until) + } + + type ArraySlice[A] = (Array[A], Int, Int) + + def arbSlice[A](arbArray: Arbitrary[Array[A]]): Arbitrary[ArraySlice[A]] = Arbitrary { + for { + xs ← arbArray.arbitrary + from ← choose(0, xs.length) + until ← choose(from, xs.length) + } yield (xs, from, until) + } + + val arbitraryByteArray: Arbitrary[Array[Byte]] = Arbitrary { Gen.sized { n ⇒ Gen.containerOfN[Array, Byte](n, arbitrary[Byte]) } } + implicit val arbitraryByteArraySlice: Arbitrary[ArraySlice[Byte]] = arbSlice(arbitraryByteArray) + val arbitraryShortArray: Arbitrary[Array[Short]] = Arbitrary { Gen.sized { n ⇒ Gen.containerOfN[Array, Short](n, arbitrary[Short]) } } + implicit val arbitraryShortArraySlice: Arbitrary[ArraySlice[Short]] = arbSlice(arbitraryShortArray) + val arbitraryIntArray: Arbitrary[Array[Int]] = Arbitrary { Gen.sized { n ⇒ Gen.containerOfN[Array, Int](n, arbitrary[Int]) } } + implicit val arbitraryIntArraySlice: Arbitrary[ArraySlice[Int]] = arbSlice(arbitraryIntArray) + val arbitraryLongArray: Arbitrary[Array[Long]] = Arbitrary { Gen.sized { n ⇒ Gen.containerOfN[Array, Long](n, arbitrary[Long]) } } + implicit val arbitraryLongArraySlice: Arbitrary[ArraySlice[Long]] = arbSlice(arbitraryLongArray) + val arbitraryFloatArray: Arbitrary[Array[Float]] = Arbitrary { Gen.sized { n ⇒ Gen.containerOfN[Array, Float](n, arbitrary[Float]) } } + implicit val arbitraryFloatArraySlice: Arbitrary[ArraySlice[Float]] = arbSlice(arbitraryFloatArray) + val arbitraryDoubleArray: Arbitrary[Array[Double]] = Arbitrary { Gen.sized { n ⇒ Gen.containerOfN[Array, Double](n, arbitrary[Double]) } } + implicit val arbitraryDoubleArraySlice: Arbitrary[ArraySlice[Double]] = arbSlice(arbitraryDoubleArray) + + def likeVector(bs: ByteString)(body: IndexedSeq[Byte] ⇒ Any): Boolean = { + val vec = Vector(bs: _*) + body(bs) == body(vec) + } + + def likeVectors(bsA: ByteString, bsB: ByteString)(body: (IndexedSeq[Byte], IndexedSeq[Byte]) ⇒ Any): Boolean = { + val vecA = Vector(bsA: _*) + val vecB = Vector(bsB: _*) + body(bsA, bsB) == body(vecA, vecB) + } + + def likeVecIt(bs: ByteString)(body: BufferedIterator[Byte] ⇒ Any, strict: Boolean = true): Boolean = { + val bsIterator = bs.iterator + val vecIterator = Vector(bs: _*).iterator.buffered + (body(bsIterator) == body(vecIterator)) && + (!strict || (bsIterator.toSeq == vecIterator.toSeq)) + } + + def likeVecIts(a: ByteString, b: ByteString)(body: (BufferedIterator[Byte], BufferedIterator[Byte]) ⇒ Any, strict: Boolean = true): Boolean = { + val (bsAIt, bsBIt) = (a.iterator, b.iterator) + val (vecAIt, vecBIt) = (Vector(a: _*).iterator.buffered, Vector(b: _*).iterator.buffered) + (body(bsAIt, bsBIt) == body(vecAIt, vecBIt)) && + (!strict || (bsAIt.toSeq, bsBIt.toSeq) == (vecAIt.toSeq, vecBIt.toSeq)) + } + + def likeVecBld(body: Builder[Byte, _] ⇒ Unit): Boolean = { + val bsBuilder = ByteString.newBuilder + val vecBuilder = Vector.newBuilder[Byte] + + body(bsBuilder) + body(vecBuilder) + + bsBuilder.result == vecBuilder.result + } + + def testShortDecoding(slice: ByteStringSlice, byteOrder: ByteOrder): Boolean = { + val elemSize = 2 + val (bytes, from, until) = slice + val (n, a, b) = (bytes.length / elemSize, from / elemSize, until / elemSize) + val reference = Array.ofDim[Short](n) + bytes.asByteBuffer.order(byteOrder).asShortBuffer.get(reference, 0, n) + val input = bytes.iterator + val decoded = Array.ofDim[Short](n) + for (i ← 0 to a - 1) decoded(i) = input.getShort(byteOrder) + input.getShorts(decoded, a, b - a)(byteOrder) + for (i ← b to n - 1) decoded(i) = input.getShort(byteOrder) + (decoded.toSeq == reference.toSeq) && (input.toSeq == bytes.drop(n * elemSize)) + } + + def testIntDecoding(slice: ByteStringSlice, byteOrder: ByteOrder): Boolean = { + val elemSize = 4 + val (bytes, from, until) = slice + val (n, a, b) = (bytes.length / elemSize, from / elemSize, until / elemSize) + val reference = Array.ofDim[Int](n) + bytes.asByteBuffer.order(byteOrder).asIntBuffer.get(reference, 0, n) + val input = bytes.iterator + val decoded = Array.ofDim[Int](n) + for (i ← 0 to a - 1) decoded(i) = input.getInt(byteOrder) + input.getInts(decoded, a, b - a)(byteOrder) + for (i ← b to n - 1) decoded(i) = input.getInt(byteOrder) + (decoded.toSeq == reference.toSeq) && (input.toSeq == bytes.drop(n * elemSize)) + } + + def testLongDecoding(slice: ByteStringSlice, byteOrder: ByteOrder): Boolean = { + val elemSize = 8 + val (bytes, from, until) = slice + val (n, a, b) = (bytes.length / elemSize, from / elemSize, until / elemSize) + val reference = Array.ofDim[Long](n) + bytes.asByteBuffer.order(byteOrder).asLongBuffer.get(reference, 0, n) + val input = bytes.iterator + val decoded = Array.ofDim[Long](n) + for (i ← 0 to a - 1) decoded(i) = input.getLong(byteOrder) + input.getLongs(decoded, a, b - a)(byteOrder) + for (i ← b to n - 1) decoded(i) = input.getLong(byteOrder) + (decoded.toSeq == reference.toSeq) && (input.toSeq == bytes.drop(n * elemSize)) + } + + def testFloatDecoding(slice: ByteStringSlice, byteOrder: ByteOrder): Boolean = { + val elemSize = 4 + val (bytes, from, until) = slice + val (n, a, b) = (bytes.length / elemSize, from / elemSize, until / elemSize) + val reference = Array.ofDim[Float](n) + bytes.asByteBuffer.order(byteOrder).asFloatBuffer.get(reference, 0, n) + val input = bytes.iterator + val decoded = Array.ofDim[Float](n) + for (i ← 0 to a - 1) decoded(i) = input.getFloat(byteOrder) + input.getFloats(decoded, a, b - a)(byteOrder) + for (i ← b to n - 1) decoded(i) = input.getFloat(byteOrder) + ((decoded.toSeq map floatToRawIntBits) == (reference.toSeq map floatToRawIntBits)) && + (input.toSeq == bytes.drop(n * elemSize)) + } + + def testDoubleDecoding(slice: ByteStringSlice, byteOrder: ByteOrder): Boolean = { + val elemSize = 8 + val (bytes, from, until) = slice + val (n, a, b) = (bytes.length / elemSize, from / elemSize, until / elemSize) + val reference = Array.ofDim[Double](n) + bytes.asByteBuffer.order(byteOrder).asDoubleBuffer.get(reference, 0, n) + val input = bytes.iterator + val decoded = Array.ofDim[Double](n) + for (i ← 0 to a - 1) decoded(i) = input.getDouble(byteOrder) + input.getDoubles(decoded, a, b - a)(byteOrder) + for (i ← b to n - 1) decoded(i) = input.getDouble(byteOrder) + ((decoded.toSeq map doubleToRawLongBits) == (reference.toSeq map doubleToRawLongBits)) && + (input.toSeq == bytes.drop(n * elemSize)) + } + + def testShortEncoding(slice: ArraySlice[Short], byteOrder: ByteOrder): Boolean = { + val elemSize = 2 + val (data, from, until) = slice + val reference = Array.ofDim[Byte](data.length * elemSize) + ByteBuffer.wrap(reference).order(byteOrder).asShortBuffer.put(data) + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.putShort(data(i))(byteOrder) + builder.putShorts(data, from, until - from)(byteOrder) + for (i ← until to data.length - 1) builder.putShort(data(i))(byteOrder) + reference.toSeq == builder.result + } + + def testIntEncoding(slice: ArraySlice[Int], byteOrder: ByteOrder): Boolean = { + val elemSize = 4 + val (data, from, until) = slice + val reference = Array.ofDim[Byte](data.length * elemSize) + ByteBuffer.wrap(reference).order(byteOrder).asIntBuffer.put(data) + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.putInt(data(i))(byteOrder) + builder.putInts(data, from, until - from)(byteOrder) + for (i ← until to data.length - 1) builder.putInt(data(i))(byteOrder) + reference.toSeq == builder.result + } + + def testLongEncoding(slice: ArraySlice[Long], byteOrder: ByteOrder): Boolean = { + val elemSize = 8 + val (data, from, until) = slice + val reference = Array.ofDim[Byte](data.length * elemSize) + ByteBuffer.wrap(reference).order(byteOrder).asLongBuffer.put(data) + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.putLong(data(i))(byteOrder) + builder.putLongs(data, from, until - from)(byteOrder) + for (i ← until to data.length - 1) builder.putLong(data(i))(byteOrder) + reference.toSeq == builder.result + } + + def testFloatEncoding(slice: ArraySlice[Float], byteOrder: ByteOrder): Boolean = { + val elemSize = 4 + val (data, from, until) = slice + val reference = Array.ofDim[Byte](data.length * elemSize) + ByteBuffer.wrap(reference).order(byteOrder).asFloatBuffer.put(data) + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.putFloat(data(i))(byteOrder) + builder.putFloats(data, from, until - from)(byteOrder) + for (i ← until to data.length - 1) builder.putFloat(data(i))(byteOrder) + reference.toSeq == builder.result + } + + def testDoubleEncoding(slice: ArraySlice[Double], byteOrder: ByteOrder): Boolean = { + val elemSize = 8 + val (data, from, until) = slice + val reference = Array.ofDim[Byte](data.length * elemSize) + ByteBuffer.wrap(reference).order(byteOrder).asDoubleBuffer.put(data) + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.putDouble(data(i))(byteOrder) + builder.putDoubles(data, from, until - from)(byteOrder) + for (i ← until to data.length - 1) builder.putDouble(data(i))(byteOrder) + reference.toSeq == builder.result + } + "A ByteString" must { "have correct size" when { "concatenating" in { check((a: ByteString, b: ByteString) ⇒ (a ++ b).size == a.size + b.size) } "dropping" in { check((a: ByteString, b: ByteString) ⇒ (a ++ b).drop(b.size).size == a.size) } } + "be sequential" when { "taking" in { check((a: ByteString, b: ByteString) ⇒ (a ++ b).take(a.size) == a) } "dropping" in { check((a: ByteString, b: ByteString) ⇒ (a ++ b).drop(a.size) == b) } } + + "be equal to the original" when { + "compacting" in { check { xs: ByteString ⇒ val ys = xs.compact; (xs == ys) && ys.isCompact } } + "recombining" in { + check { (xs: ByteString, from: Int, until: Int) ⇒ + val (tmp, c) = xs.splitAt(until) + val (a, b) = tmp.splitAt(from) + (a ++ b ++ c) == xs + } + } + } + + "behave as expected" when { + "created from and decoding to String" in { check { s: String ⇒ ByteString(s, "UTF-8").decodeString("UTF-8") == s } } + + "compacting" in { + check { a: ByteString ⇒ + val wasCompact = a.isCompact + val b = a.compact + ((!wasCompact) || (b eq a)) && + (b == a) && + b.isCompact && + (b.compact eq b) + } + } + } + "behave like a Vector" when { + "concatenating" in { check { (a: ByteString, b: ByteString) ⇒ likeVectors(a, b) { (a, b) ⇒ (a ++ b) } } } + + "calling apply" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, i1, i2) ⇒ likeVector(xs) { seq ⇒ + (if ((i1 >= 0) && (i1 < seq.length)) seq(i1) else 0, + if ((i2 >= 0) && (i2 < seq.length)) seq(i2) else 0) + } + } + } + } + + "calling head" in { check { a: ByteString ⇒ a.isEmpty || likeVector(a) { _.head } } } + "calling tail" in { check { a: ByteString ⇒ a.isEmpty || likeVector(a) { _.tail } } } + "calling last" in { check { a: ByteString ⇒ a.isEmpty || likeVector(a) { _.last } } } + "calling init" in { check { a: ByteString ⇒ a.isEmpty || likeVector(a) { _.init } } } + "calling length" in { check { a: ByteString ⇒ likeVector(a) { _.length } } } + + "calling span" in { check { (a: ByteString, b: Byte) ⇒ likeVector(a)({ _.span(_ != b) match { case (a, b) ⇒ (a, b) } }) } } + + "calling takeWhile" in { check { (a: ByteString, b: Byte) ⇒ likeVector(a)({ _.takeWhile(_ != b) }) } } + "calling dropWhile" in { check { (a: ByteString, b: Byte) ⇒ likeVector(a) { _.dropWhile(_ != b) } } } + "calling indexWhere" in { check { (a: ByteString, b: Byte) ⇒ likeVector(a) { _.indexWhere(_ == b) } } } + "calling indexOf" in { check { (a: ByteString, b: Byte) ⇒ likeVector(a) { _.indexOf(b) } } } + "calling foreach" in { check { a: ByteString ⇒ likeVector(a) { it ⇒ var acc = 0; it foreach { acc += _ }; acc } } } + "calling foldLeft" in { check { a: ByteString ⇒ likeVector(a) { _.foldLeft(0) { _ + _ } } } } + "calling toArray" in { check { a: ByteString ⇒ likeVector(a) { _.toArray.toSeq } } } + + "calling slice" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVector(xs)({ + _.slice(from, until) + }) + } + } + } + + "calling take and drop" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVector(xs)({ + _.drop(from).take(until - from) + }) + } + } + } + + "calling copyToArray" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVector(xs)({ it ⇒ + val array = Array.ofDim[Byte](xs.length) + it.slice(from, until).copyToArray(array, from, until) + array.toSeq + }) + } + } + } + } } -} \ No newline at end of file + + "A ByteStringIterator" must { + "behave like a buffered Vector Iterator" when { + "concatenating" in { check { (a: ByteString, b: ByteString) ⇒ likeVecIts(a, b) { (a, b) ⇒ (a ++ b).toSeq } } } + + "calling head" in { check { a: ByteString ⇒ a.isEmpty || likeVecIt(a) { _.head } } } + "calling next" in { check { a: ByteString ⇒ a.isEmpty || likeVecIt(a) { _.next() } } } + "calling hasNext" in { check { a: ByteString ⇒ likeVecIt(a) { _.hasNext } } } + "calling length" in { check { a: ByteString ⇒ likeVecIt(a) { _.length } } } + "calling duplicate" in { check { a: ByteString ⇒ likeVecIt(a)({ _.duplicate match { case (a, b) ⇒ (a.toSeq, b.toSeq) } }, strict = false) } } + + // Have to used toList instead of toSeq here, iterator.span (new in + // Scala-2.9) seems to be broken in combination with toSeq for the + // scala.collection default Iterator (see Scala issue SI-5838). + "calling span" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a)({ _.span(_ != b) match { case (a, b) ⇒ (a.toList, b.toList) } }, strict = false) } } + + "calling takeWhile" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a)({ _.takeWhile(_ != b).toSeq }, strict = false) } } + "calling dropWhile" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a) { _.dropWhile(_ != b).toSeq } } } + "calling indexWhere" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a) { _.indexWhere(_ == b) } } } + "calling indexOf" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a) { _.indexOf(b) } } } + "calling toSeq" in { check { a: ByteString ⇒ likeVecIt(a) { _.toSeq } } } + "calling foreach" in { check { a: ByteString ⇒ likeVecIt(a) { it ⇒ var acc = 0; it foreach { acc += _ }; acc } } } + "calling foldLeft" in { check { a: ByteString ⇒ likeVecIt(a) { _.foldLeft(0) { _ + _ } } } } + "calling toArray" in { check { a: ByteString ⇒ likeVecIt(a) { _.toArray.toSeq } } } + + "calling slice" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVecIt(xs)({ + _.slice(from, until).toSeq + }, strict = false) + } + } + } + + "calling take and drop" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVecIt(xs)({ + _.drop(from).take(until - from).toSeq + }, strict = false) + } + } + } + + "calling copyToArray" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVecIt(xs)({ it ⇒ + val array = Array.ofDim[Byte](xs.length) + it.slice(from, until).copyToArray(array, from, until) + array.toSeq + }, strict = false) + } + } + } + } + + "function as expected" when { + "getting Bytes, using getByte and getBytes" in { + // mixing getByte and getBytes here for more rigorous testing + check { slice: ByteStringSlice ⇒ + val (bytes, from, until) = slice + val input = bytes.iterator + val output = Array.ofDim[Byte](bytes.length) + for (i ← 0 to from - 1) output(i) = input.getByte + input.getBytes(output, from, until - from) + for (i ← until to bytes.length - 1) output(i) = input.getByte + (output.toSeq == bytes) && (input.isEmpty) + } + } + + "getting Bytes, using the InputStream wrapper" in { + // combining skip and both read methods here for more rigorous testing + check { slice: ByteStringSlice ⇒ + val (bytes, from, until) = slice + val a = (0 max from) min bytes.length + val b = (a max until) min bytes.length + val input = bytes.iterator + val output = Array.ofDim[Byte](bytes.length) + + input.asInputStream.skip(a) + + val toRead = b - a + var (nRead, eof) = (0, false) + while ((nRead < toRead) && !eof) { + val n = input.asInputStream.read(output, a + nRead, toRead - nRead) + if (n == -1) eof = true + else nRead += n + } + if (eof) throw new RuntimeException("Unexpected EOF") + + for (i ← b to bytes.length - 1) output(i) = input.asInputStream.read().toByte + + (output.toSeq.drop(a) == bytes.drop(a)) && + (input.asInputStream.read() == -1) && + ((output.length < 1) || (input.asInputStream.read(output, 0, 1) == -1)) + } + } + + "calling copyToBuffer" in { + check { bytes: ByteString ⇒ + import java.nio.ByteBuffer + val buffer = ByteBuffer.allocate(bytes.size) + bytes.copyToBuffer(buffer) + buffer.flip() + val array = Array.ofDim[Byte](bytes.size) + buffer.get(array) + bytes == array.toSeq + } + } + } + + "decode data correctly" when { + "decoding Short in big-endian" in { check { slice: ByteStringSlice ⇒ testShortDecoding(slice, BIG_ENDIAN) } } + "decoding Short in little-endian" in { check { slice: ByteStringSlice ⇒ testShortDecoding(slice, LITTLE_ENDIAN) } } + "decoding Int in big-endian" in { check { slice: ByteStringSlice ⇒ testIntDecoding(slice, BIG_ENDIAN) } } + "decoding Int in little-endian" in { check { slice: ByteStringSlice ⇒ testIntDecoding(slice, LITTLE_ENDIAN) } } + "decoding Long in big-endian" in { check { slice: ByteStringSlice ⇒ testLongDecoding(slice, BIG_ENDIAN) } } + "decoding Long in little-endian" in { check { slice: ByteStringSlice ⇒ testLongDecoding(slice, LITTLE_ENDIAN) } } + "decoding Float in big-endian" in { check { slice: ByteStringSlice ⇒ testFloatDecoding(slice, BIG_ENDIAN) } } + "decoding Float in little-endian" in { check { slice: ByteStringSlice ⇒ testFloatDecoding(slice, LITTLE_ENDIAN) } } + "decoding Double in big-endian" in { check { slice: ByteStringSlice ⇒ testDoubleDecoding(slice, BIG_ENDIAN) } } + "decoding Double in little-endian" in { check { slice: ByteStringSlice ⇒ testDoubleDecoding(slice, LITTLE_ENDIAN) } } + } + } + + "A ByteStringBuilder" must { + "function like a VectorBuilder" when { + "adding various contents using ++= and +=" in { + check { (array1: Array[Byte], array2: Array[Byte], bs1: ByteString, bs2: ByteString, bs3: ByteString) ⇒ + likeVecBld { builder ⇒ + builder ++= array1 + bs1 foreach { b ⇒ builder += b } + builder ++= bs2 + bs3 foreach { b ⇒ builder += b } + builder ++= Vector(array2: _*) + } + } + } + } + "function as expected" when { + "putting Bytes, using putByte and putBytes" in { + // mixing putByte and putBytes here for more rigorous testing + check { slice: ArraySlice[Byte] ⇒ + val (data, from, until) = slice + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.putByte(data(i)) + builder.putBytes(data, from, until - from) + for (i ← until to data.length - 1) builder.putByte(data(i)) + data.toSeq == builder.result + } + } + + "putting Bytes, using the OutputStream wrapper" in { + // mixing the write methods here for more rigorous testing + check { slice: ArraySlice[Byte] ⇒ + val (data, from, until) = slice + val builder = ByteString.newBuilder + for (i ← 0 to from - 1) builder.asOutputStream.write(data(i).toInt) + builder.asOutputStream.write(data, from, until - from) + for (i ← until to data.length - 1) builder.asOutputStream.write(data(i).toInt) + data.toSeq == builder.result + } + } + } + + "encode data correctly" when { + "encoding Short in big-endian" in { check { slice: ArraySlice[Short] ⇒ testShortEncoding(slice, BIG_ENDIAN) } } + "encoding Short in little-endian" in { check { slice: ArraySlice[Short] ⇒ testShortEncoding(slice, LITTLE_ENDIAN) } } + "encoding Int in big-endian" in { check { slice: ArraySlice[Int] ⇒ testIntEncoding(slice, BIG_ENDIAN) } } + "encoding Int in little-endian" in { check { slice: ArraySlice[Int] ⇒ testIntEncoding(slice, LITTLE_ENDIAN) } } + "encoding Long in big-endian" in { check { slice: ArraySlice[Long] ⇒ testLongEncoding(slice, BIG_ENDIAN) } } + "encoding Long in little-endian" in { check { slice: ArraySlice[Long] ⇒ testLongEncoding(slice, LITTLE_ENDIAN) } } + "encoding Float in big-endian" in { check { slice: ArraySlice[Float] ⇒ testFloatEncoding(slice, BIG_ENDIAN) } } + "encoding Float in little-endian" in { check { slice: ArraySlice[Float] ⇒ testFloatEncoding(slice, LITTLE_ENDIAN) } } + "encoding Double in big-endian" in { check { slice: ArraySlice[Double] ⇒ testDoubleEncoding(slice, BIG_ENDIAN) } } + "encoding Double in little-endian" in { check { slice: ArraySlice[Double] ⇒ testDoubleEncoding(slice, LITTLE_ENDIAN) } } + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 5d5e9199a3..127d73757c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -56,11 +56,15 @@ object ActorSystem { * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader * associated with the ActorSystem class. + * + * @see The Typesafe Config Library API Documentation */ def create(name: String, config: Config): ActorSystem = apply(name, config) /** * Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader + * + * @see The Typesafe Config Library API Documentation */ def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader) @@ -90,11 +94,15 @@ object ActorSystem { * obtains the current ClassLoader by first inspecting the current threads' getContextClassLoader, * then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader * associated with the ActorSystem class. + * + * @see The Typesafe Config Library API Documentation */ def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader()) /** * Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader + * + * @see The Typesafe Config Library API Documentation */ def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new ActorSystemImpl(name, config, classLoader).start() @@ -102,11 +110,15 @@ object ActorSystem { * Settings are the overall ActorSystem Settings which also provides a convenient access to the Config object. * * For more detailed information about the different possible configuration options, look in the Akka Documentation under "Configuration" + * + * @see The Typesafe Config Library API Documentation */ class Settings(classLoader: ClassLoader, cfg: Config, final val name: String) { /** * The backing Config of this ActorSystem's Settings + * + * @see The Typesafe Config Library API Documentation */ final val config: Config = { val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader)) diff --git a/akka-actor/src/main/scala/akka/util/ByteIterator.scala b/akka-actor/src/main/scala/akka/util/ByteIterator.scala new file mode 100644 index 0000000000..ef4c0c49bc --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/ByteIterator.scala @@ -0,0 +1,609 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.util + +import java.nio.{ ByteBuffer, ByteOrder } + +import scala.collection.{ LinearSeq, IndexedSeqOptimized } +import scala.collection.mutable.{ Builder, WrappedArray } +import scala.collection.immutable.{ IndexedSeq, VectorBuilder } +import scala.collection.generic.CanBuildFrom +import scala.collection.mutable.{ ListBuffer } +import scala.annotation.tailrec +import scala.reflect.ClassTag + +object ByteIterator { + object ByteArrayIterator { + private val emptyArray: Array[Byte] = Array.ofDim[Byte](0) + + protected[akka] def apply(array: Array[Byte]): ByteArrayIterator = + new ByteArrayIterator(array, 0, array.length) + + protected[akka] def apply(array: Array[Byte], from: Int, until: Int): ByteArrayIterator = + new ByteArrayIterator(array, from, until) + + val empty: ByteArrayIterator = apply(emptyArray) + } + + class ByteArrayIterator private (private var array: Array[Byte], private var from: Int, private var until: Int) extends ByteIterator { + iterator ⇒ + + @inline final def len: Int = until - from + + @inline final def hasNext: Boolean = from < until + + @inline final def head: Byte = array(from) + + final def next(): Byte = { + if (!hasNext) Iterator.empty.next + else { val i = from; from = from + 1; array(i) } + } + + def clear(): Unit = { this.array = ByteArrayIterator.emptyArray; from = 0; until = from } + + final override def length: Int = { val l = len; clear(); l } + + final override def ++(that: TraversableOnce[Byte]): ByteIterator = that match { + case that: ByteIterator ⇒ + if (that.isEmpty) this + else if (this.isEmpty) that + else that match { + case that: ByteArrayIterator ⇒ + if ((this.array eq that.array) && (this.until == that.from)) { + this.until = that.until + that.clear() + this + } else { + val result = MultiByteArrayIterator(List(this, that)) + this.clear() + result + } + case that: MultiByteArrayIterator ⇒ this ++: that + } + case _ ⇒ super.++(that) + } + + final override def clone: ByteArrayIterator = new ByteArrayIterator(array, from, until) + + final override def take(n: Int): this.type = { + if (n < len) until = { if (n > 0) (from + n) else from } + this + } + + final override def drop(n: Int): this.type = { + if (n > 0) from = { if (n < len) (from + n) else until } + this + } + + final override def takeWhile(p: Byte ⇒ Boolean): this.type = { + val prev = from + dropWhile(p) + until = from; from = prev + this + } + + final override def dropWhile(p: Byte ⇒ Boolean): this.type = { + var stop = false + while (!stop && hasNext) { + if (p(array(from))) { from = from + 1 } else { stop = true } + } + this + } + + final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = { + val n = 0 max ((xs.length - start) min this.len min len) + Array.copy(this.array, from, xs, start, n) + this.drop(n) + } + + final override def toByteString: ByteString = { + val result = + if ((from == 0) && (until == array.length)) ByteString.ByteString1C(array) + else ByteString.ByteString1(array, from, len) + clear() + result + } + + def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type = { + if (n <= this.len) { + Array.copy(this.array, this.from, xs, offset, n) + this.drop(n) + } else Iterator.empty.next + } + + private def wrappedByteBuffer: ByteBuffer = ByteBuffer.wrap(array, from, len).asReadOnlyBuffer + + def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + { wrappedByteBuffer.order(byteOrder).asShortBuffer.get(xs, offset, n); drop(2 * n) } + + def getInts(xs: Array[Int], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + { wrappedByteBuffer.order(byteOrder).asIntBuffer.get(xs, offset, n); drop(4 * n) } + + def getLongs(xs: Array[Long], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + { wrappedByteBuffer.order(byteOrder).asLongBuffer.get(xs, offset, n); drop(8 * n) } + + def getFloats(xs: Array[Float], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + { wrappedByteBuffer.order(byteOrder).asFloatBuffer.get(xs, offset, n); drop(4 * n) } + + def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + { wrappedByteBuffer.order(byteOrder).asDoubleBuffer.get(xs, offset, n); drop(8 * n) } + + def copyToBuffer(buffer: ByteBuffer): Int = { + val copyLength = math.min(buffer.remaining, len) + if (copyLength > 0) { + buffer.put(array, from, copyLength) + drop(copyLength) + } + copyLength + } + + def asInputStream: java.io.InputStream = new java.io.InputStream { + override def available: Int = iterator.len + + def read: Int = if (hasNext) (next().toInt & 0xff) else -1 + + override def read(b: Array[Byte], off: Int, len: Int): Int = { + if ((off < 0) || (len < 0) || (off + len > b.length)) throw new IndexOutOfBoundsException + if (len == 0) 0 + else if (!isEmpty) { + val nRead = math.min(available, len) + copyToArray(b, off, nRead) + nRead + } else -1 + } + + override def skip(n: Long): Long = { + val nSkip = math.min(iterator.len, n.toInt) + iterator.drop(nSkip) + nSkip + } + } + } + + object MultiByteArrayIterator { + protected val clearedList: List[ByteArrayIterator] = List(ByteArrayIterator.empty) + + val empty: MultiByteArrayIterator = new MultiByteArrayIterator(Nil) + + protected[akka] def apply(iterators: LinearSeq[ByteArrayIterator]): MultiByteArrayIterator = + new MultiByteArrayIterator(iterators) + } + + class MultiByteArrayIterator private (private var iterators: LinearSeq[ByteArrayIterator]) extends ByteIterator { + // After normalization: + // * iterators.isEmpty == false + // * (!iterator.head.isEmpty || iterators.tail.isEmpty) == true + private def normalize(): this.type = { + @tailrec def norm(xs: LinearSeq[ByteArrayIterator]): LinearSeq[ByteArrayIterator] = { + if (xs.isEmpty) MultiByteArrayIterator.clearedList + else if (xs.head.isEmpty) norm(xs.tail) + else xs + } + iterators = norm(iterators) + this + } + normalize() + + @inline private def current: ByteArrayIterator = iterators.head + @inline private def dropCurrent(): Unit = { iterators = iterators.tail } + @inline def clear(): Unit = { iterators = MultiByteArrayIterator.empty.iterators } + + @inline final def hasNext: Boolean = current.hasNext + + @inline final def head: Byte = current.head + + final def next(): Byte = { + val result = current.next() + normalize() + result + } + + final override def len: Int = iterators.foldLeft(0) { _ + _.len } + + final override def length: Int = { + var result = len + clear() + result + } + + private[akka] def ++:(that: ByteArrayIterator): this.type = { + iterators = that +: iterators + this + } + + final override def ++(that: TraversableOnce[Byte]): ByteIterator = that match { + case that: ByteIterator ⇒ + if (that.isEmpty) this + else if (this.isEmpty) that + else { + that match { + case that: ByteArrayIterator ⇒ + iterators = this.iterators :+ that + that.clear() + this + case that: MultiByteArrayIterator ⇒ + iterators = this.iterators ++ that.iterators + that.clear() + this + } + } + case _ ⇒ super.++(that) + } + + final override def clone: MultiByteArrayIterator = { + val clonedIterators: List[ByteArrayIterator] = iterators.map(_.clone)(collection.breakOut) + new MultiByteArrayIterator(clonedIterators) + } + + final override def take(n: Int): this.type = { + var rest = n + val builder = new ListBuffer[ByteArrayIterator] + while ((rest > 0) && !iterators.isEmpty) { + current.take(rest) + if (current.hasNext) { + rest -= current.len + builder += current + } + iterators = iterators.tail + } + iterators = builder.result + normalize() + } + + @tailrec final override def drop(n: Int): this.type = if ((n > 0) && !isEmpty) { + val nCurrent = math.min(n, current.len) + current.drop(n) + val rest = n - nCurrent + assert(current.isEmpty || (rest == 0)) + normalize() + drop(rest) + } else this + + final override def takeWhile(p: Byte ⇒ Boolean): this.type = { + var stop = false + var builder = new ListBuffer[ByteArrayIterator] + while (!stop && !iterators.isEmpty) { + val lastLen = current.len + current.takeWhile(p) + if (current.hasNext) builder += current + if (current.len < lastLen) stop = true + dropCurrent() + } + iterators = builder.result + normalize() + } + + @tailrec final override def dropWhile(p: Byte ⇒ Boolean): this.type = if (!isEmpty) { + current.dropWhile(p) + val dropMore = current.isEmpty + normalize() + if (dropMore) dropWhile(p) else this + } else this + + final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = { + var pos = start + var rest = len + while ((rest > 0) && !iterators.isEmpty) { + val n = 0 max ((xs.length - pos) min current.len min rest) + current.copyToArray(xs, pos, n) + pos += n + rest -= n + dropCurrent() + } + normalize() + } + + override def foreach[@specialized U](f: Byte ⇒ U): Unit = { + iterators foreach { _ foreach f } + clear() + } + + final override def toByteString: ByteString = { + if (iterators.tail.isEmpty) iterators.head.toByteString + else { + val result = iterators.foldLeft(ByteString.empty) { _ ++ _.toByteString } + clear() + result + } + } + + @tailrec protected final def getToArray[A](xs: Array[A], offset: Int, n: Int, elemSize: Int)(getSingle: ⇒ A)(getMult: (Array[A], Int, Int) ⇒ Unit): this.type = if (n <= 0) this else { + if (isEmpty) Iterator.empty.next + val nDone = if (current.len >= elemSize) { + val nCurrent = math.min(n, current.len / elemSize) + getMult(xs, offset, nCurrent) + nCurrent + } else { + xs(offset) = getSingle + 1 + } + normalize() + getToArray(xs, offset + nDone, n - nDone, elemSize)(getSingle)(getMult) + } + + def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type = + getToArray(xs, offset, n, 1) { getByte } { current.getBytes(_, _, _) } + + def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + getToArray(xs, offset, n, 2) { getShort(byteOrder) } { current.getShorts(_, _, _)(byteOrder) } + + def getInts(xs: Array[Int], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + getToArray(xs, offset, n, 4) { getInt(byteOrder) } { current.getInts(_, _, _)(byteOrder) } + + def getLongs(xs: Array[Long], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + getToArray(xs, offset, n, 8) { getLong(byteOrder) } { current.getLongs(_, _, _)(byteOrder) } + + def getFloats(xs: Array[Float], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + getToArray(xs, offset, n, 8) { getFloat(byteOrder) } { current.getFloats(_, _, _)(byteOrder) } + + def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = + getToArray(xs, offset, n, 8) { getDouble(byteOrder) } { current.getDoubles(_, _, _)(byteOrder) } + + def copyToBuffer(buffer: ByteBuffer): Int = { + val n = iterators.foldLeft(0) { _ + _.copyToBuffer(buffer) } + normalize() + n + } + + def asInputStream: java.io.InputStream = new java.io.InputStream { + override def available: Int = current.len + + def read: Int = if (hasNext) (next().toInt & 0xff) else -1 + + override def read(b: Array[Byte], off: Int, len: Int): Int = { + val nRead = current.asInputStream.read(b, off, len) + normalize() + nRead + } + + override def skip(n: Long): Long = { + @tailrec def skipImpl(n: Long, skipped: Long): Long = if (n > 0) { + if (!isEmpty) { + val m = current.asInputStream.skip(n) + normalize() + val newN = n - m + val newSkipped = skipped + m + if (newN > 0) skipImpl(newN, newSkipped) + else newSkipped + } else 0 + } else 0 + + skipImpl(n, 0) + } + } + } +} + +/** + * An iterator over a ByteString. + */ + +abstract class ByteIterator extends BufferedIterator[Byte] { + def len: Int + + def head: Byte + + def next(): Byte + + protected def clear(): Unit + + def ++(that: TraversableOnce[Byte]): ByteIterator = if (that.isEmpty) this else ByteIterator.ByteArrayIterator(that.toArray) + + // *must* be overridden by derived classes. This construction is necessary + // to specialize the return type, as the method is already implemented in + // the parent class. + override def clone: ByteIterator = throw new UnsupportedOperationException("Method clone is not implemented in ByteIterator") + + override def duplicate: (ByteIterator, ByteIterator) = (this, clone) + + // *must* be overridden by derived classes. This construction is necessary + // to specialize the return type, as the method is already implemented in + // the parent class. + override def take(n: Int): this.type = throw new UnsupportedOperationException("Method take is not implemented in ByteIterator") + + // *must* be overridden by derived classes. This construction is necessary + // to specialize the return type, as the method is already implemented in + // the parent class. + override def drop(n: Int): this.type = throw new UnsupportedOperationException("Method drop is not implemented in ByteIterator") + + override def slice(from: Int, until: Int): this.type = { + if (from > 0) drop(from).take(until - from) + else take(until) + } + + // *must* be overridden by derived classes. This construction is necessary + // to specialize the return type, as the method is already implemented in + // the parent class. + override def takeWhile(p: Byte ⇒ Boolean): this.type = throw new UnsupportedOperationException("Method takeWhile is not implemented in ByteIterator") + + // *must* be overridden by derived classes. This construction is necessary + // to specialize the return type, as the method is already implemented in + // the parent class. + override def dropWhile(p: Byte ⇒ Boolean): this.type = throw new UnsupportedOperationException("Method dropWhile is not implemented in ByteIterator") + + override def span(p: Byte ⇒ Boolean): (ByteIterator, ByteIterator) = { + val that = clone + this.takeWhile(p) + that.drop(this.len) + (this, that) + } + + override def indexWhere(p: Byte ⇒ Boolean): Int = { + var index = 0 + var found = false + while (!found && hasNext) if (p(next())) { found = true } else { index += 1 } + if (found) index else -1 + } + + def indexOf(elem: Byte): Int = indexWhere { _ == elem } + + override def indexOf[B >: Byte](elem: B): Int = indexWhere { _ == elem } + + def toByteString: ByteString + + override def toSeq: ByteString = toByteString + + override def foreach[@specialized U](f: Byte ⇒ U): Unit = + while (hasNext) f(next()) + + override def foldLeft[@specialized B](z: B)(op: (B, Byte) ⇒ B): B = { + var acc = z + foreach { byte ⇒ acc = op(acc, byte) } + acc + } + + override def toArray[B >: Byte](implicit arg0: ClassTag[B]): Array[B] = { + val target = Array.ofDim[B](len) + copyToArray(target) + target + } + + /** + * Get a single Byte from this iterator. Identical to next(). + */ + def getByte: Byte = next() + + /** + * Get a single Short from this iterator. + */ + def getShort(implicit byteOrder: ByteOrder): Short = { + if (byteOrder == ByteOrder.BIG_ENDIAN) + ((next() & 0xff) << 8 | (next() & 0xff) << 0).toShort + else if (byteOrder == ByteOrder.LITTLE_ENDIAN) + ((next() & 0xff) << 0 | (next() & 0xff) << 8).toShort + else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + + /** + * Get a single Int from this iterator. + */ + def getInt(implicit byteOrder: ByteOrder): Int = { + if (byteOrder == ByteOrder.BIG_ENDIAN) + ((next() & 0xff) << 24 + | (next() & 0xff) << 16 + | (next() & 0xff) << 8 + | (next() & 0xff) << 0) + else if (byteOrder == ByteOrder.LITTLE_ENDIAN) + ((next() & 0xff) << 0 + | (next() & 0xff) << 8 + | (next() & 0xff) << 16 + | (next() & 0xff) << 24) + else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + + /** + * Get a single Long from this iterator. + */ + def getLong(implicit byteOrder: ByteOrder): Long = { + if (byteOrder == ByteOrder.BIG_ENDIAN) + ((next().toLong & 0xff) << 56 + | (next().toLong & 0xff) << 48 + | (next().toLong & 0xff) << 40 + | (next().toLong & 0xff) << 32 + | (next().toLong & 0xff) << 24 + | (next().toLong & 0xff) << 16 + | (next().toLong & 0xff) << 8 + | (next().toLong & 0xff) << 0) + else if (byteOrder == ByteOrder.LITTLE_ENDIAN) + ((next().toLong & 0xff) << 0 + | (next().toLong & 0xff) << 8 + | (next().toLong & 0xff) << 16 + | (next().toLong & 0xff) << 24 + | (next().toLong & 0xff) << 32 + | (next().toLong & 0xff) << 40 + | (next().toLong & 0xff) << 48 + | (next().toLong & 0xff) << 56) + else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + + def getFloat(implicit byteOrder: ByteOrder): Float = + java.lang.Float.intBitsToFloat(getInt(byteOrder)) + + def getDouble(implicit byteOrder: ByteOrder): Double = + java.lang.Double.longBitsToDouble(getLong(byteOrder)) + + /** + * Get a specific number of Bytes from this iterator. In contrast to + * copyToArray, this method will fail if this.len < xs.length. + */ + def getBytes(xs: Array[Byte]): this.type = getBytes(xs, 0, xs.length) + + /** + * Get a specific number of Bytes from this iterator. In contrast to + * copyToArray, this method will fail if length < n or if (xs.length - offset) < n. + */ + def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type + + /** + * Get a number of Shorts from this iterator. + */ + def getShorts(xs: Array[Short])(implicit byteOrder: ByteOrder): this.type = + getShorts(xs, 0, xs.length)(byteOrder) + + /** + * Get a number of Shorts from this iterator. + */ + def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type + + /** + * Get a number of Ints from this iterator. + */ + def getInts(xs: Array[Int])(implicit byteOrder: ByteOrder): this.type = + getInts(xs, 0, xs.length)(byteOrder) + + /** + * Get a number of Ints from this iterator. + */ + def getInts(xs: Array[Int], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type + + /** + * Get a number of Longs from this iterator. + */ + def getLongs(xs: Array[Long])(implicit byteOrder: ByteOrder): this.type = + getLongs(xs, 0, xs.length)(byteOrder) + + /** + * Get a number of Longs from this iterator. + */ + def getLongs(xs: Array[Long], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type + + /** + * Get a number of Floats from this iterator. + */ + def getFloats(xs: Array[Float])(implicit byteOrder: ByteOrder): this.type = + getFloats(xs, 0, xs.length)(byteOrder) + + /** + * Get a number of Floats from this iterator. + */ + def getFloats(xs: Array[Float], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type + + /** + * Get a number of Doubles from this iterator. + */ + def getDoubles(xs: Array[Double])(implicit byteOrder: ByteOrder): this.type = + getDoubles(xs, 0, xs.length)(byteOrder) + + /** + * Get a number of Doubles from this iterator. + */ + def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type + + /** + * Copy as many bytes as possible to a ByteBuffer, starting from it's + * current position. This method will not overflow the buffer. + * + * @param buffer a ByteBuffer to copy bytes to + * @return the number of bytes actually copied + */ + def copyToBuffer(buffer: ByteBuffer): Int + + /** + * Directly wraps this ByteIterator in an InputStream without copying. + * Read and skip operations on the stream will advance the iterator + * accordingly. + */ + def asInputStream: java.io.InputStream +} diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index ac074d5b28..76415b93ba 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -4,14 +4,14 @@ package akka.util -import java.nio.ByteBuffer +import java.nio.{ ByteBuffer, ByteOrder } import scala.collection.IndexedSeqOptimized import scala.collection.mutable.{ Builder, WrappedArray } import scala.collection.immutable.{ IndexedSeq, VectorBuilder } import scala.collection.generic.CanBuildFrom +import scala.reflect.ClassTag -//FIXME MORE DOCS object ByteString { /** @@ -67,7 +67,7 @@ object ByteString { } /** - * A compact (unsliced) and unfragmented ByteString, implementaton of ByteString1C. + * A compact (unsliced) and unfragmented ByteString, implementation of ByteString1C. */ @SerialVersionUID(3956956327691936932L) final class ByteString1C private (private val bytes: Array[Byte]) extends CompactByteString { @@ -75,32 +75,29 @@ object ByteString { override def length: Int = bytes.length - def toArray: Array[Byte] = bytes.clone + override def iterator: ByteIterator.ByteArrayIterator = ByteIterator.ByteArrayIterator(bytes, 0, bytes.length) - def toByteString1: ByteString1 = ByteString1(bytes) + private[akka] def toByteString1: ByteString1 = ByteString1(bytes) - override def clone: ByteString1C = new ByteString1C(toArray) - - def compact: ByteString1C = this - - def asByteBuffer: ByteBuffer = toByteString1.asByteBuffer + def asByteBuffer: ByteBuffer = + toByteString1.asByteBuffer def decodeString(charset: String): String = new String(bytes, charset) - def ++(that: ByteString): ByteString = if (!that.isEmpty) toByteString1 ++ that else this + def ++(that: ByteString): ByteString = + if (that.isEmpty) this + else if (this.isEmpty) that + else toByteString1 ++ that override def slice(from: Int, until: Int): ByteString = if ((from != 0) || (until != length)) toByteString1.slice(from, until) else this - - override def copyToArray[A >: Byte](xs: Array[A], start: Int, len: Int): Unit = - toByteString1.copyToArray(xs, start, len) - - def copyToBuffer(buffer: ByteBuffer): Int = toByteString1.copyToBuffer(buffer) } private[akka] object ByteString1 { def apply(bytes: Array[Byte]): ByteString1 = new ByteString1(bytes) + def apply(bytes: Array[Byte], startIndex: Int, length: Int): ByteString1 = + new ByteString1(bytes, startIndex, length) } /** @@ -112,6 +109,9 @@ object ByteString { def apply(idx: Int): Byte = bytes(checkRangeConvert(idx)) + override def iterator: ByteIterator.ByteArrayIterator = + ByteIterator.ByteArrayIterator(bytes, startIndex, startIndex + length) + private def checkRangeConvert(index: Int): Int = { if (0 <= index && length > index) index + startIndex @@ -119,15 +119,10 @@ object ByteString { throw new IndexOutOfBoundsException(index.toString) } - def toArray: Array[Byte] = { - val ar = new Array[Byte](length) - Array.copy(bytes, startIndex, ar, 0, length) - ar - } + def isCompact: Boolean = (length == bytes.length) - override def clone: CompactByteString = ByteString1C(toArray) - - def compact: CompactByteString = if (length == bytes.length) ByteString1C(bytes) else clone + def compact: CompactByteString = + if (isCompact) ByteString1C(bytes) else ByteString1C(toArray) def asByteBuffer: ByteBuffer = { val buffer = ByteBuffer.wrap(bytes, startIndex, length).asReadOnlyBuffer @@ -138,26 +133,17 @@ object ByteString { def decodeString(charset: String): String = new String(if (length == bytes.length) bytes else toArray, charset) - def ++(that: ByteString): ByteString = that match { - case b: ByteString1C ⇒ ByteStrings(this, b.toByteString1) - case b: ByteString1 ⇒ ByteStrings(this, b) - case bs: ByteStrings ⇒ ByteStrings(this, bs) - } - - override def slice(from: Int, until: Int): ByteString = { - val newStartIndex = math.max(from, 0) + startIndex - val newLength = math.min(until, length) - from - if (newLength <= 0) ByteString.empty - else new ByteString1(bytes, newStartIndex, newLength) - } - - override def copyToArray[A >: Byte](xs: Array[A], start: Int, len: Int): Unit = - Array.copy(bytes, startIndex, xs, start, math.min(math.min(length, len), xs.length - start)) - - def copyToBuffer(buffer: ByteBuffer): Int = { - val copyLength = math.min(buffer.remaining, length) - if (copyLength > 0) buffer.put(bytes, startIndex, copyLength) - copyLength + def ++(that: ByteString): ByteString = { + if (that.isEmpty) this + else if (this.isEmpty) that + else that match { + case b: ByteString1C ⇒ ByteStrings(this, b.toByteString1) + case b: ByteString1 ⇒ + if ((bytes eq b.bytes) && (startIndex + length == b.startIndex)) + new ByteString1(bytes, startIndex, length + b.length) + else ByteStrings(this, b) + case bs: ByteStrings ⇒ ByteStrings(this, bs) + } } } @@ -195,7 +181,6 @@ object ByteString { } // 0: both empty, 1: 2nd empty, 2: 1st empty, 3: neither empty - // Using length to check emptiness is prohibited by law def compare(b1: ByteString, b2: ByteString): Int = if (b1.isEmpty) if (b2.isEmpty) 0 else 2 @@ -206,7 +191,8 @@ object ByteString { /** * A ByteString with 2 or more fragments. */ - final class ByteStrings private (val bytestrings: Vector[ByteString1], val length: Int) extends ByteString { + final class ByteStrings private (private[akka] val bytestrings: Vector[ByteString1], val length: Int) extends ByteString { + if (bytestrings.isEmpty) throw new IllegalArgumentException("bytestrings must not be empty") def apply(idx: Int): Byte = if (0 <= idx && idx < length) { @@ -219,71 +205,37 @@ object ByteString { bytestrings(pos)(idx - seen) } else throw new IndexOutOfBoundsException(idx.toString) - override def slice(from: Int, until: Int): ByteString = { - val start = math.max(from, 0) - val end = math.min(until, length) - if (end <= start) - ByteString.empty - else { - val iter = bytestrings.iterator - var cur = iter.next - var pos = 0 - var seen = 0 - while (from >= seen + cur.length) { - seen += cur.length - pos += 1 - cur = iter.next - } - val startpos = pos - val startidx = start - seen - while (until > seen + cur.length) { - seen += cur.length - pos += 1 - cur = iter.next - } - val endpos = pos - val endidx = end - seen - if (startpos == endpos) - cur.slice(startidx, endidx) - else { - val first = bytestrings(startpos).drop(startidx).asInstanceOf[ByteString1] - val last = cur.take(endidx).asInstanceOf[ByteString1] - if ((endpos - startpos) == 1) - new ByteStrings(Vector(first, last), until - from) - else - new ByteStrings(first +: bytestrings.slice(startpos + 1, endpos) :+ last, until - from) - } + override def iterator: ByteIterator.MultiByteArrayIterator = + ByteIterator.MultiByteArrayIterator(bytestrings.toStream map { _.iterator }) + + def ++(that: ByteString): ByteString = { + if (that.isEmpty) this + else if (this.isEmpty) that + else that match { + case b: ByteString1C ⇒ ByteStrings(this, b.toByteString1) + case b: ByteString1 ⇒ ByteStrings(this, b) + case bs: ByteStrings ⇒ ByteStrings(this, bs) } } - def ++(that: ByteString): ByteString = that match { - case b: ByteString1C ⇒ ByteStrings(this, b.toByteString1) - case b: ByteString1 ⇒ ByteStrings(this, b) - case bs: ByteStrings ⇒ ByteStrings(this, bs) - } + def isCompact: Boolean = if (bytestrings.length == 1) bytestrings.head.isCompact else false def compact: CompactByteString = { - val ar = new Array[Byte](length) - var pos = 0 - bytestrings foreach { b ⇒ - b.copyToArray(ar, pos, b.length) - pos += b.length + if (isCompact) bytestrings.head.compact + else { + val ar = new Array[Byte](length) + var pos = 0 + bytestrings foreach { b ⇒ + b.copyToArray(ar, pos, b.length) + pos += b.length + } + ByteString1C(ar) } - ByteString1C(ar) } def asByteBuffer: ByteBuffer = compact.asByteBuffer def decodeString(charset: String): String = compact.decodeString(charset) - - def copyToBuffer(buffer: ByteBuffer): Int = { - val copyLength = math.min(buffer.remaining, length) - val iter = bytestrings.iterator - while (iter.hasNext && buffer.hasRemaining) { - iter.next.copyToBuffer(buffer) - } - copyLength - } } } @@ -297,7 +249,44 @@ object ByteString { * TODO: Add performance characteristics */ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimized[Byte, ByteString] { - override protected[this] def newBuilder = ByteString.newBuilder + def apply(idx: Int): Byte + + override protected[this] def newBuilder: ByteStringBuilder = ByteString.newBuilder + + // *must* be overridden by derived classes. This construction is necessary + // to specialize the return type, as the method is already implemented in + // a parent trait. + override def iterator: ByteIterator = throw new UnsupportedOperationException("Method iterator is not implemented in ByteString") + + override def head: Byte = apply(0) + override def tail: ByteString = drop(1) + override def last: Byte = apply(length - 1) + override def init: ByteString = dropRight(1) + + override def slice(from: Int, until: Int): ByteString = + if ((from == 0) && (until == length)) this + else iterator.slice(from, until).toByteString + + override def take(n: Int): ByteString = slice(0, n) + override def takeRight(n: Int): ByteString = slice(length - n, length) + override def drop(n: Int): ByteString = slice(n, length) + override def dropRight(n: Int): ByteString = slice(0, length - n) + + override def takeWhile(p: Byte ⇒ Boolean): ByteString = iterator.takeWhile(p).toByteString + override def dropWhile(p: Byte ⇒ Boolean): ByteString = iterator.dropWhile(p).toByteString + override def span(p: Byte ⇒ Boolean): (ByteString, ByteString) = + { val (a, b) = iterator.span(p); (a.toByteString, b.toByteString) } + + override def splitAt(n: Int): (ByteString, ByteString) = (take(n), drop(n)) + + override def indexWhere(p: Byte ⇒ Boolean): Int = iterator.indexWhere(p) + override def indexOf[B >: Byte](elem: B): Int = iterator.indexOf(elem) + + override def toArray[B >: Byte](implicit arg0: ClassTag[B]): Array[B] = iterator.toArray + override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = + iterator.copyToArray(xs, start, len) + + override def foreach[@specialized U](f: Byte ⇒ U): Unit = iterator foreach f /** * Efficiently concatenate another ByteString. @@ -311,14 +300,24 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz * @param buffer a ByteBuffer to copy bytes to * @return the number of bytes actually copied */ - def copyToBuffer(buffer: ByteBuffer): Int + def copyToBuffer(buffer: ByteBuffer): Int = iterator.copyToBuffer(buffer) /** - * Create a new ByteString with all contents compacted into a single - * byte array. + * Create a new ByteString with all contents compacted into a single, + * full byte array. + * If isCompact returns true, compact is an O(1) operation, but + * might return a different object with an optimized implementation. */ def compact: CompactByteString + /** + * Check whether this ByteString is compact in memory. + * If the ByteString is compact, it might, however, not be represented + * by an object that takes full advantage of that fact. Use compact to + * get such an object. + */ + def isCompact: Boolean + /** * Returns a read-only ByteBuffer that directly wraps this ByteString * if it is not fragmented. @@ -329,7 +328,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz * Creates a new ByteBuffer with a copy of all bytes contained in this * ByteString. */ - final def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray) + def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray) /** * Decodes this ByteString as a UTF-8 encoded String. @@ -420,9 +419,15 @@ object CompactByteString { } /** - * A compact, unfragmented ByteString. + * A compact ByteString. + * + * The ByteString is guarantied to be contiguous in memory and to use only + * as much memory as required for its contents. */ -sealed abstract class CompactByteString extends ByteString with Serializable +sealed abstract class CompactByteString extends ByteString with Serializable { + def isCompact: Boolean = true + def compact: this.type = this +} /** * A mutable builder for efficiently creating a [[akka.util.ByteString]]. @@ -430,12 +435,37 @@ sealed abstract class CompactByteString extends ByteString with Serializable * The created ByteString is not automatically compacted. */ final class ByteStringBuilder extends Builder[Byte, ByteString] { + builder ⇒ + import ByteString.{ ByteString1C, ByteString1, ByteStrings } - private var _length = 0 - private val _builder = new VectorBuilder[ByteString1]() + private var _length: Int = 0 + private val _builder: VectorBuilder[ByteString1] = new VectorBuilder[ByteString1]() private var _temp: Array[Byte] = _ - private var _tempLength = 0 - private var _tempCapacity = 0 + private var _tempLength: Int = 0 + private var _tempCapacity: Int = 0 + + protected def fillArray(len: Int)(fill: (Array[Byte], Int) ⇒ Unit): this.type = { + ensureTempSize(_tempLength + len) + fill(_temp, _tempLength) + _tempLength += len + _length += len + this + } + + protected def fillByteBuffer(len: Int, byteOrder: ByteOrder)(fill: ByteBuffer ⇒ Unit): this.type = { + fillArray(len) { + case (array, start) ⇒ + val buffer = ByteBuffer.wrap(array, start, len) + buffer.order(byteOrder) + fill(buffer) + } + } + + def length: Int = _length + + override def sizeHint(len: Int): Unit = { + resizeTemp(len - (_length - _tempLength)) + } private def clearTemp(): Unit = { if (_tempLength > 0) { @@ -487,15 +517,169 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { clearTemp() _builder += ByteString1(xs.array.clone) _length += xs.length - case _: collection.IndexedSeq[_] ⇒ + case seq: collection.IndexedSeq[_] ⇒ ensureTempSize(_tempLength + xs.size) xs.copyToArray(_temp, _tempLength) + _tempLength += seq.length + _length += seq.length case _ ⇒ super.++=(xs) } this } + /** + * Add a single Byte to this builder. + */ + def putByte(x: Byte): this.type = this += x + + /** + * Add a single Short to this builder. + */ + def putShort(x: Int)(implicit byteOrder: ByteOrder): this.type = { + if (byteOrder == ByteOrder.BIG_ENDIAN) { + this += (x >>> 8).toByte + this += (x >>> 0).toByte + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + this += (x >>> 0).toByte + this += (x >>> 8).toByte + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + + /** + * Add a single Int to this builder. + */ + def putInt(x: Int)(implicit byteOrder: ByteOrder): this.type = { + fillArray(4) { + case (target, offset) ⇒ + if (byteOrder == ByteOrder.BIG_ENDIAN) { + target(offset + 0) = (x >>> 24).toByte + target(offset + 1) = (x >>> 16).toByte + target(offset + 2) = (x >>> 8).toByte + target(offset + 3) = (x >>> 0).toByte + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + target(offset + 0) = (x >>> 0).toByte + target(offset + 1) = (x >>> 8).toByte + target(offset + 2) = (x >>> 16).toByte + target(offset + 3) = (x >>> 24).toByte + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + this + } + + /** + * Add a single Long to this builder. + */ + def putLong(x: Long)(implicit byteOrder: ByteOrder): this.type = { + fillArray(8) { + case (target, offset) ⇒ + if (byteOrder == ByteOrder.BIG_ENDIAN) { + target(offset + 0) = (x >>> 56).toByte + target(offset + 1) = (x >>> 48).toByte + target(offset + 2) = (x >>> 40).toByte + target(offset + 3) = (x >>> 32).toByte + target(offset + 4) = (x >>> 24).toByte + target(offset + 5) = (x >>> 16).toByte + target(offset + 6) = (x >>> 8).toByte + target(offset + 7) = (x >>> 0).toByte + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + target(offset + 0) = (x >>> 0).toByte + target(offset + 1) = (x >>> 8).toByte + target(offset + 2) = (x >>> 16).toByte + target(offset + 3) = (x >>> 24).toByte + target(offset + 4) = (x >>> 32).toByte + target(offset + 5) = (x >>> 40).toByte + target(offset + 6) = (x >>> 48).toByte + target(offset + 7) = (x >>> 56).toByte + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + this + } + + /** + * Add a single Float to this builder. + */ + def putFloat(x: Float)(implicit byteOrder: ByteOrder): this.type = + putInt(java.lang.Float.floatToRawIntBits(x))(byteOrder) + + /** + * Add a single Double to this builder. + */ + def putDouble(x: Double)(implicit byteOrder: ByteOrder): this.type = + putLong(java.lang.Double.doubleToRawLongBits(x))(byteOrder) + + /** + * Add a number of Bytes from an array to this builder. + */ + def putBytes(array: Array[Byte]): this.type = + putBytes(array, 0, array.length) + + /** + * Add a number of Bytes from an array to this builder. + */ + def putBytes(array: Array[Byte], start: Int, len: Int): this.type = + fillArray(len) { case (target, targetOffset) ⇒ Array.copy(array, start, target, targetOffset, len) } + + /** + * Add a number of Shorts from an array to this builder. + */ + def putShorts(array: Array[Short])(implicit byteOrder: ByteOrder): this.type = + putShorts(array, 0, array.length)(byteOrder) + + /** + * Add a number of Shorts from an array to this builder. + */ + def putShorts(array: Array[Short], start: Int, len: Int)(implicit byteOrder: ByteOrder): this.type = + fillByteBuffer(len * 2, byteOrder) { _.asShortBuffer.put(array, start, len) } + + /** + * Add a number of Ints from an array to this builder. + */ + def putInts(array: Array[Int])(implicit byteOrder: ByteOrder): this.type = + putInts(array, 0, array.length)(byteOrder) + + /** + * Add a number of Ints from an array to this builder. + */ + def putInts(array: Array[Int], start: Int, len: Int)(implicit byteOrder: ByteOrder): this.type = + fillByteBuffer(len * 4, byteOrder) { _.asIntBuffer.put(array, start, len) } + + /** + * Add a number of Longs from an array to this builder. + */ + def putLongs(array: Array[Long])(implicit byteOrder: ByteOrder): this.type = + putLongs(array, 0, array.length)(byteOrder) + + /** + * Add a number of Longs from an array to this builder. + */ + def putLongs(array: Array[Long], start: Int, len: Int)(implicit byteOrder: ByteOrder): this.type = + fillByteBuffer(len * 8, byteOrder) { _.asLongBuffer.put(array, start, len) } + + /** + * Add a number of Floats from an array to this builder. + */ + def putFloats(array: Array[Float])(implicit byteOrder: ByteOrder): this.type = + putFloats(array, 0, array.length)(byteOrder) + + /** + * Add a number of Floats from an array to this builder. + */ + def putFloats(array: Array[Float], start: Int, len: Int)(implicit byteOrder: ByteOrder): this.type = + fillByteBuffer(len * 4, byteOrder) { _.asFloatBuffer.put(array, start, len) } + + /** + * Add a number of Doubles from an array to this builder. + */ + def putDoubles(array: Array[Double])(implicit byteOrder: ByteOrder): this.type = + putDoubles(array, 0, array.length)(byteOrder) + + /** + * Add a number of Doubles from an array to this builder. + */ + def putDoubles(array: Array[Double], start: Int, len: Int)(implicit byteOrder: ByteOrder): this.type = + fillByteBuffer(len * 8, byteOrder) { _.asDoubleBuffer.put(array, start, len) } + def clear(): Unit = { _builder.clear _length = 0 @@ -513,4 +697,13 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { ByteStrings(bytestrings, _length) } + /** + * Directly wraps this ByteStringBuilder in an OutputStream. Write + * operations on the stream are forwarded to the builder. + */ + def asOutputStream: java.io.OutputStream = new java.io.OutputStream { + def write(b: Int): Unit = builder += b.toByte + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { builder.putBytes(b, off, len) } + } } diff --git a/akka-docs/additional/code/osgi/Activator.scala b/akka-docs/additional/code/osgi/Activator.scala new file mode 100644 index 0000000000..4f432452c3 --- /dev/null +++ b/akka-docs/additional/code/osgi/Activator.scala @@ -0,0 +1,25 @@ +package docs.osgi + +case object SomeMessage + +class SomeActor extends akka.actor.Actor { + def receive = { case SomeMessage ⇒ } +} + +//#Activator +import akka.actor.{ Props, ActorSystem } +import org.osgi.framework.BundleContext +import akka.osgi.ActorSystemActivator + +class Activator extends ActorSystemActivator { + + def configure(context: BundleContext, system: ActorSystem) { + // optionally register the ActorSystem in the OSGi Service Registry + registerService(context, system) + + val someActor = system.actorOf(Props[SomeActor], name = "someName") + someActor ! SomeMessage + } + +} +//#Activator \ No newline at end of file diff --git a/akka-docs/additional/code/osgi/blueprint.xml b/akka-docs/additional/code/osgi/blueprint.xml new file mode 100644 index 0000000000..8fcedb990c --- /dev/null +++ b/akka-docs/additional/code/osgi/blueprint.xml @@ -0,0 +1,14 @@ + + + + + + + + some.config { + key=value + } + + + diff --git a/akka-docs/additional/osgi.rst b/akka-docs/additional/osgi.rst index aea554ef9c..3bedc8c7dd 100644 --- a/akka-docs/additional/osgi.rst +++ b/akka-docs/additional/osgi.rst @@ -8,3 +8,20 @@ To use Akka in an OSGi environment, the ``org.osgi.framework.bootdelegation`` property must be set to always delegate the ``sun.misc`` package to the boot classloader instead of resolving it through the normal OSGi class space. + +Activator +--------- + +To bootstrap Akka inside an OSGi environment, you can use the akka.osgi.AkkaSystemActivator class +to conveniently set up the ActorSystem. + +.. includecode:: code/osgi/Activator.scala#Activator + + +Blueprint +--------- + +For the Apache Aries Blueprint implementation, there's also a namespace handler available. The namespace URI +is http://akka.io/xmlns/blueprint/v1.0.0 and it can be used to set up an ActorSystem. + +.. includecode:: code/osgi/blueprint.xml diff --git a/akka-docs/scala/code/akka/docs/io/BinaryCoding.scala b/akka-docs/scala/code/akka/docs/io/BinaryCoding.scala new file mode 100644 index 0000000000..d9aeb334fc --- /dev/null +++ b/akka-docs/scala/code/akka/docs/io/BinaryCoding.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2012 Typesafe Inc. + */ +package akka.docs.io + +//#imports +import akka.actor._ +import akka.util.{ ByteString, ByteStringBuilder, ByteIterator } +//#imports + +abstract class BinaryDecoding { + //#decoding + implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN + + val FrameDecoder = for { + frameLenBytes ← IO.take(4) + frameLen = frameLenBytes.iterator.getInt + frame ← IO.take(frameLen) + } yield { + val in = frame.iterator + + val n = in.getInt + val m = in.getInt + + val a = Array.newBuilder[Short] + val b = Array.newBuilder[Long] + + for (i ← 1 to n) { + a += in.getShort + b += in.getInt + } + + val data = Array.ofDim[Double](m) + in.getDoubles(data) + + (a.result, b.result, data) + } + + //#decoding +} + +abstract class RestToSeq { + implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN + val bytes: ByteString + val in = bytes.iterator + + //#rest-to-seq + val n = in.getInt + val m = in.getInt + // ... in.get... + val rest: ByteString = in.toSeq + //#rest-to-seq +} + +abstract class BinaryEncoding { + //#encoding + implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN + + val a: Array[Short] + val b: Array[Long] + val data: Array[Double] + + val frameBuilder = ByteString.newBuilder + + val n = a.length + val m = data.length + + frameBuilder.putInt(n) + frameBuilder.putInt(m) + + for (i ← 0 to n - 1) { + frameBuilder.putShort(a(i)) + frameBuilder.putLong(b(i)) + } + frameBuilder.putDoubles(data) + val frame = frameBuilder.result() + //#encoding + + //#sending + val socket: IO.SocketHandle + socket.write(ByteString.newBuilder.putInt(frame.length).result) + socket.write(frame) + //#sending +} diff --git a/akka-docs/scala/io.rst b/akka-docs/scala/io.rst index 2c4e1608f3..9063e010f5 100644 --- a/akka-docs/scala/io.rst +++ b/akka-docs/scala/io.rst @@ -17,10 +17,62 @@ ByteString A primary goal of Akka's IO module is to only communicate between actors with immutable objects. When dealing with network IO on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this IO module, so ``ByteString`` was developed. -``ByteString`` is a `Rope-like `_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. +``ByteString`` is a `Rope-like `_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice. ``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc. +``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods: + +Compatibility with java.io +.......................... + +A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams. + +Encoding and decoding of binary data +.................................... + +``ByteStringBuilder`` and ``ByteIterator`` support encoding and decoding of binary data. As an example, consider a stream of binary data frames with the following format: + +.. code-block:: text + + frameLen: Int + n: Int + m: Int + n times { + a: Short + b: Long + } + data: m times Double + +In this example, the data is to be stored in arrays of ``a``, ``b`` and ``data``. + +Decoding of such frames can be efficiently implemented in the following fashion: + +.. includecode:: code/akka/docs/io/BinaryCoding.scala + :include: decoding + +This implementation naturally follows the example data format. In a true Scala application, one might, of course, want use specialized immutable Short/Long/Double containers instead of mutable Arrays. + +After extracting data from a ``ByteIterator``, the remaining content can also be turned back into a ``ByteString`` using the ``toSeq`` method + +.. includecode:: code/akka/docs/io/BinaryCoding.scala + :include: rest-to-seq + +with no copying from bytes to rest involved. In general, conversions from ByteString to ByteIterator and vice versa are O(1) for non-chunked ``ByteString``s and (at worst) O(nChunks) for chunked ByteStrings. + + +Encoding of data also is very natural, using ``ByteStringBuilder`` + +.. includecode:: code/akka/docs/io/BinaryCoding.scala + :include: encoding + + +The encoded data then can be sent over socket (see ``IOManager``): + +.. includecode:: code/akka/docs/io/BinaryCoding.scala + :include: sending + + IO.Handle ^^^^^^^^^ diff --git a/akka-osgi-aries/src/main/resources/OSGI-INF/blueprint/akka-namespacehandler.xml b/akka-osgi-aries/src/main/resources/OSGI-INF/blueprint/akka-namespacehandler.xml new file mode 100644 index 0000000000..99492bedf2 --- /dev/null +++ b/akka-osgi-aries/src/main/resources/OSGI-INF/blueprint/akka-namespacehandler.xml @@ -0,0 +1,18 @@ + + + + + + + + + http://akka.io/xmlns/blueprint/v1.0.0 + + + + + + + diff --git a/akka-osgi-aries/src/main/resources/akka/osgi/aries/blueprint/akka.xsd b/akka-osgi-aries/src/main/resources/akka/osgi/aries/blueprint/akka.xsd new file mode 100644 index 0000000000..d7d0f77a2c --- /dev/null +++ b/akka-osgi-aries/src/main/resources/akka/osgi/aries/blueprint/akka.xsd @@ -0,0 +1,42 @@ + + + + + + + + Defines the configuration elements for setting up Akka with Blueprint + + + + + + + + Defines an Akka ActorSystem + + + + + + + + + + + + + + + + Defines an Akka ActorSystem configuration + + + + + diff --git a/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/BlueprintActorSystemFactory.scala b/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/BlueprintActorSystemFactory.scala new file mode 100644 index 0000000000..99ab75ef72 --- /dev/null +++ b/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/BlueprintActorSystemFactory.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.aries.blueprint + +import org.osgi.framework.BundleContext +import akka.osgi.OsgiActorSystemFactory +import com.typesafe.config.ConfigFactory + +/** + * A set of helper/factory classes to build a Akka system using Blueprint. This class is only meant to be used by + * the [[akka.osgi.aries.blueprint.NamespaceHandler]] class, you should not use this class directly. + * + * If you're looking for a way to set up Akka using Blueprint without the namespace handler, you should use + * [[akka.osgi.OsgiActorSystemFactory]] instead. + */ +class BlueprintActorSystemFactory(context: BundleContext, name: String) extends OsgiActorSystemFactory(context) { + + var config: Option[String] = None + + lazy val system = super.createActorSystem(stringToOption(name)) + + def setConfig(config: String) = { this.config = Some(config) } + + def create = system + + def destroy = system.shutdown() + + def stringToOption(original: String) = if (original == null || original.isEmpty) { + None + } else { + Some(original) + } + + /** + * Strategy method to create the Config for the ActorSystem, ensuring that the default/reference configuration is + * loaded from the akka-actor bundle. + */ + override def actorSystemConfig(context: BundleContext) = { + config match { + case Some(value) ⇒ ConfigFactory.parseString(value).withFallback(super.actorSystemConfig(context)) + case None ⇒ super.actorSystemConfig(context) + } + + } +} + diff --git a/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/NamespaceHandler.scala b/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/NamespaceHandler.scala new file mode 100644 index 0000000000..0570a027b6 --- /dev/null +++ b/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/NamespaceHandler.scala @@ -0,0 +1,148 @@ +package akka.osgi.aries.blueprint + +import org.apache.aries.blueprint.ParserContext +import org.osgi.service.blueprint.container.ComponentDefinitionException +import org.apache.aries.blueprint.mutable.MutableBeanMetadata + +import collection.JavaConversions.setAsJavaSet +import org.osgi.framework.BundleContext +import org.apache.aries.blueprint.reflect.{ ValueMetadataImpl, RefMetadataImpl, BeanArgumentImpl } +import org.w3c.dom.{ Element, Node } +import org.osgi.service.blueprint.reflect.{ BeanMetadata, ComponentMetadata } +import akka.actor.ActorSystem +import java.util.concurrent.atomic.AtomicInteger + +import ParserHelper.childElements + +/** + * Aries Blueprint namespace handler implementation. This namespace handler will allow users of Apache Aries' Blueprint + * implementation to define their Akka [[akka.actor.ActorSystem]] using a syntax like this: + * + * {{{ + * + * + * + * + * + * some.config { + * key=value + * } + * + * + * + * + * }}} + * + * Users of other IoC frameworks in an OSGi environment should use [[akka.osgi.OsgiActorSystemFactory]] instead. + */ +class NamespaceHandler extends org.apache.aries.blueprint.NamespaceHandler { + + import NamespaceHandler._ + + val idCounter = new AtomicInteger(0) + + def getSchemaLocation(namespace: String) = getClass().getResource("akka.xsd") + + def getManagedClasses = setAsJavaSet(Set(classOf[BlueprintActorSystemFactory])) + + def parse(element: Element, context: ParserContext) = element.getLocalName match { + case ACTORSYSTEM_ELEMENT_NAME ⇒ parseActorSystem(element, context) + case _ ⇒ throw new ComponentDefinitionException("Unexpected element for Akka namespace: %s".format(element)) + } + + def decorate(node: Node, component: ComponentMetadata, context: ParserContext) = + throw new ComponentDefinitionException("Bad xml syntax: node decoration is not supported") + + /* + * Parse + */ + def parseActorSystem(element: Element, context: ParserContext) = { + val factory = createFactoryBean(context, element.getAttribute(NAME_ATTRIBUTE)) + + for (child ← childElements(element)) { + child.getLocalName match { + case CONFIG_ELEMENT_NAME ⇒ parseConfig(child, context, factory) + case _ ⇒ throw new ComponentDefinitionException("Unexpected child element %s found in %s".format(child, element)) + } + } + + createActorSystemBean(context, element, factory) + } + + /* + * Parse + */ + def parseConfig(node: Element, context: ParserContext, factory: MutableBeanMetadata) = { + factory.addProperty("config", new ValueMetadataImpl(node.getTextContent)) + } + + /* + * Create the bean definition for the ActorSystem + */ + def createActorSystemBean(context: ParserContext, element: Element, factory: MutableBeanMetadata): MutableBeanMetadata = { + val system = context.createMetadata(classOf[MutableBeanMetadata]) + system.setId(getId(context, element)) + system.setFactoryComponent(factory) + + system.setFactoryMethod(FACTORY_METHOD_NAME) + system.setRuntimeClass(classOf[ActorSystem]) + system + } + + /* + * Create the bean definition for the BlueprintActorSystemFactory + */ + def createFactoryBean(context: ParserContext, name: String): MutableBeanMetadata = { + val factory = context.createMetadata(classOf[MutableBeanMetadata]) + factory.setId(findAvailableId(context)) + factory.setScope(BeanMetadata.SCOPE_SINGLETON) + factory.setProcessor(true) + factory.setRuntimeClass(classOf[BlueprintActorSystemFactory]) + + factory.setDestroyMethod(DESTROY_METHOD_NAME) + + factory.addArgument(new BeanArgumentImpl(new RefMetadataImpl(BUNDLE_CONTEXT_REFID), classOf[BundleContext].getName, -1)) + factory.addArgument(new BeanArgumentImpl(new ValueMetadataImpl(name), classOf[String].getName, -1)) + factory.setProcessor(true) + context.getComponentDefinitionRegistry.registerComponentDefinition(factory) + factory + } + + /* + * Get the assigned id or generate a suitable id + */ + def getId(context: ParserContext, element: Element) = { + if (element.hasAttribute(ID_ATTRIBUTE)) { + element.getAttribute(ID_ATTRIBUTE) + } else { + findAvailableId(context) + } + } + + /* + * Find the next available component id + */ + def findAvailableId(context: ParserContext): String = { + val id = ".akka-" + idCounter.incrementAndGet() + if (context.getComponentDefinitionRegistry.containsComponentDefinition(id)) { + // id already exists, let's try the next one + findAvailableId(context) + } else id + } +} + +object NamespaceHandler { + + private val ID_ATTRIBUTE = "id" + private val NAME_ATTRIBUTE = "name" + + private val BUNDLE_CONTEXT_REFID = "blueprintBundleContext" + + private val ACTORSYSTEM_ELEMENT_NAME = "actor-system" + private val CONFIG_ELEMENT_NAME = "config" + + private val DESTROY_METHOD_NAME = "destroy" + private val FACTORY_METHOD_NAME = "create" + +} diff --git a/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/ParserHelper.scala b/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/ParserHelper.scala new file mode 100644 index 0000000000..c2c6d97b34 --- /dev/null +++ b/akka-osgi-aries/src/main/scala/akka/osgi/aries/blueprint/ParserHelper.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.aries.blueprint + +import org.w3c.dom.{ Node, Element } + +/** + * Helper class to deal with the W3C DOM types + */ +object ParserHelper { + + def childElements(element: Element): Seq[Element] = + children(element).filter(_.getNodeType == Node.ELEMENT_NODE).asInstanceOf[Seq[Element]] + + private[this] def children(element: Element): Seq[Node] = { + val nodelist = element.getChildNodes + for (index ← 0 until nodelist.getLength) yield nodelist.item(index) + } +} diff --git a/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/config.xml b/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/config.xml new file mode 100644 index 0000000000..ce9f48c551 --- /dev/null +++ b/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/config.xml @@ -0,0 +1,15 @@ + + + + + + + + some.config { + key=value + } + + + + diff --git a/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/injection.xml b/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/injection.xml new file mode 100644 index 0000000000..6fd21db5ef --- /dev/null +++ b/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/injection.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + diff --git a/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/simple.xml b/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/simple.xml new file mode 100644 index 0000000000..2ac6552f80 --- /dev/null +++ b/akka-osgi-aries/src/test/resources/akka/osgi/aries/blueprint/simple.xml @@ -0,0 +1,9 @@ + + + + + + + + diff --git a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/ActorSystemAwareBean.scala b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/ActorSystemAwareBean.scala new file mode 100644 index 0000000000..ade4a17bda --- /dev/null +++ b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/ActorSystemAwareBean.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.aries.blueprint + +import akka.actor.ActorSystem + +/** + * Just a simple POJO that can contain an actor system. + * Used for testing dependency injection with Blueprint + */ +class ActorSystemAwareBean(val system: ActorSystem) { + +} diff --git a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala new file mode 100644 index 0000000000..7b0d28ecce --- /dev/null +++ b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.aries.blueprint + +import org.scalatest.WordSpec +import akka.actor.ActorSystem +import de.kalpatec.pojosr.framework.launch.BundleDescriptor +import akka.osgi.PojoSRTestSupport +import akka.osgi.PojoSRTestSupport.bundle +import org.scalatest.matchers.MustMatchers + +/** + * Test cases for {@link ActorSystemActivator} + */ +object NamespaceHandlerTest { + + /* + * Bundle-SymbolicName to easily find our test bundle + */ + val TEST_BUNDLE_NAME = "akka.osgi.test.aries.namespace" + + /* + * Bundle descriptor representing the akka-osgi bundle itself + */ + val AKKA_OSGI_BLUEPRINT = + bundle("akka-osgi").withBlueprintFile(getClass.getResource("/OSGI-INF/blueprint/akka-namespacehandler.xml")) + +} + +class SimpleNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRTestSupport { + + import NamespaceHandlerTest._ + + val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + AKKA_OSGI_BLUEPRINT, + bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("simple.xml")))) + + "simple.xml" must { + "set up ActorSystem when bundle starts" in { + serviceForType[ActorSystem] must not be (null) + } + + "stop the ActorSystem when bundle stops" in { + val system = serviceForType[ActorSystem] + system.isTerminated must be(false) + + bundleForName(TEST_BUNDLE_NAME).stop() + + system.awaitTermination() + system.isTerminated must be(true) + } + } + +} + +class ConfigNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRTestSupport { + + import NamespaceHandlerTest._ + + val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + AKKA_OSGI_BLUEPRINT, + bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("config.xml")))) + + "config.xml" must { + "set up ActorSystem when bundle starts" in { + val system = serviceForType[ActorSystem] + system must not be (null) + + system.settings.config.getString("some.config.key") must be("value") + } + + "stop the ActorSystem when bundle stops" in { + val system = serviceForType[ActorSystem] + system.isTerminated must be(false) + + bundleForName(TEST_BUNDLE_NAME).stop() + + system.awaitTermination() + system.isTerminated must be(true) + } + } + +} + +class DependencyInjectionNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRTestSupport { + + import NamespaceHandlerTest._ + + val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + AKKA_OSGI_BLUEPRINT, + bundle(TEST_BUNDLE_NAME).withBlueprintFile(getClass.getResource("injection.xml")))) + + "injection.xml" must { + + "set up bean containing ActorSystem" in { + val bean = serviceForType[ActorSystemAwareBean] + bean must not be (null) + bean.system must not be (null) + } + } + +} diff --git a/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala b/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala new file mode 100644 index 0000000000..155c471fe3 --- /dev/null +++ b/akka-osgi/src/main/scala/akka/osgi/ActorSystemActivator.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi + +import akka.actor.ActorSystem +import java.util.{ Dictionary, Properties } +import org.osgi.framework.{ ServiceRegistration, BundleContext, BundleActivator } + +/** + * Abstract bundle activator implementation to bootstrap and configure an actor system in an + * OSGi environment. It also provides a convenience method to register the actor system in + * the OSGi Service Registry for sharing it with other OSGi bundles. + * + * This convenience activator is mainly useful for setting up a single [[akka.actor.ActorSystem]] instance and sharing that + * with other bundles in the OSGi Framework. If you want to set up multiple systems in the same bundle context, look at + * the [[akka.osgi.OsgiActorSystemFactory]] instead. + */ +abstract class ActorSystemActivator extends BundleActivator { + + private var system: Option[ActorSystem] = None + private var registration: Option[ServiceRegistration] = None + + /** + * Implement this method to add your own actors to the ActorSystem. If you want to share the actor + * system with other bundles, call the `registerService(BundleContext, ActorSystem)` method from within + * this method. + * + * @param context the bundle context + * @param system the ActorSystem that was created by the activator + */ + def configure(context: BundleContext, system: ActorSystem): Unit + + /** + * Sets up a new ActorSystem + * + * @param context the BundleContext + */ + def start(context: BundleContext): Unit = { + system = Some(OsgiActorSystemFactory(context).createActorSystem(Option(getActorSystemName(context)))) + system foreach (configure(context, _)) + } + + /** + * Shuts down the ActorSystem when the bundle is stopped and, if necessary, unregisters a service registration. + * + * @param context the BundleContext + */ + def stop(context: BundleContext): Unit = { + registration foreach (_.unregister()) + system foreach (_.shutdown()) + } + + /** + * Register the actor system in the OSGi service registry. The activator itself will ensure that this service + * is unregistered again when the bundle is being stopped. + * + * @param context the bundle context + * @param system the actor system + */ + def registerService(context: BundleContext, system: ActorSystem): Unit = { + val properties = new Properties() + properties.put("name", system.name) + registration = Some(context.registerService(classOf[ActorSystem].getName, system, + properties.asInstanceOf[Dictionary[String, Any]])) + } + + /** + * By default, the [[akka.actor.ActorSystem]] name will be set to `bundle--ActorSystem`. Override this + * method to define another name for your [[akka.actor.ActorSystem]] instance. + * + * @param context the bundle context + * @return the actor system name + */ + def getActorSystemName(context: BundleContext): String = null + +} diff --git a/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala b/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala new file mode 100644 index 0000000000..9f275204cf --- /dev/null +++ b/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi + +import impl.BundleDelegatingClassLoader +import akka.actor.ActorSystem +import com.typesafe.config.{ ConfigFactory, Config } +import org.osgi.framework.BundleContext + +/** + * Factory class to create ActorSystem implementations in an OSGi environment. This mainly involves dealing with + * bundle classloaders appropriately to ensure that configuration files and classes get loaded properly + */ +class OsgiActorSystemFactory(val context: BundleContext) { + + /* + * Classloader that delegates to the bundle for which the factory is creating an ActorSystem + */ + private val classloader = BundleDelegatingClassLoader.createFor(context) + + /** + * Creates the [[akka.actor.ActorSystem]], using the name specified + */ + def createActorSystem(name: String): ActorSystem = createActorSystem(Option(name)) + + /** + * Creates the [[akka.actor.ActorSystem]], using the name specified. + * + * A default name (`bundle--ActorSystem`) is assigned when you pass along [[scala.None]] instead. + */ + def createActorSystem(name: Option[String]): ActorSystem = + ActorSystem(actorSystemName(name), actorSystemConfig(context), classloader) + + /** + * Strategy method to create the Config for the ActorSystem, ensuring that the default/reference configuration is + * loaded from the akka-actor bundle. + */ + def actorSystemConfig(context: BundleContext): Config = { + val reference = ConfigFactory.defaultReference(classOf[ActorSystem].getClassLoader) + ConfigFactory.load(classloader).withFallback(reference) + } + + /** + * Determine the name for the [[akka.actor.ActorSystem]] + * Returns a default value of `bundle--ActorSystem` is no name is being specified + */ + def actorSystemName(name: Option[String]): String = + name.getOrElse("bundle-%s-ActorSystem".format(context.getBundle().getBundleId)) + +} + +object OsgiActorSystemFactory { + + /* + * Create an [[OsgiActorSystemFactory]] instance to set up Akka in an OSGi environment + */ + def apply(context: BundleContext): OsgiActorSystemFactory = new OsgiActorSystemFactory(context) + +} diff --git a/akka-osgi/src/main/scala/akka/osgi/impl/BundleDelegatingClassLoader.scala b/akka-osgi/src/main/scala/akka/osgi/impl/BundleDelegatingClassLoader.scala new file mode 100644 index 0000000000..37e74e7194 --- /dev/null +++ b/akka-osgi/src/main/scala/akka/osgi/impl/BundleDelegatingClassLoader.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.impl + +import java.net.URL +import java.util.Enumeration + +import org.osgi.framework.{ BundleContext, Bundle } + +/* + * Companion object to create bundle delegating classloader instances + */ +object BundleDelegatingClassLoader { + + /* + * Create a bundle delegating classloader for the bundle context's bundle + */ + def createFor(context: BundleContext) = new BundleDelegatingClassLoader(context.getBundle) + +} + +/* + * A bundle delegating classloader implemenation - this will try to load classes and resources from the bundle + * specified first and if there's a classloader specified, that will be used as a fallback + */ +class BundleDelegatingClassLoader(bundle: Bundle, classLoader: Option[ClassLoader]) extends ClassLoader { + + def this(bundle: Bundle) = this(bundle, None) + + protected override def findClass(name: String): Class[_] = bundle.loadClass(name) + + protected override def findResource(name: String): URL = { + val resource = bundle.getResource(name) + classLoader match { + case Some(loader) if resource == null ⇒ loader.getResource(name) + case _ ⇒ resource + } + } + + @SuppressWarnings(Array("unchecked", "rawtypes")) + protected override def findResources(name: String): Enumeration[URL] = + bundle.getResources(name).asInstanceOf[Enumeration[URL]] + + protected override def loadClass(name: String, resolve: Boolean): Class[_] = { + val clazz = try { + findClass(name) + } catch { + case cnfe: ClassNotFoundException ⇒ { + classLoader match { + case Some(loader) ⇒ loadClass(name, loader) + case None ⇒ rethrowClassNotFoundException(name, cnfe) + } + } + } + if (resolve) { + resolveClass(clazz) + } + clazz + } + + private def loadClass(name: String, classLoader: ClassLoader) = + try { + classLoader.loadClass(name) + } catch { + case cnfe: ClassNotFoundException ⇒ rethrowClassNotFoundException(name, cnfe) + } + + private def rethrowClassNotFoundException(name: String, cnfe: ClassNotFoundException): Nothing = + throw new ClassNotFoundException(name + " from bundle " + bundle.getBundleId + " (" + bundle.getSymbolicName + ")", cnfe) + + override def toString: String = String.format("BundleDelegatingClassLoader(%s)", bundle) + +} + diff --git a/akka-osgi/src/test/resources/logback-test.xml b/akka-osgi/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..9c441a6fb6 --- /dev/null +++ b/akka-osgi/src/test/resources/logback-test.xml @@ -0,0 +1,23 @@ + + + + + + + %date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n + + + + + target/akka-osgi.log + true + + %date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n + + + + + + + + diff --git a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala new file mode 100644 index 0000000000..e1781b4a80 --- /dev/null +++ b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi + +import language.postfixOps + +import org.scalatest.WordSpec +import akka.actor.ActorSystem +import akka.pattern.ask +import scala.concurrent.Await +import scala.concurrent.util.duration._ +import akka.util.Timeout +import de.kalpatec.pojosr.framework.launch.BundleDescriptor +import test.{ RuntimeNameActorSystemActivator, TestActivators, PingPongActorSystemActivator } +import test.PingPong._ +import PojoSRTestSupport.bundle +import org.scalatest.matchers.MustMatchers + +/** + * Test cases for [[akka.osgi.ActorSystemActivator]] in 2 different scenarios: + * - no name configured for [[akka.actor.ActorSystem]] + * - runtime name configuration + */ +object ActorSystemActivatorTest { + + val TEST_BUNDLE_NAME = "akka.osgi.test.activator" + +} + +class PingPongActorSystemActivatorTest extends WordSpec with MustMatchers with PojoSRTestSupport { + + import ActorSystemActivatorTest._ + + val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq( + bundle(TEST_BUNDLE_NAME).withActivator(classOf[PingPongActorSystemActivator]))) + + "PingPongActorSystemActivator" must { + + "start and register the ActorSystem when bundle starts" in { + val system = serviceForType[ActorSystem] + val actor = system.actorFor("/user/pong") + + implicit val timeout = Timeout(5 seconds) + Await.result(actor ? Ping, timeout.duration) must be(Pong) + } + + "stop the ActorSystem when bundle stops" in { + val system = serviceForType[ActorSystem] + system.isTerminated must be(false) + + bundleForName(TEST_BUNDLE_NAME).stop() + + system.awaitTermination() + system.isTerminated must be(true) + } + } + +} + +class RuntimeNameActorSystemActivatorTest extends WordSpec with MustMatchers with PojoSRTestSupport { + + import ActorSystemActivatorTest._ + + val testBundles: Seq[BundleDescriptor] = buildTestBundles(Seq(bundle(TEST_BUNDLE_NAME).withActivator(classOf[RuntimeNameActorSystemActivator]))) + + "RuntimeNameActorSystemActivator" must { + + "register an ActorSystem and add the bundle id to the system name" in { + serviceForType[ActorSystem].name must equal(TestActivators.ACTOR_SYSTEM_NAME_PATTERN.format(bundleForName(TEST_BUNDLE_NAME).getBundleId)) + } + } + +} \ No newline at end of file diff --git a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala new file mode 100644 index 0000000000..029877d8af --- /dev/null +++ b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi + +import de.kalpatec.pojosr.framework.launch.{ BundleDescriptor, PojoServiceRegistryFactory, ClasspathScanner } + +import scala.collection.JavaConversions.seqAsJavaList +import scala.collection.JavaConversions.collectionAsScalaIterable +import org.apache.commons.io.IOUtils.copy + +import org.osgi.framework._ +import java.net.URL + +import java.util.jar.JarInputStream +import java.io.{ FileInputStream, FileOutputStream, File } +import java.util.{ Date, ServiceLoader, HashMap } +import org.scalatest.{ BeforeAndAfterAll, Suite } + +/** + * Trait that provides support for building akka-osgi tests using PojoSR + */ +trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { + + val MAX_WAIT_TIME = 12800 + val START_WAIT_TIME = 50 + + /** + * All bundles being found on the test classpath are automatically installed and started in the PojoSR runtime. + * Implement this to define the extra bundles that should be available for testing. + */ + val testBundles: Seq[BundleDescriptor] + + lazy val context: BundleContext = { + val config = new HashMap[String, AnyRef]() + System.setProperty("org.osgi.framework.storage", "target/akka-osgi/" + System.currentTimeMillis) + + val bundles = new ClasspathScanner().scanForBundles() + bundles.addAll(testBundles) + config.put(PojoServiceRegistryFactory.BUNDLE_DESCRIPTORS, bundles) + + ServiceLoader.load(classOf[PojoServiceRegistryFactory]).iterator.next.newPojoServiceRegistry(config).getBundleContext + } + + // Ensure bundles get stopped at the end of the test to release resources and stop threads + override protected def afterAll() = context.getBundles.foreach(_.stop) + + /** + * Convenience method to find a bundle by symbolic name + */ + def bundleForName(name: String) = + context.getBundles.find(_.getSymbolicName == name).getOrElse(fail("Unable to find bundle with symbolic name %s".format(name))) + + /** + * Convenience method to find a service by interface. If the service is not already available in the OSGi Service + * Registry, this method will wait for a few seconds for the service to appear. + */ + def serviceForType[T](implicit manifest: Manifest[T]): T = + context.getService(awaitReference(manifest.erasure)).asInstanceOf[T] + + def awaitReference(serviceType: Class[_]): ServiceReference = awaitReference(serviceType, START_WAIT_TIME) + + def awaitReference(serviceType: Class[_], wait: Long): ServiceReference = { + val option = Option(context.getServiceReference(serviceType.getName)) + Thread.sleep(wait) //FIXME No sleep please + option match { + case Some(reference) ⇒ reference + case None if (wait > MAX_WAIT_TIME) ⇒ fail("Gave up waiting for service of type %s".format(serviceType)) + case None ⇒ awaitReference(serviceType, wait * 2) + } + } + + protected def buildTestBundles(builders: Seq[BundleDescriptorBuilder]): Seq[BundleDescriptor] = builders map (_.build) +} + +object PojoSRTestSupport { + /** + * Convenience method to define additional test bundles + */ + def bundle(name: String) = new BundleDescriptorBuilder(name) +} + +/** + * Helper class to make it easier to define test bundles + */ +class BundleDescriptorBuilder(name: String) { + + import org.ops4j.pax.tinybundles.core.TinyBundles + + val tinybundle = TinyBundles.bundle.set(Constants.BUNDLE_SYMBOLICNAME, name) + + /** + * Add a Blueprint XML file to our test bundle + */ + def withBlueprintFile(name: String, contents: URL): BundleDescriptorBuilder = { + tinybundle.add("OSGI-INF/blueprint/%s".format(name), contents) + this + } + + /** + * Add a Blueprint XML file to our test bundle + */ + def withBlueprintFile(contents: URL): BundleDescriptorBuilder = { + val filename = contents.getFile.split("/").last + withBlueprintFile(filename, contents) + } + + /** + * Add a Bundle activator to our test bundle + */ + def withActivator(activator: Class[_ <: BundleActivator]): BundleDescriptorBuilder = { + tinybundle.set(Constants.BUNDLE_ACTIVATOR, activator.getName) + this + } + + /** + * Build the actual PojoSR BundleDescriptor instance + */ + def build: BundleDescriptor = { + val file: File = tinybundleToJarFile(name) + new BundleDescriptor(getClass().getClassLoader(), new URL("jar:" + file.toURI().toString() + "!/"), extractHeaders(file)) + } + + def extractHeaders(file: File): HashMap[String, String] = { + val headers = new HashMap[String, String]() + + val jis = new JarInputStream(new FileInputStream(file)) + try { + for (entry ← jis.getManifest().getMainAttributes().entrySet()) + headers.put(entry.getKey().toString(), entry.getValue().toString()) + } finally jis.close() + + headers + } + + def tinybundleToJarFile(name: String): File = { + val file = new File("target/%s-%tQ.jar".format(name, new Date())) + val fos = new FileOutputStream(file) + try copy(tinybundle.build(), fos) finally fos.close() + + file + } +} + diff --git a/akka-osgi/src/test/scala/akka/osgi/test/PingPong.scala b/akka-osgi/src/test/scala/akka/osgi/test/PingPong.scala new file mode 100644 index 0000000000..cd2294f3b0 --- /dev/null +++ b/akka-osgi/src/test/scala/akka/osgi/test/PingPong.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.test + +import akka.actor.Actor + +/** + * Simple ping-pong actor, used for testing + */ +object PingPong { + + abstract class TestMessage + + case object Ping extends TestMessage + case object Pong extends TestMessage + + class PongActor extends Actor { + def receive = { + case Ping ⇒ + sender ! Pong + } + } + +} diff --git a/akka-osgi/src/test/scala/akka/osgi/test/TestActivators.scala b/akka-osgi/src/test/scala/akka/osgi/test/TestActivators.scala new file mode 100644 index 0000000000..eec07b99e1 --- /dev/null +++ b/akka-osgi/src/test/scala/akka/osgi/test/TestActivators.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.osgi.test + +import akka.osgi.ActorSystemActivator +import akka.actor.{ Props, ActorSystem } +import PingPong._ +import org.osgi.framework.BundleContext + +/** + * A set of [[akka.osgi.ActorSystemActivator]]s for testing purposes + */ +object TestActivators { + + val ACTOR_SYSTEM_NAME_PATTERN = "actor-system-for-bundle-%s" + +} + +/** + * Simple ActorSystemActivator that starts the sample ping-pong application + */ +class PingPongActorSystemActivator extends ActorSystemActivator { + + def configure(context: BundleContext, system: ActorSystem) { + system.actorOf(Props(new PongActor), name = "pong") + registerService(context, system) + } + +} + +/** + * [[akka.osgi.ActorSystemActivator]] implementation that determines [[akka.actor.ActorSystem]] name at runtime + */ +class RuntimeNameActorSystemActivator extends ActorSystemActivator { + + def configure(context: BundleContext, system: ActorSystem) = registerService(context, system); + + override def getActorSystemName(context: BundleContext) = + TestActivators.ACTOR_SYSTEM_NAME_PATTERN.format(context.getBundle.getBundleId) + +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a706ec2c64..7dad77d7cf 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -50,7 +50,7 @@ object AkkaBuild extends Build { sphinxLatex <<= sphinxLatex in LocalProject(docs.id), sphinxPdf <<= sphinxPdf in LocalProject(docs.id) ), - aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, /*akkaSbtPlugin,*/ samples, tutorials, docs) + aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, /*akkaSbtPlugin,*/ samples, tutorials, osgi, osgiAries, docs) ) lazy val actor = Project( @@ -225,6 +225,24 @@ object AkkaBuild extends Build { ) ) + lazy val osgi = Project( + id = "akka-osgi", + base = file("akka-osgi"), + dependencies = Seq(actor), + settings = defaultSettings ++ OSGi.osgi ++ Seq( + libraryDependencies ++= Dependencies.osgi + ) + ) + + lazy val osgiAries = Project( + id = "akka-osgi-aries", + base = file("akka-osgi-aries"), + dependencies = Seq(osgi % "compile;test->test"), + settings = defaultSettings ++ OSGi.osgiAries ++ Seq( + libraryDependencies ++= Dependencies.osgiAries + ) + ) + lazy val akkaSbtPlugin = Project( id = "akka-sbt-plugin", base = file("akka-sbt-plugin"), @@ -289,7 +307,7 @@ object AkkaBuild extends Build { id = "akka-docs", base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", - remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel), + remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries), settings = defaultSettings ++ Sphinx.settings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs, @@ -473,6 +491,10 @@ object Dependencies { val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito) + val osgi = Seq(osgiCore,Test.logback, Test.commonsIo, Test.pojosr, Test.tinybundles, Test.scalatest, Test.junit) + + val osgiAries = Seq(osgiCore, ariesBlueprint, Test.ariesProxy) + val tutorials = Seq(Test.scalatest, Test.junit) val docs = Seq(Test.scalatest, Test.junit, Test.specs2) @@ -485,15 +507,16 @@ object Dependency { def v(a: String): String = a+"_"+AkkaBuild.desiredScalaVersion // Compile - val config = "com.typesafe" % "config" % "0.4.1" // ApacheV2 - val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" // ApacheV2 - val netty = "io.netty" % "netty" % "3.5.1.Final" // ApacheV2 - val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD - val scalaStm = "org.scala-tools" % v("scala-stm") % "0.5" // Modified BSD (Scala) - val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT - val zeroMQ = "org.zeromq" % v("zeromq-scala-binding") % "0.0.6" // ApacheV2 //FIXME SWITCH TO OFFICIAL VERSION - val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 - + val config = "com.typesafe" % "config" % "0.4.1" // ApacheV2 + val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" // ApacheV2 + val netty = "io.netty" % "netty" % "3.5.1.Final" // ApacheV2 + val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD + val scalaStm = "org.scala-tools" % v("scala-stm") % "0.5" // Modified BSD (Scala) + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT + val zeroMQ = "org.zeromq" % v("zeromq-scala-binding") % "0.0.6" // ApacheV2 //FIXME SWITCH TO OFFICIAL VERSION + val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 + val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 + val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 // Test @@ -506,6 +529,9 @@ object Dependency { val scalatest = "org.scalatest" % v("scalatest") % "1.9-2.10.0-M4-B2" % "test" // ApacheV2 val scalacheck = "org.scalacheck" % v("scalacheck") % "1.10.0-b1" % "test" // New BSD val specs2 = "org.specs2" % "specs2_2.10" % "1.11" % "test" // Modified BSD / ApacheV2 + val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2 + val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2 + val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2 } } @@ -525,6 +551,10 @@ object OSGi { val mailboxesCommon = exports(Seq("akka.actor.mailbox.*")) + val osgi = exports(Seq("akka.osgi")) ++ Seq(OsgiKeys.privatePackage := Seq("akka.osgi.impl")) + + val osgiAries = exports() ++ Seq(OsgiKeys.privatePackage := Seq("akka.osgi.aries.*")) + val remote = exports(Seq("akka.remote.*", "akka.routing.*", "akka.serialization.*")) val slf4j = exports(Seq("akka.event.slf4j.*")) @@ -533,11 +563,12 @@ object OSGi { val zeroMQ = exports(Seq("akka.zeromq.*")) - def exports(packages: Seq[String]) = osgiSettings ++ Seq( - OsgiKeys.importPackage := Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*"), + def exports(packages: Seq[String] = Seq()) = osgiSettings ++ Seq( + OsgiKeys.importPackage := defaultImports, OsgiKeys.exportPackage := packages ) + def defaultImports = Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*") def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName) def configImport(packageName: String = "com.typesafe.config.*") = "%s;version=\"[0.4.1,0.5)\"".format(packageName) def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.9.2,2.10)\"".format(packageName) diff --git a/repl b/repl new file mode 100644 index 0000000000..29f505e292 --- /dev/null +++ b/repl @@ -0,0 +1,16 @@ +import akka.actor._ +import akka.dispatch.{ Future, Promise } +import com.typesafe.config.ConfigFactory +import akka.testkit._ +val remoteConfig = try { + Class.forName("akka.remote.RemoteActorRefProvider") + "\nakka.actor.provider=akka.remote.RemoteActorRefProvider" + } catch { + case _: ClassNotFoundException => "" + } +val config=ConfigFactory.parseString("akka.daemonic=on" + remoteConfig) +val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem] +implicit val ec=sys.dispatcher +import akka.util.duration._ +import akka.util.Timeout +implicit val timeout=Timeout(5 seconds)