diff --git a/service/stock_service.go b/service/stock_service.go index 0ad37f91..5becb18d 100644 --- a/service/stock_service.go +++ b/service/stock_service.go @@ -56,6 +56,18 @@ func NewStockService() *StockService { } } +type Task struct { + fullName string + sql string + rule *global.Rule +} + +type Result struct{ + fullName string + count int64 +} + + func (s *StockService) Run() error { canalCfg := canal.NewDefaultConfig() canalCfg.Addr = global.Cfg().Addr @@ -87,69 +99,119 @@ func (s *StockService) Run() error { s.endpoint = endpoint startTime := dates.NowMillisecond() - log.Println(fmt.Sprintf("bulk size: %d", global.Cfg().BulkSize)) - for _, rule := range global.RuleInsList() { - if rule.OrderByColumn == "" { - return errors.New("empty order_by_column not allowed") - } + log.Printf("bulk size: %d", global.Cfg().BulkSize) + + taskChan := make(chan *Task,0) + resultChan := make(chan *Result,0) + dones := make(chan bool, global.Cfg().Maxprocs) + //开启worker池 + for i := 0; i < global.Cfg().Maxprocs; i++ { + go func(wid int,taskChan chan *Task,resultChan chan *Result,dones chan bool){ + defer func(){ + dones <- true + }() + for task := range taskChan{ + requests, err := s.export(task) + if err != nil { + logs.Error(err.Error()) + s.shutoff.Store(true) + return + } + count := s.imports(task.fullName, requests) + resultChan <- &Result{ + fullName:task.fullName, + count: count, + } + } - exportColumns := s.exportColumns(rule) - fullName := fmt.Sprintf("%s.%s", rule.Schema, rule.Table) - log.Println(fmt.Sprintf("开始导出 %s", fullName)) - res, err := s.canal.Execute(fmt.Sprintf("select count(1) from %s", fullName)) - if err != nil { - return err - } - totalRow, err := res.GetInt(0, 0) - s.totalRows[fullName] = totalRow - log.Println(fmt.Sprintf("%s 共 %d 条数据", fullName, totalRow)) + }(i,taskChan,resultChan,dones) + } - s.counter[fullName] = 0 + go func() error{ + defer func() { + close(taskChan) + }() + for _, rule := range global.RuleInsList() { + if rule.OrderByColumn == "" { + return errors.New("empty order_by_column not allowed") + } + exportColumns := s.exportColumns(rule) + fullName := fmt.Sprintf("%s.%s", rule.Schema, rule.Table) + log.Println(fmt.Sprintf("开始导出 %s", fullName)) + i := rule.TableInfo.PKColumns[0] + pk := rule.TableInfo.GetPKColumn(i).Name + //pk为表的自增id列 整数型 + minSql := fmt.Sprintf("select min(%s) as min_id from %s", pk, fullName) + log.Println(fmt.Sprintf("minSql %s", minSql)) + maxSql := fmt.Sprintf("select max(%s) as max_id from %s", pk, fullName) + log.Println(fmt.Sprintf("maxSql %s", maxSql)) + minRes, err := s.canal.Execute(minSql) + if err != nil { + return err + } + minId, err := minRes.GetIntByName(0, "min_id") + if err != nil { + return err + } - var batch int64 - size := global.Cfg().BulkSize - if batch%size == 0 { - batch = totalRow / size - } else { - batch = (totalRow / size) + 1 + maxRes, err := s.canal.Execute(maxSql) + if err != nil { + return err + } + maxId, err := maxRes.GetIntByName(0, "max_id") + if err != nil { + return err + } + log.Println(fmt.Sprintf("minId:%d,maxId:%d", minId, maxId)) + start := minId - 1 + end := maxId + //res, err := s.canal.Execute(fmt.Sprintf("select count(1) from %s", fullName)) + //if err != nil { + // return err + //} + //log.Println(fmt.Sprintf("res:%v", res)) + //totalRow, err := res.GetInt(0, 0) + //s.totalRows[fullName] = totalRow + //log.Println(fmt.Sprintf("%s 共 %d 条数据", fullName, totalRow)) + for start < end { + _end := start + global.Cfg().BulkSize + if _end >= end { + _end = end + } + sql := s.buildSql(fullName, exportColumns, start, _end, rule) + task := &Task{ + fullName: fullName, + sql: sql, + rule: rule, + } + taskChan <- task + start = _end + } } + return nil + }() - var processed atomic.Int64 + go func(){ for i := 0; i < global.Cfg().Maxprocs; i++ { - s.wg.Add(1) - go func(_fullName, _columns string, _rule *global.Rule) { - for { - processed.Inc() - requests, err := s.export(_fullName, _columns, processed.Load(), _rule) - if err != nil { - logs.Error(err.Error()) - s.shutoff.Store(true) - break - } - - s.imports(_fullName, requests) - if processed.Load() > batch { - break - } - } - s.wg.Done() - }(fullName, exportColumns, rule) + <- dones + } + close(resultChan) + }() + for r := range resultChan{ + if _,ok := s.counter[r.fullName];ok{ + s.counter[r.fullName] += r.count + }else{ + s.counter[r.fullName] = r.count } + } - s.wg.Wait() - fmt.Println(fmt.Sprintf("共耗时 :%d(毫秒)", dates.NowMillisecond()-startTime)) + log.Printf("共耗时 :%d(毫秒)", dates.NowMillisecond()-startTime) - for k, v := range s.totalRows { - vv, ok := s.counter[k] - if ok { - fmt.Println(fmt.Sprintf("表: %s,共:%d 条数据,成功导入:%d 条", k, v, vv)) - if v > vv { - fmt.Println("存在导入错误的数据,具体请至日志查看") - } - } + for k, v := range s.counter { + log.Printf("表:%s 共导入 %d条",k,v) } s.endpoint.Close() // 关闭客户端 @@ -157,14 +219,12 @@ func (s *StockService) Run() error { return nil } -func (s *StockService) export(fullName, columns string, batch int64, rule *global.Rule) ([]*model.RowRequest, error) { +func (s *StockService) export(task *Task) ([]*model.RowRequest, error) { if s.shutoff.Load() { return nil, errors.New("shutoff") } - - offset := s.offset(batch) - sql := s.buildSql(fullName, columns, offset, rule) - logs.Infof("export sql : %s", sql) + sql := task.sql + rule := task.rule resultSet, err := s.canal.Execute(sql) if err != nil { logs.Errorf("数据导出错误: %s - %s", sql, err.Error()) @@ -193,27 +253,21 @@ func (s *StockService) export(fullName, columns string, batch int64, rule *globa } // 构造SQL -func (s *StockService) buildSql(fullName, columns string, offset int64, rule *global.Rule) string { - size := global.Cfg().BulkSize - if len(rule.TableInfo.PKColumns) == 0 { - return fmt.Sprintf("select %s from %s order by %s limit %d,%d", columns, fullName, rule.OrderByColumn, offset, size) - } - +func (s *StockService) buildSql(fullName, columns string, start int64,end int64, rule *global.Rule) string { i := rule.TableInfo.PKColumns[0] - n := rule.TableInfo.GetPKColumn(i).Name - t := "select b.* from (select %s from %s order by %s limit %d,%d) a left join %s b on a.%s=b.%s" - sql := fmt.Sprintf(t, n, fullName, rule.OrderByColumn, offset, size, fullName, n, n) + pk := rule.TableInfo.GetPKColumn(i).Name + t := "select * from %s where %s>%d and %s <= %d" + sql := fmt.Sprintf(t, fullName, pk, start, pk, end) return sql } -func (s *StockService) imports(fullName string, requests []*model.RowRequest) { +func (s *StockService) imports(fullName string, requests []*model.RowRequest)int64 { if s.shutoff.Load() { - return + return 0 } - - succeeds := s.endpoint.Stock(requests) - count := s.incCounter(fullName, succeeds) + count := s.endpoint.Stock(requests) log.Println(fmt.Sprintf("%s 导入数据 %d 条", fullName, count)) + return count } func (s *StockService) exportColumns(rule *global.Rule) string {