Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

全量同步数据改成协程池 切分主键进行同步 #128

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 124 additions & 70 deletions service/stock_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ func NewStockService() *StockService {
}
}

type Task struct {
fullName string
sql string
rule *global.Rule
}

type Result struct{
fullName string
count int64
}


func (s *StockService) Run() error {
canalCfg := canal.NewDefaultConfig()
canalCfg.Addr = global.Cfg().Addr
Expand Down Expand Up @@ -87,84 +99,132 @@ func (s *StockService) Run() error {
s.endpoint = endpoint

startTime := dates.NowMillisecond()
log.Println(fmt.Sprintf("bulk size: %d", global.Cfg().BulkSize))
for _, rule := range global.RuleInsList() {
if rule.OrderByColumn == "" {
return errors.New("empty order_by_column not allowed")
}
log.Printf("bulk size: %d", global.Cfg().BulkSize)

taskChan := make(chan *Task,0)
resultChan := make(chan *Result,0)
dones := make(chan bool, global.Cfg().Maxprocs)
//开启worker池
for i := 0; i < global.Cfg().Maxprocs; i++ {
go func(wid int,taskChan chan *Task,resultChan chan *Result,dones chan bool){
defer func(){
dones <- true
}()
for task := range taskChan{
requests, err := s.export(task)
if err != nil {
logs.Error(err.Error())
s.shutoff.Store(true)
return
}
count := s.imports(task.fullName, requests)
resultChan <- &Result{
fullName:task.fullName,
count: count,
}
}

exportColumns := s.exportColumns(rule)
fullName := fmt.Sprintf("%s.%s", rule.Schema, rule.Table)
log.Println(fmt.Sprintf("开始导出 %s", fullName))

res, err := s.canal.Execute(fmt.Sprintf("select count(1) from %s", fullName))
if err != nil {
return err
}
totalRow, err := res.GetInt(0, 0)
s.totalRows[fullName] = totalRow
log.Println(fmt.Sprintf("%s 共 %d 条数据", fullName, totalRow))
}(i,taskChan,resultChan,dones)
}

s.counter[fullName] = 0
go func() error{
defer func() {
close(taskChan)
}()
for _, rule := range global.RuleInsList() {
if rule.OrderByColumn == "" {
return errors.New("empty order_by_column not allowed")
}
exportColumns := s.exportColumns(rule)
fullName := fmt.Sprintf("%s.%s", rule.Schema, rule.Table)
log.Println(fmt.Sprintf("开始导出 %s", fullName))
i := rule.TableInfo.PKColumns[0]
pk := rule.TableInfo.GetPKColumn(i).Name
//pk为表的自增id列 整数型
minSql := fmt.Sprintf("select min(%s) as min_id from %s", pk, fullName)
log.Println(fmt.Sprintf("minSql %s", minSql))
maxSql := fmt.Sprintf("select max(%s) as max_id from %s", pk, fullName)
log.Println(fmt.Sprintf("maxSql %s", maxSql))
minRes, err := s.canal.Execute(minSql)
if err != nil {
return err
}
minId, err := minRes.GetIntByName(0, "min_id")
if err != nil {
return err
}

var batch int64
size := global.Cfg().BulkSize
if batch%size == 0 {
batch = totalRow / size
} else {
batch = (totalRow / size) + 1
maxRes, err := s.canal.Execute(maxSql)
if err != nil {
return err
}
maxId, err := maxRes.GetIntByName(0, "max_id")
if err != nil {
return err
}
log.Println(fmt.Sprintf("minId:%d,maxId:%d", minId, maxId))
start := minId - 1
end := maxId
//res, err := s.canal.Execute(fmt.Sprintf("select count(1) from %s", fullName))
//if err != nil {
// return err
//}
//log.Println(fmt.Sprintf("res:%v", res))
//totalRow, err := res.GetInt(0, 0)
//s.totalRows[fullName] = totalRow
//log.Println(fmt.Sprintf("%s 共 %d 条数据", fullName, totalRow))
for start < end {
_end := start + global.Cfg().BulkSize
if _end >= end {
_end = end
}
sql := s.buildSql(fullName, exportColumns, start, _end, rule)
task := &Task{
fullName: fullName,
sql: sql,
rule: rule,
}
taskChan <- task
start = _end
}
}
return nil
}()

var processed atomic.Int64
go func(){
for i := 0; i < global.Cfg().Maxprocs; i++ {
s.wg.Add(1)
go func(_fullName, _columns string, _rule *global.Rule) {
for {
processed.Inc()
requests, err := s.export(_fullName, _columns, processed.Load(), _rule)
if err != nil {
logs.Error(err.Error())
s.shutoff.Store(true)
break
}

s.imports(_fullName, requests)
if processed.Load() > batch {
break
}
}
s.wg.Done()
}(fullName, exportColumns, rule)
<- dones
}
close(resultChan)
}()
for r := range resultChan{
if _,ok := s.counter[r.fullName];ok{
s.counter[r.fullName] += r.count
}else{
s.counter[r.fullName] = r.count
}

}

s.wg.Wait()

fmt.Println(fmt.Sprintf("共耗时 :%d(毫秒)", dates.NowMillisecond()-startTime))
log.Printf("共耗时 :%d(毫秒)", dates.NowMillisecond()-startTime)

for k, v := range s.totalRows {
vv, ok := s.counter[k]
if ok {
fmt.Println(fmt.Sprintf("表: %s,共:%d 条数据,成功导入:%d 条", k, v, vv))
if v > vv {
fmt.Println("存在导入错误的数据,具体请至日志查看")
}
}
for k, v := range s.counter {
log.Printf("表:%s 共导入 %d条",k,v)
}

s.endpoint.Close() // 关闭客户端

return nil
}

func (s *StockService) export(fullName, columns string, batch int64, rule *global.Rule) ([]*model.RowRequest, error) {
func (s *StockService) export(task *Task) ([]*model.RowRequest, error) {
if s.shutoff.Load() {
return nil, errors.New("shutoff")
}

offset := s.offset(batch)
sql := s.buildSql(fullName, columns, offset, rule)
logs.Infof("export sql : %s", sql)
sql := task.sql
rule := task.rule
resultSet, err := s.canal.Execute(sql)
if err != nil {
logs.Errorf("数据导出错误: %s - %s", sql, err.Error())
Expand Down Expand Up @@ -193,27 +253,21 @@ func (s *StockService) export(fullName, columns string, batch int64, rule *globa
}

// 构造SQL
func (s *StockService) buildSql(fullName, columns string, offset int64, rule *global.Rule) string {
size := global.Cfg().BulkSize
if len(rule.TableInfo.PKColumns) == 0 {
return fmt.Sprintf("select %s from %s order by %s limit %d,%d", columns, fullName, rule.OrderByColumn, offset, size)
}

func (s *StockService) buildSql(fullName, columns string, start int64,end int64, rule *global.Rule) string {
i := rule.TableInfo.PKColumns[0]
n := rule.TableInfo.GetPKColumn(i).Name
t := "select b.* from (select %s from %s order by %s limit %d,%d) a left join %s b on a.%s=b.%s"
sql := fmt.Sprintf(t, n, fullName, rule.OrderByColumn, offset, size, fullName, n, n)
pk := rule.TableInfo.GetPKColumn(i).Name
t := "select * from %s where %s>%d and %s <= %d"
sql := fmt.Sprintf(t, fullName, pk, start, pk, end)
return sql
}

func (s *StockService) imports(fullName string, requests []*model.RowRequest) {
func (s *StockService) imports(fullName string, requests []*model.RowRequest)int64 {
if s.shutoff.Load() {
return
return 0
}

succeeds := s.endpoint.Stock(requests)
count := s.incCounter(fullName, succeeds)
count := s.endpoint.Stock(requests)
log.Println(fmt.Sprintf("%s 导入数据 %d 条", fullName, count))
return count
}

func (s *StockService) exportColumns(rule *global.Rule) string {
Expand Down