Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8SPXC-1512: Install binlog UDF component for 8.4 #1937

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage"
)

const collectorPasswordPath = "/etc/mysql/mysql-users-secret/xtrabackup"

type Collector struct {
db *pxc.PXC
storage storage.Storage
Expand Down Expand Up @@ -93,13 +95,56 @@ func New(ctx context.Context, c Config) (*Collector, error) {
return nil, errors.New("unknown STORAGE_TYPE")
}

file, err := os.Open(collectorPasswordPath)
if err != nil {
return nil, errors.Wrap(err, "open file")
}
pxcPass, err := io.ReadAll(file)
if err != nil {
return nil, errors.Wrap(err, "read password")
}

return &Collector{
storage: s,
pxcUser: c.PXCUser,
pxcPass: string(pxcPass),
pxcServiceName: c.PXCServiceName,
}, nil
}

func (c *Collector) Init(ctx context.Context) error {
host, err := pxc.GetPXCFirstHost(ctx, c.pxcServiceName)
if err != nil {
return errors.Wrap(err, "get first PXC host")
}

db, err := pxc.NewPXC(host, c.pxcUser, c.pxcPass)
if err != nil {
return errors.Wrapf(err, "new manager with host %s", host)
}
defer db.Close()

version, err := db.GetVersion(ctx)
if err != nil {
return errors.Wrap(err, "get version")
}

switch {
case strings.HasPrefix(version, "8.0"):
log.Println("creating collector functions")
if err := db.CreateCollectorFunctions(ctx); err != nil {
return errors.Wrap(err, "init 8.0: create collector functions")
}
case strings.HasPrefix(version, "8.4"):
log.Println("installing binlog UDF component")
if err := db.InstallBinlogUDFComponent(ctx); err != nil {
return errors.Wrap(err, "init 8.4: install component")
}
}

return nil
}

func (c *Collector) Run(ctx context.Context) error {
err := c.newDB(ctx)
if err != nil {
Expand Down Expand Up @@ -136,22 +181,12 @@ func (c *Collector) lastGTIDSet(ctx context.Context, suffix string) (pxc.GTIDSet
}

func (c *Collector) newDB(ctx context.Context) error {
file, err := os.Open("/etc/mysql/mysql-users-secret/xtrabackup")
if err != nil {
return errors.Wrap(err, "open file")
}
pxcPass, err := io.ReadAll(file)
if err != nil {
return errors.Wrap(err, "read password")
}
c.pxcPass = string(pxcPass)

host, err := pxc.GetPXCOldestBinlogHost(ctx, c.pxcServiceName, c.pxcUser, c.pxcPass)
if err != nil {
return errors.Wrap(err, "get host")
}

log.Println("Reading binlogs from pxc with hostname=", host)
log.Println("reading binlogs from pxc with hostname=", host)

c.db, err = pxc.NewPXC(host, c.pxcUser, c.pxcPass)
if err != nil {
Expand Down Expand Up @@ -317,7 +352,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {
}

if len(lastGTIDSetList) == 0 {
log.Println("No binlogs to upload")
log.Println("no binlogs to upload")
return nil
}

Expand Down Expand Up @@ -381,7 +416,7 @@ func (c *Collector) CollectBinLogs(ctx context.Context) error {
}

if len(list) == 0 {
log.Println("No binlogs to upload")
log.Println("no binlogs to upload after filter")
return nil
}

Expand Down
10 changes: 8 additions & 2 deletions cmd/pitr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ func runCollector(ctx context.Context) {
}
c, err := collector.New(ctx, config)
if err != nil {
log.Fatalln("ERROR: new controller:", err)
log.Fatalln("ERROR: new collector:", err)
}
log.Println("run binlog collector")

log.Println("initializing collector")
if err := c.Init(ctx); err != nil {
log.Fatalln("ERROR: init collector:", err)
}

log.Println("running binlog collector")
for {
timeout, cancel := context.WithTimeout(ctx, time.Duration(config.CollectSpanSec)*time.Second)
defer cancel()
Expand Down
96 changes: 49 additions & 47 deletions cmd/pitr/pxc/pxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@
"strings"

"github.com/go-sql-driver/mysql"
v "github.com/hashicorp/go-version"
"github.com/pkg/errors"
)

const UsingPassErrorMessage = `mysqlbinlog: [Warning] Using a password on the command line interface can be insecure.`

// PXC is a type for working with pxc
type PXC struct {
db *sql.DB // handle for work with database
host string // host for connection
db *sql.DB // handle for work with database
host string // host for connection
version *v.Version

Check failure on line 23 in cmd/pitr/pxc/pxc.go

View workflow job for this annotation

GitHub Actions / runner / suggester / golangci-lint

field `version` is unused (unused)
}

// NewManager return new manager for work with pxc
Expand All @@ -31,6 +33,7 @@
config.Net = "tcp"
config.Addr = addr + ":33062"
config.Params = map[string]string{"interpolateParams": "true"}
config.DBName = "mysql"

mysqlDB, err := sql.Open("mysql", config.FormatDSN())
if err != nil {
Expand All @@ -55,23 +58,10 @@

// GetGTIDSet return GTID set by binary log file name
func (p *PXC) GetGTIDSet(ctx context.Context, binlogName string) (string, error) {
// select name from mysql.func where name='get_gtid_set_by_binlog'
var existFunc string
nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_gtid_set_by_binlog'")
err := nameRow.Scan(&existFunc)
if err != nil && err != sql.ErrNoRows {
return "", errors.Wrap(err, "get udf name")
}
if len(existFunc) == 0 {
_, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'")
if err != nil {
return "", errors.Wrap(err, "create function")
}
}
var binlogSet string
row := p.db.QueryRowContext(ctx, "SELECT get_gtid_set_by_binlog(?)", binlogName)
err = row.Scan(&binlogSet)
if err != nil && !strings.Contains(err.Error(), "Binary log does not exist") {

if err := row.Scan(&binlogSet); err != nil && !strings.Contains(err.Error(), "Binary log does not exist") {
return "", errors.Wrap(err, "scan set")
}

Expand Down Expand Up @@ -110,6 +100,16 @@
return list
}

func (p *PXC) GetVersion(ctx context.Context) (string, error) {
var version string

if err := p.db.QueryRowContext(ctx, "select @@VERSION").Scan(&version); err != nil {
return "", errors.Wrap(err, "select @@VERSION")
}

return version, nil
}

// GetBinLogList return binary log files list
func (p *PXC) GetBinLogList(ctx context.Context) ([]Binlog, error) {
rows, err := p.db.QueryContext(ctx, "SHOW BINARY LOGS")
Expand Down Expand Up @@ -166,23 +166,10 @@

// GetBinLogFirstTimestamp return binary log file first timestamp
func (p *PXC) GetBinLogFirstTimestamp(ctx context.Context, binlog string) (string, error) {
var existFunc string
nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_first_record_timestamp_by_binlog'")
err := nameRow.Scan(&existFunc)
if err != nil && err != sql.ErrNoRows {
return "", errors.Wrap(err, "get udf name")
}
if len(existFunc) == 0 {
_, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'")
if err != nil {
return "", errors.Wrap(err, "create function")
}
}
var timestamp string
row := p.db.QueryRowContext(ctx, "SELECT get_first_record_timestamp_by_binlog(?) DIV 1000000", binlog)

err = row.Scan(&timestamp)
if err != nil {
if err := row.Scan(&timestamp); err != nil {
return "", errors.Wrap(err, "scan binlog timestamp")
}

Expand All @@ -191,23 +178,10 @@

// GetBinLogLastTimestamp return binary log file last timestamp
func (p *PXC) GetBinLogLastTimestamp(ctx context.Context, binlog string) (string, error) {
var existFunc string
nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_last_record_timestamp_by_binlog'")
err := nameRow.Scan(&existFunc)
if err != nil && err != sql.ErrNoRows {
return "", errors.Wrap(err, "get udf name")
}
if len(existFunc) == 0 {
_, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_last_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'")
if err != nil {
return "", errors.Wrap(err, "create function")
}
}
var timestamp string
row := p.db.QueryRowContext(ctx, "SELECT get_last_record_timestamp_by_binlog(?) DIV 1000000", binlog)

err = row.Scan(&timestamp)
if err != nil {
if err := row.Scan(&timestamp); err != nil {
return "", errors.Wrap(err, "scan binlog timestamp")
}

Expand All @@ -217,8 +191,8 @@
func (p *PXC) SubtractGTIDSet(ctx context.Context, set, subSet string) (string, error) {
var result string
row := p.db.QueryRowContext(ctx, "SELECT GTID_SUBTRACT(?,?)", set, subSet)
err := row.Scan(&result)
if err != nil {

if err := row.Scan(&result); err != nil {
return "", errors.Wrap(err, "scan gtid subtract result")
}

Expand Down Expand Up @@ -330,6 +304,34 @@
return binlogTime, nil
}

func (p *PXC) InstallBinlogUDFComponent(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, "INSTALL COMPONENT 'file://component_binlog_utils_udf'")
if err != nil {
return errors.Wrap(err, "install component")
}

return nil
}

func (p *PXC) CreateCollectorFunctions(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, "CREATE FUNCTION IF NOT EXISTS get_last_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'")
if err != nil {
return errors.Wrap(err, "create function get_first_record_timestamp_by_binlog")
}

_, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'")
if err != nil {
return errors.Wrap(err, "create function get_gtid_set_by_binlog")
}

_, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'")
if err != nil {
return errors.Wrap(err, "create function get_first_record_timestamp_by_binlog")
}

return nil
}

func (p *PXC) DropCollectorFunctions(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, "DROP FUNCTION IF EXISTS get_first_record_timestamp_by_binlog")
if err != nil {
Expand Down
Loading