Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vibhansa/truncate update #1517

Open
wants to merge 19 commits into
base: blobfuse/2.4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipeline-templates/verbose-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ steps:
distro_name: ${{ parameters.distro_name }}
quick_test: false
verbose_log: ${{ parameters.verbose_log }}
clone: false
clone: true

- ${{ if eq(parameters.test_sas_credential, true) }}:
- template: e2e-tests.yml
Expand Down
2 changes: 1 addition & 1 deletion component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (az *AzStorage) GetFileBlockOffsets(options internal.GetFileBlockOffsetsOpt

func (az *AzStorage) TruncateFile(options internal.TruncateFileOptions) error {
log.Trace("AzStorage::TruncateFile : %s to %d bytes", options.Name, options.Size)
err := az.storage.TruncateFile(options.Name, options.Size)
err := az.storage.TruncateFile(options.Name, options.Size, options.BlockSize)

if err == nil {
azStatsCollector.PushEvents(truncateFile, options.Name, map[string]interface{}{size: options.Size})
Expand Down
329 changes: 216 additions & 113 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,33 +1033,9 @@
return bufferSize, nil
}

func (bb *BlockBlob) removeBlocks(blockList *common.BlockOffsetList, size int64, name string) *common.BlockOffsetList {
_, index := blockList.BinarySearch(size)
// if the start index is equal to new size - block should be removed - move one index back
if blockList.BlockList[index].StartIndex == size {
index = index - 1
}
// if the file we're shrinking is in the middle of a block then shrink that block
if blockList.BlockList[index].EndIndex > size {
blk := blockList.BlockList[index]
blk.EndIndex = size
blk.Data = make([]byte, blk.EndIndex-blk.StartIndex)
blk.Flags.Set(common.DirtyBlock)

err := bb.ReadInBuffer(name, blk.StartIndex, blk.EndIndex-blk.StartIndex, blk.Data)
if err != nil {
log.Err("BlockBlob::removeBlocks : Failed to remove blocks %s [%s]", name, err.Error())
}
func (bb *BlockBlob) TruncateFile(name string, size int64, blockSize int64) error {
// TODO : Truncate is sequential wirte now, we can upload multiple blocks in parallel

Check failure on line 1037 in component/azstorage/block_blob.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

wirte ==> write

}
blk := blockList.BlockList[index]
blk.Flags.Set(common.RemovedBlocks)
blockList.BlockList = blockList.BlockList[:index+1]

return blockList
}

func (bb *BlockBlob) TruncateFile(name string, size int64) error {
// log.Trace("BlockBlob::TruncateFile : name=%s, size=%d", name, size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed the commented line

attr, err := bb.GetAttr(name)
if err != nil {
Expand All @@ -1068,119 +1044,246 @@
return err
}
}
if size == 0 || attr.Size == 0 {
// If we are resizing to a value > 1GB then we need to upload multiple blocks to resize
if size > 1*common.GbToBytes {
blkSize := int64(16 * common.MbToBytes)
blobName := filepath.Join(bb.Config.prefixPath, name)
blobClient := bb.Container.NewBlockBlobClient(blobName)

blkList := make([]string, 0)
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))

for i := 0; size > 0; i++ {
if i == 0 || size < blkSize {
// Only first and last block we upload and rest all we replicate with the first block itself
if size < blkSize {
blkSize = size
id = base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
}
data := make([]byte, blkSize)

_, err = blobClient.StageBlock(context.Background(),
id,
streaming.NopCloser(bytes.NewReader(data)),
&blockblob.StageBlockOptions{
CPKInfo: bb.blobCPKOpt,
})
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to stage block for %s [%s]", name, err.Error())
return err
}
}
blkList = append(blkList, id)
size -= blkSize
}

err = bb.CommitBlocks(blobName, blkList)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to commit blocks for %s [%s]", name, err.Error())
return err
}
} else {
err := bb.WriteFromBuffer(name, nil, make([]byte, size))
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to set the %s to 0 bytes [%s]", name, err.Error())
}
// Check if file really needs truncation or not
if size == attr.Size {
log.Debug("BlockBlob::TruncateFile : File %s is already of size %d", name, size)
return nil
}

// File needs to be wiped out
if size == 0 {
err := bb.WriteFromBuffer(name, nil, make([]byte, size))
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to set the %s to 0 bytes [%s]", name, err.Error())
}
return err
return nil
}

// If blockSize is not provided use the azstorage configured block-size
// In case of block-cache component this shall be filled based on block-size configured there
if blockSize == 0 {
blockSize = bb.Config.blockSize
}

//If new size is less than 256MB
if size < blockblob.MaxUploadBlobBytes {
data, err := bb.HandleSmallFile(name, size, attr.Size)
if blockSize == 0 {
// Nothing is configured then assume 16MB as default size
blockSize = 16 * 1024 * 1024
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this standard size? if not, we have to document it somewhere.

}

if attr.Size == 0 {
// File does not have any data yet so we just need to fill this with 0's
err = bb.createSparseFile(name, size, blockSize, make([]string, 0), 16)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
return err
log.Err("BlockBlob::TruncateFile : Failed to expand file %s [%s]", name, err.Error())
}
err = bb.WriteFromBuffer(name, nil, data)
return err
}

// Get the committed block list from the source file
committedBlockList, err := bb.GetCommittedBlockList(name)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to get block list of file %s [%s]", name, err.Error())
return err
}

if committedBlockList == nil {
// This file is not split into blocks yet
// Read full file and then proceed based on new size
data, err := bb.readFileData(name, size, attr.Size)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to write from buffer file %s", name, err.Error())
log.Err("BlockBlob::TruncateFile : Failed to read file %s [%s]", name, err.Error())
return err
}
} else {
bol, err := bb.GetFileBlockOffsets(name)

err = bb.convertFileToBlocks(name, size, blockSize, data)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to get block list of file %s [%s]", name, err.Error())
return err
log.Err("BlockBlob::TruncateFile : Failed to expand file %s [%s]", name, err.Error())
}
if bol.SmallFile() {
data, err := bb.HandleSmallFile(name, size, attr.Size)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
return err
}
err = bb.WriteFromBuffer(name, nil, data)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to write from buffer file %s", name, err.Error())
return err
}
} else {
if size < attr.Size {
bol = bb.removeBlocks(bol, size, name)
} else if size > attr.Size {
_, err = bb.createNewBlocks(bol, bol.BlockList[len(bol.BlockList)-1].EndIndex, size-attr.Size)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to create new blocks for file %s", name, err.Error())
return err
}
return err
}

// This file already has the blocks created we just need to truncate the blocks
return bb.updateBlocks(name, size, blockSize, committedBlockList)
}

func (bb *BlockBlob) readFileData(name string, truncateSize int64, fileSize int64) ([]byte, error) {
// If truncate size is smaller then file size then read only data upto truncate size
// If truncate size is bigger then file size then read entire file data
size := int64(math.Min((float64)(truncateSize), (float64)(fileSize)))

var data = make([]byte, size)

err := bb.ReadInBuffer(name, 0, size, data)
if err != nil {
log.Err("BlockBlob::readFileData : Failed to read small file %s", name, err.Error())
}

return data, err
}

func (bb *BlockBlob) createSparseFile(name string, size int64, blockSize int64, blkList []string, idLen int64) error {
syeleti-msft marked this conversation as resolved.
Show resolved Hide resolved
blobName := filepath.Join(bb.Config.prefixPath, name)
blobClient := bb.Container.NewBlockBlobClient(blobName)

id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(idLen))
for i := 0; size > 0; i++ {
if i == 0 || size < blockSize {
// Only first and last block we upload and rest all we replicate with the first block itself
if size < blockSize {
blockSize = size
id = base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(idLen))
}
err = bb.StageAndCommit(name, bol)
data := make([]byte, blockSize)

_, err := blobClient.StageBlock(context.Background(),
id,
streaming.NopCloser(bytes.NewReader(data)),
&blockblob.StageBlockOptions{
CPKInfo: bb.blobCPKOpt,
})

if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to stage and commit file %s", name, err.Error())
log.Err("BlockBlob::expandFile : Failed to stage block for %s [%s]", name, err.Error())
return err
}
}

blkList = append(blkList, id)
size -= blockSize
}

err := bb.CommitBlocks(blobName, blkList)
if err != nil {
log.Err("BlockBlob::expandFile : Failed to commit blocks for %s [%s]", name, err.Error())
return err
}

return nil
}
func (bb *BlockBlob) updateBlocks(name string, newSize int64, blockSize int64, blockList *internal.CommittedBlockList) error {
// Iterate through blockList to figure out how many blocks we can keep
removeBlocks := false
blkList := make([]string, 0)

func (bb *BlockBlob) HandleSmallFile(name string, size int64, originalSize int64) ([]byte, error) {
var data = make([]byte, size)
var err error
if size > originalSize {
err = bb.ReadInBuffer(name, 0, 0, data)
blobName := filepath.Join(bb.Config.prefixPath, name)
blobClient := bb.Container.NewBlockBlobClient(blobName)

var block internal.CommittedBlock
lastIdx := 0

// Extract the block-id length and size of block so that we create new blocks accordingly
idLen := int64(16)
orgId, err := base64.StdEncoding.DecodeString((*blockList)[0].Id)
if err == nil {
idLen = int64(len(orgId))
}
oldBlockSize := int64((*blockList)[0].Size)

if oldBlockSize != 0 {
blockSize = oldBlockSize
}

for lastIdx, block = range *blockList {
if newSize < int64(block.Size) {
removeBlocks = true
break
}
newSize -= int64(block.Size)
blkList = append(blkList, block.Id)
}

if newSize == 0 {
// New Size exactly aligns to the block boundary
// Commit the current list and no need to upload any data here
err := bb.CommitBlocks(blobName, blkList)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
log.Err("BlockBlob::updateBlocks : Failed to commit blocks for %s [%s]", name, err.Error())
}
} else {
err = bb.ReadInBuffer(name, 0, size, data)
return err
}

if removeBlocks {
// Last block in file needs truncation
data := make([]byte, newSize)

id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(idLen))
err := bb.ReadInBuffer(name, (*blockList)[lastIdx].Offset, newSize, data)
if err != nil {
log.Err("BlockBlob::updateBlocks : Failed to read buffer for %s [%s]", name, err.Error())
return err
}

_, err = blobClient.StageBlock(context.Background(),
id,
streaming.NopCloser(bytes.NewReader(data)),
&blockblob.StageBlockOptions{
CPKInfo: bb.blobCPKOpt,
})

if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to read small file %s", name, err.Error())
log.Err("BlockBlob::updateBlocks : Failed to stage block for %s [%s]", name, err.Error())
return err
}

blkList = append(blkList, id)
newSize = 0
}
return data, err

return bb.createSparseFile(name, newSize, blockSize, blkList, idLen)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If file size is not aligning with the block size and truncate is used to expand the size, this will lead to irregular blocksizes in the block list.
Eg: lets say blocksize = 4M
file size = 5M (i.e., 2blocks) block1 = 4M, block2 = 1M

Call for truncate(10M)

I suppose the logic in this PR will make the block list look like this after truncate operation:
block1 = 4M, block2 = 1M, block3 = 4M, block4 = 1M.
Although this looks correct but the subsequent operations in block cache cannot utilize this sort of blocklist.
Please correct me if I am wrong

}

func (bb *BlockBlob) convertFileToBlocks(name string, size int64, blockSize int64, data []byte) error {
blobName := filepath.Join(bb.Config.prefixPath, name)
blobClient := bb.Container.NewBlockBlobClient(blobName)

oldDataLen := int64(len(data))
blkList := make([]string, 0)

offset := int64(0)
createBlock := true
id := ""

for i := 0; size > 0; i++ {
if size < blockSize {
blockSize = size
createBlock = true
}

if createBlock {
newData := make([]byte, blockSize)
id = base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))

if offset < oldDataLen {
copy(newData, data[offset:])
} else {
createBlock = false
}

_, err := blobClient.StageBlock(context.Background(),
id,
streaming.NopCloser(bytes.NewReader(newData)),
&blockblob.StageBlockOptions{
CPKInfo: bb.blobCPKOpt,
})
if err != nil {
log.Err("BlockBlob::convertFileToBlocks : Failed to stage block for %s [%s]", name, err.Error())
return err
}
}

size -= blockSize
offset += blockSize
blkList = append(blkList, id)
}

err := bb.CommitBlocks(blobName, blkList)
if err != nil {
log.Err("BlockBlob::expandFile : Failed to commit blocks for %s [%s]", name, err.Error())
return err
}

return nil
}

// Write : write data at given offset to a blob
Expand Down
Loading
Loading