Skip to content

Commit

Permalink
feat: serialize agg state using kryo (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Oct 30, 2023
1 parent c3aced3 commit b3e0579
Show file tree
Hide file tree
Showing 35 changed files with 867 additions and 10,935 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,30 @@ public void write(Batch batch) {
log.info("[output] [ceresdb3] agg=[{}] ts=[{}] stat={} output=[{}] discard=[{}] cost=[{}]", //
batch.key, ts, batch.window.getStat(), points.size(), discard, time1 - time0);

if (batch.window.preview) {
return;
}

param = new WriteMetricsParam();
param.setTenant(batch.key.getTenant());
param.setPoints(points);
points.clear();
// completeness info
{
for (String metricName : usedMetricNames) {
String completeMetric = metricName + "_complete";
for (String metricName : usedMetricNames) {
String completeMetric = metricName + "_complete";

for (MergedCompleteness.GroupCompleteness mgc : batch.window.mergedCompleteness
.getDetails()) {
for (MergedCompleteness.GroupCompleteness mgc : batch.window.mergedCompleteness
.getDetails()) {

String value = JSON.toJSONString(mgc.getTables(), SerializerFeature.NotWriteDefaultValue);
String value = JSON.toJSONString(mgc.getTables(), SerializerFeature.NotWriteDefaultValue);

WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(completeMetric);
p.setTimeStamp(batch.window.getTimestamp());
p.setTags(mgc.getTags());
p.setStrValue(value);
WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(completeMetric);
p.setTimeStamp(batch.window.getTimestamp());
p.setTags(mgc.getTags());
p.setStrValue(value);

points.add(p);
}
points.add(p);
}
}
if (points.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import javax.annotation.Nullable;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* <p>
Expand All @@ -17,6 +18,7 @@
* @author xzchaoo
*/
@Data
@NoArgsConstructor
public class AggFunc {
public static final int TYPE_SUM = 1;
public static final int TYPE_AVG = 2;
Expand Down Expand Up @@ -57,6 +59,10 @@ public class AggFunc {
@Nullable
private LogSamplesMergeParams logSamplesMerge;

public AggFunc(String type) {
this.type = type;
}

public int getTypeInt() {
if (typeInt != 0) {
return typeInt;
Expand All @@ -67,9 +73,18 @@ public int getTypeInt() {

@Data
public static class TopnParams {
/**
* Order by which field
*/
private String orderBy;
private boolean desc;
private int limit;
/**
* Desc or asc
*/
private boolean asc;
/**
* topn limit
*/
private int limit = 3;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ public class AggTask {
private Where gatewayWhere;
@Nullable
private Where where;

@Nonnull
private GroupBy groupBy;

@Nonnull
private Window window;
@Nonnull
private Output output;
/**
* agg task state config
*/
private State state = new State();

public AggTask() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import java.util.List;

import javax.annotation.Nonnull;

import lombok.Data;

/**
Expand All @@ -17,8 +15,14 @@
*/
@Data
public class Output {
@Nonnull
private List<OutputItem> items;

/**
* If preview is true, the incomplete result will be emitted.
*
* @see Window#getPreviewEmitInterval()
*/
private boolean preview = false;

public Output() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import javax.annotation.Nonnull;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -16,9 +17,7 @@
*
* @author xzchaoo
*/
@Data
public class Select {
@Getter
@Setter
@Nonnull
private List<SelectItem> items;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.core.conf;

import lombok.Data;

/**
* <p>
* created at 2023/10/26
*
* @author xzchaoo
*/
@Data
public class State {
private boolean enabled = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public class Window {
private String type;
private long interval;

/**
* emit interval for preview data
*/
private long previewEmitInterval = 60000L;

public Window() {}

public static Window sliding(long interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
Expand All @@ -22,10 +23,11 @@
* @author xzchaoo
*/
@Data
@NoArgsConstructor
public final class AggTaskKey {
private final String tenant;
private final String aggId;
private final String partition;
private String tenant;
private String aggId;
private String partition;

@Setter(AccessLevel.NONE)
@Getter(AccessLevel.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.collect.Maps;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
Expand All @@ -20,6 +21,7 @@
*
* @author xzchaoo
*/
@NoArgsConstructor
public final class FixedSizeTags implements Cloneable {
public static final FixedSizeTags EMPTY = new FixedSizeTags(new String[0], new String[0], 0);

Expand Down Expand Up @@ -55,8 +57,6 @@ public FixedSizeTags(String[] keys) {
this.hash = hash;
}

public FixedSizeTags() {}

public Map<String, String> asMap() {
Map<String, String> map = this.map;
if (map == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class WindowInfo {
public long timestamp;
public MergedCompleteness mergedCompleteness;
public WindowStat stat;
public boolean preview;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
*/
package io.holoinsight.server.agg.v1.executor.state;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Consumer offset info
Expand All @@ -13,6 +15,8 @@
* @author xzchaoo
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OffsetInfo {
/**
* Kafka partition offset
Expand Down
Loading

0 comments on commit b3e0579

Please sign in to comment.