From dbac5f12d2339a0aa516e4e58e6dc5f565c0a0cb Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 10 Mar 2022 16:45:39 +0800 Subject: [PATCH] alter placement rule relate --- ddl/ddl.go | 25 +++--- ddl/ddl_ops.go | 223 ++++++++++++++++++++++++++++++++++++++++++++++--- ddl/meta.go | 45 ++++++---- ddl/run.go | 4 + main.go | 4 +- 5 files changed, 264 insertions(+), 37 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index eed2469..f71b649 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -153,14 +153,15 @@ func NewDDLCase(cfg *DDLCaseConfig) *DDLCase { cases := make([]*testCase, cfg.Concurrency) for i := 0; i < cfg.Concurrency; i++ { cases[i] = &testCase{ - cfg: cfg, - tables: make(map[string]*ddlTestTable), - schemas: make(map[string]*ddlTestSchema), - views: make(map[string]*ddlTestView), - ddlOps: make([]ddlTestOpExecutor, 0), - dmlOps: make([]dmlTestOpExecutor, 0), - caseIndex: i, - stop: 0, + cfg: cfg, + tables: make(map[string]*ddlTestTable), + schemas: make(map[string]*ddlTestSchema), + views: make(map[string]*ddlTestView), + placementPolicies: map[string]*ddlTestPlacementPolicy{}, + ddlOps: make([]ddlTestOpExecutor, 0), + dmlOps: make([]dmlTestOpExecutor, 0), + caseIndex: i, + stop: 0, } } b := &DDLCase{ @@ -409,8 +410,7 @@ func (c *testCase) execute(executeDDL ExecuteDDLFunc, exeDMLFunc ExecuteDMLFunc) log.Infof("[ddl] [instance %d] Executing post round operations...", c.caseIndex) if !c.cfg.MySQLCompatible { - err := c.executeAdminCheck() - if err != nil { + if err := c.executeAdminCheck(); err != nil { return errors.Trace(err) } } @@ -593,6 +593,8 @@ func (c *testCase) executeAdminCheck() error { return nil } + c.tablesLock.Lock() + defer c.tablesLock.Unlock() // build SQL sql := "ADMIN CHECK TABLE " i := 0 @@ -605,6 +607,9 @@ func (c *testCase) executeAdminCheck() error { for _, index := range table.indexes { checkIndexSQL := fmt.Sprintf("admin check index `%s` `%s`", table.name, index.name) _, err := c.pickupDB().Exec(checkIndexSQL) + if ddlIgnoreError(err) { + continue + } if err != nil { return errors.Annotatef(err, "Error when executing SQL: %s", checkIndexSQL) } diff --git a/ddl/ddl_ops.go b/ddl/ddl_ops.go index 3174145..9eb59f4 100644 --- a/ddl/ddl_ops.go +++ b/ddl/ddl_ops.go @@ -94,6 +94,18 @@ func (c *testCase) generateDDLOps() error { if err := c.generateAlterIndexVisibility(defaultTime); err != nil { return errors.Trace(err) } + if err := c.generateAlterPlacementPolicy(defaultTime); err != nil { + return errors.Trace(err) + } + if err := c.generateCreatePlacementPolicy(defaultTime); err != nil { + return errors.Trace(err) + } + if err := c.generateDropPlacementPolicy(defaultTime); err != nil { + return errors.Trace(err) + } + if err := c.generateAddPlacementPolicyToTable(defaultTime); err != nil { + return errors.Trace(err) + } return nil } @@ -132,6 +144,11 @@ const ( ActionAlterIndexVisibility + ActionCreatePlacementPolicy + ActionDropPlacementPolicy + ActionAddPlacementPolicyToTable + ActionAlterPlacementPolicy + ddlKindNil ) @@ -168,6 +185,11 @@ var mapOfDDLKind = map[string]DDLKind{ "drop primary key": ActionDropPrimaryKey, "alter index visibility": ActionAlterIndexVisibility, + + "create placement policy": ActionCreatePlacementPolicy, + "drop placement policy": ActionDropPlacementPolicy, + "add placement policy to table": ActionAddPlacementPolicyToTable, + "alter placement policy": ActionAlterPlacementPolicy, } var mapOfDDLKindToString = map[DDLKind]string{ @@ -202,6 +224,11 @@ var mapOfDDLKindToString = map[DDLKind]string{ ActionDropPrimaryKey: "drop primary key", ActionAlterIndexVisibility: "alter index visibility", + + ActionCreatePlacementPolicy: "create placement policy", + ActionDropPlacementPolicy: "drop placement policy", + ActionAddPlacementPolicyToTable: "add placement policy to table", + ActionAlterPlacementPolicy: "alter placement policy", } // mapOfDDLKindProbability use to control every kind of ddl request execute probability. @@ -236,6 +263,11 @@ var mapOfDDLKindProbability = map[DDLKind]float64{ ActionDropPrimaryKey: 0.10, ActionAlterIndexVisibility: 0.20, + + ActionCreatePlacementPolicy: 0.20, + ActionDropPlacementPolicy: 0.20, + ActionAddPlacementPolicyToTable: 0.20, + ActionAlterPlacementPolicy: 0.20, } type ddlJob struct { @@ -251,14 +283,15 @@ type ddlJob struct { type ddlJobArg unsafe.Pointer type ddlJobTask struct { - ddlID int - k DDLKind - tblInfo *ddlTestTable - schemaInfo *ddlTestSchema - viewInfo *ddlTestView - sql string - arg ddlJobArg - err error // err is an error executed by the remote TiDB. + ddlID int + k DDLKind + tblInfo *ddlTestTable + schemaInfo *ddlTestSchema + viewInfo *ddlTestView + placementInfo *ddlTestPlacementPolicy + sql string + arg ddlJobArg + err error // err is an error executed by the remote TiDB. } // Maintain the tableInfo description in the memory, once schrddl fails, we can get more details from the memory schema copy. @@ -286,6 +319,8 @@ func (c *testCase) updateTableInfo(task *ddlJobTask) error { return c.dropTableJob(task) case ddlCreateView: return c.createViewJob(task) + case ddlDropView: + return c.dropViewJob(task) case ddlAddIndex: return c.addIndexJob(task) case ddlRenameIndex: @@ -308,6 +343,16 @@ func (c *testCase) updateTableInfo(task *ddlJobTask) error { return c.setAddPrimaryKey(task) case ActionDropPrimaryKey: return c.setDropPrimaryKey(task) + case ActionAlterIndexVisibility: + return c.setAlterIndexVisibility(task) + case ActionCreatePlacementPolicy: + return c.setCreatePlacementPolicy(task) + case ActionDropPlacementPolicy: + return c.setDropPlacementPolicy(task) + case ActionAddPlacementPolicyToTable: + return c.setAddPlacementPolicyToTable(task) + case ActionAlterPlacementPolicy: + return c.setAlterPlacementPolicy(task) } return fmt.Errorf("unknow ddl task , %v", *task) } @@ -1274,8 +1319,9 @@ func (c *testCase) prepareAddPrimaryKey(_ interface{}, taskCh chan *ddlJobTask) perm := rand.Perm(table.columns.Size()) // build SQL sql := fmt.Sprintf("ALTER TABLE `%s` ADD PRIMARY KEY (", table.name) + var column *ddlTestColumn for _, i := range perm { - column := getColumnFromArrayList(table.columns, i) + column = getColumnFromArrayList(table.columns, i) if column.canBePrimary() { sql += fmt.Sprintf("`%s`", column.name) break @@ -1287,6 +1333,7 @@ func (c *testCase) prepareAddPrimaryKey(_ interface{}, taskCh chan *ddlJobTask) k: ActionAddPrimaryKey, sql: sql, tblInfo: table, + arg: ddlJobArg(column), } taskCh <- task return nil @@ -1295,6 +1342,8 @@ func (c *testCase) prepareAddPrimaryKey(_ interface{}, taskCh chan *ddlJobTask) func (c *testCase) setAddPrimaryKey(task *ddlJobTask) error { tblInfo := task.tblInfo tblInfo.hasPK = true + col := (*ddlTestColumn)(task.arg) + col.isPrimaryKey = true return nil } @@ -1327,6 +1376,13 @@ func (c *testCase) prepareDropPrimaryKey(_ interface{}, taskCh chan *ddlJobTask) func (c *testCase) setDropPrimaryKey(task *ddlJobTask) error { tblInfo := task.tblInfo tblInfo.hasPK = false + for ite := tblInfo.columns.Iterator(); ite.Next(); { + col := ite.Value().(*ddlTestColumn) + if col.isPrimaryKey { + col.isPrimaryKey = false + break + } + } return nil } @@ -1471,7 +1527,7 @@ func (c *testCase) prepareAlterIndexVisibility(_ interface{}, taskCh chan *ddlJo return nil } -func (c *testCase) dropAlterIndexVisibilityJob(task *ddlJobTask) error { +func (c *testCase) setAlterIndexVisibility(task *ddlJobTask) error { jobArg := (*ddlIndexJobArg)(task.arg) tblInfo := task.tblInfo @@ -1494,6 +1550,148 @@ func (c *testCase) dropAlterIndexVisibilityJob(task *ddlJobTask) error { return nil } +func (c *testCase) generateCreatePlacementPolicy(repeat int) error { + for i := 0; i < repeat; i++ { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareCreatePlacementPolicy, nil, ActionCreatePlacementPolicy}) + } + return nil +} + +func (c *testCase) prepareCreatePlacementPolicy(_ interface{}, taskCh chan *ddlJobTask) error { + policy := &ddlTestPlacementPolicy{ + Name: uuid.NewV4().String(), + Content: "followers=2", + } + + sql := fmt.Sprintf("CREATE PLACEMENT POLICY `%s` followers=2", policy.Name) + taskCh <- &ddlJobTask{ + k: ActionCreatePlacementPolicy, + sql: sql, + placementInfo: policy, + } + return nil +} + +func (c *testCase) setCreatePlacementPolicy(task *ddlJobTask) error { + policy := task.placementInfo + c.placementPolicies[policy.Name] = policy + return nil +} + +func (c *testCase) generateDropPlacementPolicy(repeat int) error { + for i := 0; i < repeat; i++ { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareDropPlacementPolicy, nil, ActionDropPlacementPolicy}) + } + return nil +} + +func (c *testCase) prepareDropPlacementPolicy(_ interface{}, taskCh chan *ddlJobTask) error { + var policy string + for k := range c.placementPolicies { + if rand.Intn(len(c.placementPolicies)) == 0 { + policy = k + break + } + } + + if policy == "" { + return nil + } + + sql := fmt.Sprintf("DROP PLACEMENT POLICY `%s`", policy) + taskCh <- &ddlJobTask{ + k: ActionDropPlacementPolicy, + sql: sql, + placementInfo: c.placementPolicies[policy], + } + return nil +} + +func (c *testCase) setDropPlacementPolicy(task *ddlJobTask) error { + if task.placementInfo == nil { + return nil + } + delete(c.placementPolicies, task.placementInfo.Name) + return nil +} + +func (c *testCase) generateAddPlacementPolicyToTable(repeat int) error { + for i := 0; i < repeat; i++ { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAddPlacementPolicyToTable, nil, ActionAddPlacementPolicyToTable}) + } + return nil +} + +func (c *testCase) prepareAddPlacementPolicyToTable(_ interface{}, taskCh chan *ddlJobTask) error { + var policy string + for k := range c.placementPolicies { + if rand.Intn(len(c.placementPolicies)) == 0 { + policy = k + break + } + } + + if policy == "" { + return nil + } + + table := c.pickupRandomTable() + if table == nil || table.policyName != "" { + return nil + } + + sql := fmt.Sprintf("ALTER TABLE `%s` PLACEMENT POLICY=`%s`", table.name, policy) + taskCh <- &ddlJobTask{ + k: ActionAddPlacementPolicyToTable, + sql: sql, + tblInfo: table, + placementInfo: c.placementPolicies[policy], + } + return nil +} + +func (c *testCase) setAddPlacementPolicyToTable(task *ddlJobTask) error { + table := task.tblInfo + table.policyName = task.placementInfo.Name + return nil +} + +func (c *testCase) generateAlterPlacementPolicy(repeat int) error { + for i := 0; i < repeat; i++ { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAlterPlacementPolicy, nil, ActionAlterPlacementPolicy}) + } + return nil +} + +func (c *testCase) prepareAlterPlacementPolicy(_ interface{}, taskCh chan *ddlJobTask) error { + var policy string + for k := range c.placementPolicies { + if rand.Intn(len(c.placementPolicies)) == 0 { + policy = k + break + } + } + + if policy == "" { + return nil + } + + followers := rand.Intn(3) + 1 + sql := fmt.Sprintf("ALTER PLACEMENT POLICY `%s` followers=%d", policy, followers) + taskCh <- &ddlJobTask{ + k: ActionAlterPlacementPolicy, + sql: sql, + placementInfo: c.placementPolicies[policy], + arg: ddlJobArg(&followers), + } + return nil +} + +func (c *testCase) setAlterPlacementPolicy(task *ddlJobTask) error { + c.placementPolicies[task.placementInfo.Name].Content = fmt.Sprintf("followers=%d", (*int)(task.arg)) + return nil +} + type ddlTestAddDropColumnStrategy = int const ( @@ -1806,6 +2004,8 @@ func (c *testCase) prepareDropColumn(_ interface{}, taskCh chan *ddlJobTask) err return nil } + table.lock.Lock() + defer table.lock.Unlock() columnsSnapshot := table.filterColumns(table.predicateAll) if len(columnsSnapshot) <= 1 { return nil @@ -1970,6 +2170,9 @@ func (c *testCase) generateModifySchemaCharsetAndCollate(repeat int) error { func (c *testCase) prepareModifySchemaCharsetAndCollate(_ interface{}, taskCh chan *ddlJobTask) error { schema := c.pickupRandomSchema() + if schema == nil { + return nil + } charset, collation := c.pickupRandomCharsetAndCollate() taskCh <- &ddlJobTask{ k: ActionModifySchemaCharsetAndCollate, diff --git a/ddl/meta.go b/ddl/meta.go index 7f95f9c..a8562a2 100644 --- a/ddl/meta.go +++ b/ddl/meta.go @@ -18,21 +18,27 @@ import ( "github.com/twinj/uuid" ) +var ( + ddlJobCount int64 = 0 +) + type testCase struct { - cfg *DDLCaseConfig - initDB string - dbs []*sql.DB - caseIndex int - ddlOps []ddlTestOpExecutor - dmlOps []dmlTestOpExecutor - tables map[string]*ddlTestTable - schemas map[string]*ddlTestSchema - views map[string]*ddlTestView - tablesLock sync.RWMutex - stop int32 - lastDDLID int - charsets []string - charsetsCollates map[string][]string + cfg *DDLCaseConfig + initDB string + dbs []*sql.DB + caseIndex int + ddlOps []ddlTestOpExecutor + dmlOps []dmlTestOpExecutor + tables map[string]*ddlTestTable + schemas map[string]*ddlTestSchema + views map[string]*ddlTestView + placementPolicies map[string]*ddlTestPlacementPolicy + tablesLock sync.RWMutex + stop int32 + lastDDLID int + charsets []string + charsetsCollates map[string][]string + updateSchemaMu sync.Mutex } type ddlTestErrorConflict struct { @@ -105,6 +111,9 @@ func (c *testCase) executeWithTimeout(db *sql.DB, task *ddlJobTask) error { case ddlModifyColumn2, ddlModifyColumn, ddlAddIndex: t = time.Minute * 10 } + runningDDLCount := atomic.AddInt64(&ddlJobCount, 1) + defer atomic.AddInt64(&ddlJobCount, -1) + t = t * time.Duration(runningDDLCount) ctx, cancel := context.WithTimeout(context.Background(), t) defer cancel() _, err := db.ExecContext(ctx, task.sql) @@ -209,6 +218,11 @@ func (c *testCase) pickupRandomView() *ddlTestView { return nil } +type ddlTestPlacementPolicy struct { + Name string + Content string +} + type ddlTestTable struct { deleted int32 name string @@ -222,6 +236,7 @@ type ddlTestTable struct { charset string collate string hasPK bool + policyName string lock *sync.RWMutex } @@ -892,7 +907,7 @@ type ddlTestIndex struct { name string signature string columns []*ddlTestColumn - invisible bool + invisible bool } func (col *ddlTestColumn) normalizeDataType() string { diff --git a/ddl/run.go b/ddl/run.go index 2966856..ef7b488 100644 --- a/ddl/run.go +++ b/ddl/run.go @@ -152,6 +152,7 @@ func ddlIgnoreError(err error) bool { } errStr := err.Error() log.Warnf("check DDL err:%s", errStr) + fmt.Fprintf(os.Stdout, "check DDL err:%s\n", errStr) if strings.Contains(errStr, "Information schema is changed") { return true } @@ -208,5 +209,8 @@ func ddlIgnoreError(err error) bool { strings.Contains(errStr, "with tidb_enable_change_multi_schema is disable") { return true } + if strings.Contains(errStr, "with tidb_enable_change_multi_schema is disable") || strings.Contains(errStr, "Unsupported drop primary key when the table's pkIsHandle is true") { + return true + } return false } diff --git a/main.go b/main.go index 464247a..a0c6002 100644 --- a/main.go +++ b/main.go @@ -27,10 +27,10 @@ var ( dbs = &dbAddr{dbs: []string{"127.0.0.1:4000"}} dbName = flag.String("db", "test", "database name") mode = flag.String("mode", "serial", "test mode: serial, parallel") - concurrency = flag.Int("concurrency", 1, "concurrency") + concurrency = flag.Int("concurrency", 20, "concurrency") tablesToCreate = flag.Int("tables", 1, "the number of the tables to create") mysqlCompatible = flag.Bool("mysql-compatible", false, "disable TiDB-only features") - testTime = flag.Duration("time", 5*time.Minute, "test time") + testTime = flag.Duration("time", 2*time.Hour, "test time") output = flag.String("output", "", "output file") )