Skip to content

Commit

Permalink
Merge pull request #14 from frisch/cdp-demo
Browse files Browse the repository at this point in the history
Cdp demo
  • Loading branch information
frischHWC authored and GitHub Enterprise committed Nov 25, 2021
2 parents 8362782 + be0bd18 commit eb7f197
Show file tree
Hide file tree
Showing 35 changed files with 1,382 additions and 145 deletions.
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ target/
*.log
src/main/resources/overall_launch.sh
src/main/resources/create_release.sh
src/main/resources/config.properties
src/main/resources/config.properties
.factorypath
.classpath
.settings
.project
config.properties
90 changes: 89 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,91 @@ All available types are:
- BLOB which is a byte array of default 1MB (length represents length of byte array) (Use it carefully) +
- EMAIL which is a string as in form of (<name>.<name>|<AZ09><name>)@(google|outlook|yahoo|mail).com +

Note that each field could have defined a set of "possible_values" that will limit values to be exactly these ones.
Fields values could be also more "deterministic" by providing manually values, or providing values and give them a weight to choose repartition,
or even create conditions based on other columns values.

===== Possible values

Each field could have defined a set of "possible_values" that will limit values to be exactly these.

===== Possible values weighted

A weight (choose between 0 & 100 and whose sums of all weights must be 100) could also be defined
to make it less random and make each value having a percentage of appearance. (Only String, Boolean, Integer and Long supports weight).

===== Minimum & Maximum

It is possible for INTEGER and LONG type to define a minimum and a maximum.

===== Conditionals

Conditions must be make on previous defined columns.
Two types of condition:

1. Formula, possible for Float/Integer/Long
Types must be compatible (int can be converted to long but not the invert).
It is a simple expression evaluated with operators: * , + , - , /
Output column must be of type STRING, Input columns (used to compute) must be INTEGER or LONG or FLOAT

Example:
"conditionals": {
"always": "2 * $very_low_int + 56 - $low_int"
}

Be careful of letting space in your expression to be parseable and evaluated.

1. Value depend on other column's value, possible for Integer/Long/Float/String/Boolean (using these types)
Support for && (= AND) and || (= OR).
Conditions must be equals (=) or unequals (!=) or superior (>) or inferior (<).
Multiple conditions is working on same line.
Conditions are evaluated one by one like a "if ... else if ...", first one returning true is picked.
Output column must be of type STRING, columns of input must be STRING or LONG or INTEGER or FLOAT

Example:
"conditionals": {
"$country_of_provenance=FRANCE" : "Paris",
"$country_of_provenance=GERMANY | $increment_int<40" : "Berlin",
"$country_of_provenance=GERMANY & $increment_int>40" : "Berlin_40"
}

_N.B.: Multiple conditions are evaluated using precedence of AND over OR, meaning: A & B | C will in fact be evaluated like (A & B) | C_

===== Examples

A simple definition of a field looks like this:

{
"name": "name",
"type": "NAME"
}

A definition with restricted values:

{
"name": "credits_used",
"type": "INTEGER",
"possible_values": [0, 1, -1]
}

A definition with weighted values to not generate even spread data:

{
"name": "country_of_provenance",
"type": "STRING",
"possible_values_weighted": {
"France": 40,
"Germany": 60
}
}

A definition with minimum and maximum:

{
"name": "percentage",
"type": "INTEGER",
"min": 0,
"max": 100
}

==== Table Names:

Expand All @@ -126,6 +210,9 @@ corresponding to the name of field (multiple fields could be provided separated
- HBASE_PRIMARY_KEY +
- OZONE_BUCKET +
- OZONE_KEY +
- KUDU_PRIMARY_KEYS +
- KUDU_HASH_KEYS +
- KUDU_RANGE_KEYS

==== Options:

Expand All @@ -134,6 +221,7 @@ An array of other options that could be required depending with which sinks it i
This mapping must be in the form : "CF:col1,col2;CF2:col5" +
- SOLR_SHARDS +
- SOLR_REPLICAS +
- KUDU_REPLICAS


Note that all not required settings could be safely removed with no errors.
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/cloudera/frisch/randomdatagen/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ public static void main(String [] args) {
logger.info("Finished to process batch " + (i+1) + "/" + numberOfBatches + " of " + rowsPerBatch + " rows");
}

logger.info("All sinks will be closed");
// Terminate all sinks
sinks.forEach(SinkInterface::terminate);

// Recap of what has been generated
Utils.recap(numberOfBatches, rowsPerBatch, (List<ArgumentsParser.sinks>) ArgumentsParser.getArgsMap().get(ArgumentsParser.args.SINK_TO_FILL), model);

logger.info("Application Finished");
logger.info("Application took : " + (System.currentTimeMillis()-start) + " ms to run");

}

}
19 changes: 0 additions & 19 deletions src/main/java/com/cloudera/frisch/randomdatagen/ShutdownHook.java

This file was deleted.

162 changes: 162 additions & 0 deletions src/main/java/com/cloudera/frisch/randomdatagen/Utils.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package com.cloudera.frisch.randomdatagen;


import com.cloudera.frisch.randomdatagen.config.ArgumentsParser;
import com.cloudera.frisch.randomdatagen.config.PropertiesLoader;
import com.cloudera.frisch.randomdatagen.model.Model;
import com.cloudera.frisch.randomdatagen.model.OptionsConverter;
import com.cloudera.frisch.randomdatagen.sink.*;
import org.apache.avro.reflect.MapEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

import javax.security.auth.Subject;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;


Expand Down Expand Up @@ -85,6 +94,23 @@ public static String getAlphaString(int n, Random random)
}


/**
* Using map of possible values weighted (between 0 and 100), it gives possible value
* @param random
* @param weights
* @return
*/
public static String getRandomValueWithWeights(Random random, LinkedHashMap<String, Integer> weights) {
int randomIntPercentage = random.nextInt(100);
int sumOfWeight = 0;
for(Map.Entry<String, Integer> entry : weights.entrySet()) {
sumOfWeight = sumOfWeight + entry.getValue();
if(randomIntPercentage < sumOfWeight) {
return entry.getKey();
}
}
return "";
}

/**
* Login to kerberos using a given user and its associated keytab
Expand Down Expand Up @@ -184,5 +210,141 @@ public static void testClasspath() {
}
}

/**
* Log in the recap of what's been generated
*/
public static void recap(long numberOfBatches, long rowPerBatch, List<ArgumentsParser.sinks> sinks, Model model) {
logger.info(" ************************* Recap of data generation ****************** ");
logger.info("Generated " + rowPerBatch*numberOfBatches + " rows into : ");

sinks.forEach(sink -> {
switch (sink) {
case HDFSCSV:
logger.info(" - HDFS as CSV files of " + rowPerBatch + " rows, from : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-0000000000.csv");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".csv");
break;
case HDFSAVRO:
logger.info(" - HDFS as Avro files of " + rowPerBatch + " rows, from : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-0000000000.avro");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".avro");
break;
case HDFSORC:
logger.info(" - HDFS as ORC files of " + rowPerBatch + " rows, from : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-0000000000.orc");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".orc");
break;
case HDFSPARQUET:
logger.info(" - HDFS as Parquet files of " + rowPerBatch + " rows, from : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-0000000000.parquet");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.HDFS_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".parquet");
break;
case HBASE:
logger.info(" - HBase in namespace " + model.getTableNames().get(OptionsConverter.TableNames.HBASE_NAMESPACE) +
" in table : " + model.getTableNames().get(OptionsConverter.TableNames.HBASE_TABLE_NAME));
break;
case HIVE:
logger.info(" - Hive in database: " + model.getTableNames().get(OptionsConverter.TableNames.HIVE_DATABASE) +
" in table : " + model.getTableNames().get(OptionsConverter.TableNames.HIVE_TABLE_NAME));
if(Boolean.parseBoolean(PropertiesLoader.getProperty("hive.on.hdfs"))) {
String tableNameTemporary = model.getTableNames().get(OptionsConverter.TableNames.HIVE_TEMPORARY_TABLE_NAME)==null ?
(String) model.getTableNames().get(OptionsConverter.TableNames.HIVE_TABLE_NAME) + "_tmp" :
(String) model.getTableNames().get(OptionsConverter.TableNames.HIVE_TEMPORARY_TABLE_NAME);
logger.info(" - Hive in database: " + model.getTableNames().get(OptionsConverter.TableNames.HIVE_DATABASE) +
" in external table : " + tableNameTemporary + " located in HDFS at: " +
model.getTableNames().get(OptionsConverter.TableNames.HIVE_HDFS_FILE_PATH));
}
break;
case OZONE:
logger.info(" - Ozone in volume " + model.getTableNames().get(OptionsConverter.TableNames.OZONE_VOLUME));
break;
case SOLR:
logger.info(" - SolR in collection " + model.getTableNames().get(OptionsConverter.TableNames.SOLR_COLLECTION));
break;
case KAFKA:
logger.info(" - Kafka in topic " + model.getTableNames().get(OptionsConverter.TableNames.KAFKA_TOPIC));
break;
case KUDU:
logger.info(" - Kudu in table " + model.getTableNames().get(OptionsConverter.TableNames.KUDU_TABLE_NAME));
break;
case CSV:
logger.info(" - CSV files of " + rowPerBatch + " rows, from : " );
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-0000000000.csv");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".csv");
break;
case AVRO:
logger.info(" - Avro files of " + rowPerBatch + " rows, from : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-0000000000.avro");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".avro");
break;
case PARQUET:
logger.info(" - Parquet files of " + rowPerBatch + " rows, from : " );
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-0000000000.parquet");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".parquet");
break;
case ORC:
logger.info(" - ORC files of " + rowPerBatch + " rows, from : " );
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-0000000000.orc");
logger.info(" to : ");
logger.info(" " +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_PATH) +
model.getTableNames().get(OptionsConverter.TableNames.LOCAL_FILE_NAME) + "-" +
String.format("%010d", numberOfBatches-1) + ".orc");
break;
default:
logger.info("The sink " + sink.toString() +
" provided has not been recognized as an expected sink");
break;
}

});

logger.info("****************************************************************");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class PropertiesLoader {
private static Properties loadProperties() {
// Load config file
java.util.Properties properties = new java.util.Properties();

try {
FileInputStream fileInputStream = new FileInputStream("config.properties");
properties.load(fileInputStream);
Expand Down
Loading

0 comments on commit eb7f197

Please sign in to comment.