Skip to content

Commit

Permalink
Updated logging information
Browse files Browse the repository at this point in the history
Signed-off-by: Chandrakala Subramanyam <[email protected]>
  • Loading branch information
chandrams committed Dec 13, 2024
1 parent 4fd462c commit 7a80dc4
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 64 deletions.
59 changes: 46 additions & 13 deletions tests/scripts/helpers/kruize.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,31 +464,64 @@ def generate_recommendations(experiment_name):
print("\n************************************************************")
return response

def post_bulk_api(input_json_file):
print("\n************************************************************")
print("Sending POST request to URL: ", f"{URL}/bulk")
print("Request Payload: ", input_json_file)
def post_bulk_api(input_json_file, logger=None):
if logger:
logger.info("\n************************************************************")
else:
print("\n************************************************************")

if logger:
logger.info(f"Sending POST request to URL: {URL}/bulk")
logger.info(f"Request Payload: {input_json_file}")
else:
print("Sending POST request to URL: ", f"{URL}/bulk")
print("Request Payload: ", input_json_file)

curl_command = f"curl -X POST {URL}/bulk -H 'Content-Type: application/json' -d '{json.dumps(input_json_file)}'"
print("Equivalent cURL command: ", curl_command)
if logger:
logger.info(f"Equivalent cURL command: {curl_command}")
else:
print("Equivalent cURL command: ", curl_command)

# Send the POST request
response = requests.post(f"{URL}/bulk", json=input_json_file)
print("Response Status Code: ", response.status_code)
print("Response JSON: ", response.json())

if logger:
logger.info(f"Response Status Code: {response.status_code}")
logger.info(f"Response JSON: {response.json()}")
else:
print("Response Status Code: ", response.status_code)
print("Response JSON: ", response.json())
return response

def get_bulk_job_status(job_id,verbose=False):
print("\n************************************************************")
def get_bulk_job_status(job_id,verbose=False,logger=None):
if logger:
logger.info("\n************************************************************")
else:
print("\n************************************************************")
url_basic = f"{URL}/bulk?job_id={job_id}"
url_verbose = f"{URL}/bulk?job_id={job_id}&verbose={verbose}"
getJobIDURL = url_basic
if verbose:
getJobIDURL = url_verbose
print("Sending GET request to URL ( verbose=",verbose," ): ", getJobIDURL)

if logger:
logger.info(f"Sending GET request to URL ( verbose={verbose} ): {getJobIDURL}")
else:
print("Sending GET request to URL ( verbose=",verbose," ): ", getJobIDURL)

curl_command_verbose = f"curl -X GET '{getJobIDURL}'"
print("Equivalent cURL command : ", curl_command_verbose)

if logger:
logger.info(f"Equivalent cURL command : {curl_command_verbose}")
else:
print("Equivalent cURL command : ", curl_command_verbose)
response = requests.get(url_verbose)

print("Verbose GET Response Status Code: ", response.status_code)
print("Verbose GET Response JSON: ", response.json())
if logger:
logger.info(f"Verbose GET Response Status Code: {response.status_code}")
logger.info(f"Verbose GET Response JSON: {response.json()}")
else:
print("Verbose GET Response Status Code: ", response.status_code)
print("Verbose GET Response JSON: ", response.json())
return response
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,16 @@ def invoke_bulk_with_time_range_labels(worker_number, resultsdir, bulk_json, del

log_id = str(worker_number) + "-" + org_id + "-" + cluster_id

print(log_id)

log_file = f"{scale_log_dir}/worker_{log_id}.log"
print(f"log_file - {log_file}")

logger = setup_logger(f"logger_{log_id}", log_file)
print("logger created")
logger.info(f"log id = {log_id}")

logger.info(f"worker number = {worker_number}")

# Invoke the bulk service
logger.info("Invoking bulk service with bulk json")
bulk_response = post_bulk_api(bulk_json)

logger.info(bulk_json)
bulk_response = post_bulk_api(bulk_json, logger)

# Obtain the job id from the response from bulk service
job_id_json = bulk_response.json()
Expand All @@ -76,14 +71,14 @@ def invoke_bulk_with_time_range_labels(worker_number, resultsdir, bulk_json, del

# Get the bulk job status using the job id
verbose = "true"
bulk_job_response = get_bulk_job_status(job_id, verbose)
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()

# Loop until job status is COMPLETED
job_status = job_status_json['status']
print(job_status)

while job_status != "COMPLETED":
bulk_job_response = get_bulk_job_status(job_id, verbose)
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()
job_status = job_status_json['status']
if job_status == "FAILED":
Expand Down Expand Up @@ -167,8 +162,13 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int
print(f"num_tsdb_blocks - {num_tsdb_blocks}")

current_end_time = initial_end_time

for k in range(1, num_tsdb_blocks):
# Update time range in the bulk input json
bulk_json_file = "../json_files/bulk_input_timerange.json"

json_file = open(bulk_json_file, "r")
bulk_input_json = json.loads(json_file.read())

for k in range(1, num_tsdb_blocks + 1):

current_start_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta(
hours=interval_hours)
Expand All @@ -178,8 +178,8 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int
current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')

for org_id in range(1, org_ids):
for cluster_id in range(1, cluster_ids):
for org_id in range(1, org_ids + 1):
for cluster_id in range(1, cluster_ids + 1):
org_value = "org-" + str(org_id)
cluster_value = "eu-" + str(org_id) + "-" + str(cluster_id)

Expand All @@ -188,14 +188,7 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int
"cluster_id": cluster_value
}

# Update time range in the bulk input json
bulk_json_file = "../json_files/bulk_input_timerange.json"

json_file = open(bulk_json_file, "r")
bulk_json = json.loads(json_file.read())

print(f"bulk current start time - {current_start_time}")
print(f"bulk current end time - {current_end_time}")
bulk_json = bulk_input_json
bulk_json['time_range']['start'] = current_start_time
bulk_json['time_range']['end'] = current_end_time
bulk_json['filter']['include']['labels'].update(new_labels)
Expand Down Expand Up @@ -271,6 +264,11 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int
# List datasources
datasource_name = None
list_response = list_datasources(datasource_name)
list_response_json = list_response.json()

if list_response_json['datasources'][0]['name'] != "thanos":
print("Failed! Thanos datasource is not registered with Kruize!")
sys.exit(1)

start_time = time.time()
print(f"initial_end_date to parallel requests - {initial_end_date}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ prometheus_ds=0
replicas=3

ds_url="http://thanos-query-frontend.thanos-bench.svc.cluster.local:9090/"
#ds_url="http://thanos-query-frontend-example-query-thanos-operator-system.apps.kruize-scalelab.h0b5.p1.openshiftapps.com"

target="crc"
KRUIZE_IMAGE="quay.io/kruize/autotune:mvp_demo"
Expand Down Expand Up @@ -218,3 +219,12 @@ end_time=$(get_date)
elapsed_time=$(time_diff "${start_time}" "${end_time}")
echo ""
echo "Test took ${elapsed_time} seconds to complete" | tee -a ${LOG}

if [[ $(grep -i "error\|exception" ${KRUIZE_SERVICE_LOG}) ]]; then
echo "Bulk scale test failed! Check the logs for details" | tee -a ${LOG}
exit 1
else
echo "Bulk scale test completed! Check the logs for details" | tee -a ${LOG}
exit 0
fi

Original file line number Diff line number Diff line change
Expand Up @@ -66,39 +66,21 @@ def invoke_bulk_with_time_range_labels(resultsdir, chunk, current_start_time, cu
try:
for org_id, cluster_id in chunk:
#time.sleep(delay)
print(f"In bulk for - {org_id} {cluster_id}")

scale_log_dir = resultsdir + "/scale_logs"
os.makedirs(scale_log_dir, exist_ok=True)

#bulk_json = update_bulk_config(org_id, cluster_id, current_start_time, current_end_time)
org_value = "org-" + str(org_id)
cluster_value = "eu-" + str(org_id) + "-" + str(cluster_id)

new_labels = {
"org_id": org_value,
"cluster_id": cluster_value
}

# Update time range in the bulk input json
bulk_json_file = "../json_files/bulk_input_timerange.json"

json_file = open(bulk_json_file, "r")
bulk_json = json.loads(json_file.read())

bulk_json['time_range']['start'] = current_start_time
bulk_json['time_range']['end'] = current_end_time
bulk_json['filter']['include']['labels'].update(new_labels)
bulk_json = update_bulk_config(org_id, cluster_id, current_start_time, current_end_time)

log_id = str(org_id) + "-" + str(cluster_id)
log_file = f"{scale_log_dir}/worker_{log_id}.log"

logger = setup_logger(f"logger_{log_id}", log_file)
logger.info(f"log id = {log_id}")

# Invoke the bulk service
logger.info("Invoking bulk service with bulk json")
logger.info(bulk_json)
bulk_response = post_bulk_api(bulk_json)
bulk_response = post_bulk_api(bulk_json, logger)

# Obtain the job id from the response from bulk service
job_id_json = bulk_response.json()
Expand All @@ -108,14 +90,14 @@ def invoke_bulk_with_time_range_labels(resultsdir, chunk, current_start_time, cu

# Get the bulk job status using the job id
verbose = "true"
bulk_job_response = get_bulk_job_status(job_id, verbose)
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()

# Loop until job status is COMPLETED
job_status = job_status_json['status']
print(job_status)

while job_status != "COMPLETED":
bulk_job_response = get_bulk_job_status(job_id, verbose)
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()
job_status = job_status_json['status']
if job_status == "FAILED":
Expand Down Expand Up @@ -202,15 +184,15 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int

current_end_time = initial_end_time

for k in range(1, num_tsdb_blocks):
for k in range(1, num_tsdb_blocks + 1):

current_start_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta(
hours=interval_hours)
current_end_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ')
print(current_end_time)
print(current_start_time)

current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
print(f"Invoking bulk service with the tsdb time range {current_start_time} and {current_end_time}")

# Create all tasks
tasks = [(org_id, cluster_id) for org_id in range(1, org_ids + 1) for cluster_id in range(1, cluster_ids + 1)]
Expand All @@ -226,7 +208,7 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int
for future in as_completed(futures):
try:
chunk = future.result()
results.extend(chunk_results)
results.append(chunk_results)
except Exception as e:
print(f"Error processing chunk: {e}")

Expand Down Expand Up @@ -292,13 +274,20 @@ def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, int
# List datasources
datasource_name = None
list_response = list_datasources(datasource_name)
list_response_json = list_response.json()

if list_response_json['datasources'][0]['name'] != "thanos":
print("Failed! Thanos datasource is not registered with Kruize!")
sys.exit(1)

start_time = time.time()
print(f"initial_end_date to parallel requests - {initial_end_date}")
responses = parallel_requests_with_labels(max_workers, results_dir, initial_end_date, interval_hours, days_of_res, org_ids, cluster_ids,
chunk_size, rampup_interval_seconds)

# Print the results
print("\n*************************************************")
print(responses)
print("\n*************************************************")
for i, response in enumerate(responses):
print(f"Response {i+1}: {json.dumps(response, indent=2)}")

Expand Down

0 comments on commit 7a80dc4

Please sign in to comment.