diff --git a/tests/scripts/helpers/kruize.py b/tests/scripts/helpers/kruize.py index 0c7c4f7d0..e94f92c22 100644 --- a/tests/scripts/helpers/kruize.py +++ b/tests/scripts/helpers/kruize.py @@ -464,31 +464,64 @@ def generate_recommendations(experiment_name): print("\n************************************************************") return response -def post_bulk_api(input_json_file): - print("\n************************************************************") - print("Sending POST request to URL: ", f"{URL}/bulk") - print("Request Payload: ", input_json_file) +def post_bulk_api(input_json_file, logger=None): + if logger: + logger.info("\n************************************************************") + else: + print("\n************************************************************") + + if logger: + logger.info(f"Sending POST request to URL: {URL}/bulk") + logger.info(f"Request Payload: {input_json_file}") + else: + print("Sending POST request to URL: ", f"{URL}/bulk") + print("Request Payload: ", input_json_file) + curl_command = f"curl -X POST {URL}/bulk -H 'Content-Type: application/json' -d '{json.dumps(input_json_file)}'" - print("Equivalent cURL command: ", curl_command) + if logger: + logger.info(f"Equivalent cURL command: {curl_command}") + else: + print("Equivalent cURL command: ", curl_command) # Send the POST request response = requests.post(f"{URL}/bulk", json=input_json_file) - print("Response Status Code: ", response.status_code) - print("Response JSON: ", response.json()) + + if logger: + logger.info(f"Response Status Code: {response.status_code}") + logger.info(f"Response JSON: {response.json()}") + else: + print("Response Status Code: ", response.status_code) + print("Response JSON: ", response.json()) return response -def get_bulk_job_status(job_id,verbose=False): - print("\n************************************************************") +def get_bulk_job_status(job_id,verbose=False,logger=None): + if logger: + logger.info("\n************************************************************") + else: + print("\n************************************************************") url_basic = f"{URL}/bulk?job_id={job_id}" url_verbose = f"{URL}/bulk?job_id={job_id}&verbose={verbose}" getJobIDURL = url_basic if verbose: getJobIDURL = url_verbose - print("Sending GET request to URL ( verbose=",verbose," ): ", getJobIDURL) + + if logger: + logger.info(f"Sending GET request to URL ( verbose={verbose} ): {getJobIDURL}") + else: + print("Sending GET request to URL ( verbose=",verbose," ): ", getJobIDURL) + curl_command_verbose = f"curl -X GET '{getJobIDURL}'" - print("Equivalent cURL command : ", curl_command_verbose) + + if logger: + logger.info(f"Equivalent cURL command : {curl_command_verbose}") + else: + print("Equivalent cURL command : ", curl_command_verbose) response = requests.get(url_verbose) - print("Verbose GET Response Status Code: ", response.status_code) - print("Verbose GET Response JSON: ", response.json()) + if logger: + logger.info(f"Verbose GET Response Status Code: {response.status_code}") + logger.info(f"Verbose GET Response JSON: {response.json()}") + else: + print("Verbose GET Response Status Code: ", response.status_code) + print("Verbose GET Response JSON: ", response.json()) return response diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py index 135afe138..67f74eca6 100644 --- a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py @@ -52,21 +52,16 @@ def invoke_bulk_with_time_range_labels(worker_number, resultsdir, bulk_json, del log_id = str(worker_number) + "-" + org_id + "-" + cluster_id - print(log_id) - log_file = f"{scale_log_dir}/worker_{log_id}.log" - print(f"log_file - {log_file}") + logger = setup_logger(f"logger_{log_id}", log_file) - print("logger created") logger.info(f"log id = {log_id}") logger.info(f"worker number = {worker_number}") # Invoke the bulk service logger.info("Invoking bulk service with bulk json") - bulk_response = post_bulk_api(bulk_json) - - logger.info(bulk_json) + bulk_response = post_bulk_api(bulk_json, logger) # Obtain the job id from the response from bulk service job_id_json = bulk_response.json() @@ -76,14 +71,14 @@ def invoke_bulk_with_time_range_labels(worker_number, resultsdir, bulk_json, del # Get the bulk job status using the job id verbose = "true" - bulk_job_response = get_bulk_job_status(job_id, verbose) + bulk_job_response = get_bulk_job_status(job_id, verbose, logger) job_status_json = bulk_job_response.json() # Loop until job status is COMPLETED job_status = job_status_json['status'] - print(job_status) + while job_status != "COMPLETED": - bulk_job_response = get_bulk_job_status(job_id, verbose) + bulk_job_response = get_bulk_job_status(job_id, verbose, logger) job_status_json = bulk_job_response.json() job_status = job_status_json['status'] if job_status == "FAILED": @@ -167,8 +162,13 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int print(f"num_tsdb_blocks - {num_tsdb_blocks}") current_end_time = initial_end_time - - for k in range(1, num_tsdb_blocks): + # Update time range in the bulk input json + bulk_json_file = "../json_files/bulk_input_timerange.json" + + json_file = open(bulk_json_file, "r") + bulk_input_json = json.loads(json_file.read()) + + for k in range(1, num_tsdb_blocks + 1): current_start_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta( hours=interval_hours) @@ -178,8 +178,8 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') - for org_id in range(1, org_ids): - for cluster_id in range(1, cluster_ids): + for org_id in range(1, org_ids + 1): + for cluster_id in range(1, cluster_ids + 1): org_value = "org-" + str(org_id) cluster_value = "eu-" + str(org_id) + "-" + str(cluster_id) @@ -188,14 +188,7 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int "cluster_id": cluster_value } - # Update time range in the bulk input json - bulk_json_file = "../json_files/bulk_input_timerange.json" - - json_file = open(bulk_json_file, "r") - bulk_json = json.loads(json_file.read()) - - print(f"bulk current start time - {current_start_time}") - print(f"bulk current end time - {current_end_time}") + bulk_json = bulk_input_json bulk_json['time_range']['start'] = current_start_time bulk_json['time_range']['end'] = current_end_time bulk_json['filter']['include']['labels'].update(new_labels) @@ -271,6 +264,11 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int # List datasources datasource_name = None list_response = list_datasources(datasource_name) + list_response_json = list_response.json() + + if list_response_json['datasources'][0]['name'] != "thanos": + print("Failed! Thanos datasource is not registered with Kruize!") + sys.exit(1) start_time = time.time() print(f"initial_end_date to parallel requests - {initial_end_date}") diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh index b59584336..1386099ce 100755 --- a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh @@ -40,6 +40,7 @@ prometheus_ds=0 replicas=3 ds_url="http://thanos-query-frontend.thanos-bench.svc.cluster.local:9090/" +#ds_url="http://thanos-query-frontend-example-query-thanos-operator-system.apps.kruize-scalelab.h0b5.p1.openshiftapps.com" target="crc" KRUIZE_IMAGE="quay.io/kruize/autotune:mvp_demo" @@ -218,3 +219,12 @@ end_time=$(get_date) elapsed_time=$(time_diff "${start_time}" "${end_time}") echo "" echo "Test took ${elapsed_time} seconds to complete" | tee -a ${LOG} + +if [[ $(grep -i "error\|exception" ${KRUIZE_SERVICE_LOG}) ]]; then + echo "Bulk scale test failed! Check the logs for details" | tee -a ${LOG} + exit 1 +else + echo "Bulk scale test completed! Check the logs for details" | tee -a ${LOG} + exit 0 +fi + diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test_parallel.py b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test_parallel.py index 49ff9a390..a175686bd 100644 --- a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test_parallel.py +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test_parallel.py @@ -66,39 +66,21 @@ def invoke_bulk_with_time_range_labels(resultsdir, chunk, current_start_time, cu try: for org_id, cluster_id in chunk: #time.sleep(delay) - print(f"In bulk for - {org_id} {cluster_id}") + scale_log_dir = resultsdir + "/scale_logs" os.makedirs(scale_log_dir, exist_ok=True) - #bulk_json = update_bulk_config(org_id, cluster_id, current_start_time, current_end_time) - org_value = "org-" + str(org_id) - cluster_value = "eu-" + str(org_id) + "-" + str(cluster_id) - - new_labels = { - "org_id": org_value, - "cluster_id": cluster_value - } - - # Update time range in the bulk input json - bulk_json_file = "../json_files/bulk_input_timerange.json" - - json_file = open(bulk_json_file, "r") - bulk_json = json.loads(json_file.read()) - - bulk_json['time_range']['start'] = current_start_time - bulk_json['time_range']['end'] = current_end_time - bulk_json['filter']['include']['labels'].update(new_labels) + bulk_json = update_bulk_config(org_id, cluster_id, current_start_time, current_end_time) log_id = str(org_id) + "-" + str(cluster_id) log_file = f"{scale_log_dir}/worker_{log_id}.log" logger = setup_logger(f"logger_{log_id}", log_file) - logger.info(f"log id = {log_id}") # Invoke the bulk service logger.info("Invoking bulk service with bulk json") logger.info(bulk_json) - bulk_response = post_bulk_api(bulk_json) + bulk_response = post_bulk_api(bulk_json, logger) # Obtain the job id from the response from bulk service job_id_json = bulk_response.json() @@ -108,14 +90,14 @@ def invoke_bulk_with_time_range_labels(resultsdir, chunk, current_start_time, cu # Get the bulk job status using the job id verbose = "true" - bulk_job_response = get_bulk_job_status(job_id, verbose) + bulk_job_response = get_bulk_job_status(job_id, verbose, logger) job_status_json = bulk_job_response.json() # Loop until job status is COMPLETED job_status = job_status_json['status'] - print(job_status) + while job_status != "COMPLETED": - bulk_job_response = get_bulk_job_status(job_id, verbose) + bulk_job_response = get_bulk_job_status(job_id, verbose, logger) job_status_json = bulk_job_response.json() job_status = job_status_json['status'] if job_status == "FAILED": @@ -202,15 +184,15 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int current_end_time = initial_end_time - for k in range(1, num_tsdb_blocks): + for k in range(1, num_tsdb_blocks + 1): current_start_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta( hours=interval_hours) current_end_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - print(current_end_time) - print(current_start_time) + current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') + print(f"Invoking bulk service with the tsdb time range {current_start_time} and {current_end_time}") # Create all tasks tasks = [(org_id, cluster_id) for org_id in range(1, org_ids + 1) for cluster_id in range(1, cluster_ids + 1)] @@ -226,7 +208,7 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int for future in as_completed(futures): try: chunk = future.result() - results.extend(chunk_results) + results.append(chunk_results) except Exception as e: print(f"Error processing chunk: {e}") @@ -292,13 +274,20 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int # List datasources datasource_name = None list_response = list_datasources(datasource_name) + list_response_json = list_response.json() + + if list_response_json['datasources'][0]['name'] != "thanos": + print("Failed! Thanos datasource is not registered with Kruize!") + sys.exit(1) start_time = time.time() - print(f"initial_end_date to parallel requests - {initial_end_date}") responses = parallel_requests_with_labels(max_workers, results_dir, initial_end_date, interval_hours, days_of_res, org_ids, cluster_ids, chunk_size, rampup_interval_seconds) # Print the results + print("\n*************************************************") + print(responses) + print("\n*************************************************") for i, response in enumerate(responses): print(f"Response {i+1}: {json.dumps(response, indent=2)}")