Skip to content

Commit

Permalink
allow customize the reader subscription prefix (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
nlu90 authored Nov 2, 2021
1 parent b4b410f commit afb42b7
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 150 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-spark-connector_2.12</artifactId>
<version>3.1.1.1</version>
<version>3.1.1.2</version>
<name>StreamNative :: Pulsar Spark Connector</name>
<url>https://pulsar.apache.org</url>
<inceptionYear>2019</inceptionYear>
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/org/apache/spark/sql/pulsar/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ object AdminUtils {
def buildAdmin(adminUrl: String, clientConf: ju.Map[String, Object]): PulsarAdmin = {
val builder = PulsarAdmin.builder().serviceHttpUrl(adminUrl)

if (clientConf.containsKey(AUTH_PLUGIN_CLASS_NAME)) {
if (clientConf.containsKey(AuthPluginClassName)) {
builder.authentication(
clientConf.get(AUTH_PLUGIN_CLASS_NAME).toString, clientConf.get(AUTH_PARAMS).toString)
clientConf.get(AuthPluginClassName).toString, clientConf.get(AuthParams).toString)
}

if (clientConf.containsKey(TLS_ALLOW_INSECURE_CONNECTION)) {
if (clientConf.containsKey(TlsAllowInsecureConnection)) {
builder.allowTlsInsecureConnection(
clientConf.get(TLS_ALLOW_INSECURE_CONNECTION).toString.toBoolean)
clientConf.get(TlsAllowInsecureConnection).toString.toBoolean)
}

if (clientConf.containsKey(TLS_HOSTNAME_VERIFICATION_ENABLE)) {
if (clientConf.containsKey(TlsHostnameVerificationEnable)) {
builder.enableTlsHostnameVerification(
clientConf.get(TLS_HOSTNAME_VERIFICATION_ENABLE).toString.toBoolean)
clientConf.get(TlsHostnameVerificationEnable).toString.toBoolean)
}

if (clientConf.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
builder.tlsTrustCertsFilePath(clientConf.get(TLS_TRUST_CERTS_FILE_PATH).toString)
if (clientConf.containsKey(TlsTrustCertsFilePath)) {
builder.tlsTrustCertsFilePath(clientConf.get(TlsTrustCertsFilePath).toString)
}

builder.build()
Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.pulsar.client.api.{ClientBuilder, PulsarClient}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH}
import org.apache.spark.sql.pulsar.PulsarOptions.{AuthParams, AuthPluginClassName, TlsAllowInsecureConnection, TlsHostnameVerificationEnable, TlsTrustCertsFilePath}

private[pulsar] object CachedPulsarClient extends Logging {

Expand Down Expand Up @@ -69,33 +69,33 @@ private[pulsar] object CachedPulsarClient extends Logging {
pulsarConf: ju.Map[String, Object],
pulsarClientBuilder: ClientBuilder = PulsarClient.builder()): Client = {
val pulsarServiceUrl =
pulsarConf.get(PulsarOptions.SERVICE_URL_OPTION_KEY).asInstanceOf[String]
pulsarConf.get(PulsarOptions.ServiceUrlOptionKey).asInstanceOf[String]
val clientConf = new PulsarConfigUpdater(
"pulsarClientCache",
pulsarConf.asScala.toMap,
PulsarOptions.FILTERED_KEYS
PulsarOptions.FilteredKeys
).rebuild()
logInfo(s"Client Conf = ${clientConf}")
try {
pulsarClientBuilder
.serviceUrl(pulsarServiceUrl)
.loadConf(clientConf)
// Set TLS and authentication parameters if they were given
if (clientConf.containsKey(AUTH_PLUGIN_CLASS_NAME)) {
if (clientConf.containsKey(AuthPluginClassName)) {
pulsarClientBuilder.authentication(
clientConf.get(AUTH_PLUGIN_CLASS_NAME).toString, clientConf.get(AUTH_PARAMS).toString)
clientConf.get(AuthPluginClassName).toString, clientConf.get(AuthParams).toString)
}
if (clientConf.containsKey(TLS_ALLOW_INSECURE_CONNECTION)) {
if (clientConf.containsKey(TlsAllowInsecureConnection)) {
pulsarClientBuilder.allowTlsInsecureConnection(
clientConf.get(TLS_ALLOW_INSECURE_CONNECTION).toString.toBoolean)
clientConf.get(TlsAllowInsecureConnection).toString.toBoolean)
}
if (clientConf.containsKey(TLS_HOSTNAME_VERIFICATION_ENABLE)) {
if (clientConf.containsKey(TlsHostnameVerificationEnable)) {
pulsarClientBuilder.enableTlsHostnameVerification(
clientConf.get(TLS_HOSTNAME_VERIFICATION_ENABLE).toString.toBoolean)
clientConf.get(TlsHostnameVerificationEnable).toString.toBoolean)
}
if (clientConf.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
if (clientConf.containsKey(TlsTrustCertsFilePath)) {
pulsarClientBuilder.tlsTrustCertsFilePath(
clientConf.get(TLS_TRUST_CERTS_FILE_PATH).toString)
clientConf.get(TlsTrustCertsFilePath).toString)
}
val pulsarClient: Client = pulsarClientBuilder.build()
logDebug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private[pulsar] case class PulsarConfigUpdater(
module: String,
pulsarParams: Map[String, Object],
blacklistedKeys: Set[String] = Set(),
keysToHideInLog: Set[String] = Set(PulsarOptions.AUTH_PARAMS))
keysToHideInLog: Set[String] = Set(PulsarOptions.AuthParams))
extends Logging {

private val map = new ju.HashMap[String, Object](pulsarParams.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ package org.apache.spark.sql.pulsar

import java.{util => ju}
import java.io.Closeable
import java.util.{Optional, UUID}
import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient, SubscriptionInitialPosition, SubscriptionType}
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo

import org.apache.spark.internal.Logging
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH, TOPIC_OPTION_KEYS}
import org.apache.spark.sql.pulsar.PulsarOptions._
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -71,6 +71,9 @@ private[pulsar] case class PulsarMetadataReader(
try {
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", umid.mid)
} catch {
case _: PulsarAdminException.ConflictException =>
log.info("Subscription already exists, resetting the cursor to given offset")
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", umid.mid)
case e: Throwable =>
throw new RuntimeException(
s"Failed to setup cursor for ${TopicName.get(tp).toString}",
Expand All @@ -82,18 +85,25 @@ private[pulsar] case class PulsarMetadataReader(
def setupCursorByTime(time: SpecificPulsarStartingTime): Unit = {
time.topicTimes.foreach {
case (tp, time) =>
val msgID = time match {
case PulsarProvider.EARLIEST_TIME => MessageId.earliest
case PulsarProvider.LATEST_TIME => MessageId.latest
case t if t >= 0 => MessageId.latest
case _ => throw new RuntimeException(s"Invalid starting time for $tp: $time")
}

// setup the subscription
try {
if (time == PulsarProvider.EARLIEST_TIME) {
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", MessageId.earliest)
} else if (time == PulsarProvider.LATEST_TIME) {
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", MessageId.latest)
} else if (time < 0) {
throw new RuntimeException(s"Invalid starting time for $tp: $time")
} else {
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", MessageId.latest)
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", time)
}
admin.topics().createSubscription(tp, s"$driverGroupIdPrefix-$tp", msgID)
} catch {
case _: PulsarAdminException.ConflictException =>
log.info("subscription already exists, resetting the cursor to given offset")
time match {
case PulsarProvider.EARLIEST_TIME | PulsarProvider.LATEST_TIME =>
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", msgID)
case _ =>
admin.topics().resetCursor(tp, s"$driverGroupIdPrefix-$tp", time)
}
case e: Throwable =>
throw new RuntimeException(
s"Failed to setup cursor for ${TopicName.get(tp).toString}", e)
Expand Down Expand Up @@ -227,15 +237,17 @@ private[pulsar] case class PulsarMetadataReader(
}

private def getTopics(): Seq[String] = {
topics = caseInsensitiveParameters.find(x => TOPIC_OPTION_KEYS.contains(x._1)).get match {
case ("topic", value) =>
val topics = caseInsensitiveParameters.find({case (key, _) => TopicOptionKeys.contains(key)})
topics match {
case Some((TopicSingle, value)) =>
TopicName.get(value).toString :: Nil
case ("topics", value) =>
case Some((TopicMulti, value)) =>
value.split(",").map(_.trim).filter(_.nonEmpty).map(TopicName.get(_).toString)
case ("topicspattern", value) =>
case Some((TopicPattern, value)) =>
getTopics(value)
case None =>
throw new RuntimeException("Failed to get topics from configurations")
}
topics
}

private def getTopicPartitions(): Seq[String] = {
Expand All @@ -245,7 +257,7 @@ private[pulsar] case class PulsarMetadataReader(
if (partNum == 0) {
tp :: Nil
} else {
(0 until partNum).map(tp + PulsarOptions.PARTITION_SUFFIX + _)
(0 until partNum).map(tp + PulsarOptions.PartitionSuffix + _)
}
}
topicPartitions
Expand Down Expand Up @@ -296,7 +308,7 @@ private[pulsar] case class PulsarMetadataReader(
assert(
specified.keySet.subsetOf(topicPartitions.toSet),
s"topics designated in startingOffsets/endingOffsets" +
s" should all appear in $TOPIC_OPTION_KEYS .\n" +
s" should all appear in $TopicOptionKeys .\n" +
s"topics: $topicPartitions, topics in offsets: ${specified.keySet}"
)
val nonSpecifiedTopics = topicPartitions.toSet -- specified.keySet
Expand All @@ -316,7 +328,7 @@ private[pulsar] case class PulsarMetadataReader(
assert(
specified.keySet.subsetOf(topicPartitions.toSet),
s"topics designated in startingTime" +
s" should all appear in $TOPIC_OPTION_KEYS .\n" +
s" should all appear in $TopicOptionKeys .\n" +
s"topics: $topicPartitions, topics in startingTime: ${specified.keySet}"
)
val nonSpecifiedTopics = topicPartitions.toSet -- specified.keySet
Expand Down Expand Up @@ -348,7 +360,7 @@ private[pulsar] case class PulsarMetadataReader(
assert(
specified.keySet.subsetOf(topicPartitions.toSet),
s"topics designated in startingOffsets/endingOffsets" +
s" should all appear in $TOPIC_OPTION_KEYS .\n" +
s" should all appear in $TopicOptionKeys .\n" +
s"topics: $topicPartitions, topics in offsets: ${specified.keySet}"
)
val nonSpecifiedTopics = topicPartitions.toSet -- specified.keySet
Expand Down
86 changes: 43 additions & 43 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,75 +19,75 @@ import org.apache.pulsar.common.naming.TopicName
private[pulsar] object PulsarOptions {

// option key prefix for different modules
val PULSAR_ADMIN_OPTION_KEY_PREFIX = "pulsar.admin."
val PULSAR_CLIENT_OPTION_KEY_PREFIX = "pulsar.client."
val PULSAR_PRODUCER_OPTION_KEY_PREFIX = "pulsar.producer."
val PULSAR_CONSUMER_OPTION_KEY_PREFIX = "pulsar.consumer."
val PULSAR_READER_OPTION_KEY_PREFIX = "pulsar.reader."
val PulsarAdminOptionKeyPrefix = "pulsar.admin."
val PulsarClientOptionKeyPrefix = "pulsar.client."
val PulsarProducerOptionKeyPrefix = "pulsar.producer."
val PulsarConsumerOptionKeyPrefix = "pulsar.consumer."
val PulsarReaderOptionKeyPrefix = "pulsar.reader."

// options

val TOPIC_SINGLE = "topic"
val TOPIC_MULTI = "topics"
val TOPIC_PATTERN = "topicspattern"
val TopicSingle = "topic"
val TopicMulti = "topics"
val TopicPattern = "topicspattern"

val PARTITION_SUFFIX = TopicName.PARTITIONED_TOPIC_SUFFIX
val PartitionSuffix: String = TopicName.PARTITIONED_TOPIC_SUFFIX

val TOPIC_OPTION_KEYS = Set(
TOPIC_SINGLE,
TOPIC_MULTI,
TOPIC_PATTERN
val TopicOptionKeys = Set(
TopicSingle,
TopicMulti,
TopicPattern
)

val SERVICE_URL_OPTION_KEY = "service.url"
val ADMIN_URL_OPTION_KEY = "admin.url"
val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
val STARTING_TIME = "startingtime"
val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
val ServiceUrlOptionKey = "service.url"
val AdminUrlOptionKey = "admin.url"
val StartingOffsetsOptionKey = "startingoffsets"
val StartingTime = "startingtime"
val EndingOffsetsOptionKey = "endingoffsets"
val SubscriptionPrefix = "subscriptionprefix"

val POLL_TIMEOUT_MS = "polltimeoutms"
val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val PollTimeoutMS = "polltimeoutms"
val FailOnDataLossOptionKey = "failondataloss"

val AUTH_PLUGIN_CLASS_NAME = "authPluginClassName"
val AUTH_PARAMS = "authParams"
val TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath"
val TLS_ALLOW_INSECURE_CONNECTION = "tlsAllowInsecureConnection"
val USE_TLS = "useTls"
val TLS_HOSTNAME_VERIFICATION_ENABLE = "tlsHostnameVerificationEnable"
val AuthPluginClassName = "authPluginClassName"
val AuthParams = "authParams"
val TlsTrustCertsFilePath = "tlsTrustCertsFilePath"
val TlsAllowInsecureConnection = "tlsAllowInsecureConnection"
val UseTls = "useTls"
val TlsHostnameVerificationEnable = "tlsHostnameVerificationEnable"


val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
val InstructionForFailOnDataLossFalse: String =
"""
|Some data may have been lost because they are not available in Pulsar any more; either the
| data was aged out by Pulsar or the topic may have been deleted before all the data in the
| topic was processed. If you want your streaming query to fail on such cases, set the source
| option "failOnDataLoss" to "true".
""".stripMargin

val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
val InstructionForFailOnDataLossTrue: String =
"""
|Some data may have been lost because they are not available in Pulsar any more; either the
| data was aged out by Pulsar or the topic may have been deleted before all the data in the
| topic was processed. If you don't want your streaming query to fail on such cases, set the
| source option "failOnDataLoss" to "false".
""".stripMargin

val TOPIC_SCHEMA_CLASS_OPTION_KEY = "topic.schema.class"
val TopicSchemaClassOptionKey = "topic.schema.class"

val FILTERED_KEYS: Set[String] =
Set(TOPIC_SINGLE, SERVICE_URL_OPTION_KEY, TOPIC_SCHEMA_CLASS_OPTION_KEY)
val FilteredKeys: Set[String] =
Set(TopicSingle, ServiceUrlOptionKey, TopicSchemaClassOptionKey)

val TOPIC_ATTRIBUTE_NAME: String = "__topic"
val KEY_ATTRIBUTE_NAME: String = "__key"
val MESSAGE_ID_NAME: String = "__messageId"
val PUBLISH_TIME_NAME: String = "__publishTime"
val EVENT_TIME_NAME: String = "__eventTime"
val TopicAttributeName: String = "__topic"
val KeyAttributeName: String = "__key"
val MessageIdName: String = "__messageId"
val PublishTimeName: String = "__publishTime"
val EventTimeName: String = "__eventTime"

val META_FIELD_NAMES = Set(
TOPIC_ATTRIBUTE_NAME,
KEY_ATTRIBUTE_NAME,
MESSAGE_ID_NAME,
PUBLISH_TIME_NAME,
EVENT_TIME_NAME
val MetaFieldNames = Set(
TopicAttributeName,
KeyAttributeName,
MessageIdName,
PublishTimeName,
EventTimeName
)
}
Loading

0 comments on commit afb42b7

Please sign in to comment.