Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1764][CIP-11] Adding support for TagsQL for filtering workers tags #2980

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def tagsExpr: String = get(TAGS_EXPR)
def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)

def useTagsQL: Boolean = get(USE_TAGS_QL)

// //////////////////////////////////////////////////////
// kerberos //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -6024,6 +6026,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val USE_TAGS_QL: ConfigEntry[Boolean] =
buildConf("celeborn.tags.useTagsQL")
.categories("master")
.version("0.6.0")
.doc("Whether to use tagsQL for tags expression.")
.booleanConf
.createWithDefault(false)

val MASTER_EXCLUDE_WORKER_UNHEALTHY_DISK_RATIO_THRESHOLD: ConfigEntry[Double] =
buildConf("celeborn.master.excludeWorker.unhealthyDiskRatioThreshold")
.categories("master")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ license: |
| celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 0.6.0 | |
| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the tags expression provided by the client over the tags expression provided by the master. | 0.6.0 | |
| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | |
| celeborn.tags.useTagsQL | false | false | Whether to use tagsQL for tags expression. | 0.6.0 | |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private[celeborn] class Master(
private val hasS3Storage = conf.hasS3Storage

private val quotaManager = new QuotaManager(conf, configService)
private val tagsManager = new TagsManager(Option(configService))
private val tagsManager = new TagsManager(conf, Option(configService))
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.tags

import java.util
import java.util.{Collections, Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import java.util.stream.Collectors

import org.apache.celeborn.common.meta.WorkerInfo

class DefaultTagsFilter(tagsStore: ConcurrentHashMap[String, JSet[String]])
extends TagsFilter {

override def filter(tagsExpr: String, workers: util.List[WorkerInfo]): util.List[WorkerInfo] = {
val tags: Array[String] = tagsExpr.split(",").map(_.trim)
var workersForTags: Option[JSet[String]] = None

tags.foreach { tag =>
val taggedWorkers = tagsStore.getOrDefault(tag, Collections.emptySet())
workersForTags match {
case Some(w) =>
w.retainAll(taggedWorkers)
case _ =>
workersForTags = Some(taggedWorkers)
}
}

val workerTagsPredicate = new Predicate[WorkerInfo] {
override def test(w: WorkerInfo): Boolean = {
workersForTags.get.contains(w.toUniqueId())
}
}

workers.stream().filter(workerTagsPredicate).collect(Collectors.toList())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.tags

import java.util

import org.apache.celeborn.common.meta.WorkerInfo

abstract class TagsFilter {

def filter(tagsExpr: String, workers: util.List[WorkerInfo]): util.List[WorkerInfo]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
package org.apache.celeborn.service.deploy.master.tags

import java.util
import java.util.{Collections, Set => JSet}
import java.util.{Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import java.util.stream.Collectors

import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsScalaConcurrentMapConverter}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.common.util.JavaUtils
import org.apache.celeborn.server.common.service.config.ConfigService
import org.apache.celeborn.service.deploy.master.tags.ql.TagsQLFilter

class TagsManager(configService: Option[ConfigService]) extends Logging {
class TagsManager(celebornConf: CelebornConf, configService: Option[ConfigService])
extends Logging {
private val defaultTagStore = JavaUtils.newConcurrentHashMap[String, JSet[String]]()

private val addNewTagFunc =
Expand Down Expand Up @@ -70,28 +71,19 @@ class TagsManager(configService: Option[ConfigService]) extends Logging {
return workers
}

val tags = tagsExpr.split(",").map(_.trim)

var workersForTags: Option[JSet[String]] = None
tags.foreach { tag =>
val taggedWorkers = getTagStore.getOrDefault(tag, Collections.emptySet())
workersForTags match {
case Some(w) =>
w.retainAll(taggedWorkers)
case _ =>
workersForTags = Some(taggedWorkers)
val tagsFilter =
if (celebornConf.useTagsQL) {
new TagsQLFilter(getTagStore)
} else {
new DefaultTagsFilter(getTagStore)
}
}
val taggedWorkers = tagsFilter.filter(tagsExpr, workers)

if (workersForTags.isEmpty) {
logWarning(s"No workers for tags: $tagsExpr found in cluster")
return Collections.emptyList()
if (taggedWorkers.isEmpty) {
logWarning(s"No workers for tagsExpr: $tagsExpr found in cluster")
}

val workerTagsPredicate = new Predicate[WorkerInfo] {
override def test(w: WorkerInfo): Boolean = workersForTags.get.contains(w.toUniqueId())
}
workers.stream().filter(workerTagsPredicate).collect(Collectors.toList())
taggedWorkers
}

def addTagToWorker(tag: String, workerId: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.tags.ql

import java.util
import java.util.{Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import java.util.stream.Collectors

import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.service.deploy.master.tags.TagsFilter

class TagsQLFilter(tagsStore: ConcurrentHashMap[String, JSet[String]])
extends TagsFilter {

private val TAGS_KV_SEPARATOR = "="

override def filter(tagsExpr: String, workers: util.List[WorkerInfo]): util.List[WorkerInfo] = {
val parser = new TagsQLParser()
val nodes = parser.parse(tagsExpr)

var positiveWorkerSet: Option[util.HashSet[String]] = None
val negativeWorkerSet = new util.HashSet[String]

nodes.foreach { node =>
val tagKey = node.key

val workerForNode = new util.HashSet[String]
node.values.foreach { tagValue =>
val tag = s"$tagKey$TAGS_KV_SEPARATOR$tagValue"
val taggedWorkers = tagsStore.getOrDefault(tag, util.Collections.emptySet())
workerForNode.addAll(taggedWorkers)
}

node.operator match {
case Equals =>
positiveWorkerSet match {
case Some(w) =>
w.retainAll(workerForNode)
case _ =>
positiveWorkerSet = Some(workerForNode)
}
case NotEquals =>
negativeWorkerSet.addAll(workerForNode)
}
}

val workerTagsPredicate = new Predicate[WorkerInfo] {
override def test(w: WorkerInfo): Boolean = {
positiveWorkerSet.get.contains(w.toUniqueId()) && !negativeWorkerSet.contains(
w.toUniqueId())
}
}

workers.stream().filter(workerTagsPredicate).collect(Collectors.toList())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.tags.ql

sealed trait Operator
case object Equals extends Operator
case object NotEquals extends Operator

case class Node(key: String, operator: Operator, values: Set[String])

/**
* TagsQL uses key/value pair to give your tags more context.
*
* The query language supports the following syntax:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios might you need this functionality?

Can the following rules align with your goal?
tag1,tag2 | tag3,tag4 | tag5,tag6

In the expression above, tag1 and tag2 are combined using a logical AND, just like tag3 and tag4. The results of tag1 AND tag2 and tag3 AND tag4 are combined using a logical OR.

* - Match single value: `key:value`
* - Negate single value: `key:!value`
* - Match list of values: `key:{value1,value2}`
* - Negate list of values: `key:!{value1,value2}`
*
* Example tags expression: `env:production region:{us-east,us-west} env:!sandbox`
* This tags expression will select all of the workers that have the following tags:
* - env=production
* - region=us-east or region=us-west
* and will ignore all of the workers that have the following tags:
* - env=sandbox
*
* TagsQLParser defines the parsing logic for TagsQL.
*/
class TagsQLParser {

private val SEPARATOR_TOKEN = ","
private val NEGATION_TOKEN = "!"
// Only allow word characters and hyphen in the key and value
// (This includes underscore and numbers as well)
private val VALID_CHARS = "\\w\\-"

private val Pattern = (s"^([$VALID_CHARS]+):($NEGATION_TOKEN?)" +
s"(?:\\{([$VALID_CHARS$SEPARATOR_TOKEN]+)}|([$VALID_CHARS]+))" + "$").r

def parse(tagsExpr: String): List[Node] = {
tagsExpr.split("\\s+").map(parseToken).toList
}

private def parseToken(token: String): Node = {
token match {
case Pattern(key, condition, values, value) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not use regex, as there can always be bugs in regex engines that may lead to unexpected outcomes.

val valuesStr = if (values != null) values else value
val operator = condition match {
case NEGATION_TOKEN => NotEquals
case _ => Equals
}
Node(key, operator, valuesStr.split(",").toSet)

case _ => throw new IllegalArgumentException(
s"Found invalid token: $token while parsing the tagsExpr.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TagsManagerSuite extends CelebornFunSuite {
}

test("test tags manager") {
tagsManager = new TagsManager(Option(null))
tagsManager = new TagsManager(new CelebornConf(), Option(null))

tagsManager.addTagToWorker(TAG1, WORKER1.toUniqueId())
tagsManager.addTagToWorker(TAG1, WORKER2.toUniqueId())
Expand Down Expand Up @@ -127,7 +127,7 @@ class TagsManagerSuite extends CelebornFunSuite {
}

test("test tags expression with multiple tags") {
tagsManager = new TagsManager(Option(null))
tagsManager = new TagsManager(new CelebornConf(), Option(null))

// Tag1
tagsManager.addTagToWorker(TAG1, WORKER1.toUniqueId())
Expand Down Expand Up @@ -159,7 +159,7 @@ class TagsManagerSuite extends CelebornFunSuite {
getTestResourceFile("dynamicConfig-tags.yaml").getPath)
val configService = DynamicConfigServiceFactory.getConfigService(conf)

tagsManager = new TagsManager(Option(configService))
tagsManager = new TagsManager(new CelebornConf(), Option(configService))

{
// preferClientTagsExpr: true
Expand Down
Loading
Loading