Skip to content

Commit

Permalink
alter placement rule relate
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongjiwei committed Apr 19, 2022
1 parent 800d15c commit dbac5f1
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 37 deletions.
25 changes: 15 additions & 10 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ func NewDDLCase(cfg *DDLCaseConfig) *DDLCase {
cases := make([]*testCase, cfg.Concurrency)
for i := 0; i < cfg.Concurrency; i++ {
cases[i] = &testCase{
cfg: cfg,
tables: make(map[string]*ddlTestTable),
schemas: make(map[string]*ddlTestSchema),
views: make(map[string]*ddlTestView),
ddlOps: make([]ddlTestOpExecutor, 0),
dmlOps: make([]dmlTestOpExecutor, 0),
caseIndex: i,
stop: 0,
cfg: cfg,
tables: make(map[string]*ddlTestTable),
schemas: make(map[string]*ddlTestSchema),
views: make(map[string]*ddlTestView),
placementPolicies: map[string]*ddlTestPlacementPolicy{},
ddlOps: make([]ddlTestOpExecutor, 0),
dmlOps: make([]dmlTestOpExecutor, 0),
caseIndex: i,
stop: 0,
}
}
b := &DDLCase{
Expand Down Expand Up @@ -409,8 +410,7 @@ func (c *testCase) execute(executeDDL ExecuteDDLFunc, exeDMLFunc ExecuteDMLFunc)
log.Infof("[ddl] [instance %d] Executing post round operations...", c.caseIndex)

if !c.cfg.MySQLCompatible {
err := c.executeAdminCheck()
if err != nil {
if err := c.executeAdminCheck(); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -593,6 +593,8 @@ func (c *testCase) executeAdminCheck() error {
return nil
}

c.tablesLock.Lock()
defer c.tablesLock.Unlock()
// build SQL
sql := "ADMIN CHECK TABLE "
i := 0
Expand All @@ -605,6 +607,9 @@ func (c *testCase) executeAdminCheck() error {
for _, index := range table.indexes {
checkIndexSQL := fmt.Sprintf("admin check index `%s` `%s`", table.name, index.name)
_, err := c.pickupDB().Exec(checkIndexSQL)
if ddlIgnoreError(err) {
continue
}
if err != nil {
return errors.Annotatef(err, "Error when executing SQL: %s", checkIndexSQL)
}
Expand Down
223 changes: 213 additions & 10 deletions ddl/ddl_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ func (c *testCase) generateDDLOps() error {
if err := c.generateAlterIndexVisibility(defaultTime); err != nil {
return errors.Trace(err)
}
if err := c.generateAlterPlacementPolicy(defaultTime); err != nil {
return errors.Trace(err)
}
if err := c.generateCreatePlacementPolicy(defaultTime); err != nil {
return errors.Trace(err)
}
if err := c.generateDropPlacementPolicy(defaultTime); err != nil {
return errors.Trace(err)
}
if err := c.generateAddPlacementPolicyToTable(defaultTime); err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -132,6 +144,11 @@ const (

ActionAlterIndexVisibility

ActionCreatePlacementPolicy
ActionDropPlacementPolicy
ActionAddPlacementPolicyToTable
ActionAlterPlacementPolicy

ddlKindNil
)

Expand Down Expand Up @@ -168,6 +185,11 @@ var mapOfDDLKind = map[string]DDLKind{
"drop primary key": ActionDropPrimaryKey,

"alter index visibility": ActionAlterIndexVisibility,

"create placement policy": ActionCreatePlacementPolicy,
"drop placement policy": ActionDropPlacementPolicy,
"add placement policy to table": ActionAddPlacementPolicyToTable,
"alter placement policy": ActionAlterPlacementPolicy,
}

var mapOfDDLKindToString = map[DDLKind]string{
Expand Down Expand Up @@ -202,6 +224,11 @@ var mapOfDDLKindToString = map[DDLKind]string{
ActionDropPrimaryKey: "drop primary key",

ActionAlterIndexVisibility: "alter index visibility",

ActionCreatePlacementPolicy: "create placement policy",
ActionDropPlacementPolicy: "drop placement policy",
ActionAddPlacementPolicyToTable: "add placement policy to table",
ActionAlterPlacementPolicy: "alter placement policy",
}

// mapOfDDLKindProbability use to control every kind of ddl request execute probability.
Expand Down Expand Up @@ -236,6 +263,11 @@ var mapOfDDLKindProbability = map[DDLKind]float64{
ActionDropPrimaryKey: 0.10,

ActionAlterIndexVisibility: 0.20,

ActionCreatePlacementPolicy: 0.20,
ActionDropPlacementPolicy: 0.20,
ActionAddPlacementPolicyToTable: 0.20,
ActionAlterPlacementPolicy: 0.20,
}

type ddlJob struct {
Expand All @@ -251,14 +283,15 @@ type ddlJob struct {
type ddlJobArg unsafe.Pointer

type ddlJobTask struct {
ddlID int
k DDLKind
tblInfo *ddlTestTable
schemaInfo *ddlTestSchema
viewInfo *ddlTestView
sql string
arg ddlJobArg
err error // err is an error executed by the remote TiDB.
ddlID int
k DDLKind
tblInfo *ddlTestTable
schemaInfo *ddlTestSchema
viewInfo *ddlTestView
placementInfo *ddlTestPlacementPolicy
sql string
arg ddlJobArg
err error // err is an error executed by the remote TiDB.
}

// Maintain the tableInfo description in the memory, once schrddl fails, we can get more details from the memory schema copy.
Expand Down Expand Up @@ -286,6 +319,8 @@ func (c *testCase) updateTableInfo(task *ddlJobTask) error {
return c.dropTableJob(task)
case ddlCreateView:
return c.createViewJob(task)
case ddlDropView:
return c.dropViewJob(task)
case ddlAddIndex:
return c.addIndexJob(task)
case ddlRenameIndex:
Expand All @@ -308,6 +343,16 @@ func (c *testCase) updateTableInfo(task *ddlJobTask) error {
return c.setAddPrimaryKey(task)
case ActionDropPrimaryKey:
return c.setDropPrimaryKey(task)
case ActionAlterIndexVisibility:
return c.setAlterIndexVisibility(task)
case ActionCreatePlacementPolicy:
return c.setCreatePlacementPolicy(task)
case ActionDropPlacementPolicy:
return c.setDropPlacementPolicy(task)
case ActionAddPlacementPolicyToTable:
return c.setAddPlacementPolicyToTable(task)
case ActionAlterPlacementPolicy:
return c.setAlterPlacementPolicy(task)
}
return fmt.Errorf("unknow ddl task , %v", *task)
}
Expand Down Expand Up @@ -1274,8 +1319,9 @@ func (c *testCase) prepareAddPrimaryKey(_ interface{}, taskCh chan *ddlJobTask)
perm := rand.Perm(table.columns.Size())
// build SQL
sql := fmt.Sprintf("ALTER TABLE `%s` ADD PRIMARY KEY (", table.name)
var column *ddlTestColumn
for _, i := range perm {
column := getColumnFromArrayList(table.columns, i)
column = getColumnFromArrayList(table.columns, i)
if column.canBePrimary() {
sql += fmt.Sprintf("`%s`", column.name)
break
Expand All @@ -1287,6 +1333,7 @@ func (c *testCase) prepareAddPrimaryKey(_ interface{}, taskCh chan *ddlJobTask)
k: ActionAddPrimaryKey,
sql: sql,
tblInfo: table,
arg: ddlJobArg(column),
}
taskCh <- task
return nil
Expand All @@ -1295,6 +1342,8 @@ func (c *testCase) prepareAddPrimaryKey(_ interface{}, taskCh chan *ddlJobTask)
func (c *testCase) setAddPrimaryKey(task *ddlJobTask) error {
tblInfo := task.tblInfo
tblInfo.hasPK = true
col := (*ddlTestColumn)(task.arg)
col.isPrimaryKey = true
return nil
}

Expand Down Expand Up @@ -1327,6 +1376,13 @@ func (c *testCase) prepareDropPrimaryKey(_ interface{}, taskCh chan *ddlJobTask)
func (c *testCase) setDropPrimaryKey(task *ddlJobTask) error {
tblInfo := task.tblInfo
tblInfo.hasPK = false
for ite := tblInfo.columns.Iterator(); ite.Next(); {
col := ite.Value().(*ddlTestColumn)
if col.isPrimaryKey {
col.isPrimaryKey = false
break
}
}
return nil
}

Expand Down Expand Up @@ -1471,7 +1527,7 @@ func (c *testCase) prepareAlterIndexVisibility(_ interface{}, taskCh chan *ddlJo
return nil
}

func (c *testCase) dropAlterIndexVisibilityJob(task *ddlJobTask) error {
func (c *testCase) setAlterIndexVisibility(task *ddlJobTask) error {
jobArg := (*ddlIndexJobArg)(task.arg)
tblInfo := task.tblInfo

Expand All @@ -1494,6 +1550,148 @@ func (c *testCase) dropAlterIndexVisibilityJob(task *ddlJobTask) error {
return nil
}

func (c *testCase) generateCreatePlacementPolicy(repeat int) error {
for i := 0; i < repeat; i++ {
c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareCreatePlacementPolicy, nil, ActionCreatePlacementPolicy})
}
return nil
}

func (c *testCase) prepareCreatePlacementPolicy(_ interface{}, taskCh chan *ddlJobTask) error {
policy := &ddlTestPlacementPolicy{
Name: uuid.NewV4().String(),
Content: "followers=2",
}

sql := fmt.Sprintf("CREATE PLACEMENT POLICY `%s` followers=2", policy.Name)
taskCh <- &ddlJobTask{
k: ActionCreatePlacementPolicy,
sql: sql,
placementInfo: policy,
}
return nil
}

func (c *testCase) setCreatePlacementPolicy(task *ddlJobTask) error {
policy := task.placementInfo
c.placementPolicies[policy.Name] = policy
return nil
}

func (c *testCase) generateDropPlacementPolicy(repeat int) error {
for i := 0; i < repeat; i++ {
c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareDropPlacementPolicy, nil, ActionDropPlacementPolicy})
}
return nil
}

func (c *testCase) prepareDropPlacementPolicy(_ interface{}, taskCh chan *ddlJobTask) error {
var policy string
for k := range c.placementPolicies {
if rand.Intn(len(c.placementPolicies)) == 0 {
policy = k
break
}
}

if policy == "" {
return nil
}

sql := fmt.Sprintf("DROP PLACEMENT POLICY `%s`", policy)
taskCh <- &ddlJobTask{
k: ActionDropPlacementPolicy,
sql: sql,
placementInfo: c.placementPolicies[policy],
}
return nil
}

func (c *testCase) setDropPlacementPolicy(task *ddlJobTask) error {
if task.placementInfo == nil {
return nil
}
delete(c.placementPolicies, task.placementInfo.Name)
return nil
}

func (c *testCase) generateAddPlacementPolicyToTable(repeat int) error {
for i := 0; i < repeat; i++ {
c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAddPlacementPolicyToTable, nil, ActionAddPlacementPolicyToTable})
}
return nil
}

func (c *testCase) prepareAddPlacementPolicyToTable(_ interface{}, taskCh chan *ddlJobTask) error {
var policy string
for k := range c.placementPolicies {
if rand.Intn(len(c.placementPolicies)) == 0 {
policy = k
break
}
}

if policy == "" {
return nil
}

table := c.pickupRandomTable()
if table == nil || table.policyName != "" {
return nil
}

sql := fmt.Sprintf("ALTER TABLE `%s` PLACEMENT POLICY=`%s`", table.name, policy)
taskCh <- &ddlJobTask{
k: ActionAddPlacementPolicyToTable,
sql: sql,
tblInfo: table,
placementInfo: c.placementPolicies[policy],
}
return nil
}

func (c *testCase) setAddPlacementPolicyToTable(task *ddlJobTask) error {
table := task.tblInfo
table.policyName = task.placementInfo.Name
return nil
}

func (c *testCase) generateAlterPlacementPolicy(repeat int) error {
for i := 0; i < repeat; i++ {
c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAlterPlacementPolicy, nil, ActionAlterPlacementPolicy})
}
return nil
}

func (c *testCase) prepareAlterPlacementPolicy(_ interface{}, taskCh chan *ddlJobTask) error {
var policy string
for k := range c.placementPolicies {
if rand.Intn(len(c.placementPolicies)) == 0 {
policy = k
break
}
}

if policy == "" {
return nil
}

followers := rand.Intn(3) + 1
sql := fmt.Sprintf("ALTER PLACEMENT POLICY `%s` followers=%d", policy, followers)
taskCh <- &ddlJobTask{
k: ActionAlterPlacementPolicy,
sql: sql,
placementInfo: c.placementPolicies[policy],
arg: ddlJobArg(&followers),
}
return nil
}

func (c *testCase) setAlterPlacementPolicy(task *ddlJobTask) error {
c.placementPolicies[task.placementInfo.Name].Content = fmt.Sprintf("followers=%d", (*int)(task.arg))
return nil
}

type ddlTestAddDropColumnStrategy = int

const (
Expand Down Expand Up @@ -1806,6 +2004,8 @@ func (c *testCase) prepareDropColumn(_ interface{}, taskCh chan *ddlJobTask) err
return nil
}

table.lock.Lock()
defer table.lock.Unlock()
columnsSnapshot := table.filterColumns(table.predicateAll)
if len(columnsSnapshot) <= 1 {
return nil
Expand Down Expand Up @@ -1970,6 +2170,9 @@ func (c *testCase) generateModifySchemaCharsetAndCollate(repeat int) error {

func (c *testCase) prepareModifySchemaCharsetAndCollate(_ interface{}, taskCh chan *ddlJobTask) error {
schema := c.pickupRandomSchema()
if schema == nil {
return nil
}
charset, collation := c.pickupRandomCharsetAndCollate()
taskCh <- &ddlJobTask{
k: ActionModifySchemaCharsetAndCollate,
Expand Down
Loading

0 comments on commit dbac5f1

Please sign in to comment.