Skip to content

Commit

Permalink
Use a proper coordinator table schema based on whether the group comm…
Browse files Browse the repository at this point in the history
…it feature is enabled or not (#2034)
  • Loading branch information
komamitsu authored Jul 5, 2024
1 parent c5a00bd commit 2deb11f
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitAdminIntegrationTestBase;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.Coordinator;
import java.util.Properties;

public class ConsensusCommitAdminIntegrationTestWithCassandra
extends ConsensusCommitAdminIntegrationTestBase {
@Override
protected Properties getProps(String testName) {
return CassandraEnv.getProperties(testName);
return ConsensusCommitCassandraEnv.getProperties(testName);
}

@Override
Expand All @@ -17,4 +19,20 @@ protected String getSystemNamespaceName(Properties properties) {
.getSystemNamespaceName()
.orElse(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME);
}

@Override
protected String getCoordinatorNamespaceName(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.getCoordinatorNamespace()
.orElse(Coordinator.NAMESPACE);
}

@Override
protected boolean isGroupCommitEnabled(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.isCoordinatorGroupCommitEnabled();
}

@Override
protected void extraCheckOnCoordinatorTable() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitAdminIntegrationTestBase;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.Coordinator;
import java.util.Map;
import java.util.Properties;

Expand All @@ -10,12 +12,12 @@ public class ConsensusCommitAdminIntegrationTestWithCosmos

@Override
protected Properties getProps(String testName) {
return CosmosEnv.getProperties(testName);
return ConsensusCommitCosmosEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
return ConsensusCommitCosmosEnv.getCreationOptions();
}

@Override
Expand All @@ -24,4 +26,17 @@ protected String getSystemNamespaceName(Properties properties) {
.getTableMetadataDatabase()
.orElse(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME);
}

@Override
protected String getCoordinatorNamespaceName(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.getCoordinatorNamespace()
.orElse(Coordinator.NAMESPACE);
}

@Override
protected boolean isGroupCommitEnabled(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.isCoordinatorGroupCommitEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitAdminIntegrationTestBase;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.Coordinator;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
Expand All @@ -12,12 +14,12 @@ public class ConsensusCommitAdminIntegrationTestWithDynamo

@Override
protected Properties getProps(String testName) {
return DynamoEnv.getProperties(testName);
return ConsensusCommitDynamoEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return DynamoEnv.getCreationOptions();
return ConsensusCommitDynamoEnv.getCreationOptions();
}

@Override
Expand All @@ -32,6 +34,19 @@ protected String getSystemNamespaceName(Properties properties) {
.orElse(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME);
}

@Override
protected String getCoordinatorNamespaceName(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.getCoordinatorNamespace()
.orElse(Coordinator.NAMESPACE);
}

@Override
protected boolean isGroupCommitEnabled(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.isCoordinatorGroupCommitEnabled();
}

// Since DynamoDB doesn't have the namespace concept, some behaviors around the namespace are
// different from the other adapters. So disable several tests that check such behaviors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitAdminIntegrationTestBase;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitConfig;
import com.scalar.db.transaction.consensuscommit.Coordinator;
import java.util.Properties;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
Expand All @@ -12,7 +14,7 @@ public class ConsensusCommitAdminIntegrationTestWithJdbcDatabase

@Override
protected Properties getProps(String testName) {
return JdbcEnv.getProperties(testName);
return ConsensusCommitJdbcEnv.getProperties(testName);
}

@Override
Expand All @@ -22,6 +24,19 @@ protected String getSystemNamespaceName(Properties properties) {
.orElse(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME);
}

@Override
protected String getCoordinatorNamespaceName(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.getCoordinatorNamespace()
.orElse(Coordinator.NAMESPACE);
}

@Override
protected boolean isGroupCommitEnabled(String testName) {
return new ConsensusCommitConfig(new DatabaseConfig(getProperties(testName)))
.isCoordinatorGroupCommitEnabled();
}

// Since SQLite doesn't have persistent namespaces, some behaviors around the namespace are
// different from the other adapters. So disable several tests that check such behaviors.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
@ThreadSafe
public class ConsensusCommitAdmin implements DistributedTransactionAdmin {

private final ConsensusCommitConfig config;
private final DistributedStorageAdmin admin;
private final String coordinatorNamespace;
private final boolean isIncludeMetadataEnabled;
Expand All @@ -35,7 +36,7 @@ public class ConsensusCommitAdmin implements DistributedTransactionAdmin {
@Inject
public ConsensusCommitAdmin(DistributedStorageAdmin admin, DatabaseConfig databaseConfig) {
this.admin = admin;
ConsensusCommitConfig config = new ConsensusCommitConfig(databaseConfig);
config = new ConsensusCommitConfig(databaseConfig);
coordinatorNamespace = config.getCoordinatorNamespace().orElse(Coordinator.NAMESPACE);
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
}
Expand All @@ -44,7 +45,7 @@ public ConsensusCommitAdmin(DatabaseConfig databaseConfig) {
StorageFactory storageFactory = StorageFactory.create(databaseConfig.getProperties());
admin = storageFactory.getStorageAdmin();

ConsensusCommitConfig config = new ConsensusCommitConfig(databaseConfig);
config = new ConsensusCommitConfig(databaseConfig);
coordinatorNamespace = config.getCoordinatorNamespace().orElse(Coordinator.NAMESPACE);
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
}
Expand All @@ -54,6 +55,7 @@ public ConsensusCommitAdmin(DatabaseConfig databaseConfig) {
DistributedStorageAdmin admin,
ConsensusCommitConfig config,
boolean isIncludeMetadataEnabled) {
this.config = config;
this.admin = admin;
coordinatorNamespace = config.getCoordinatorNamespace().orElse(Coordinator.NAMESPACE);
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
Expand All @@ -67,7 +69,8 @@ public void createCoordinatorTables(Map<String, String> options) throws Executio
}

admin.createNamespace(coordinatorNamespace, options);
admin.createTable(coordinatorNamespace, Coordinator.TABLE, Coordinator.TABLE_METADATA, options);
admin.createTable(
coordinatorNamespace, Coordinator.TABLE, getCoordinatorTableMetadata(), options);
}

@Override
Expand Down Expand Up @@ -207,7 +210,8 @@ public void repairTable(

@Override
public void repairCoordinatorTables(Map<String, String> options) throws ExecutionException {
admin.repairTable(coordinatorNamespace, Coordinator.TABLE, Coordinator.TABLE_METADATA, options);
admin.repairTable(
coordinatorNamespace, Coordinator.TABLE, getCoordinatorTableMetadata(), options);
}

@Override
Expand Down Expand Up @@ -276,4 +280,12 @@ private void checkNamespace(String namespace) {
CoreError.CONSENSUS_COMMIT_COORDINATOR_NAMESPACE_SPECIFIED.buildMessage(namespace));
}
}

private TableMetadata getCoordinatorTableMetadata() {
if (config.isCoordinatorGroupCommitEnabled()) {
return Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_ENABLED;
} else {
return Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_DISABLED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@
public class Coordinator {
public static final String NAMESPACE = "coordinator";
public static final String TABLE = "state";
public static final TableMetadata TABLE_METADATA =
public static final TableMetadata TABLE_METADATA_WITH_GROUP_COMMIT_DISABLED =
TableMetadata.newBuilder()
.addColumn(Attribute.ID, DataType.TEXT)
.addColumn(Attribute.STATE, DataType.INT)
.addColumn(Attribute.CREATED_AT, DataType.BIGINT)
.addPartitionKey(Attribute.ID)
.build();
public static final TableMetadata TABLE_METADATA_WITH_GROUP_COMMIT_ENABLED =
TableMetadata.newBuilder()
.addColumn(Attribute.ID, DataType.TEXT)
.addColumn(Attribute.CHILD_IDS, DataType.TEXT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class ConsensusCommitAdminTestBase {
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this).close();
when(config.getCoordinatorNamespace()).thenReturn(getCoordinatorNamespaceConfig());
when(config.isCoordinatorGroupCommitEnabled()).thenReturn(false);
admin = new ConsensusCommitAdmin(distributedStorageAdmin, config, false);
coordinatorNamespaceName = getCoordinatorNamespaceConfig().orElse(Coordinator.NAMESPACE);
}
Expand All @@ -68,7 +69,29 @@ public void createCoordinatorTables_shouldCreateCoordinatorTableProperly()
.createTable(
coordinatorNamespaceName,
Coordinator.TABLE,
Coordinator.TABLE_METADATA,
Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_DISABLED,
Collections.emptyMap());
}

@Test
public void createCoordinatorTables_WithGroupCommitEnabled_shouldCreateCoordinatorTableProperly()
throws ExecutionException {
// Arrange
when(config.isCoordinatorGroupCommitEnabled()).thenReturn(true);
ConsensusCommitAdmin adminWithGroupCommit =
new ConsensusCommitAdmin(distributedStorageAdmin, config, false);

// Act
adminWithGroupCommit.createCoordinatorTables();

// Assert
verify(distributedStorageAdmin)
.createNamespace(coordinatorNamespaceName, Collections.emptyMap());
verify(distributedStorageAdmin)
.createTable(
coordinatorNamespaceName,
Coordinator.TABLE,
Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_ENABLED,
Collections.emptyMap());
}

Expand Down Expand Up @@ -98,7 +121,34 @@ public void createCoordinatorTables_WithOptions_shouldCreateCoordinatorTableProp
verify(distributedStorageAdmin).createNamespace(coordinatorNamespaceName, options);
verify(distributedStorageAdmin)
.createTable(
coordinatorNamespaceName, Coordinator.TABLE, Coordinator.TABLE_METADATA, options);
coordinatorNamespaceName,
Coordinator.TABLE,
Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_DISABLED,
options);
}

@Test
public void
createCoordinatorTables_WithOptionsWithGroupCommitEnabled_shouldCreateCoordinatorTableProperly()
throws ExecutionException {
// Arrange
when(config.isCoordinatorGroupCommitEnabled()).thenReturn(true);
ConsensusCommitAdmin adminWithGroupCommit =
new ConsensusCommitAdmin(distributedStorageAdmin, config, false);

Map<String, String> options = ImmutableMap.of("name", "value");

// Act
adminWithGroupCommit.createCoordinatorTables(options);

// Assert
verify(distributedStorageAdmin).createNamespace(coordinatorNamespaceName, options);
verify(distributedStorageAdmin)
.createTable(
coordinatorNamespaceName,
Coordinator.TABLE,
Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_ENABLED,
options);
}

@Test
Expand Down Expand Up @@ -570,7 +620,32 @@ public void repairCoordinatorTables_ShouldCallJdbcAdminProperly() throws Executi
// Assert
verify(distributedStorageAdmin)
.repairTable(
coordinatorNamespaceName, Coordinator.TABLE, Coordinator.TABLE_METADATA, options);
coordinatorNamespaceName,
Coordinator.TABLE,
Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_DISABLED,
options);
}

@Test
public void repairCoordinatorTables_WithGroupCommitEnabled_ShouldCallJdbcAdminProperly()
throws ExecutionException {
// Arrange
when(config.isCoordinatorGroupCommitEnabled()).thenReturn(true);
ConsensusCommitAdmin adminWithGroupCommit =
new ConsensusCommitAdmin(distributedStorageAdmin, config, false);

Map<String, String> options = ImmutableMap.of("foo", "bar");

// Act
adminWithGroupCommit.repairCoordinatorTables(options);

// Assert
verify(distributedStorageAdmin)
.repairTable(
coordinatorNamespaceName,
Coordinator.TABLE,
Coordinator.TABLE_METADATA_WITH_GROUP_COMMIT_ENABLED,
options);
}

@Test
Expand Down
Loading

0 comments on commit 2deb11f

Please sign in to comment.