Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: serialize agg state using kryo #714

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,30 @@ public void write(Batch batch) {
log.info("[output] [ceresdb3] agg=[{}] ts=[{}] stat={} output=[{}] discard=[{}] cost=[{}]", //
batch.key, ts, batch.window.getStat(), points.size(), discard, time1 - time0);

if (batch.window.preview) {
return;
}

param = new WriteMetricsParam();
param.setTenant(batch.key.getTenant());
param.setPoints(points);
points.clear();
// completeness info
{
for (String metricName : usedMetricNames) {
String completeMetric = metricName + "_complete";
for (String metricName : usedMetricNames) {
String completeMetric = metricName + "_complete";

for (MergedCompleteness.GroupCompleteness mgc : batch.window.mergedCompleteness
.getDetails()) {
for (MergedCompleteness.GroupCompleteness mgc : batch.window.mergedCompleteness
.getDetails()) {

String value = JSON.toJSONString(mgc.getTables(), SerializerFeature.NotWriteDefaultValue);
String value = JSON.toJSONString(mgc.getTables(), SerializerFeature.NotWriteDefaultValue);

WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(completeMetric);
p.setTimeStamp(batch.window.getTimestamp());
p.setTags(mgc.getTags());
p.setStrValue(value);
WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(completeMetric);
p.setTimeStamp(batch.window.getTimestamp());
p.setTags(mgc.getTags());
p.setStrValue(value);

points.add(p);
}
points.add(p);
}
}
if (points.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import javax.annotation.Nullable;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* <p>
Expand All @@ -17,6 +18,7 @@
* @author xzchaoo
*/
@Data
@NoArgsConstructor
public class AggFunc {
public static final int TYPE_SUM = 1;
public static final int TYPE_AVG = 2;
Expand Down Expand Up @@ -57,6 +59,10 @@ public class AggFunc {
@Nullable
private LogSamplesMergeParams logSamplesMerge;

public AggFunc(String type) {
this.type = type;
}

public int getTypeInt() {
if (typeInt != 0) {
return typeInt;
Expand All @@ -67,9 +73,18 @@ public int getTypeInt() {

@Data
public static class TopnParams {
/**
* Order by which field
*/
private String orderBy;
private boolean desc;
private int limit;
/**
* Desc or asc
*/
private boolean asc;
/**
* topn limit
*/
private int limit = 3;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ public class AggTask {
private Where gatewayWhere;
@Nullable
private Where where;

@Nonnull
private GroupBy groupBy;

@Nonnull
private Window window;
@Nonnull
private Output output;
/**
* agg task state config
*/
private State state = new State();

public AggTask() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import java.util.List;

import javax.annotation.Nonnull;

import lombok.Data;

/**
Expand All @@ -17,8 +15,14 @@
*/
@Data
public class Output {
@Nonnull
private List<OutputItem> items;

/**
* If preview is true, the incomplete result will be emitted.
*
* @see Window#getPreviewEmitInterval()
*/
private boolean preview = false;

public Output() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import javax.annotation.Nonnull;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -16,9 +17,7 @@
*
* @author xzchaoo
*/
@Data
public class Select {
@Getter
@Setter
@Nonnull
private List<SelectItem> items;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.core.conf;

import lombok.Data;

/**
* <p>
* created at 2023/10/26
*
* @author xzchaoo
*/
@Data
public class State {
private boolean enabled = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public class Window {
private String type;
private long interval;

/**
* emit interval for preview data
*/
private long previewEmitInterval = 60000L;

public Window() {}

public static Window sliding(long interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
Expand All @@ -22,10 +23,11 @@
* @author xzchaoo
*/
@Data
@NoArgsConstructor
public final class AggTaskKey {
private final String tenant;
private final String aggId;
private final String partition;
private String tenant;
private String aggId;
private String partition;

@Setter(AccessLevel.NONE)
@Getter(AccessLevel.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.collect.Maps;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
Expand All @@ -20,6 +21,7 @@
*
* @author xzchaoo
*/
@NoArgsConstructor
public final class FixedSizeTags implements Cloneable {
public static final FixedSizeTags EMPTY = new FixedSizeTags(new String[0], new String[0], 0);

Expand Down Expand Up @@ -55,8 +57,6 @@ public FixedSizeTags(String[] keys) {
this.hash = hash;
}

public FixedSizeTags() {}

public Map<String, String> asMap() {
Map<String, String> map = this.map;
if (map == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class WindowInfo {
public long timestamp;
public MergedCompleteness mergedCompleteness;
public WindowStat stat;
public boolean preview;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
*/
package io.holoinsight.server.agg.v1.executor.state;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Consumer offset info
Expand All @@ -13,6 +15,8 @@
* @author xzchaoo
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OffsetInfo {
/**
* Kafka partition offset
Expand Down
Loading
Loading