From 2cf8fad29289a0094cb7c678cc78736822aba6ef Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 12:07:51 -0500 Subject: [PATCH 01/10] [SPARK-12177][Streaming][Kafka] limit LocationStrategy api surface area --- .../streaming/kafka010/LocationStrategy.scala | 65 ++++++++++--------- .../kafka010/JavaDirectKafkaStreamSuite.java | 4 +- .../streaming/kafka010/JavaKafkaRDDSuite.java | 4 +- .../kafka010/JavaLocationStrategySuite.java | 19 +++--- .../kafka010/DirectKafkaStreamSuite.scala | 2 +- .../streaming/kafka010/KafkaRDDSuite.scala | 2 +- 6 files changed, 51 insertions(+), 45 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala index df620300eae21..df73c9ffd62a3 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -36,42 +36,47 @@ import org.apache.spark.annotation.Experimental @Experimental sealed trait LocationStrategy -/** - * :: Experimental :: - * Use this only if your executors are on the same nodes as your Kafka brokers. - */ -@Experimental -case object PreferBrokers extends LocationStrategy { - def create: PreferBrokers.type = this -} +private case object PreferBrokers extends LocationStrategy -/** - * :: Experimental :: - * Use this in most cases, it will consistently distribute partitions across all executors. - */ -@Experimental -case object PreferConsistent extends LocationStrategy { - def create: PreferConsistent.type = this -} +private case object PreferConsistent extends LocationStrategy -/** - * :: Experimental :: - * Use this to place particular TopicPartitions on particular hosts if your load is uneven. - * Any TopicPartition not specified in the map will use a consistent location. - */ -@Experimental -case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy +private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy /** - * :: Experimental :: - * Use this to place particular TopicPartitions on particular hosts if your load is uneven. - * Any TopicPartition not specified in the map will use a consistent location. + * :: Experimental :: object to obtain instances of [[LocationStrategy]] + * */ @Experimental -object PreferFixed { - def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = { +object LocationStrategies { + /** + * :: Experimental :: + * Use this only if your executors are on the same nodes as your Kafka brokers. + */ + @Experimental + def preferBrokers: LocationStrategy = PreferBrokers + + /** + * :: Experimental :: + * Use this in most cases, it will consistently distribute partitions across all executors. + */ + @Experimental + def preferConsistent: LocationStrategy = PreferConsistent + + /** + * :: Experimental :: + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ + @Experimental + def preferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) - } - def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed = + + /** + * :: Experimental :: + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ + @Experimental + def preferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = PreferFixed(hostMap) } diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java index e57ede7afaef4..20b4cba563063 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -90,7 +90,7 @@ public void testKafkaStream() throws InterruptedException { JavaInputDStream> istream1 = KafkaUtils.createDirectStream( ssc, - PreferConsistent.create(), + LocationStrategies.preferConsistent(), Subscribe.create(Arrays.asList(topic1), kafkaParams) ); @@ -123,7 +123,7 @@ public String call(ConsumerRecord r) { JavaInputDStream> istream2 = KafkaUtils.createDirectStream( ssc, - PreferConsistent.create(), + LocationStrategies.preferConsistent(), Subscribe.create(Arrays.asList(topic2), kafkaParams2) ); diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index 548ba134dcddf..03c710ac4eb8c 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -96,14 +96,14 @@ public String call(ConsumerRecord r) { sc, kafkaParams, offsetRanges, - PreferFixed.create(leaders) + LocationStrategies.preferFixed(leaders) ).map(handler); JavaRDD rdd2 = KafkaUtils.createRDD( sc, kafkaParams, offsetRanges, - PreferConsistent.create() + LocationStrategies.preferConsistent() ).map(handler); // just making sure the java user apis work; the scala tests handle logic corner cases diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java index 7873c09e1af85..8b808860f038e 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java @@ -41,18 +41,19 @@ public void testLocationStrategyConstructors() { JavaConverters.mapAsScalaMapConverter(hosts).asScala(); // make sure constructors can be called from java - final LocationStrategy c1 = PreferConsistent.create(); - final LocationStrategy c2 = PreferConsistent$.MODULE$; - Assert.assertEquals(c1, c2); + final LocationStrategy c1 = LocationStrategies.preferConsistent(); + final LocationStrategy c2 = LocationStrategies.preferConsistent(); + Assert.assertSame(c1, c2); - final LocationStrategy c3 = PreferBrokers.create(); - final LocationStrategy c4 = PreferBrokers$.MODULE$; - Assert.assertEquals(c3, c4); + final LocationStrategy c3 = LocationStrategies.preferBrokers(); + final LocationStrategy c4 = LocationStrategies.preferBrokers(); + Assert.assertSame(c3, c4); - final LocationStrategy c5 = PreferFixed.create(hosts); - final LocationStrategy c6 = PreferFixed.apply(sHosts); - Assert.assertEquals(c5, c6); + Assert.assertNotSame(c1, c3); + final LocationStrategy c5 = LocationStrategies.preferFixed(hosts); + final LocationStrategy c6 = LocationStrategies.preferFixed(sHosts); + Assert.assertEquals(c5, c6); } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 776d11ad2f648..63120f3d85376 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -93,7 +93,7 @@ class DirectKafkaStreamSuite kp } - val preferredHosts = PreferConsistent + val preferredHosts = LocationStrategies.preferConsistent test("basic stream receiving with multiple topics and smallest starting offset") { val topics = List("basic1", "basic2", "basic3") diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 3d2546ddd936d..e5151a994f072 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -62,7 +62,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}" ).asJava - private val preferredHosts = PreferConsistent + private val preferredHosts = LocationStrategies.preferConsistent test("basic usage") { val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}" From 27d63376701c4416ebad5a3a400a55d0e34c21ea Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 12:50:30 -0500 Subject: [PATCH 02/10] [SPARK-12177][Streaming][Kafka] limit ConsumerStrategy api surface area --- .../streaming/kafka010/ConsumerStrategy.scala | 120 ++++++++---------- .../spark/streaming/kafka010/KafkaUtils.scala | 24 ++-- .../kafka010/JavaConsumerStrategySuite.java | 18 ++- .../kafka010/JavaDirectKafkaStreamSuite.java | 4 +- .../kafka010/DirectKafkaStreamSuite.scala | 30 +++-- 5 files changed, 98 insertions(+), 98 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 079a07dbc2bd0..a333a90afb6ab 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -55,7 +55,6 @@ trait ConsumerStrategy[K, V] { } /** - * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka @@ -68,8 +67,7 @@ trait ConsumerStrategy[K, V] { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ -@Experimental -case class Subscribe[K, V] private( +private case class Subscribe[K, V]( topics: ju.Collection[java.lang.String], kafkaParams: ju.Map[String, Object], offsets: ju.Map[TopicPartition, Long] @@ -90,12 +88,45 @@ case class Subscribe[K, V] private( } } +/** + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +private case class Assign[K, V]( + topicPartitions: ju.Collection[TopicPartition], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, Long] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.assign(topicPartitions) + if (currentOffsets.isEmpty) { + offsets.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + /** * :: Experimental :: - * Companion object for creating [[Subscribe]] strategy + * object for obtaining instances of [[ConsumerStrategy]] */ -@Experimental -object Subscribe { +object ConsumerStrategies { /** * :: Experimental :: * Subscribe to a collection of topics. @@ -111,10 +142,10 @@ object Subscribe { * auto.offset.reset will be used. */ @Experimental - def apply[K, V]( + def subscribe[K, V]( topics: Iterable[java.lang.String], kafkaParams: collection.Map[String, Object], - offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = { + offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), @@ -133,9 +164,9 @@ object Subscribe { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def apply[K, V]( + def subscribe[K, V]( topics: Iterable[java.lang.String], - kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = { + kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), @@ -157,10 +188,10 @@ object Subscribe { * auto.offset.reset will be used. */ @Experimental - def create[K, V]( + def subscribe[K, V]( topics: ju.Collection[java.lang.String], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = { + offsets: ju.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { Subscribe[K, V](topics, kafkaParams, offsets) } @@ -176,56 +207,12 @@ object Subscribe { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def create[K, V]( + def subscribe[K, V]( topics: ju.Collection[java.lang.String], - kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = { + kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) } -} - -/** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ -@Experimental -case class Assign[K, V] private( - topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] - ) extends ConsumerStrategy[K, V] { - - def executorKafkaParams: ju.Map[String, Object] = kafkaParams - - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) - consumer.assign(topicPartitions) - if (currentOffsets.isEmpty) { - offsets.asScala.foreach { case (topicPartition, offset) => - consumer.seek(topicPartition, offset) - } - } - - consumer - } -} - -/** - * :: Experimental :: - * Companion object for creating [[Assign]] strategy - */ -@Experimental -object Assign { /** * :: Experimental :: * Assign a fixed collection of TopicPartitions @@ -241,10 +228,10 @@ object Assign { * auto.offset.reset will be used. */ @Experimental - def apply[K, V]( + def assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], - offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = { + offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), @@ -263,9 +250,9 @@ object Assign { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def apply[K, V]( + def assign[K, V]( topicPartitions: Iterable[TopicPartition], - kafkaParams: collection.Map[String, Object]): Assign[K, V] = { + kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), @@ -287,10 +274,10 @@ object Assign { * auto.offset.reset will be used. */ @Experimental - def create[K, V]( + def assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = { + offsets: ju.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { Assign[K, V](topicPartitions, kafkaParams, offsets) } @@ -306,9 +293,10 @@ object Assign { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def create[K, V]( + def assign[K, V]( topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object]): Assign[K, V] = { + kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) } + } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index c0524990bc4dc..626597a244171 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -48,8 +48,8 @@ object KafkaUtils extends Logging { * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -87,8 +87,8 @@ object KafkaUtils extends Logging { * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -110,10 +110,10 @@ object KafkaUtils extends Logging { * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number * of messages * per second that each '''partition''' will accept. - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe, + * see [[ConsumerStrategies]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -132,10 +132,10 @@ object KafkaUtils extends Logging { * each given Kafka topic/partition corresponds to an RDD partition. * @param keyClass Class of the keys in the Kafka records * @param valueClass Class of the values in the Kafka records - * @param locationStrategy In most cases, pass in [[PreferConsistent]], - * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe, + * see [[ConsumerStrategies]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index 8d7c05b5a615d..724294d197496 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -53,28 +53,26 @@ public void testConsumerStrategyConstructors() { // final ConsumerStrategy sub0 = // does not compile in Scala 2.10 // Subscribe.apply(topics, kafkaParams, offsets); final ConsumerStrategy sub1 = - Subscribe.apply(sTopics, sKafkaParams, sOffsets); + ConsumerStrategies.subscribe(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy sub2 = - Subscribe.apply(sTopics, sKafkaParams); + ConsumerStrategies.subscribe(sTopics, sKafkaParams); final ConsumerStrategy sub3 = - Subscribe.create(topics, kafkaParams, offsets); + ConsumerStrategies.subscribe(topics, kafkaParams, offsets); final ConsumerStrategy sub4 = - Subscribe.create(topics, kafkaParams); + ConsumerStrategies.subscribe(topics, kafkaParams); Assert.assertEquals( sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); - // final ConsumerStrategy asn0 = // does not compile in Scala 2.10 - // Assign.apply(parts, kafkaParams, offsets); final ConsumerStrategy asn1 = - Assign.apply(sParts, sKafkaParams, sOffsets); + ConsumerStrategies.assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = - Assign.apply(sParts, sKafkaParams); + ConsumerStrategies.assign(sParts, sKafkaParams); final ConsumerStrategy asn3 = - Assign.create(parts, kafkaParams, offsets); + ConsumerStrategies.assign(parts, kafkaParams, offsets); final ConsumerStrategy asn4 = - Assign.create(parts, kafkaParams); + ConsumerStrategies.assign(parts, kafkaParams); Assert.assertEquals( asn1.executorKafkaParams().get("bootstrap.servers"), diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java index 20b4cba563063..329f076258e94 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -91,7 +91,7 @@ public void testKafkaStream() throws InterruptedException { JavaInputDStream> istream1 = KafkaUtils.createDirectStream( ssc, LocationStrategies.preferConsistent(), - Subscribe.create(Arrays.asList(topic1), kafkaParams) + ConsumerStrategies.subscribe(Arrays.asList(topic1), kafkaParams) ); JavaDStream stream1 = istream1.transform( @@ -124,7 +124,7 @@ public String call(ConsumerRecord r) { JavaInputDStream> istream2 = KafkaUtils.createDirectStream( ssc, LocationStrategies.preferConsistent(), - Subscribe.create(Arrays.asList(topic2), kafkaParams2) + ConsumerStrategies.subscribe(Arrays.asList(topic2), kafkaParams2) ); JavaDStream stream2 = istream2.transform( diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 63120f3d85376..817383c958a69 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -108,7 +108,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.subscribe[String, String](topics, kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)]() @@ -178,7 +180,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { val s = new DirectKafkaInputDStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) s.consumer.poll(0) assert( s.consumer.position(topicPartition) >= offsetBeforeStart, @@ -225,8 +229,10 @@ class DirectKafkaStreamSuite // Setup context and kafka stream with largest offset ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { - val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts, - Assign[String, String]( + val s = new DirectKafkaInputDStream[String, String]( + ssc, + preferredHosts, + ConsumerStrategies.assign[String, String]( List(topicPartition), kafkaParams.asScala, Map(topicPartition -> 11L))) @@ -267,7 +273,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(100)) val kafkaStream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) } val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt } val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => @@ -360,7 +368,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(100)) withClue("Error creating direct stream") { val kafkaStream = KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val data = rdd.map(_.value).collect() @@ -412,7 +422,9 @@ class DirectKafkaStreamSuite val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)] @@ -486,7 +498,9 @@ class DirectKafkaStreamSuite val kafkaStream = withClue("Error creating direct stream") { new DirectKafkaInputDStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) { + ssc, + preferredHosts, + ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) }.map(r => (r.key, r.value)) From 205536ae4e7ed323872546b2468eb55ee08ecc3e Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 14:05:16 -0500 Subject: [PATCH 03/10] [SPARK-12177][Streaming][Kafka] capitalize static methods to make them look like enums, distinguish java from scala long --- .../streaming/kafka010/ConsumerStrategy.scala | 63 ++++++++++--------- .../streaming/kafka010/LocationStrategy.scala | 14 +++-- .../kafka010/JavaConsumerStrategySuite.java | 30 +++++---- .../kafka010/JavaDirectKafkaStreamSuite.java | 8 +-- .../streaming/kafka010/JavaKafkaRDDSuite.java | 4 +- .../kafka010/JavaLocationStrategySuite.java | 12 ++-- .../kafka010/DirectKafkaStreamSuite.scala | 16 ++--- .../streaming/kafka010/KafkaRDDSuite.scala | 2 +- 8 files changed, 79 insertions(+), 70 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index a333a90afb6ab..27059ee2901e9 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.{ util => ju } +import java.{ util => ju, lang => jl } import scala.collection.JavaConverters._ @@ -68,9 +68,9 @@ trait ConsumerStrategy[K, V] { * auto.offset.reset will be used. */ private case class Subscribe[K, V]( - topics: ju.Collection[java.lang.String], + topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] + offsets: ju.Map[TopicPartition, jl.Long] ) extends ConsumerStrategy[K, V] { def executorKafkaParams: ju.Map[String, Object] = kafkaParams @@ -104,7 +104,7 @@ private case class Subscribe[K, V]( private case class Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] + offsets: ju.Map[TopicPartition, jl.Long] ) extends ConsumerStrategy[K, V] { def executorKafkaParams: ju.Map[String, Object] = kafkaParams @@ -142,14 +142,14 @@ object ConsumerStrategies { * auto.offset.reset will be used. */ @Experimental - def subscribe[K, V]( - topics: Iterable[java.lang.String], + def Subscribe[K, V]( + topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { - Subscribe[K, V]( + new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, Long](offsets.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } /** @@ -164,13 +164,13 @@ object ConsumerStrategies { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def subscribe[K, V]( - topics: Iterable[java.lang.String], + def Subscribe[K, V]( + topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { - Subscribe[K, V]( + new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - ju.Collections.emptyMap[TopicPartition, Long]()) + ju.Collections.emptyMap[TopicPartition, jl.Long]()) } /** @@ -188,11 +188,11 @@ object ConsumerStrategies { * auto.offset.reset will be used. */ @Experimental - def subscribe[K, V]( - topics: ju.Collection[java.lang.String], + def Subscribe[K, V]( + topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { - Subscribe[K, V](topics, kafkaParams, offsets) + offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { + new Subscribe[K, V](topics, kafkaParams, offsets) } /** @@ -207,10 +207,10 @@ object ConsumerStrategies { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def subscribe[K, V]( - topics: ju.Collection[java.lang.String], + def Subscribe[K, V]( + topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { - Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) + new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]()) } /** @@ -228,14 +228,14 @@ object ConsumerStrategies { * auto.offset.reset will be used. */ @Experimental - def assign[K, V]( + def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { - Assign[K, V]( + new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, Long](offsets.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } /** @@ -250,13 +250,13 @@ object ConsumerStrategies { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def assign[K, V]( + def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { - Assign[K, V]( + new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - ju.Collections.emptyMap[TopicPartition, Long]()) + ju.Collections.emptyMap[TopicPartition, jl.Long]()) } /** @@ -274,11 +274,11 @@ object ConsumerStrategies { * auto.offset.reset will be used. */ @Experimental - def assign[K, V]( + def Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { - Assign[K, V](topicPartitions, kafkaParams, offsets) + offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { + new Assign[K, V](topicPartitions, kafkaParams, offsets) } /** @@ -293,10 +293,13 @@ object ConsumerStrategies { * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @Experimental - def assign[K, V]( + def Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { - Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) + new Assign[K, V]( + topicPartitions, + kafkaParams, + ju.Collections.emptyMap[TopicPartition, jl.Long]()) } } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala index df73c9ffd62a3..fd307ac4925f2 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -53,14 +53,16 @@ object LocationStrategies { * Use this only if your executors are on the same nodes as your Kafka brokers. */ @Experimental - def preferBrokers: LocationStrategy = PreferBrokers + def PreferBrokers: LocationStrategy = + org.apache.spark.streaming.kafka010.PreferBrokers /** * :: Experimental :: * Use this in most cases, it will consistently distribute partitions across all executors. */ @Experimental - def preferConsistent: LocationStrategy = PreferConsistent + def PreferConsistent: LocationStrategy = + org.apache.spark.streaming.kafka010.PreferConsistent /** * :: Experimental :: @@ -68,8 +70,8 @@ object LocationStrategies { * Any TopicPartition not specified in the map will use a consistent location. */ @Experimental - def preferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = - PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) + def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = + new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) /** * :: Experimental :: @@ -77,6 +79,6 @@ object LocationStrategies { * Any TopicPartition not specified in the map will use a consistent location. */ @Experimental - def preferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = - PreferFixed(hostMap) + def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = + new PreferFixed(hostMap) } diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index 724294d197496..ac8d64b180f0d 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -44,35 +44,39 @@ public void testConsumerStrategyConstructors() { kafkaParams.put("bootstrap.servers", "not used"); final scala.collection.Map sKafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); - final Map offsets = new HashMap<>(); + final Map offsets = new HashMap<>(); offsets.put(tp1, 23L); final scala.collection.Map sOffsets = - JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues( + new scala.runtime.AbstractFunction1() { + @Override + public Object apply(Long x) { + return (Object) x; + } + } + ); - // make sure constructors can be called from java - // final ConsumerStrategy sub0 = // does not compile in Scala 2.10 - // Subscribe.apply(topics, kafkaParams, offsets); final ConsumerStrategy sub1 = - ConsumerStrategies.subscribe(sTopics, sKafkaParams, sOffsets); + ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy sub2 = - ConsumerStrategies.subscribe(sTopics, sKafkaParams); + ConsumerStrategies.Subscribe(sTopics, sKafkaParams); final ConsumerStrategy sub3 = - ConsumerStrategies.subscribe(topics, kafkaParams, offsets); + ConsumerStrategies.Subscribe(topics, kafkaParams, offsets); final ConsumerStrategy sub4 = - ConsumerStrategies.subscribe(topics, kafkaParams); + ConsumerStrategies.Subscribe(topics, kafkaParams); Assert.assertEquals( sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); final ConsumerStrategy asn1 = - ConsumerStrategies.assign(sParts, sKafkaParams, sOffsets); + ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = - ConsumerStrategies.assign(sParts, sKafkaParams); + ConsumerStrategies.Assign(sParts, sKafkaParams); final ConsumerStrategy asn3 = - ConsumerStrategies.assign(parts, kafkaParams, offsets); + ConsumerStrategies.Assign(parts, kafkaParams, offsets); final ConsumerStrategy asn4 = - ConsumerStrategies.assign(parts, kafkaParams); + ConsumerStrategies.Assign(parts, kafkaParams); Assert.assertEquals( asn1.executorKafkaParams().get("bootstrap.servers"), diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java index 329f076258e94..dc9c13ba863ff 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -90,8 +90,8 @@ public void testKafkaStream() throws InterruptedException { JavaInputDStream> istream1 = KafkaUtils.createDirectStream( ssc, - LocationStrategies.preferConsistent(), - ConsumerStrategies.subscribe(Arrays.asList(topic1), kafkaParams) + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Arrays.asList(topic1), kafkaParams) ); JavaDStream stream1 = istream1.transform( @@ -123,8 +123,8 @@ public String call(ConsumerRecord r) { JavaInputDStream> istream2 = KafkaUtils.createDirectStream( ssc, - LocationStrategies.preferConsistent(), - ConsumerStrategies.subscribe(Arrays.asList(topic2), kafkaParams2) + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Arrays.asList(topic2), kafkaParams2) ); JavaDStream stream2 = istream2.transform( diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index 03c710ac4eb8c..e784f59700de7 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -96,14 +96,14 @@ public String call(ConsumerRecord r) { sc, kafkaParams, offsetRanges, - LocationStrategies.preferFixed(leaders) + LocationStrategies.PreferFixed(leaders) ).map(handler); JavaRDD rdd2 = KafkaUtils.createRDD( sc, kafkaParams, offsetRanges, - LocationStrategies.preferConsistent() + LocationStrategies.PreferConsistent() ).map(handler); // just making sure the java user apis work; the scala tests handle logic corner cases diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java index 8b808860f038e..41ccb0ebe7bfa 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java @@ -41,18 +41,18 @@ public void testLocationStrategyConstructors() { JavaConverters.mapAsScalaMapConverter(hosts).asScala(); // make sure constructors can be called from java - final LocationStrategy c1 = LocationStrategies.preferConsistent(); - final LocationStrategy c2 = LocationStrategies.preferConsistent(); + final LocationStrategy c1 = LocationStrategies.PreferConsistent(); + final LocationStrategy c2 = LocationStrategies.PreferConsistent(); Assert.assertSame(c1, c2); - final LocationStrategy c3 = LocationStrategies.preferBrokers(); - final LocationStrategy c4 = LocationStrategies.preferBrokers(); + final LocationStrategy c3 = LocationStrategies.PreferBrokers(); + final LocationStrategy c4 = LocationStrategies.PreferBrokers(); Assert.assertSame(c3, c4); Assert.assertNotSame(c1, c3); - final LocationStrategy c5 = LocationStrategies.preferFixed(hosts); - final LocationStrategy c6 = LocationStrategies.preferFixed(sHosts); + final LocationStrategy c5 = LocationStrategies.PreferFixed(hosts); + final LocationStrategy c6 = LocationStrategies.PreferFixed(sHosts); Assert.assertEquals(c5, c6); } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 817383c958a69..165bf07f55f9f 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -93,7 +93,7 @@ class DirectKafkaStreamSuite kp } - val preferredHosts = LocationStrategies.preferConsistent + val preferredHosts = LocationStrategies.PreferConsistent test("basic stream receiving with multiple topics and smallest starting offset") { val topics = List("basic1", "basic2", "basic3") @@ -110,7 +110,7 @@ class DirectKafkaStreamSuite KafkaUtils.createDirectStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.subscribe[String, String](topics, kafkaParams.asScala)) + ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)]() @@ -182,7 +182,7 @@ class DirectKafkaStreamSuite val s = new DirectKafkaInputDStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) s.consumer.poll(0) assert( s.consumer.position(topicPartition) >= offsetBeforeStart, @@ -232,7 +232,7 @@ class DirectKafkaStreamSuite val s = new DirectKafkaInputDStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.assign[String, String]( + ConsumerStrategies.Assign[String, String]( List(topicPartition), kafkaParams.asScala, Map(topicPartition -> 11L))) @@ -275,7 +275,7 @@ class DirectKafkaStreamSuite KafkaUtils.createDirectStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) } val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt } val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => @@ -370,7 +370,7 @@ class DirectKafkaStreamSuite val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val data = rdd.map(_.value).collect() @@ -424,7 +424,7 @@ class DirectKafkaStreamSuite KafkaUtils.createDirectStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)] @@ -500,7 +500,7 @@ class DirectKafkaStreamSuite new DirectKafkaInputDStream[String, String]( ssc, preferredHosts, - ConsumerStrategies.subscribe[String, String](List(topic), kafkaParams.asScala)) { + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala)) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) }.map(r => (r.key, r.value)) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index e5151a994f072..be373af0599cc 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -62,7 +62,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}" ).asJava - private val preferredHosts = LocationStrategies.preferConsistent + private val preferredHosts = LocationStrategies.PreferConsistent test("basic usage") { val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}" From ef040cf9eba2566c6100a5fb84f56df68b0a4f1d Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 14:21:01 -0500 Subject: [PATCH 04/10] [SPARK-12177][Streaming][Kafka] more separation of java long from scala long --- .../apache/spark/streaming/kafka010/ConsumerStrategy.scala | 7 ++++--- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 2 +- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 3 ++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 27059ee2901e9..23580cd428480 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -51,7 +51,7 @@ trait ConsumerStrategy[K, V] { * has successfully read. Will be empty on initial start, possibly non-empty on restart from * checkpoint. */ - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] } /** @@ -75,7 +75,7 @@ private case class Subscribe[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { val consumer = new KafkaConsumer[K, V](kafkaParams) consumer.subscribe(topics) if (currentOffsets.isEmpty) { @@ -109,7 +109,7 @@ private case class Assign[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { + def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { val consumer = new KafkaConsumer[K, V](kafkaParams) consumer.assign(topicPartitions) if (currentOffsets.isEmpty) { @@ -126,6 +126,7 @@ private case class Assign[K, V]( * :: Experimental :: * object for obtaining instances of [[ConsumerStrategy]] */ +@Experimental object ConsumerStrategies { /** * :: Experimental :: diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index acd1841d5305c..13827f68f2cb5 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( @transient private var kc: Consumer[K, V] = null def consumer(): Consumer[K, V] = this.synchronized { if (null == kc) { - kc = consumerStrategy.onStart(currentOffsets) + kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava) } kc } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 165bf07f55f9f..0a53259802d1e 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka010 import java.io.File +import java.lang.{ Long => JLong } import java.util.{ Arrays, HashMap => JHashMap, Map => JMap } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.ConcurrentLinkedQueue @@ -566,7 +567,7 @@ class DirectKafkaStreamSuite preferredHosts, new ConsumerStrategy[String, String] { def executorKafkaParams = ekp - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = { + def onStart(currentOffsets: JMap[TopicPartition, JLong]): Consumer[String, String] = { val consumer = new KafkaConsumer[String, String](kafkaParams) val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) consumer.assign(Arrays.asList(tps: _*)) From b442c9843a588231543a1f36b7383b3d13ba8ac0 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 14:31:33 -0500 Subject: [PATCH 05/10] [SPARK-12177][Streaming][Kafka] fix scalastyle --- .../org/apache/spark/streaming/kafka010/ConsumerStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 23580cd428480..06abf57cce5b6 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.{ util => ju, lang => jl } +import java.{ lang => jl, util => ju } import scala.collection.JavaConverters._ From 2aea1132bd8891436f7fd0ce7d3b64c2ac0b7ab0 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 17:19:44 -0500 Subject: [PATCH 06/10] [SPARK-12177][Streaming][Kafka] tweak default timeout to see if it increases test stability, add explicit group id --- .../org/apache/spark/streaming/kafka010/KafkaRDD.scala | 2 +- .../org/apache/spark/streaming/kafka010/KafkaUtils.scala | 6 +++++- .../apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index c15c16344924f..55b4bf20f7981 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V]( " must be set to false for executor kafka params, else offsets may commit before processing") // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? - private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512) private val cacheInitialCapacity = conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) private val cacheMaxCapacity = diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index 626597a244171..06113151dd044 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -161,7 +161,11 @@ object KafkaUtils extends Logging { kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") // driver and executor should be in different consumer groups - val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) + val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) + if (null == originalGroupId) { + logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it") + } + val groupId = "spark-executor-" + originalGroupId logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index e784f59700de7..87bfe1514e338 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Random; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.TopicPartition; @@ -65,6 +66,8 @@ public void testKafkaRDD() throws InterruptedException { String topic1 = "topic1"; String topic2 = "topic2"; + Random random = new Random(); + createTopicAndSendData(topic1); createTopicAndSendData(topic2); @@ -72,6 +75,8 @@ public void testKafkaRDD() throws InterruptedException { kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress()); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() + + "-" + System.currentTimeMillis()); OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), From 2652170916114965ac72ec75abfc6772b54b43e8 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 17:22:33 -0500 Subject: [PATCH 07/10] [SPARK-12177][Streaming][Kafka] move trait to abstract class per tdas --- .../org/apache/spark/streaming/kafka010/ConsumerStrategy.scala | 2 +- .../org/apache/spark/streaming/kafka010/LocationStrategy.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 06abf57cce5b6..4d2b00d0f68af 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -36,7 +36,7 @@ import org.apache.spark.annotation.Experimental * @tparam V type of Kafka message value */ @Experimental -trait ConsumerStrategy[K, V] { +abstract class ConsumerStrategy[K, V] { /** * Kafka * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala index fd307ac4925f2..d3ad76ccb7187 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.annotation.Experimental * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. */ @Experimental -sealed trait LocationStrategy +sealed abstract class LocationStrategy private case object PreferBrokers extends LocationStrategy From d1480e7b409b478e7baf13050bcd6ab259198a5f Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 19:32:39 -0500 Subject: [PATCH 08/10] [SPARK-12177][Streaming][Kafka] add doc link --- .../org/apache/spark/streaming/kafka010/ConsumerStrategy.scala | 1 + .../org/apache/spark/streaming/kafka010/LocationStrategy.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 4d2b00d0f68af..976bd0f084980 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -30,6 +30,7 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * See [[ConsumerStrategies]] to obtain instances. * Kafka 0.10 consumers can require additional, sometimes complex, setup after object * instantiation. This interface encapsulates that process, and allows it to be checkpointed. * @tparam K type of Kafka message key diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala index d3ad76ccb7187..c9a8a13f51c32 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -29,6 +29,7 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Choice of how to schedule consumers for a given TopicPartition on an executor. + * See [[LocationStrategies]] to obtain instances. * Kafka 0.10 consumers prefetch messages, so it's important for performance * to keep cached consumers on appropriate executors, not recreate them for every partition. * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. From 22db76fe920f926ba063084eba615b7c0c20a576 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Jun 2016 22:27:11 -0500 Subject: [PATCH 09/10] [SPARK-12177][Streaming][Kafka] fix package object name and doc links --- .../streaming/kafka010/ConsumerStrategy.scala | 22 +++++++++---------- .../spark/streaming/kafka010/KafkaRDD.scala | 2 +- .../spark/streaming/kafka010/KafkaUtils.scala | 6 ++--- .../spark/streaming/kafka010/package.scala | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 976bd0f084980..70c3f1a98d97a 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -39,7 +39,7 @@ import org.apache.spark.annotation.Experimental @Experimental abstract class ConsumerStrategy[K, V] { /** - * Kafka + * Kafka * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ @@ -59,7 +59,7 @@ abstract class ConsumerStrategy[K, V] { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -93,7 +93,7 @@ private case class Subscribe[K, V]( * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -134,7 +134,7 @@ object ConsumerStrategies { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -159,7 +159,7 @@ object ConsumerStrategies { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -180,7 +180,7 @@ object ConsumerStrategies { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -202,7 +202,7 @@ object ConsumerStrategies { * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -220,7 +220,7 @@ object ConsumerStrategies { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -245,7 +245,7 @@ object ConsumerStrategies { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -266,7 +266,7 @@ object ConsumerStrategies { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set @@ -288,7 +288,7 @@ object ConsumerStrategies { * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka - * + * * configuration parameters to be used on driver. The same params will be used on executors, * with minor automatic modifications applied. * Requires "bootstrap.servers" to be set diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 55b4bf20f7981..5b5a9ac48c7ca 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka - * + * * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index 06113151dd044..b2190bfa05a3a 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream._ /** * :: Experimental :: - * Companion object for constructing Kafka streams and RDDs + * object for constructing Kafka streams and RDDs */ @Experimental object KafkaUtils extends Logging { @@ -44,7 +44,7 @@ object KafkaUtils extends Logging { * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka - * + * * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD @@ -83,7 +83,7 @@ object KafkaUtils extends Logging { * @param keyClass Class of the keys in the Kafka records * @param valueClass Class of the values in the Kafka records * @param kafkaParams Kafka - * + * * configuration parameters. Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala index 2bfc1e84d7ccd..09db6d6062d82 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala @@ -20,4 +20,4 @@ package org.apache.spark.streaming /** * Spark Integration for Kafka 0.10 */ -package object kafka +package object kafka010 //scalastyle:ignore From 2f65fc190f648943fd47c2a4754f8e3897b5b8fd Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 Jul 2016 00:13:54 -0500 Subject: [PATCH 10/10] [SPARK-12177][Streaming][Kafka] use a random port for embedded Kafka --- .../org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 13c08430db6be..19192e4b95945 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -61,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + private var brokerPort = 0 private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -110,7 +110,8 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - (server, port) + brokerPort = server.boundPort() + (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true