Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix datarace issue for PendingDocuments function #264

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

mazing80
Copy link

Sometimes occurred data race issue on PendingDocuments().

@vrecan
Copy link
Collaborator

vrecan commented Mar 17, 2016

I'm assuming you found this with the race detector? Do you still have the output from the tool?

@mazing80
Copy link
Author

Yes, like below.

==================
WARNING: DATA RACE
Write by goroutine 30:
  my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).startDocChannel.func1()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:250 +0x12a

Previous read by goroutine 14:
  my.corp.com/me/log-server.git/storage/es.PendingChecker.func1()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/storage/es/client.go:81 +0x2a4

Goroutine 30 (running) created at:
  my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).startDocChannel()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:259 +0x42
  my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib.(*BulkIndexer).Start.func1()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/vendor/github.com/mattbaird/elastigo/lib/corebulk.go:142 +0x104

Goroutine 14 (running) created at:
  my.corp.com/me/log-server.git/storage/es.PendingChecker()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/storage/es/client.go:84 +0xf1
  main.main()
      /Users/me/Golang/src/my.corp.com/me/log-server.git/main.go:85 +0xc41
==================

@mazing80
Copy link
Author

And this is my simple client code. If I wrong, please tell me.

import (
    "sync"
    "time"
        "log"

    elastigo "github.com/mattbaird/elastigo/lib"
)

const (
    DefaultHost = "localhost:0092"
    // max connections for ES
    esMaxConns = 15
    // if 0 it will not retry
    retryForSeconds = 0
    // ES Bulk buffer - 5MB
    //bulkMaxBuffer = 5242880
    bulkMaxBuffer        = 5242880
    bulkMaxDocs          = 3000
    pendingCheckDuration = time.Second * 1
)

var esClient *elastigo.Conn
var ESBulkIndexer *elastigo.BulkIndexer

type check struct {
    done   chan struct{}
    enable bool
    sync.Mutex
}

var pendingCheck check

func init() {
    pendingCheck.enable = false
}

func Setup() {
    esClient = elastigo.NewConn()
    esClient.Hosts = append(esClient.Hosts, DefaultHost)
    ESBulkIndexer = esClient.NewBulkIndexerErrors(esMaxConns, retryForSeconds)
    ESBulkIndexer.BulkMaxBuffer = bulkMaxBuffer
    ESBulkIndexer.BulkMaxDocs = bulkMaxDocs
}

func SetHost(host []string) {
    esClient.Hosts = host
}

func Start() {
    go func() {
        for errBuf := range ESBulkIndexer.ErrorChannel {
            log.Warnf("ES Error : %v", errBuf.Err)
        }
        log.Info("ES Error Channel closed.")
    }()
    ESBulkIndexer.Start()
    log.Info("ES BulkIndexer start.")
}

func PendingChecker() {
    ticker := time.NewTicker(pendingCheckDuration)
    pendingCheck.Lock()
    pendingCheck.enable = true
    pendingCheck.done = make(chan struct{}, 1)
    pendingCheck.Unlock()
    go func() {
    L:
        for range ticker.C {
            select {
            case <-pendingCheck.done:
                pendingCheck.Lock()
                close(pendingCheck.done)
                pendingCheck.enable = false
                pendingCheck.Unlock()
                ticker.Stop()
                log.Info("ES Pending checker stop.")
                break L
            default:
                log.Infof("ES Pending documents : %d.", ESBulkIndexer.PendingDocuments())
            }
        }
    }()
}

func Stop() {
    ESBulkIndexer.Flush()
    ESBulkIndexer.Stop()
    close(ESBulkIndexer.ErrorChannel)
    pendingCheck.Lock()
    if pendingCheck.enable {
        pendingCheck.done <- struct{}{}
    }
    pendingCheck.Unlock()
    time.Sleep(time.Millisecond * 100)
    log.Info("ES BulkIndexer stop.")
}

func main() {
    Setup()
    SetHost("localhost:9200")
    Start()
    PendingChecker()
    // running http server 
    Stop()
}

@vrecan
Copy link
Collaborator

vrecan commented Mar 19, 2016

I can look into this more tonight but the elastigo connection can't be shared acrossed goroutines right now

@mazing80
Copy link
Author

Thank you. I will test again. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants