OLD REPO → Moved to https://github.com/frischHWC/datagen
Goal of this project is to generate Random data in order to insert them into different services of CDP platform.
Sink = Output system were data will be written, such as HDFS, HIVE, HBASE etc…
Model = Representation of data model use to generate random data
List of possible sinks working:
-
HDFS-AVRO : generates an AVRO file sent to HDFS
-
HDFS-CSV : generates a CSV file sent to HDFS
-
HDFS-ORC : generates an ORC file sent to HDFS
-
HDFS-PARQUET : generates a PARQUET file sent to HDFS
-
HBase
-
Kafka: Generates an avro and send it to Kafka
-
Kudu
-
Ozone
-
SolR
-
Hive
-
Parquet: Generates a parquet file locally
-
ORC: Generates an ORC file locally
-
CSV: Generate a CSV file locally
-
Avro: Generates an Avro file locally
First, go to pom.xml and change cdp version to yours, change also if required, individual versions of each component.
Then Package the program:
mvn clean package
Use the script src/main/resources/launchToPlatform.sh.
(Adapt it to your needs by changing user & machine of the platform)
Launch the script with 4 arguments:
- Location of model file (by default one is pushed alongside the jar: model.json)
- Packet of rows (linked) generated before inserting them to external systems
- Number of times to generate this bunch of rows
- List of sinks separated by a space that must either be: HDFS-AVRO, HDFS-ORC, HDFS-PARQUET, HDFS-CSV, HBASE, HIVE, IMPALA, KAFKA, OZONE, SOLR, AVRO, ORC, PARQUET, CSV
java -jar random-datagen.jar <PATH_TO_MODEL> <NUMBER_OF_ROW_PER_ITERATION> <NUMBER_OF_ITERATION> <SINK>
Launching script example:
java -jar random-datagen.jar model.json 1000 1000 HBASE
It will create a directory on the platform and send all needed files there.
Then script src/main/resources/launch.sh will be launched on platform.
This script add required jars to classpath and launch jar with provided arguments in the script.
Note that once every needed files are copied to the platform, program could be directly launched there using launch.sh script under random-datagen directory, by providing arguments at the command line.
Launch with at least 4 parameters:
- Location of model file (by default one is defined under resources/model.json)
- Packet of rows (linked) generated before inserting them to external systems
- Number of times to generate this bunch of rows
- List of sinks separated by a space that must either be: HDFS, HBASE, HIVE, KAFKA, OZONE, SOLR, CSV
Data is generated according to a model passed in first argument of the launch.
Exmaples of models could be found under src/main/resources/models/
This model is divided into 4 sections:
This is an array describing all fields by at least a name and a type, length could be precised but is optional.
All available types are:
-
STRING is an alphaNumeric string (length represents length of string, by default 20 if not set)
-
STRINGAZ is an alpha non-numeric string (length represents length of string, by default 20 if not set)
-
INTEGER (with length representing maximum value, by default Integer.MAX_VALUE)
-
BOOLEAN
-
FLOAT
-
LONG
-
TIMESTAMP (is timestamp of data generation)
-
BYTES (length represents length of byte array, by default 20)
-
HASHMD5 is the hash of a random string (length represents size of byte array, by default 32)
-
BIRTHDATE is a date between 1910 & 2020
-
NAME is a first name taken from a dictionary of over 10,000+ names
-
COUNTRY is a country name taken from a dictionary
-
BLOB is a byte array of default 1MB (length represents length of byte array) (Use it carefully)
-
EMAIL is string as in form of (<name>.<name>|<AZ09><name>)@(gaagle.com|yahaa.com|uutlook.com|email.fr)
-
IP is a string representing an IP in form of Ipv4: 0-255.0-255.0-255.0-255
-
CITY is an object representing an existing city (name, lat, long, country) made from a dictionary of over 10,000+ cities, only the name is taken for this field
-
LINK is a string whose values is derived from another field, currently from a CITY or CSV field
-
CSV is an object taken from a given CSV file
Fields values could be also more "deterministic" by providing manually values, or providing values and give them a weight to choose repartition, or even create conditions based on other columns values.
Each field could have defined a set of "possible_values" that will limit values to be exactly these.
A weight (choose between 0 & 100 and whose sums of all weights must be 100) could also be defined to make it less random and make each value having a percentage of appearance. (Only String, Boolean, Integer and Long supports weight).
Conditions must be make on previous defined columns. Two types of condition:
-
Formula, possible for Float/Integer/Long Types must be compatible (int can be converted to long but not the invert). It is a simple expression evaluated with operators: * , + , - , / Output column must be of type STRING, Input columns (used to compute) must be INTEGER or LONG or FLOAT
Example:
"conditionals": {
"always": "2 * $very_low_int + 56 - $low_int"
}
Be careful of letting space in your expression to be parsable and evaluated.
-
Value depend on other column’s value, possible for Integer/Long/Float/String/Boolean (using these types) Support for && (= AND) and || (= OR). Conditions must be equals (=) or unequals (!=) or superior (>) or inferior (<). Multiple conditions is working on same line. Conditions are evaluated one by one like a "if … else if …", first one returning true is picked. Output column must be of type STRING, columns of input must be STRING or LONG or INTEGER or FLOAT
Example:
"conditionals": {
"$country_of_provenance=FRANCE" : "Paris",
"$country_of_provenance=GERMANY | $increment_int<40" : "Berlin",
"$country_of_provenance=GERMANY & $increment_int>40" : "Berlin_40"
}
N.B.: Multiple conditions are evaluated using precedence of AND over OR, meaning: A & B | C will in fact be evaluated like (A & B) | C
It is possible to define CITY for a field as its type, this is what happens under the hood:
-
A dictionary of 41,000 cities all around the world is loaded into memory
-
A filter could be applied to take only some cities from one or multiple countries
-
When a row is required, a small city object is constructed, taken randomly from in-memory loaded data, it consists of name, lattitude, longitude and country
It is possible to define a filter based on country for this field, by adding "filters": ["France", "Spain"]
in the definition of the field.
With this, only cities whose country is France or Spain will be loaded.
The field CITY will ONLY have the city name written as a value for the row.
It is possible to define LINK for a field as its type, it will be "linked" to a CITY field by defining conditionals on it.
This field will be a string type and will have its value taken from the previous city object created, by either being latitude, longitude or country.
The relationship between this field and the CITY field is defined like this:
"conditionals": {
"link": "$city.country"
}
where city here is the name of another field whose type is CITY.
It is possible to take data from a CSV file with a header, ";" as a separator and a line separator between each line.
File path must be specified using: "file": "/home/my_csv.csv"
This file is loaded into memory and filtered (if some filters are specified like this "filters": ["country=France"]
).
All fields from teh CSV will be treated as STRING types and a field name must be specified (like this "field": "name"
) to know which one should be set for this field.
Then, a LINK can be made from other fields to this one and hence get linked values.
A simple definition of a field looks like this:
{
"name": "name",
"type": "NAME"
}
A definition with restricted values:
{
"name": "credits_used",
"type": "INTEGER",
"possible_values": [0, 1, -1]
}
A definition with weighted values to not generate even spread data:
{
"name": "country_of_provenance",
"type": "STRING",
"possible_values_weighted": {
"France": 40,
"Germany": 60
}
}
A definition with minimum and maximum:
{
"name": "percentage",
"type": "INTEGER",
"min": 0,
"max": 100
}
A definition with a formula to evaluate value of the column:
{
"name": "percentage",
"type": "INTEGER",
"conditionals": {
"always": "2 * $very_low_int + 56 - $low_int"
}
}
A definition with some conditions (equalities and inequalities) to evaluate its value:
{
"name": "percentage",
"type": "INTEGER",
"conditionals": {
"$country_of_provenance=FRANCE" : "Paris",
"$country_of_provenance=GERMANY | $increment_int<40" : "Berlin",
"$country_of_provenance=GERMANY & $increment_int>40" : "Berlin_40"
}
}
A definition with one field which represent a CITY (filtered on either France or Spain) and other fields for its longitude, latitude and country:
{
"name": "city",
"type": "CITY",
"possible_values": ["France", "Spain"]
},
{
"name": "city_lat",
"type": "LINK",
"conditionals": {
"link": "$city.lat"
}
},
{
"name": "city_long",
"type": "LINK",
"conditionals": {
"link": "$city.long"
}
},
{
"name": "city_country",
"type": "LINK",
"conditionals": {
"link": "$city.country"
}
}
A definition with two fields taken from a given CSV file, this file is filtered on a column, and another field is taken as a linked to the first one:
{
"name": "person",
"type": "CSV",
"filters": ["country=France"],
"file": "/root/dictionnaries/person_test.csv",
"field": "name"
},
{
"name": "person_department",
"type": "LINK",
"conditionals": {
"link": "$person.department"
}
}
The CSV file looks like this:
name;department;country
francois;PS;France
kamel;SE;France
thomas;RH;Germany
sebastian;PS;Spain
An array of following properties self-describing:
-
HDFS_FILE_PATH
-
HDFS_FILE_NAME
-
HBASE_TABLE_NAME
-
HBASE_NAMESPACE
-
KAFKA_TOPIC
-
OZONE_VOLUME
-
SOLR_COLLECTION
-
HIVE_DATABASE
-
HIVE_HDFS_FILE_PATH
-
HIVE_TABLE_NAME
-
HIVE_TEMPORARY_TABLE_NAME
-
KUDU_TABLE_NAME
-
LOCAL_FILE_PATH
-
LOCAL_FILE_NAME
-
AVRO_NAME
An array of following properties, each of it associated with a value that is
corresponding to the name of field (multiple fields could be provided separated by a comma):
-
KAFKA_MSG_KEY
-
HBASE_PRIMARY_KEY
-
OZONE_BUCKET
-
OZONE_KEY
-
KUDU_PRIMARY_KEYS
-
KUDU_HASH_KEYS
-
KUDU_RANGE_KEYS
An array of other options to configure basic settings for some sinks:
-
HBASE_COLUMN_FAMILIES_MAPPING
This mapping must be in the form : "CF:col1,col2;CF2:col5" -
SOLR_SHARDS
-
SOLR_REPLICAS
-
KUDU_REPLICAS
-
ONE_FILE_PER_ITERATION
-
KAFKA_MESSAGE_TYPE
-
HIVE_THREAD_NUMBER
-
HIVE_ON_HDFS
-
HIVE_TEZ_QUEUE_NAME
-
CSV_HEADER
-
DELETE_PREVIOUS
-
PARQUET_PAGE_SIZE
-
PARQUET_ROW_GROUP_SIZE
-
PARQUET_DICTIONARY_PAGE_SIZE
-
PARQUET_DICTIONARY_ENCODING
-
KAFKA_ACKS_CONFIG
-
KAFKA_RETRIES_CONFIG
-
KUDU_BUCKETS
-
KUDU_BUFFER
-
KUDU_FLUSH
-
OZONE_REPLICATION_FACTOR
-
HDFS_REPLICATION_FACTOR
Note that all not required settings could be safely removed with no errors.
Note that to make it more efficient and faster, this program can be launched in parallel, and especially on yarn thanks to this project: https://github.infra.cloudera.com/frisch/yarnsubmit.
This project has intent to launch java programs on YARN containers, with as many instances as desired by the user, which is perfectly suited for this project.
The command used to launch the application with yarn-submit project was the following:
./yarn-submit.sh
--app-name=random
--container-number=10
--kerberos-user=frisch/[email protected]
--keytab=/home/frisch/frisch.keytab
--app-files=/home/frisch/random-datagen/model.json,/home/frisch/random-datagen/config.properties,/home/frisch/random-datagen/log4j.properties
/home/frisch/random-datagen/random-datagen.jar model.json 1000 100 hbase
There is a main that orchestrates the whole program: Main.java.
This main reads the command-line arguments to know which model to use, how much data it should generate per iteration , how much iteration to do, and to which output system it should write (HDFS, HBase, Hive etc..).
It parses the model file and create a model object based on it, this model contains a list of fields with their definition, also table names definitions, primary keys and other options passed through model.
It then uses the config file src/main/resources/config.properties to get configuration of output systems (that are called sink). It initiates sinks by creating needed files or tables for all required sinks.
Finally, it launches batches of generation of data, that are then send to all sinks in parallel.
-
Create a Sink under sink package that extends SinkInterface
-
Implements required functions (to send one and multiple rows to the output system) and all other needed function in this class
-
Add the sink in the function "stringToSink" of ArgumentsParser under config package
-
Add the sink initialization under the function "sinksInit" of SinkSender under sink package
-
Add a function to create required object for insertion under Field abstract class
-
If needed, add a specific function for some or all Fields extended class
-
Add a function to create required object combining all Fields functions under Row class
-
If needed, under Model class, create a function to create initial queries required
-
Add required properties under config.properties file
-
Create an extended class of field under package model.type
-
Create a builder in previous class, implement generateRandomValue() function
-
If needed, override Fields function specific to some or all sinks available
-
In Field, instantiateField() function, add in the switch case statement, the new type of field
-
In Model, modify functions on table creation to be able to integrate the new type of field