diff --git a/go/base/context.go b/go/base/context.go index 51b43dfc8..b9badc43b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -52,6 +52,7 @@ const ( const ( HTTPStatusOK = 200 MaxEventsBatchSize = 1000 + ETAUnknown = math.MinInt64 ) var ( @@ -182,6 +183,7 @@ type MigrationContext struct { lastHeartbeatOnChangelogMutex *sync.Mutex CurrentLag int64 currentProgress uint64 + etaNanoseonds int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 @@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext { MaxLagMillisecondsThrottleThreshold: 1500, CutOverLockTimeoutSeconds: 3, DMLBatchSize: 10, + etaNanoseonds: ETAUnknown, maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, @@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) { atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct)) } +func (this *MigrationContext) GetETADuration() time.Duration { + return time.Duration(atomic.LoadInt64(&this.etaNanoseonds)) +} + +func (this *MigrationContext) SetETADuration(etaDuration time.Duration) { + atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds()) +} + +func (this *MigrationContext) GetETASeconds() int64 { + nano := atomic.LoadInt64(&this.etaNanoseonds) + if nano < 0 { + return ETAUnknown + } + return nano / int64(time.Second) +} + // math.Float64bits([f=0..100]) // GetTotalRowsCopied returns the accurate number of rows being copied (affected) diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 71f070ce3..2275ede51 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds())) env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds())) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct())) + env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken)) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 097b58f3a..fb473b830 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -187,6 +187,10 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { if column.Name == mappedColumn.Name && column.Type == sql.DateTimeColumnType && mappedColumn.Type == sql.TimestampColumnType { this.migrationContext.MappedSharedColumns.SetConvertDatetimeToTimestamp(column.Name, this.migrationContext.ApplierTimeZone) } + if column.Name == mappedColumn.Name && column.Type == sql.EnumColumnType && mappedColumn.Charset != "" { + this.migrationContext.MappedSharedColumns.SetEnumToTextConversion(column.Name) + this.migrationContext.MappedSharedColumns.SetEnumValues(column.Name, column.EnumValues) + } } for _, column := range this.migrationContext.UniqueKey.Columns.Columns() { @@ -590,6 +594,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL } if strings.HasPrefix(columnType, "enum") { column.Type = sql.EnumColumnType + column.EnumValues = sql.ParseEnumValues(m.GetString("COLUMN_TYPE")) } if strings.HasPrefix(columnType, "binary") { column.Type = sql.BinaryColumnType diff --git a/go/logic/migrator.go b/go/logic/migrator.go index dfddccffc..c12c21fc3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } var etaSeconds float64 = math.MaxFloat64 - eta := "N/A" + var etaDuration = time.Duration(base.ETAUnknown) if progressPct >= 100.0 { - eta = "due" + etaDuration = 0 } else if progressPct >= 0.1 { elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds if etaSeconds >= 0 { - etaDuration := time.Duration(etaSeconds) * time.Second - eta = base.PrettifyDurationOutput(etaDuration) + etaDuration = time.Duration(etaSeconds) * time.Second } else { - eta = "due" + etaDuration = 0 } } + this.migrationContext.SetETADuration(etaDuration) + var eta string + switch etaDuration { + case 0: + eta = "due" + case time.Duration(base.ETAUnknown): + eta = "N/A" + default: + eta = base.PrettifyDurationOutput(etaDuration) + } state := "migrating" if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows { diff --git a/go/logic/throttler.go b/go/logic/throttler.go index d234ea663..abe8669e1 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -188,9 +188,12 @@ func (this *Throttler) collectControlReplicasLag() { dbUri := connectionConfig.GetDBUri("information_schema") var heartbeatValue string - if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil { + db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri) + if err != nil { return lag, err - } else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil { + } + + if err := db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil { return lag, err } diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 17bb5fc32..43a228e18 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -18,8 +18,11 @@ import ( "github.com/outbrain/golib/sqlutils" ) -const MaxTableNameLength = 64 -const MaxReplicationPasswordLength = 32 +const ( + MaxTableNameLength = 64 + MaxReplicationPasswordLength = 32 + MaxDBPoolConnections = 3 +) type ReplicationLagResult struct { Key InstanceKey @@ -39,23 +42,22 @@ func (this *ReplicationLagResult) HasLag() bool { var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB) var knownDBsMutex = &sync.Mutex{} -func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) { +func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, err error) { cacheKey := migrationUuid + ":" + mysql_uri knownDBsMutex.Lock() - defer func() { - knownDBsMutex.Unlock() - }() - - var exists bool - if _, exists = knownDBs[cacheKey]; !exists { - if db, err := gosql.Open("mysql", mysql_uri); err == nil { - knownDBs[cacheKey] = db - } else { - return db, exists, err + defer knownDBsMutex.Unlock() + + if db, exists = knownDBs[cacheKey]; !exists { + db, err = gosql.Open("mysql", mysql_uri) + if err != nil { + return nil, false, err } + db.SetMaxOpenConns(MaxDBPoolConnections) + db.SetMaxIdleConns(MaxDBPoolConnections) + knownDBs[cacheKey] = db } - return knownDBs[cacheKey], exists, nil + return db, exists, nil } // GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS diff --git a/go/sql/builder.go b/go/sql/builder.go index 776a10d70..7fe366c6f 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -38,6 +38,8 @@ func buildColumnsPreparedValues(columns *ColumnList) []string { var token string if column.timezoneConversion != nil { token = fmt.Sprintf("convert_tz(?, '%s', '%s')", column.timezoneConversion.ToTimezone, "+00:00") + } else if column.enumToTextConversion { + token = fmt.Sprintf("ELT(?, %s)", column.EnumValues) } else if column.Type == JSONColumnType { token = "convert(? using utf8mb4)" } else { @@ -108,6 +110,8 @@ func BuildSetPreparedClause(columns *ColumnList) (result string, err error) { var setToken string if column.timezoneConversion != nil { setToken = fmt.Sprintf("%s=convert_tz(?, '%s', '%s')", EscapeName(column.Name), column.timezoneConversion.ToTimezone, "+00:00") + } else if column.enumToTextConversion { + setToken = fmt.Sprintf("%s=ELT(?, %s)", EscapeName(column.Name), column.EnumValues) } else if column.Type == JSONColumnType { setToken = fmt.Sprintf("%s=convert(? using utf8mb4)", EscapeName(column.Name)) } else { diff --git a/go/sql/parser.go b/go/sql/parser.go index d9c0c3f19..eac0bdce3 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -33,6 +33,7 @@ var ( // ALTER TABLE tbl something regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)\s+(.*$)`), } + enumValuesRegexp = regexp.MustCompile("^enum[(](.*)[)]$") ) type AlterTableParser struct { @@ -205,3 +206,10 @@ func (this *AlterTableParser) HasExplicitTable() bool { func (this *AlterTableParser) GetAlterStatementOptions() string { return this.alterStatementOptions } + +func ParseEnumValues(enumColumnType string) string { + if submatch := enumValuesRegexp.FindStringSubmatch(enumColumnType); len(submatch) > 0 { + return submatch[1] + } + return enumColumnType +} diff --git a/go/sql/parser_test.go b/go/sql/parser_test.go index 6cdbb39cc..3157d097e 100644 --- a/go/sql/parser_test.go +++ b/go/sql/parser_test.go @@ -322,3 +322,21 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b", "add index idx(i)"})) } } + +func TestParseEnumValues(t *testing.T) { + { + s := "enum('red','green','blue','orange')" + values := ParseEnumValues(s) + test.S(t).ExpectEquals(values, "'red','green','blue','orange'") + } + { + s := "('red','green','blue','orange')" + values := ParseEnumValues(s) + test.S(t).ExpectEquals(values, "('red','green','blue','orange')") + } + { + s := "zzz" + values := ParseEnumValues(s) + test.S(t).ExpectEquals(values, "zzz") + } +} diff --git a/go/sql/types.go b/go/sql/types.go index fa6b74e4b..3c4ce5e85 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -33,15 +33,16 @@ type TimezoneConversion struct { } type Column struct { - Name string - IsUnsigned bool - Charset string - Type ColumnType - + Name string + IsUnsigned bool + Charset string + Type ColumnType + EnumValues string + timezoneConversion *TimezoneConversion + enumToTextConversion bool // add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog. // https://github.com/github/gh-ost/issues/909 - BinaryOctetLength uint - timezoneConversion *TimezoneConversion + BinaryOctetLength uint } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { @@ -198,6 +199,18 @@ func (this *ColumnList) HasTimezoneConversion(columnName string) bool { return this.GetColumn(columnName).timezoneConversion != nil } +func (this *ColumnList) SetEnumToTextConversion(columnName string) { + this.GetColumn(columnName).enumToTextConversion = true +} + +func (this *ColumnList) IsEnumToTextConversion(columnName string) bool { + return this.GetColumn(columnName).enumToTextConversion +} + +func (this *ColumnList) SetEnumValues(columnName string, enumValues string) { + this.GetColumn(columnName).EnumValues = enumValues +} + func (this *ColumnList) String() string { return strings.Join(this.Names(), ",") } diff --git a/localtests/enum-to-varchar/create.sql b/localtests/enum-to-varchar/create.sql new file mode 100644 index 000000000..0dbab17ed --- /dev/null +++ b/localtests/enum-to-varchar/create.sql @@ -0,0 +1,26 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + e enum('red', 'green', 'blue', 'orange') null default null collate 'utf8_bin', + primary key(id) +) auto_increment=1; + +insert into gh_ost_test values (null, 7, 'red'); + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 11, 'red'); + insert into gh_ost_test values (null, 13, 'green'); + insert into gh_ost_test values (null, 17, 'blue'); + set @last_insert_id := last_insert_id(); + update gh_ost_test set e='orange' where id = @last_insert_id; +end ;; diff --git a/localtests/enum-to-varchar/extra_args b/localtests/enum-to-varchar/extra_args new file mode 100644 index 000000000..68524e49b --- /dev/null +++ b/localtests/enum-to-varchar/extra_args @@ -0,0 +1 @@ +--alter="change e e varchar(32) not null default ''"