Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports multi fields data aggregation and HLL operator #697

Merged
merged 3 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.executor.executor;

import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.holoinsight.server.agg.v1.executor.CompletenessService;
import io.holoinsight.server.agg.v1.executor.output.XOutput;
import io.holoinsight.server.common.springboot.ConditionalOnRole;
import io.holoinsight.server.extension.MetricStorage;
import io.holoinsight.server.registry.core.collecttarget.CollectTargetStorage;

/**
* <p>
* created at 2023/10/16
*
* @author xzchaoo
*/
@Configuration
@ConditionalOnRole("agg-executor")
@ConditionalOnBean(CollectTargetStorage.class)
public class AggExecutorRegistryAutoConfiguration {

@Bean
@ConditionalOnBean(MetricStorage.class)
public XOutput metricStorageOutput() {
return new MetricStorageOutput();
}

@Bean
public CompletenessService completenessService() {
return new RegistryInternalCompletenessService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.executor.executor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

import io.holoinsight.server.agg.v1.core.Utils;
import io.holoinsight.server.agg.v1.executor.output.MergedCompleteness;
import io.holoinsight.server.agg.v1.executor.output.XOutput;
import io.holoinsight.server.extension.MetricStorage;
import io.holoinsight.server.extension.model.WriteMetricsParam;
import lombok.extern.slf4j.Slf4j;

/**
* <p>
* created at 2023/10/17
*
* @author xiangfeng.xzc
*/
@Slf4j
public class MetricStorageOutput implements XOutput {
@Autowired
private MetricStorage metricStorage;

@Override
public String type() {
return "TSDB";
}

@Override
public void write(Batch batch) {
log.info("xxx");
WriteMetricsParam param = new WriteMetricsParam();
param.setTenant(batch.key.getTenant());
List<WriteMetricsParam.Point> points = new ArrayList<>(batch.groups.size());


long time0 = System.currentTimeMillis();

String baseName = batch.oi.getName();
boolean useFormatName = baseName.contains("%s");
String ts = Utils.formatTimeShort(batch.window.getTimestamp());

boolean debug = false;

int discard = 0;
Set<String> usedMetricNames = new HashSet<>();
for (Group g : batch.groups) {
Map<String, Object> finalFields = g.getFinalFields();
for (Map.Entry<String, Object> e : finalFields.entrySet()) {
Object value = e.getValue();
if (value == null) {
continue;
}

String metricName;
if ("value".equals(e.getKey())) {
metricName = baseName;
} else {
if (useFormatName) {
metricName = String.format(baseName, e.getKey());
} else {
metricName = baseName + "_" + e.getKey();
}
}
usedMetricNames.add(metricName);

try {
if (value instanceof Number) {
} else if (value instanceof String) {
} else {
value = JSON.toJSONString(value);
}

WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(metricName);
p.setTimeStamp(batch.window.getTimestamp());
p.setTags(g.getTags().asMap());

if (value instanceof Number) {
p.setValue(((Number) value).doubleValue());
} else {
p.setStrValue(value.toString());
}

if (debug) {
log.info("[output] [ceresdb3] debug agg=[{}] ts=[{}] metric=[{}] tags=[{}] value=[{}]", //
batch.key, ts, metricName, g.getTags(), value);
}
points.add(p);
} catch (IllegalArgumentException ex) {
discard++;
}
}
}

if (points.size() > 0) {
metricStorage.write(param).block();
}
long time1 = System.currentTimeMillis();

log.info("[output] [ceresdb3] agg=[{}] ts=[{}] input=[{}] output=[{}] discard=[{}] cost=[{}]", //
batch.key, ts, batch.window.getInput(), points.size(), discard, time1 - time0);

param = new WriteMetricsParam();
param.setTenant(batch.key.getTenant());
param.setPoints(points);
points.clear();
// completeness info
{
for (String metricName : usedMetricNames) {
String completeMetric = metricName + "_complete";

for (MergedCompleteness.GroupCompleteness mgc : batch.window.mergedCompleteness
.getDetails()) {

String value = JSON.toJSONString(mgc.getTables(), SerializerFeature.NotWriteDefaultValue);

WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(completeMetric);
p.setTimeStamp(batch.window.getTimestamp());
p.setTags(mgc.getTags());
p.setStrValue(value);

points.add(p);
}
}
}
if (points.size() > 0) {
metricStorage.write(param).block();
long time2 = System.currentTimeMillis();

log.info(
"[output] [ceresdb3] agg=[{}] ts=[{}] write completeness info successfully, cost=[{}]", //
batch.key, ts, time2 - time1);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import io.holoinsight.server.registry.core.dim.ProdDimService;
import io.holoinsight.server.registry.core.template.CollectTemplate;
import io.holoinsight.server.registry.core.template.TemplateStorage;
import lombok.extern.slf4j.Slf4j;

/**
* <p>
* created at 2023/10/9
*
* @author xzchaoo
*/
public class RegistryCompletenessService2 implements CompletenessService {
@Slf4j
public class RegistryInternalCompletenessService implements CompletenessService {
@Autowired
private CollectTargetStorage collectTargetStorage;
@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.holoinsight.server.agg.v1.executor.executor.AggExecutorRegistryAutoConfiguration
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class AggFunc {

public static final int TYPE_TOPN = 8;
public static final int TYPE_AVG_MERGE = 9;
public static final int TYPE_HLL = 10;

private static final Map<String, Integer> TYPE_TO_INT = new HashMap<>();

Expand All @@ -44,6 +45,7 @@ public class AggFunc {

TYPE_TO_INT.put("TOPN", 8);
TYPE_TO_INT.put("AVG_MERGE", 9);
TYPE_TO_INT.put("HLL", 10);
}

private String type;
Expand Down Expand Up @@ -81,4 +83,5 @@ public static class LogSamplesMergeParams {
@Nullable
private String strategy = "ANY";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
@Data
public class CompletenessConfig {
private Mode mode = Mode.COMPLETENESS_INFO;
private Mode mode = Mode.NONE;

/**
* Used when mode is {@link Mode#DATA}
Expand All @@ -37,6 +37,7 @@ public class CompletenessConfig {
private List<String> keepTargetKeys;

public enum Mode {
NONE,
/**
* Build completeness info by info extracted from data itself
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.google.common.base.Preconditions;

import io.holoinsight.server.agg.v1.core.data.DataAccessor;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
Expand Down Expand Up @@ -77,4 +78,11 @@ public void fixDefaultValue() {
}
}
}

public void fillTagValuesFromDataAccessor(String[] values, DataAccessor da) {
Preconditions.checkArgument(values.length == items.size());
for (int i = 0; i < items.size(); i++) {
values[i] = items.get(i).get(da);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.google.common.base.Preconditions;

import io.holoinsight.server.agg.v1.core.data.DataAccessor;
import lombok.Data;

/**
Expand Down Expand Up @@ -52,6 +53,11 @@ public static GroupByItem of(String tag) {
return new GroupByItem(tag, DEFAULT_VALUE, tag);
}

public String get(DataAccessor da) {
String v = da.getTag(tag);
return StringUtils.isEmpty(v) ? defaultValue : v;
}

public String get(Map<String, String> tags) {
String v = tags.get(tag);
return StringUtils.isEmpty(v) ? defaultValue : v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -31,6 +32,16 @@ public final class AggTaskKey {
@EqualsAndHashCode.Exclude
private transient Map<String, String> partitionInfo;

public AggTaskKey(String tenant, String aggId) {
this(tenant, aggId, "");
}

public AggTaskKey(String tenant, String aggId, String partition) {
this.tenant = Objects.requireNonNull(tenant);
this.aggId = Objects.requireNonNull(aggId);
this.partition = Objects.requireNonNull(partition);
}

@Override
public String toString() {
return tenant + ":" + aggId + ":" + partition;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.core.data;

import java.util.Collection;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;

/**
* An abstraction of reading data
* <p>
* created at 2023/10/17
*
* @author xzchaoo
*/
public interface DataAccessor {
long getTimestamp();

Map<String, String> getTags();

default String getTagOrDefault(String name, String defaultValue) {
String v = getTag(name);
if (StringUtils.isEmpty(v)) {
return defaultValue;
}
return v;
}

String getTag(String name);

Collection<String> getFieldNames();

double getFloat64Field(String name);

String getStringField(String name);

void bindFieldName(String fieldName);

boolean isSingleValue();

@Deprecated
String getFieldName();

int getCount();

double getFloat64Field();

String getStringField();
}
Loading