Skip to content

Commit

Permalink
Improved batch request script to allow for larger requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Adanato committed Jul 12, 2024
1 parent b9d886b commit c773195
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 31 deletions.
1 change: 1 addition & 0 deletions WokeyTalky_Research_Code/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
out
81 changes: 53 additions & 28 deletions WokeyTalky_Research_Code/scripts/stage_1_data_batch_request.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import json
import asyncio
import aiohttp
from dotenv import load_dotenv
import os
load_dotenv()
from utils.eval_util import load_prompt_format
import argparse
from utils.progress_util import progress_bar
from utils.gpt_batch_request import save_requests_to_file,upload_file_to_openai,create_batch,retrieve_batch,get_file_content,parse_multiple_json_objects,save_as_jsonl,confirm_action
from utils.gpt_batch_request import save_requests_to_file, upload_file_to_openai, create_batch, retrieve_batch, get_file_content, parse_multiple_json_objects, save_as_jsonl, confirm_action
from openai import OpenAI

def split_requests_into_chunks(requests, chunk_size):
return [requests[i:i + chunk_size] for i in range(0, len(requests), chunk_size)]

def process_json_files(input_dir, judge_prompt_filename):
requests = []
Expand Down Expand Up @@ -54,8 +60,33 @@ def process_json_files(input_dir, judge_prompt_filename):

return requests

async def send_batch_request(client, chunk, output_dir, i, api_safety_on):
save_requests_to_file(chunk, f'{output_dir}/batch_request_chunk_{i}.jsonl')

if api_safety_on and not confirm_action("Do you want to proceed with sending the request to OpenAI?"):
print("Request canceled.")
return None

_, response = upload_file_to_openai(f'{output_dir}/batch_request_chunk_{i}.jsonl', client.api_key)
batch = create_batch(client, response)

while True:
try:
retrieved_batch = retrieve_batch(client, batch.id)
if retrieved_batch.status == 'completed':
break
else:
raise Exception(f"Batch processing failed with status: {retrieved_batch.status}")
except Exception as e:
print(f"Error retrieving batch: {e}")
print(f"Batch {i}: Retrying in 30 seconds...\n")
await asyncio.sleep(30)

text_content = get_file_content(client, retrieved_batch)
if text_content:
parsed_objects = parse_multiple_json_objects(text_content)
return parsed_objects
return None

if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand All @@ -68,13 +99,16 @@ def process_json_files(input_dir, judge_prompt_filename):
help='Path and name of the output file to save the requests.')
parser.add_argument('--batch_name', type=str, default='batch_output',
help='Name of the batch output file.')
parser.add_argument('--api_safety_on', type=bool, default=True,
help='Name of the batch output file.')
args = parser.parse_args()

input_dir = args.input_dir
judge_prompt_filename = args.judge_prompt_filename
output_dir = args.output_dir
batch_name = args.batch_name

api_safety_on = args.api_safety_on

api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
Expand Down Expand Up @@ -106,7 +140,9 @@ def process_json_files(input_dir, judge_prompt_filename):
else:
raise Exception(
f"Batch processing failed with status: {retrieved_batch.status}")

except Exception as e:
print(retrieved_batch)
print(f"Error retrieving batch: {e}")
print("Retrying in 30 seconds...\n")
progress_bar(30)
Expand All @@ -121,33 +157,22 @@ def process_json_files(input_dir, judge_prompt_filename):
print("Invalid input. Please enter a valid number.")

requests = process_json_files(input_dir, judge_prompt_filename)
save_requests_to_file(requests, f'{output_dir}/batch_request.jsonl')

if not confirm_action("Do you want to proceed with sending the request to OpenAI?"):
print("Request canceled.")
exit(0)
request_chunks = split_requests_into_chunks(requests, 50000)

client, response = upload_file_to_openai(
f'{output_dir}/batch_request.jsonl', api_key)
batch = create_batch(client, response)
async def main():
client = OpenAI(api_key=api_key)
tasks = []
for i, chunk in enumerate(request_chunks, start=1):
print(f"Processing chunk {i} of {len(request_chunks)}...")
task = asyncio.ensure_future(send_batch_request(client, chunk, output_dir, i, api_safety_on))
tasks.append(task)

with open(batch_ids_file, "a") as f:
f.write(json.dumps({"batch_id": batch.id}) + "\n")
all_parsed_objects = []
for task in asyncio.as_completed(tasks):
parsed_objects = await task
if parsed_objects:
all_parsed_objects.extend(parsed_objects)

while True:
try:
retrieved_batch = retrieve_batch(client, batch.id)
if retrieved_batch.status == 'completed':
break
else:
raise Exception(
f"Batch processing failed with status: {retrieved_batch.status}")
except Exception as e:
print(f"Error retrieving batch: {e}")
print("Retrying in 30 seconds...\n")
progress_bar(30)
save_as_jsonl(all_parsed_objects, f"{output_dir}/batch_raw_outputs.jsonl")

text_content = get_file_content(client, retrieved_batch)
if text_content:
parsed_objects = parse_multiple_json_objects(text_content)
save_as_jsonl(parsed_objects, f"{output_dir}/batch_raw_outputs.jsonl")
asyncio.run(main())
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
6 changes: 3 additions & 3 deletions WokeyTalky_Research_Code/setup.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

# Set your default values here
DATASET_FILE="$(pwd)/data/HEx-PHI/category_1.csv"
PROJECT_NAME="Demo"
PROJECT_NAME="GuardrailText"
MODELS_LIST_DIR="$(pwd)/configs/open_models.txt"
JUDGE_TEMPLATES_FILE="$(pwd)/configs/judge_prompt.jsonl"
WOKE_TEMPLATES_FILE="$(pwd)/configs/woke_templates.json"
SCRIPTS_DIR="$(pwd)/scripts"
STAGE_6_FILE=""
INSERTION_POINT=0
INSERTION_POINT=1
CUDA_VISIBLE_DEVICES="0,1,2,3"
RAY_TMPDIR="$HOME/tmp_ray"

# Run the main pipeline script with arguments
./main_pipeline.sh \
./pipeline.sh \
--dataset_file "${1:-$DATASET_FILE}" \
--project_name "${2:-$PROJECT_NAME}" \
--models_list_dir "${3:-$MODELS_LIST_DIR}" \
Expand Down

0 comments on commit c773195

Please sign in to comment.