Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for FileBackup #916

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ subprojects {
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.10.2")
testImplementation("ch.qos.logback:logback-classic:1.3.14")
testImplementation("org.hamcrest:hamcrest-all:1.3")
testImplementation("org.assertj:assertj-core:3.26.3")
testImplementation("org.mockito:mockito-core:4.11.0")
testImplementation("com.google.guava:guava:33.1.0-jre")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
Expand Down Expand Up @@ -54,7 +55,7 @@ public FileBackup(File backupDir, Buffer userBuffer, String prefix)
this.backupDir = backupDir;
this.userBuffer = userBuffer;
this.prefix = prefix;
this.pattern = Pattern.compile(userBuffer.bufferFormatType() + prefix() + PARAM_DELIM_IN_FILENAME + "([\\w\\.\\-" + PARAM_DELIM_IN_FILENAME + "]+)" + EXT_FILENAME);
this.pattern = Pattern.compile(String.format("^%s%s%s([\\w.\\-%s]+)%s$", userBuffer.bufferFormatType(), prefix(), PARAM_DELIM_IN_FILENAME, PARAM_DELIM_IN_FILENAME, EXT_FILENAME));
LOG.debug(this.toString());
}

Expand Down Expand Up @@ -83,9 +84,9 @@ public List<SavedBuffer> getSavedFiles()
}

LOG.debug("Checking backup files. files.length={}", files.length);
ArrayList<SavedBuffer> savedBuffers = new ArrayList<>();
List<SavedBuffer> savedBuffers = new ArrayList<>();
for (File f : files) {
Matcher matcher = pattern.matcher(f.getName());
Matcher matcher = pattern.matcher(f.toPath().getFileName().toString());
if (matcher.find()) {
if (matcher.groupCount() != 1) {
LOG.warn("Invalid backup filename: file={}", f.getName());
Expand Down Expand Up @@ -133,15 +134,15 @@ public void saveBuffer(List<String> params, ByteBuffer buffer)
channel.write(buffer);
}
catch (Exception e) {
LOG.error("Failed to save buffer to file: params=" + copiedParams + ", path=" + file.getAbsolutePath() + ", buffer=" + buffer, e);
LOG.error("Failed to save buffer to file: params={}, path={}, buffer={}", copiedParams, file.getAbsolutePath(), buffer, e);
}
finally {
if (channel != null) {
try {
channel.close();
}
catch (IOException e) {
LOG.warn("Failed to close Channel: channel=" + channel);
LOG.warn("Failed to close Channel: channel={}", channel);
}
}
}
Expand All @@ -168,22 +169,22 @@ public void open(Callback callback)
success();
}
catch (Exception e) {
LOG.error("Failed to process file. Skipping the file: file=" + savedFile, e);
LOG.error("Failed to process file. Skipping the file: file={}", savedFile, e);
}
finally {
try {
close();
}
catch (IOException e) {
LOG.warn("Failed to close file: file=" + savedFile, e);
LOG.warn("Failed to close file: file={}", savedFile, e);
}
}
}

public void remove()
{
if (!savedFile.delete()) {
LOG.warn("Failed to delete file: file=" + savedFile);
LOG.warn("Failed to delete file: file={}", savedFile);
}
}

Expand All @@ -193,7 +194,7 @@ private void success()
close();
}
catch (IOException e) {
LOG.warn("Failed to close file: file=" + savedFile, e);
LOG.warn("Failed to close file: file={}", savedFile, e);
}
finally {
remove();
Expand All @@ -210,6 +211,18 @@ public void close()
}
}

@Override
public String toString() {
return "SavedBuffer{" +
"params=" + params +
", savedFile=" + savedFile +
'}';
}

public Path getPath() {
return savedFile.toPath();
}

public interface Callback
{
void process(List<String> params, FileChannel channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -83,9 +82,9 @@ void testGetAllocatedBufferSize()
Buffer buffer = new Buffer(bufferConfig, new JsonRecordFormatter());
Flusher flusher = new Flusher(flusherConfig, buffer, ingester);
try (Fluency fluency = new Fluency(buffer, flusher)) {
assertThat(fluency.getAllocatedBufferSize(), is(0L));
assertThat(fluency.getAllocatedBufferSize()).isEqualTo(0L);
fluency.emit("foodb.bartbl", ImmutableMap.of("comment", "hello, world"));
assertThat(fluency.getAllocatedBufferSize(), is(1024L));
assertThat(fluency.getAllocatedBufferSize()).isEqualTo(1024L);
}
}

Expand Down Expand Up @@ -122,7 +121,7 @@ void testWaitUntilFlusherTerminated(int waitUntilFlusherTerm, boolean expected)

fluency.emit("foo.bar", new HashMap<>());
fluency.close();
assertThat(fluency.waitUntilFlusherTerminated(waitUntilFlusherTerm), is(expected));
assertThat(fluency.waitUntilFlusherTerminated(waitUntilFlusherTerm)).isEqualTo(expected);
}

@ParameterizedTest
Expand All @@ -136,7 +135,7 @@ void testWaitUntilFlushingAllBuffer(int waitUntilFlusherTerm, boolean expected)
Flusher flusher = new Flusher(flusherConfig, buffer, ingester);
try (Fluency fluency = new Fluency(buffer, flusher)) {
fluency.emit("foo.bar", new HashMap<>());
assertThat(fluency.waitUntilAllBufferFlushed(waitUntilFlusherTerm), is(expected));
assertThat(fluency.waitUntilAllBufferFlushed(waitUntilFlusherTerm)).isEqualTo(expected);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -337,13 +334,13 @@ void testGetAllocatedSize()
{
bufferConfig.setChunkInitialSize(256 * 1024);
try (Buffer buffer = new Buffer(bufferConfig, recordFormatter)) {
assertThat(buffer.getAllocatedSize(), is(0L));
assertThat(buffer.getAllocatedSize()).isEqualTo(0L);
Map<String, Object> map = new HashMap<>();
map.put("name", "komamitsu");
for (int i = 0; i < 10; i++) {
buffer.append("foo.bar", new Date().getTime(), map);
}
assertThat(buffer.getAllocatedSize(), is(256 * 1024L));
assertThat(buffer.getAllocatedSize()).isEqualTo(256 * 1024L);
}
}

Expand All @@ -353,18 +350,18 @@ void testGetBufferedDataSize()
{
bufferConfig.setChunkInitialSize(256 * 1024);
try (Buffer buffer = new Buffer(bufferConfig, recordFormatter)) {
assertThat(buffer.getBufferedDataSize(), is(0L));
assertThat(buffer.getBufferedDataSize()).isEqualTo(0L);

Map<String, Object> map = new HashMap<>();
map.put("name", "komamitsu");
for (int i = 0; i < 10; i++) {
buffer.append("foo.bar", new Date().getTime(), map);
}
assertThat(buffer.getBufferedDataSize(), is(greaterThan(0L)));
assertThat(buffer.getBufferedDataSize(), is(lessThan(512L)));
assertThat(buffer.getBufferedDataSize()).isGreaterThan(0L);
assertThat(buffer.getBufferedDataSize()).isLessThan(512L);

buffer.flush(ingester, true);
assertThat(buffer.getBufferedDataSize(), is(0L));
assertThat(buffer.getBufferedDataSize()).isEqualTo(0L);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package org.komamitsu.fluency.buffer;

import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

class FileBackupTest {

private void createTempFile(File dir, String filename, String content) throws IOException {
Path tempfilePath = Files.createFile(dir.toPath().resolve(filename));
Files.write(tempfilePath, content.getBytes(StandardCharsets.UTF_8));
tempfilePath.toFile().deleteOnExit();
}

private void assertSavedBuffer(FileBackup.SavedBuffer savedBuffer, Path expectedPath, byte[] expectedContent, String... expectedParams) {
assertThat(savedBuffer.getPath()).isEqualTo(expectedPath);
savedBuffer.open((params, channel) -> {
assertThat(params.toArray()).isEqualTo(expectedParams);
try {
long size = channel.size();
ByteBuffer buf = ByteBuffer.allocate((int) size);
channel.read(buf);
assertThat(buf.array()).isEqualTo(expectedContent);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

private void assertSavedFile(File savedFile,
String bufferFormatType,
String prefix,
long startNanos,
long endNanos,
String param1,
String param2,
byte[] expectedContent) throws IOException {
String fileName = savedFile.toPath().getFileName().toString();
assertThat(fileName).endsWith(".buf");

String[] partsOfPath = fileName.substring(0, fileName.length() - ".buf".length()).split("#");
assertThat(partsOfPath).hasSize(4);
assertThat(partsOfPath[0]).isEqualTo(bufferFormatType + "_" + prefix);
assertThat(partsOfPath[1]).isEqualTo(param1);
assertThat(partsOfPath[2]).isEqualTo(param2);
assertThat(Long.valueOf(partsOfPath[3])).isBetween(startNanos, endNanos);
assertThat(Files.readAllBytes(savedFile.toPath())).isEqualTo(expectedContent);
}

@Test
void getSavedFiles_GivenEmptyFiles_ShouldReturnEmpty() throws IOException {
File backupDir = Files.createTempDirectory("test").toFile();
backupDir.deleteOnExit();
Buffer buffer = mock(Buffer.class);
String prefix = "my_prefix";
FileBackup fileBackup = new FileBackup(backupDir, buffer, prefix);
assertThat(fileBackup.getSavedFiles()).isEmpty();
}

@Test
void getSavedFiles_GivenSomeFiles_ShouldReturnThem() throws IOException {
long nanoSeconds1 = System.nanoTime();
long nanoSeconds2 = System.nanoTime();
long nanoSeconds3 = System.nanoTime();
File backupDir = Files.createTempDirectory("test").toFile();
backupDir.deleteOnExit();
createTempFile(backupDir,
String.format("xmy_buf_type_my_prefix#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("xmy_buf_type_my_prefix#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("y_buf_type_my_prefix#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#1paramA#1paramB#%d.buf", nanoSeconds1),
"content1");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#2param-a#2param-b#%d.buf", nanoSeconds2),
"content2");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#3param_a#3param_b#%d.buf", nanoSeconds3),
"content3");
createTempFile(backupDir,
String.format("my_buf_type_my_prefixz#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefi#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#param:a#param:b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#param_a#param_b#%d", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#param_a#param_b#%d.buff", System.nanoTime()),
"ignored");
Buffer buffer = mock(Buffer.class);
doReturn("my_buf_type").when(buffer).bufferFormatType();
String prefix = "my_prefix";
FileBackup fileBackup = new FileBackup(backupDir, buffer, prefix);

List<FileBackup.SavedBuffer> savedFiles = fileBackup.getSavedFiles().stream().sorted(
Comparator.comparing(FileBackup.SavedBuffer::getPath)).collect(Collectors.toList());
System.out.println(savedFiles);
assertThat(savedFiles).size().isEqualTo(3);
assertSavedBuffer(savedFiles.get(0),
backupDir.toPath().resolve(String.format("my_buf_type_my_prefix#1paramA#1paramB#%d.buf", nanoSeconds1)),
"content1".getBytes(StandardCharsets.UTF_8),
"1paramA",
"1paramB");
assertSavedBuffer(savedFiles.get(1),
backupDir.toPath().resolve(String.format("my_buf_type_my_prefix#2param-a#2param-b#%d.buf", nanoSeconds2)),
"content2".getBytes(StandardCharsets.UTF_8),
"2param-a",
"2param-b");
assertSavedBuffer(savedFiles.get(2),
backupDir.toPath().resolve(String.format("my_buf_type_my_prefix#3param_a#3param_b#%d.buf", nanoSeconds3)),
"content3".getBytes(StandardCharsets.UTF_8),
"3param_a",
"3param_b");
}

@Test
void saveBuffer() throws IOException {
File backupDir = Files.createTempDirectory("test").toFile();
backupDir.deleteOnExit();
Buffer buffer = mock(Buffer.class);
doReturn("my_buf_type").when(buffer).bufferFormatType();
String prefix = "my_prefix";
FileBackup fileBackup = new FileBackup(backupDir, buffer, prefix);
long startNanos = System.nanoTime();
fileBackup.saveBuffer(
Arrays.asList("1paramA", "1paramB"),
ByteBuffer.wrap("content1".getBytes(StandardCharsets.UTF_8)));
fileBackup.saveBuffer(
Arrays.asList("2param-a", "2param-b"),
ByteBuffer.wrap("content2".getBytes(StandardCharsets.UTF_8)));
fileBackup.saveBuffer(
Arrays.asList("3param_a", "3param_b"),
ByteBuffer.wrap("content3".getBytes(StandardCharsets.UTF_8)));
long endNanos = System.nanoTime();

List<File> savedFiles = Arrays.stream(Objects.requireNonNull(backupDir.listFiles()))
.sorted(Comparator.comparing(File::toString))
.collect(Collectors.toList());
assertThat(savedFiles).size().isEqualTo(3);
assertSavedFile(savedFiles.get(0),
"my_buf_type",
"my_prefix",
startNanos,
endNanos,
"1paramA",
"1paramB",
"content1".getBytes(StandardCharsets.UTF_8));
assertSavedFile(savedFiles.get(1),
"my_buf_type",
"my_prefix",
startNanos,
endNanos,
"2param-a",
"2param-b",
"content2".getBytes(StandardCharsets.UTF_8));
assertSavedFile(savedFiles.get(2),
"my_buf_type",
"my_prefix",
startNanos,
endNanos,
"3param_a",
"3param_b",
"content3".getBytes(StandardCharsets.UTF_8));
}
}
Loading