Skip to content

Commit

Permalink
Modify tablet usage (#358)
Browse files Browse the repository at this point in the history
* modify tablet usage

* modify tablet

* modify tablet

* modify tablet

* revert pom

* add ut

* modify exception message
  • Loading branch information
shuwenwei authored Jan 9, 2025
1 parent d05cc65 commit afd6ec4
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ private static void writeNonAlignedWithTablet(TsFileWriter tsFileWriter)
measurementSchemas.add(new MeasurementSchema(SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
measurementSchemas.add(new MeasurementSchema(SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
Tablet tablet = new Tablet(DEVICE_2, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
int rowNum = 100;
int sensorNum = measurementSchemas.size();
long timestamp = 1;
Expand All @@ -143,8 +141,7 @@ private static void writeNonAlignedWithTablet(TsFileWriter tsFileWriter)
int row = tablet.getRowSize();
tablet.addTimestamp(row, timestamp++);
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = value;
tablet.addValue(row, i, value);
}
// write
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ private static void writeWithTablet(
long startValue)
throws IOException, WriteProcessException {
Tablet tablet = new Tablet(deviceId, schemas);
long[] timestamps = tablet.timestamps;
long sensorNum = schemas.size();

for (long r = 0; r < rowNum; r++, startValue++) {
int row = tablet.getRowSize();
timestamps[row] = startTime++;
tablet.addTimestamp(row, startTime++);
for (int i = 0; i < sensorNum; i++) {
tablet.addValue(
schemas.get(i).getMeasurementName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected Field getNonNullField(int columnIndex) {
}

protected Field getField(int columnIndex) {
if (columnIndex > this.columnNameToColumnIndexMap.size()) {
if (columnIndex > this.columnNameToColumnIndexMap.size() || columnIndex <= 0) {
throw new IndexOutOfBoundsException("column index " + columnIndex + " out of bound");
}
Field field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ public static void writeWithTablet(
boolean isAligned)
throws IOException, WriteProcessException {
Tablet tablet = new Tablet(deviceId, schemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
long sensorNum = schemas.size();

for (long r = 0; r < rowNum; r++, startValue++) {
int row = tablet.getRowSize();
tablet.addTimestamp(row, startTime++);
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = startValue;
tablet.addValue(row, i, startValue);
}
// write
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
// TODO: changing to a column-first style by calculating the remaining page space of each
// column firsts
for (int row = startRowIndex; row < endRowIndex; row++) {
long time = tablet.timestamps[row];
long time = tablet.getTimestamps()[row];
checkIsHistoryData(time);
for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); columnIndex++) {
if (tablet.getColumnTypes() != null
Expand All @@ -242,42 +242,43 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
}

boolean isNull =
tablet.bitMaps != null
&& tablet.bitMaps[columnIndex] != null
&& tablet.bitMaps[columnIndex].isMarked(row);
tablet.getBitMaps() != null
&& tablet.getBitMaps()[columnIndex] != null
&& tablet.getBitMaps()[columnIndex].isMarked(row);
// check isNull by bitMap in tablet
ValueChunkWriter valueChunkWriter =
tryToAddSeriesWriterInternal(measurementSchemas.get(columnIndex));
switch (measurementSchemas.get(columnIndex).getType()) {
case BOOLEAN:
valueChunkWriter.write(time, ((boolean[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(
time, ((boolean[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case INT32:
valueChunkWriter.write(time, ((int[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((int[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case DATE:
valueChunkWriter.write(
time,
isNull
? 0
: DateUtils.parseDateExpressionToInt(
((LocalDate[]) tablet.values[columnIndex])[row]),
((LocalDate[]) tablet.getValues()[columnIndex])[row]),
isNull);
break;
case INT64:
case TIMESTAMP:
valueChunkWriter.write(time, ((long[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((long[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case FLOAT:
valueChunkWriter.write(time, ((float[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((float[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case DOUBLE:
valueChunkWriter.write(time, ((double[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((double[]) tablet.getValues()[columnIndex])[row], isNull);
break;
case TEXT:
case BLOB:
case STRING:
valueChunkWriter.write(time, ((Binary[]) tablet.values[columnIndex])[row], isNull);
valueChunkWriter.write(time, ((Binary[]) tablet.getValues()[columnIndex])[row], isNull);
break;
default:
throw new UnSupportedDataTypeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,42 +121,51 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
pointCount = 0;
for (int row = startRowIndex; row < endRowIndex; row++) {
// check isNull in tablet
if (tablet.bitMaps != null
&& tablet.bitMaps[column] != null
&& tablet.bitMaps[column].isMarked(row)) {
if (tablet.getBitMaps() != null
&& tablet.getBitMaps()[column] != null
&& tablet.getBitMaps()[column].isMarked(row)) {
continue;
}
long time = tablet.timestamps[row];
long time = tablet.getTimestamps()[row];
checkIsHistoryData(measurementId, time);
pointCount++;
switch (tsDataType) {
case INT32:
chunkWriters.get(measurementId).write(time, ((int[]) tablet.values[column])[row]);
chunkWriters.get(measurementId).write(time, ((int[]) tablet.getValues()[column])[row]);
break;
case DATE:
chunkWriters
.get(measurementId)
.write(
time,
DateUtils.parseDateExpressionToInt(((LocalDate[]) tablet.values[column])[row]));
DateUtils.parseDateExpressionToInt(
((LocalDate[]) tablet.getValues()[column])[row]));
break;
case INT64:
case TIMESTAMP:
chunkWriters.get(measurementId).write(time, ((long[]) tablet.values[column])[row]);
chunkWriters.get(measurementId).write(time, ((long[]) tablet.getValues()[column])[row]);
break;
case FLOAT:
chunkWriters.get(measurementId).write(time, ((float[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((float[]) tablet.getValues()[column])[row]);
break;
case DOUBLE:
chunkWriters.get(measurementId).write(time, ((double[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((double[]) tablet.getValues()[column])[row]);
break;
case BOOLEAN:
chunkWriters.get(measurementId).write(time, ((boolean[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((boolean[]) tablet.getValues()[column])[row]);
break;
case TEXT:
case BLOB:
case STRING:
chunkWriters.get(measurementId).write(time, ((Binary[]) tablet.values[column])[row]);
chunkWriters
.get(measurementId)
.write(time, ((Binary[]) tablet.getValues()[column])[row]);
break;
default:
throw new UnSupportedDataTypeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,11 @@ public class Tablet {
/** MeasurementId->indexOf({@link MeasurementSchema}) */
private final Map<String, Integer> measurementIndex;

/** Timestamps in this {@link Tablet} */
public long[] timestamps;
private long[] timestamps;

/** Each object is a primitive type array, which represents values of one measurement */
public Object[] values;
private Object[] values;

/** Each {@link BitMap} represents the existence of each value in the current column. */
public BitMap[] bitMaps;
private BitMap[] bitMaps;

/**
* For compatibility with the usage of directly modifying Tablet content through public fields.
Expand Down Expand Up @@ -292,6 +289,7 @@ public void initBitMaps() {
public void addTimestamp(int rowIndex, long timestamp) {
timestamps[rowIndex] = timestamp;
this.rowSize = Math.max(this.rowSize, rowIndex + 1);
initBitMapsWithApiUsage();
}

public void addValue(final String measurementId, final int rowIndex, final Object value) {
Expand Down Expand Up @@ -413,6 +411,10 @@ public void addValue(int rowIndex, String measurement, int val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, int val) {
if (!(values[columnIndex] instanceof int[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not INT32");
}
final int[] sensor = (int[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -426,6 +428,10 @@ public void addValue(int rowIndex, String measurement, long val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, long val) {
if (!(values[columnIndex] instanceof long[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not INT64/TIMESTAMP");
}
final long[] sensor = (long[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -439,6 +445,10 @@ public void addValue(int rowIndex, String measurement, float val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, float val) {
if (!(values[columnIndex] instanceof float[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not FLOAT");
}
final float[] sensor = (float[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -452,6 +462,10 @@ public void addValue(int rowIndex, String measurement, double val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, double val) {
if (!(values[columnIndex] instanceof double[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not DOUBLE");
}
final double[] sensor = (double[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -465,6 +479,10 @@ public void addValue(int rowIndex, String measurement, boolean val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, boolean val) {
if (!(values[columnIndex] instanceof boolean[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not BOOLEAN");
}
final boolean[] sensor = (boolean[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -478,6 +496,10 @@ public void addValue(int rowIndex, String measurement, String val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, String val) {
if (!(values[columnIndex] instanceof Binary[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not TEXT/STRING/BLOB");
}
final Binary[] sensor = (Binary[]) values[columnIndex];
sensor[rowIndex] = new Binary(val, TSFileConfig.STRING_CHARSET);
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -491,6 +513,10 @@ public void addValue(int rowIndex, String measurement, byte[] val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, byte[] val) {
if (!(values[columnIndex] instanceof Binary[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not TEXT/STRING/BLOB");
}
final Binary[] sensor = (Binary[]) values[columnIndex];
sensor[rowIndex] = new Binary(val);
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -504,6 +530,10 @@ public void addValue(int rowIndex, String measurement, LocalDate val) {

@TsFileApi
public void addValue(int rowIndex, int columnIndex, LocalDate val) {
if (!(values[columnIndex] instanceof LocalDate[])) {
throw new IllegalArgumentException(
"The data type of column index " + columnIndex + " is not DATE");
}
final LocalDate[] sensor = (LocalDate[]) values[columnIndex];
sensor[rowIndex] = val;
updateBitMap(rowIndex, columnIndex, false);
Expand All @@ -521,6 +551,15 @@ private int getColumnIndexByMeasurement(String measurement) {
}

private void updateBitMap(int rowIndex, int columnIndex, boolean mark) {
initBitMapsWithApiUsage();
if (mark) {
bitMaps[columnIndex].mark(rowIndex);
} else {
bitMaps[columnIndex].unmark(rowIndex);
}
}

private void initBitMapsWithApiUsage() {
if (bitMaps == null) {
initBitMaps();
}
Expand All @@ -530,11 +569,6 @@ private void updateBitMap(int rowIndex, int columnIndex, boolean mark) {
bitMap.markAll();
}
}
if (mark) {
bitMaps[columnIndex].mark(rowIndex);
} else {
bitMaps[columnIndex].unmark(rowIndex);
}
}

public List<IMeasurementSchema> getSchemas() {
Expand Down Expand Up @@ -1195,6 +1229,34 @@ public void setRowSize(int rowSize) {
this.rowSize = rowSize;
}

public long getTimestamp(int i) {
return timestamps[i];
}

public long[] getTimestamps() {
return timestamps;
}

public void setTimestamps(long[] timestamps) {
this.timestamps = timestamps;
}

public Object[] getValues() {
return values;
}

public void setValues(Object[] values) {
this.values = values;
}

public BitMap[] getBitMaps() {
return bitMaps;
}

public void setBitMaps(BitMap[] bitMaps) {
this.bitMaps = bitMaps;
}

public enum ColumnCategory {
TAG,
FIELD,
Expand Down Expand Up @@ -1247,4 +1309,13 @@ public void setTableName(String tableName) {
public List<ColumnCategory> getColumnTypes() {
return columnCategories;
}

public boolean isSorted() {
for (int i = 1; i < rowSize; i++) {
if (timestamps[i] < timestamps[i - 1]) {
return false;
}
}
return true;
}
}
Loading

0 comments on commit afd6ec4

Please sign in to comment.