Skip to content

Commit

Permalink
fix concurrent read
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Jul 6, 2024
1 parent 0ccbff2 commit 4206c62
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
90 changes: 60 additions & 30 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"hash/crc32"
"io"
"os"
"sync"

"github.com/valyala/bytebufferpool"
)
Expand Down Expand Up @@ -52,7 +53,8 @@ type segment struct {
currentBlockSize uint32
closed bool
header []byte
cachedBlock *blockAndHeader
startupBlock *startupBlock
isStartupTraversal bool
}

// segmentReader is used to iterate all the data from the segment file.
Expand All @@ -64,10 +66,11 @@ type segmentReader struct {
chunkOffset int64
}

// block and chunk header, saved in pool.
type blockAndHeader struct {
// There is only one reader(single goroutine) for startup traversal,
// so we can use one block to finish the whole traversal
// to avoid memory allocation.
type startupBlock struct {
block []byte
header []byte
blockNumber int64
}

Expand All @@ -83,6 +86,20 @@ type ChunkPosition struct {
ChunkSize uint32
}

var blockPool = sync.Pool{
New: func() interface{} {
return make([]byte, blockSize)
},
}

func getBuffer() []byte {
return blockPool.Get().([]byte)
}

func putBuffer(buf []byte) {
blockPool.Put(buf)
}

// openSegmentFile a new segment file.
func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) {
fd, err := os.OpenFile(
Expand All @@ -101,20 +118,17 @@ func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) {
return nil, fmt.Errorf("seek to the end of segment file %d%s failed: %v", id, extName, err)
}

// init cached block
bh := &blockAndHeader{
block: make([]byte, blockSize),
header: make([]byte, chunkHeaderSize),
blockNumber: -1,
}

return &segment{
id: id,
fd: fd,
header: make([]byte, chunkHeaderSize),
currentBlockNumber: uint32(offset / blockSize),
currentBlockSize: uint32(offset % blockSize),
cachedBlock: bh,
startupBlock: &startupBlock{
block: make([]byte, blockSize),
blockNumber: -1,
},
isStartupTraversal: false,
}, nil
}

Expand Down Expand Up @@ -359,7 +373,7 @@ func (seg *segment) writeChunkBuffer(buf *bytebufferpool.ByteBuffer) error {
}

// the cached block can not be reused again after writes.
seg.cachedBlock.blockNumber = -1
seg.startupBlock.blockNumber = -1
return nil
}

Expand All @@ -376,11 +390,21 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,

var (
result []byte
bh = seg.cachedBlock
block []byte
segSize = seg.Size()
nextChunk = &ChunkPosition{SegmentId: seg.id}
)

if seg.isStartupTraversal {
block = seg.startupBlock.block
} else {
block = getBuffer()
if len(block) != blockSize {
block = make([]byte, blockSize)
}
defer putBuffer(block)
}

for {
size := int64(blockSize)
offset := int64(blockNumber) * blockSize
Expand All @@ -392,40 +416,46 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
return nil, nil, io.EOF
}

// There are two cases that we should read block from file:
// 1. the acquired block is not the cached one
// 2. new writes appended to the block, and the block
// is still smaller than 32KB, we must read it again because of the new writes.
if seg.cachedBlock.blockNumber != int64(blockNumber) || size != blockSize {
// read block from segment file at the specified offset.
_, err := seg.fd.ReadAt(bh.block[0:size], offset)
if err != nil {
if seg.isStartupTraversal {
// There are two cases that we should read block from file:
// 1. the acquired block is not the cached one
// 2. new writes appended to the block, and the block
// is still smaller than 32KB, we must read it again because of the new writes.
if seg.startupBlock.blockNumber != int64(blockNumber) || size != blockSize {
// read block from segment file at the specified offset.
_, err := seg.fd.ReadAt(block[0:size], offset)
if err != nil {
return nil, nil, err
}
// remember the block
seg.startupBlock.blockNumber = int64(blockNumber)
}
} else {
if _, err := seg.fd.ReadAt(block[0:size], offset); err != nil {
return nil, nil, err
}
// remember the block
bh.blockNumber = int64(blockNumber)
}

// header
copy(bh.header, bh.block[chunkOffset:chunkOffset+chunkHeaderSize])
header := block[chunkOffset : chunkOffset+chunkHeaderSize]

// length
length := binary.LittleEndian.Uint16(bh.header[4:6])
length := binary.LittleEndian.Uint16(header[4:6])

// copy data
start := chunkOffset + chunkHeaderSize
result = append(result, bh.block[start:start+int64(length)]...)
result = append(result, block[start:start+int64(length)]...)

// check sum
checksumEnd := chunkOffset + chunkHeaderSize + int64(length)
checksum := crc32.ChecksumIEEE(bh.block[chunkOffset+4 : checksumEnd])
savedSum := binary.LittleEndian.Uint32(bh.header[:4])
checksum := crc32.ChecksumIEEE(block[chunkOffset+4 : checksumEnd])
savedSum := binary.LittleEndian.Uint32(header[:4])
if savedSum != checksum {
return nil, nil, ErrInvalidCRC
}

// type
chunkType := bh.header[6]
chunkType := header[6]

if chunkType == ChunkTypeFull || chunkType == ChunkTypeLast {
nextChunk.BlockNumber = blockNumber
Expand Down
11 changes: 11 additions & 0 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ func (wal *WAL) IsEmpty() bool {
return len(wal.olderSegments) == 0 && wal.activeSegment.Size() == 0
}

// SetIsStartupTraversal This is only used if the WAL is during startup traversal.
// Such as rosedb/lotusdb startup, so it's not a common usage for most users.
// And notice that if you set it to true, only one reader can read the data from the WAL
// (Single Thread).
func (wal *WAL) SetIsStartupTraversal(v bool) {
for _, seg := range wal.olderSegments {
seg.isStartupTraversal = v
}
wal.activeSegment.isStartupTraversal = v
}

// NewReaderWithMax returns a new reader for the WAL,
// and the reader will only read the data from the segment file
// whose id is less than or equal to the given segId.
Expand Down

0 comments on commit 4206c62

Please sign in to comment.