Skip to content
This repository has been archived by the owner on Dec 9, 2024. It is now read-only.

Commit

Permalink
support to custom tables and pad length
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored and sticnarf committed Jun 21, 2021
1 parent 6560045 commit e9527be
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 14 deletions.
28 changes: 23 additions & 5 deletions testcase/write-stress/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package testcase
import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"sync"

Expand All @@ -34,8 +35,23 @@ func (c *appendClient) SetUp(ctx context.Context, nodes []cluster.Node, clientNo
if err := c.baseClient.SetUp(ctx, nodes, clientNodes, idx); err != nil {
return err
}
util.MustExec(c.db, "drop table if exists write_stress")
util.MustExec(c.db, "create table write_stress(col1 bigint, col2 varchar(256), data longtext, key k(col1, col2))")
// Use 32 threads to create tables.
var wg sync.WaitGroup
for i := 0; i < 32; i++ {
wg.Add(1)
go func(tid int) {
defer wg.Done()
for j := 0; j < c.tables; j++ {
if j%32 == tid {
sql := fmt.Sprintf("drop table if exists write_stress%d", j+1)
util.MustExec(c.db, sql)
sql = fmt.Sprintf("create table write_stress%d(col1 bigint, col2 varchar(256), data longtext, key k(col1, col2))", j+1)
util.MustExec(c.db, sql)
}
}
}(i)
}
wg.Wait()
return nil
}

Expand Down Expand Up @@ -66,14 +82,16 @@ func (c *appendClient) runClient(ctx context.Context) error {
rng := rand.New(rand.NewSource(rand.Int63()))

col2 := make([]byte, 192)
data := make([]byte, 65536)
data := make([]byte, c.padLength)
for {
col1 := rng.Int63()
col2Len := rng.Intn(192)
_, _ = rng.Read(col2[:col2Len])
dataLen := rng.Intn(65536)
dataLen := rng.Intn(c.padLength)
_, _ = rng.Read(data[:dataLen])
_, err := c.db.ExecContext(ctx, "insert into write_stress values (?, ?, ?)", col1,
tid := rng.Int()%c.tables + 1
sql := fmt.Sprintf("insert into write_stress%d values (?, ?, ?)", tid)
_, err := c.db.ExecContext(ctx, sql, col1,
base64.StdEncoding.EncodeToString(col2[:col2Len]),
base64.StdEncoding.EncodeToString(data[:dataLen]))
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions testcase/write-stress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ import (

// CaseCreator is a creator of test client
type CaseCreator struct {
Concurrency int
CaseName string
Concurrency int
Tables int
PadLength int
}

// Create creates a test client
func (c CaseCreator) Create(node cluster.ClientNode) core.Client {
base := baseClient{concurrency: c.Concurrency}
base := baseClient{
concurrency: c.Concurrency,
tables: c.Tables,
padLength: c.PadLength,
}
switch c.CaseName {
case "uniform":
return &uniformClient{base}
Expand All @@ -47,6 +53,8 @@ func (c CaseCreator) Create(node cluster.ClientNode) core.Client {
type baseClient struct {
db *sql.DB
concurrency int
tables int
padLength int
}

func (c *baseClient) SetUp(ctx context.Context, _ []cluster.Node, clientNodes []cluster.ClientNode, idx int) error {
Expand Down
9 changes: 7 additions & 2 deletions testcase/write-stress/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import (

var (
concurrency = flag.Int("concurrency", 1024, "write concurrency")
caseName = flag.String("case-name", "uniform", "test case name")
tables = flag.Int("tables", 1, "total tables")
padLength = flag.Int("pad-length", 65536, "pad string length")

caseName = flag.String("case-name", "uniform", "test case name")
)

func main() {
Expand All @@ -47,8 +50,10 @@ func main() {
Config: &cfg,
Provider: cluster.NewDefaultClusterProvider(),
ClientCreator: testcase.CaseCreator{
Concurrency: *concurrency,
CaseName: *caseName,
Concurrency: *concurrency,
Tables: *tables,
PadLength: *padLength,
},
NemesisGens: util.ParseNemesisGenerators(fixture.Context.Nemesis),
ClusterDefs: test_infra.NewDefaultCluster(c.Namespace, c.ClusterName, c.TiDBClusterConfig),
Expand Down
28 changes: 23 additions & 5 deletions testcase/write-stress/uniform.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package testcase
import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"sync"

Expand All @@ -35,8 +36,23 @@ func (c *uniformClient) SetUp(ctx context.Context, nodes []cluster.Node, clientN
if err := c.baseClient.SetUp(ctx, nodes, clientNodes, idx); err != nil {
return err
}
util.MustExec(c.db, "drop table if exists write_stress")
util.MustExec(c.db, "create table write_stress(id varchar(40) primary key clustered, col1 bigint, col2 varchar(256), data longtext, key k(col1, col2))")
// Use 32 threads to create tables.
var wg sync.WaitGroup
for i := 0; i < 32; i++ {
wg.Add(1)
go func(tid int) {
defer wg.Done()
for j := 0; j < c.tables; j++ {
if j%32 == tid {
sql := fmt.Sprintf("drop table if exists write_stress%d", j+1)
util.MustExec(c.db, sql)
sql = fmt.Sprintf("create table write_stress(id varchar(40) primary key clustered, col1 bigint, col2 varchar(256), data longtext, key k(col1, col2))", j+1)
util.MustExec(c.db, sql)
}
}
}(i)
}
wg.Wait()
return nil
}

Expand Down Expand Up @@ -68,15 +84,17 @@ func (c *uniformClient) runClient(ctx context.Context) error {
rng := rand.New(rand.NewSource(rand.Int63()))

col2 := make([]byte, 192)
data := make([]byte, 65536)
data := make([]byte, c.padLength)
for {
uuid := uuid.New().String()
col1 := rng.Int63()
col2Len := rng.Intn(192)
_, _ = rng.Read(col2[:col2Len])
dataLen := rng.Intn(65536)
dataLen := rng.Intn(c.padLength)
_, _ = rng.Read(data[:dataLen])
_, err := c.db.ExecContext(ctx, "insert into write_stress values (?, ?, ?, ?)", uuid, col1,
tid := rng.Int()%c.tables + 1
sql := fmt.Sprintf("insert into write_stress%d values (?, ?, ?)", tid)
_, err := c.db.ExecContext(ctx, sql, uuid, col1,
base64.StdEncoding.EncodeToString(col2[:col2Len]),
base64.StdEncoding.EncodeToString(data[:dataLen]))
if err != nil {
Expand Down

0 comments on commit e9527be

Please sign in to comment.