From 97407878a49e54237cb4ed2674982fa626e1d3f3 Mon Sep 17 00:00:00 2001 From: Angus Davis Date: Tue, 29 Mar 2016 15:19:35 -0700 Subject: [PATCH] Add withConf "escape hatch" methods to reader and writer builders. This allows custom configuration to be passed per RDD. Example use cases include: * Using Cloud Bigtable without needing to modify hbase-site * Using separate HBase clusters per RDD (issue #17) --- .../nerdammer/spark/hbase/HBaseReaderBuilder.scala | 9 +++++++-- .../it/nerdammer/spark/hbase/HBaseSparkConf.scala | 9 +++++++-- .../nerdammer/spark/hbase/HBaseWriterBuilder.scala | 14 +++++++++++--- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main/scala/it/nerdammer/spark/hbase/HBaseReaderBuilder.scala b/src/main/scala/it/nerdammer/spark/hbase/HBaseReaderBuilder.scala index 3f90c57..a27d28e 100644 --- a/src/main/scala/it/nerdammer/spark/hbase/HBaseReaderBuilder.scala +++ b/src/main/scala/it/nerdammer/spark/hbase/HBaseReaderBuilder.scala @@ -16,7 +16,8 @@ case class HBaseReaderBuilder [R: ClassTag] private[hbase] ( private[hbase] val columns: Iterable[String] = Seq.empty, private[hbase] val startRow: Option[String] = None, private[hbase] val stopRow: Option[String] = None, - private[hbase] val salting: Iterable[String] = Seq.empty + private[hbase] val salting: Iterable[String] = Seq.empty, + private[hbase] val conf: Map[String, String] = Map.empty ) (implicit mapper: FieldReader[R], saltingProvider: SaltingProviderFactory[String]) extends Serializable { @@ -57,6 +58,10 @@ case class HBaseReaderBuilder [R: ClassTag] private[hbase] ( this.copy(salting = salting) } + def withConf(entry: (String, String)) = this.copy(conf = conf + entry) + + def withConf(entries: Map[String, String]) = this.copy(conf = conf ++ entries) + } @@ -80,7 +85,7 @@ trait HBaseReaderBuilderConversions extends Serializable { } def toSimpleHBaseRDD[R: ClassTag](builder: HBaseReaderBuilder[R], saltingLength: Int = 0)(implicit mapper: FieldReader[R], saltingProvider: SaltingProviderFactory[String]): HBaseSimpleRDD[R] = { - val hbaseConfig = HBaseSparkConf.fromSparkConf(builder.sc.getConf).createHadoopBaseConfig() + val hbaseConfig = HBaseSparkConf.fromSparkConf(builder.sc.getConf).createHadoopBaseConfig(builder.conf) hbaseConfig.set(TableInputFormat.INPUT_TABLE, builder.table) diff --git a/src/main/scala/it/nerdammer/spark/hbase/HBaseSparkConf.scala b/src/main/scala/it/nerdammer/spark/hbase/HBaseSparkConf.scala index 5243774..16b759f 100644 --- a/src/main/scala/it/nerdammer/spark/hbase/HBaseSparkConf.scala +++ b/src/main/scala/it/nerdammer/spark/hbase/HBaseSparkConf.scala @@ -1,5 +1,6 @@ package it.nerdammer.spark.hbase +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.spark.SparkConf @@ -7,13 +8,17 @@ case class HBaseSparkConf ( hbaseHost: String = HBaseSparkConf.DefaultHBaseHost, hbaseRootDir: String = HBaseSparkConf.DefaultHBaseRootDir) extends Serializable { - def createHadoopBaseConfig() = { + def createHadoopBaseConfig() : Configuration = createHadoopBaseConfig(Map()) + + def createHadoopBaseConfig(extraConf: Map[String, String]) : Configuration = { val conf = HBaseConfiguration.create conf.setBoolean("hbase.cluster.distributed", true) conf.setInt("hbase.client.scanner.caching", 10000) conf.set("hbase.rootdir", hbaseRootDir) conf.set("hbase.zookeeper.quorum", hbaseHost) - + for ((key, value) <- extraConf) { + conf.set(key, value) + } conf } } diff --git a/src/main/scala/it/nerdammer/spark/hbase/HBaseWriterBuilder.scala b/src/main/scala/it/nerdammer/spark/hbase/HBaseWriterBuilder.scala index 2f5c0b9..b0e978b 100644 --- a/src/main/scala/it/nerdammer/spark/hbase/HBaseWriterBuilder.scala +++ b/src/main/scala/it/nerdammer/spark/hbase/HBaseWriterBuilder.scala @@ -6,7 +6,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import scala.reflect.ClassTag @@ -16,7 +15,8 @@ case class HBaseWriterBuilder[R: ClassTag] private[hbase] ( private[hbase] val table: String, private[hbase] val columnFamily: Option[String] = None, private[hbase] val columns: Iterable[String] = Seq.empty, - private[hbase] val salting: Iterable[String] = Seq.empty + private[hbase] val salting: Iterable[String] = Seq.empty, + private[hbase] val conf: Map[String, String] = Map.empty )(implicit mapper: FieldWriter[R], saltingProvider: SaltingProviderFactory[String]) extends Serializable { @@ -41,6 +41,14 @@ case class HBaseWriterBuilder[R: ClassTag] private[hbase] ( this.copy(salting = salting) } + def withConf(entry: (String, String)) = { + this.copy(conf = conf + entry) + } + + def withConf(entries: Map[String, String]) = { + this.copy(conf = conf ++ entries) + } + } class HBaseWriterBuildable[R: ClassTag](rdd: RDD[R])(implicit mapper: FieldWriter[R], sal: SaltingProviderFactory[String]) extends Serializable { @@ -53,7 +61,7 @@ class HBaseWriter[R: ClassTag](builder: HBaseWriterBuilder[R])(implicit mapper: def save(): Unit = { - val conf = HBaseSparkConf.fromSparkConf(builder.rdd.sparkContext.getConf).createHadoopBaseConfig() + val conf = HBaseSparkConf.fromSparkConf(builder.rdd.sparkContext.getConf).createHadoopBaseConfig(builder.conf) conf.set(TableOutputFormat.OUTPUT_TABLE, builder.table) val job = Job.getInstance(conf)