Skip to content

Commit

Permalink
Display latencies for getting a connection from Hikari pool. (#59)
Browse files Browse the repository at this point in the history
Summary:
The output with the connection latency:
01:39:14,402 (DBWorkload.java:1016) INFO  - Num New Order transactions : 36, time seconds: 180
01:39:14,402 (DBWorkload.java:1017) INFO  - TPM-C: 12
01:39:14,402 (DBWorkload.java:1018) INFO  - Efficiency : 93.31%
01:39:14,403 (DBWorkload.java:1056) INFO  - NewOrder, Avg Latency: 35.67194444444444 msecs, p99 Latency: 172.218 msecs Whole operation, Avg Latency: 56.37466666666666 msecs, p99 Latency: 231.338 msecs
01:39:14,404 (DBWorkload.java:1056) INFO  - Payment, Avg Latency: 25.760722222222224 msecs, p99 Latency: 216.9 msecs Whole operation, Avg Latency: 39.668416666666666 msecs, p99 Latency: 255.616 msecs
01:39:14,404 (DBWorkload.java:1056) INFO  - OrderStatus, Avg Latency: 19.56925 msecs, p99 Latency: 36.103 msecs Whole operation, Avg Latency: 20.107 msecs, p99 Latency: 36.516 msecs
01:39:14,404 (DBWorkload.java:1056) INFO  - Delivery, Avg Latency: 63.89 msecs, p99 Latency: 87.842 msecs Whole operation, Avg Latency: 64.649 msecs, p99 Latency: 88.568 msecs
01:39:14,404 (DBWorkload.java:1056) INFO  - StockLevel, Avg Latency: 135.317 msecs, p99 Latency: 136.352 msecs Whole operation, Avg Latency: 135.9805 msecs, p99 Latency: 136.877 msecs
01:39:14,404 (DBWorkload.java:1068) INFO  - Acquire Connection, Avg Latency: 49.240474999999996 msecs, p99 Latency: 255.616 msecs

Reviewers:
Mihnea, Rob
  • Loading branch information
psudheer21 authored Oct 16, 2020
1 parent e1169f6 commit fd90670
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 26 deletions.
2 changes: 2 additions & 0 deletions config/workload_all.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
<enableForeignKeysAfterLoad>true</enableForeignKeysAfterLoad>
<hikariConnectionTimeoutMs>180000</hikariConnectionTimeoutMs>
<useStoredProcedures>true</useStoredProcedures>
<displayEnhancedLatencyMetrics>false</displayEnhancedLatencyMetrics>

<transactiontypes>
<transactiontype>
<name>NewOrder</name>
Expand Down
79 changes: 61 additions & 18 deletions src/com/oltpbenchmark/DBWorkload.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ else if (serial)
// Bombs away!
Results r = null;
try {
r = runWorkload(benchList, verbose, intervalMonitor);
r = runWorkload(benchList, verbose, intervalMonitor, xmlConfig);
} catch (Throwable ex) {
LOG.error("Unexpected error when running benchmarks.", ex);
System.exit(1);
Expand Down Expand Up @@ -976,7 +976,7 @@ private static void runLoader(BenchmarkModule bench, boolean verbose) {
}

private static Results runWorkload(List<BenchmarkModule> benchList, boolean verbose,
int intervalMonitor) throws QueueLimitException, IOException {
int intervalMonitor, XMLConfiguration xmlConfig) throws QueueLimitException, IOException {
List<Worker<?>> workers = new ArrayList<Worker<?>>();
List<WorkloadConfiguration> workConfs = new ArrayList<WorkloadConfiguration>();

Expand Down Expand Up @@ -1017,34 +1017,74 @@ private static Results runWorkload(List<BenchmarkModule> benchList, boolean verb
LOG.info("TPM-C: " + df.format(tpmc));
LOG.info("Efficiency : " + df.format(efficiency) + "%");

boolean displayEnhancedLatencyMetrics =
xmlConfig.containsKey("displayEnhancedLatencyMetrics") &&
xmlConfig.getBoolean("displayEnhancedLatencyMetrics");
PrintLatencies(workers, displayEnhancedLatencyMetrics);

return r;
}

private static void PrintLatencies(List<Worker<?>> workers,
boolean displayEnhancedLatencyMetrics) {
List<List<Integer>> list_latencies = new ArrayList<>();
List<List<Integer>> list_enhanced_latencies = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
list_latencies.add(new ArrayList<Integer>());
list_enhanced_latencies.add(new ArrayList<Integer>());
}

for (Worker<?> w : workers) {
for (LatencyRecord.Sample sample : w.getLatencyRecords()) {
list_latencies.get(sample.tranType - 1).add(sample.operationLatencyUs);
}

if (displayEnhancedLatencyMetrics) {
for (LatencyRecord.Sample sample : w.getWholeOperationLatencyRecords()) {
list_enhanced_latencies.get(sample.tranType - 1).add(sample.operationLatencyUs);
}
}
}
printLatencies(list_latencies, transactionTypes);
return r;
}

private static void printLatencies(List<List<Integer>> list_latencies, Map<Integer,
String> transactionTypes) {
for (int i = 0; i < 5; ++i) {
Collections.sort(list_latencies.get(i));
long sum = 0;
for (int val : list_latencies.get(i)) {
sum += val;
if (!displayEnhancedLatencyMetrics) {
for (int i = 0; i < list_latencies.size(); ++i) {
LOG.info(getOperationLatencyString(transactionTypes.get(i+1).toString(),
list_latencies.get(i)));
}
double avgLatency = sum / list_latencies.get(i).size();
int p99Index = (int)(list_latencies.get(i).size() * 0.99);
return;
}

for (int i = 0; i < list_latencies.size(); ++i) {
LOG.info(getOperationLatencyString(transactionTypes.get(i+1).toString(),
list_latencies.get(i)) +
getOperationLatencyString(", Whole operation",
list_enhanced_latencies.get(i)));
}

LOG.info(transactionTypes.get(i+1) +
", Avg Latency: " + avgLatency / 1000 +
" msecs, p99 Latency: " + list_latencies.get(i).get(p99Index) * 1.0 / 1000 + " msecs");
List<Integer> acqConnectionLatency = new ArrayList<Integer>();
for (Worker<?> w : workers) {
for (LatencyRecord.Sample sample : w.getAcqConnectionLatencyRecords()) {
acqConnectionLatency.add(sample.operationLatencyUs);
}
}
LOG.info(getOperationLatencyString("Acquire Connection", acqConnectionLatency));
}

private static String getOperationLatencyString(String operation, List<Integer> latencies) {
if (latencies.size() == 0) {
return "";
}
Collections.sort(latencies);
long sum = 0;
for (int val : latencies) {
sum += val;
}
double avgLatency = sum * 1.0 / latencies.size() / 1000;
int p99Index = (int)(latencies.size() * 0.99);
double p99Latency = latencies.get(p99Index) * 1.0 / 1000;

return operation + ", Avg Latency: " + avgLatency +
" msecs, p99 Latency: " + p99Latency + " msecs";
}

public static void mergeResults(String dirPath, String[] fileNames) {
Expand Down Expand Up @@ -1113,7 +1153,10 @@ public static void mergeResults(String dirPath, String[] fileNames) {
LOG.info("Num New Order transactions : " + numNewOrderTransactions + ", time seconds: " + time);
LOG.info("TPM-C: " + df.format(tpmc));
LOG.info("Efficiency : " + df.format(efficiency) + "%");
printLatencies(list_latencies, transactionTypes);
for (int i = 0; i < list_latencies.size(); ++i) {
LOG.info(getOperationLatencyString(transactionTypes.get(i+1).toString(),
list_latencies.get(i)));
}
}

private static void printUsage(Options options) {
Expand Down
120 changes: 112 additions & 8 deletions src/com/oltpbenchmark/api/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public abstract class Worker<T extends BenchmarkModule> implements Runnable {

private static WorkloadState wrkldState;
private LatencyRecord latencies;
private LatencyRecord wholeOperationLatencies;
private LatencyRecord acqConnectionLatencies;
private Statement currStatement;

// Interval requests used by the monitor
Expand Down Expand Up @@ -137,6 +139,14 @@ public final Iterable<LatencyRecord.Sample> getLatencyRecords() {
return latencies;
}

public final Iterable<LatencyRecord.Sample> getWholeOperationLatencyRecords() {
return wholeOperationLatencies;
}

public final Iterable<LatencyRecord.Sample> getAcqConnectionLatencyRecords() {
return acqConnectionLatencies;
}

@SuppressWarnings("unchecked")
public final <P extends Procedure> P getProcedure(Class<P> procClass) {
return (P) (this.class_procedures.get(procClass));
Expand Down Expand Up @@ -205,6 +215,60 @@ synchronized public void cancelStatement() {
}
}

class TransactionExecutionState {
private long startOperation = 0;
private long endOperation = 0;
private long startConnection = 0;
private long endConnection = 0;
private TransactionType type;

public TransactionExecutionState() {
this.startOperation = 0;
this.endOperation = 0;
this.startConnection = 0;
this.endConnection = 0;
this.type = TransactionType.INVALID;
}

public TransactionExecutionState(long startOperation, long endOperation,
long startConnection, long endConnection,
TransactionType type) {
this.startOperation = startOperation;
this.endOperation = endOperation;
this.startConnection = startConnection;
this.endConnection = endConnection;
this.type = type;
}

public long getStartOperation() {
return startOperation;
}

public long getEndOperation() {
return endOperation;
}

public long getStartConnection() {
return startConnection;
}

public long getEndConnection() {
return endConnection;
}

public TransactionType getTransactionType() {
return type;
}

public void setEndOperation(long endOperation) {
this.endOperation = endOperation;
}

public void setEndConnection(long endConnection) {
this.endConnection = endConnection;
}
}

@Override
public final void run() {
Thread t = Thread.currentThread();
Expand All @@ -213,6 +277,8 @@ public final void run() {

// In case of reuse reset the measurements
latencies = new LatencyRecord(wrkldState.getTestStartNs());
wholeOperationLatencies = new LatencyRecord(wrkldState.getTestStartNs());
acqConnectionLatencies = new LatencyRecord(wrkldState.getTestStartNs());

// Invoke the initialize callback
try {
Expand Down Expand Up @@ -310,10 +376,9 @@ public final void run() {
}
}

long startOperation = System.nanoTime();
TransactionType type = invalidTT;
TransactionExecutionState executionState = new TransactionExecutionState();
try {
type = doWork(preState == State.MEASURE, pieceOfWork);
executionState = doWork(preState == State.MEASURE, pieceOfWork);
} catch (IndexOutOfBoundsException e) {
if (phase.isThroughputRun()) {
LOG.error("Thread tried executing disabled phase!");
Expand All @@ -336,7 +401,13 @@ public final void run() {
}
}
}
long endOperation = System.nanoTime();
// In case of transaction failures, the end times will not be populated.
if (executionState.getEndOperation() == 0) {
executionState.setEndOperation(System.nanoTime());
}
if (executionState.getEndConnection() == 0) {
executionState.setEndConnection(System.nanoTime());
}

if (think_time_msecs > 0) {
// Sleep for the think time duration.
Expand All @@ -359,8 +430,27 @@ public final void run() {
// changed, otherwise we're recording results for a query
// that either started during the warmup phase or ended
// after the timer went off.
if (preState == State.MEASURE && type != null && this.wrkldState.getCurrentPhase().id == phase.id) {
latencies.addLatency(type.getId(), start, end, startOperation, endOperation, this.id, phase.id);
if (preState == State.MEASURE && executionState.getTransactionType() != null &&
this.wrkldState.getCurrentPhase().id == phase.id) {
latencies.addLatency(
executionState.getTransactionType().getId(),
start, end,
executionState.getStartOperation(), executionState.getEndOperation(),
this.id, phase.id);
acqConnectionLatencies.addLatency(
1, start, end,
executionState.getStartConnection(),executionState.getEndOperation(),
this.id, phase.id);

// The latency of the whole operation can be obtained by evaluating the
// time from the acquisition of the connection to the completion of the
// operation.
wholeOperationLatencies.addLatency(
executionState.getTransactionType().getId(),
start, end,
executionState.getStartConnection(), executionState.getEndOperation(),
this.id, phase.id);

intervalRequests.incrementAndGet();
}
if (phase.isLatencyRun())
Expand Down Expand Up @@ -390,8 +480,13 @@ public final void run() {
*
* @param llr
*/
protected final TransactionType doWork(boolean measure, SubmittedProcedure pieceOfWork) {
protected final TransactionExecutionState doWork(boolean measure, SubmittedProcedure pieceOfWork) {
TransactionType next = null;
long startOperation = 0;
long endOperation = 0;
long startConnection = 0;
long endConnection = 0;

TransactionStatus status = TransactionStatus.RETRY;
Savepoint savepoint = null;
final DatabaseType dbType = wrkld.getDBType();
Expand All @@ -400,10 +495,14 @@ protected final TransactionType doWork(boolean measure, SubmittedProcedure piece

Connection conn = null;
try {
startConnection = System.nanoTime();

conn = dataSource.getConnection();
conn.setAutoCommit(false);
conn.setTransactionIsolation(this.wrkld.getIsolationMode());

endConnection = System.nanoTime();

while (status == TransactionStatus.RETRY && this.wrkldState.getGlobalState() != State.DONE) {
if (next == null) {
next = transactionTypes.getType(pieceOfWork.getType());
Expand All @@ -420,7 +519,10 @@ protected final TransactionType doWork(boolean measure, SubmittedProcedure piece
// }

status = TransactionStatus.UNKNOWN;

startOperation = System.nanoTime();
status = this.executeWork(conn, next);
endOperation = System.nanoTime();

// User Abort Handling
// These are not errors
Expand Down Expand Up @@ -517,7 +619,9 @@ protected final TransactionType doWork(boolean measure, SubmittedProcedure piece
}
}

return (next);
return new TransactionExecutionState(startOperation, endOperation,
startConnection, endConnection,
next);
}

/**
Expand Down

0 comments on commit fd90670

Please sign in to comment.