-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfigure_extractor.py
351 lines (299 loc) · 16.7 KB
/
figure_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import argparse
import os
import requests
import tempfile
import zipfile
import json
from urllib.parse import urljoin
import logging
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
DEFAULT_OUTPUT_DIR = os.path.join(os.getcwd(), 'output')
class FileDownloader:
@staticmethod
def download_file(download_url, output_path, timeout=30):
"""
Downloads a file from the given URL to the specified path.
:param download_url: The URL of the file to download.
:param output_path: The file system path to save the downloaded file.
:param timeout: The request timeout duration.
"""
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
logging.debug(f"Starting download from {download_url}")
response = requests.get(download_url, stream=True, timeout=timeout)
response.raise_for_status()
with open(output_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
logging.debug(f"Downloaded file to {output_path}")
except requests.RequestException as e:
logging.error(f"Failed to download {download_url}: {str(e)}")
raise
@staticmethod
def download_extracted_data(response_data, output_dir, base_url="http://localhost:5001"):
"""
Downloads metadata and figures from the extraction response data.
:param response_data: The JSON response data from the server.
:param output_dir: Directory to save the downloaded files.
:param base_url: Base URL for downloading files.
"""
try:
logging.debug(f"download_extracted_data called with output_dir: {output_dir}")
# Download Metadata
metadata_filename = response_data['metadata_filename']
metadata_download_url = urljoin(base_url, f"download/{metadata_filename}")
metadata_output_path = os.path.join(output_dir, metadata_filename)
FileDownloader.download_file(metadata_download_url, metadata_output_path)
# Download Figures
figures = response_data.get('figures', [])
logging.debug(f"Downloading figures: {figures}")
for figure_filename in figures:
logging.debug(f"Original figure filename: {figure_filename}")
# Extract base filename to avoid absolute paths
sanitized_figure_filename = os.path.basename(figure_filename)
logging.debug(f"Sanitized figure filename: {sanitized_figure_filename}")
figure_download_url = urljoin(base_url, f"download/{sanitized_figure_filename}")
figure_output_path = os.path.join(output_dir, sanitized_figure_filename)
logging.debug(f"Figure download URL: {figure_download_url}")
logging.debug(f"Figure output path: {figure_output_path}")
FileDownloader.download_file(figure_download_url, figure_output_path)
# Download Tables
tables = response_data.get('tables', [])
logging.debug(f"Downloading tables: {tables}")
for table_filename in tables:
logging.debug(f"Original table filename: {table_filename}")
# Extract base filename to avoid absolute paths
sanitized_table_filename = os.path.basename(table_filename)
logging.debug(f"Sanitized table filename: {sanitized_table_filename}")
table_download_url = urljoin(base_url, f"download/{sanitized_table_filename}")
table_output_path = os.path.join(output_dir, sanitized_table_filename)
logging.debug(f"Table download URL: {table_download_url}")
logging.debug(f"Table output path: {table_output_path}")
FileDownloader.download_file(table_download_url, table_output_path)
except Exception as e:
logging.error(f"Error downloading extracted data: {str(e)}")
raise
class PDFExtractor:
@staticmethod
def extract_pdf(file_path, output_dir, url="http://localhost:5001/extract"):
"""
Extracts figures and tables from a PDF file by sending it to the server and downloads the extracted data.
:param file_path: Path to the PDF file to be extracted.
:param url: URL of the extraction service.
:param output_dir: Directory to save the downloaded files.
:param base_url: Base URL for downloading files.
:return: Response from the server.
"""
try:
logging.debug(f"Uploading {file_path} to {url} for extraction")
logging.info(f"Extracting figures and tables from {file_path}")
output_dir = DirectoryProcessor.setup_output_directory(output_dir)
with open(file_path, 'rb') as file:
files = {'file': file}
response = requests.post(url, files=files)
response.raise_for_status()
logging.info(f"Extraction successful for {file_path}")
response_data = response.json()
logging.debug(f"Received response data: {json.dumps(response_data, indent=2)}")
# normalize figures and tables paths with output directory
FileDownloader.download_extracted_data(response_data, output_dir)
figures = response_data.get('figures', [])
tables = response_data.get('tables', [])
response_data["figures"] = [os.path.join(output_dir, fig) for fig in figures]
response_data["tables"] = [os.path.join(output_dir, tab) for tab in tables]
logging.info(f"Downloading metadata for {file_path}")
# Extract figure-level information
logging.info(f"Extracting figure metadata for {file_path}")
figures_with_metadata = []
figure_metadata_path = os.path.join(
output_dir, f"{os.path.splitext(os.path.basename(file_path))[0]}.json")
logging.debug(f"Figure metadata path: {figure_metadata_path}")
try:
logging.debug(f"Opening metadata file: {figure_metadata_path}")
with open(figure_metadata_path, 'r') as metadata_file:
figure_metadata = json.load(metadata_file)
logging.debug(f"Loaded metadata: {json.dumps(figure_metadata, indent=2)}")
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(
f"Error opening or parsing metadata file {figure_metadata_path}: {str(e)}")
raise
logging.debug(f"Extracted figures: {figures}")
for fig in figures:
figure_info = get_figure_metadata(figure_metadata, fig)
figures_with_metadata.append({
'figure': fig,
'metadata': figure_info
})
logging.debug(f"Figures with metadata: {json.dumps(figures_with_metadata, indent=2)}")
response_data['figures_with_metadata'] = figures_with_metadata
logging.info(f"Extraction complete for {response_data['figures_with_metadata']}")
return response_data
except requests.RequestException as e:
logging.error(f"Error extracting PDF: {str(e)}")
raise
class DirectoryProcessor:
@staticmethod
def zip_directory(folder_path):
"""
Creates a ZIP file from the specified directory - only including pdf files.
:param folder_path: Path to the directory to zip.
:return: Path to the temporary ZIP file.
"""
logging.debug(f"Zipping directory: {folder_path}")
temp_zip = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
if file.lower().endswith('.pdf'):
zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_path))
logging.debug(f"Created ZIP file at {temp_zip.name}")
return temp_zip.name
@staticmethod
def setup_output_directory(output_dir):
"""
Ensures the output directory exists and is writable.
:param output_dir: Path to the output directory.
:return: Absolute path to the output directory.
"""
try:
output_dir = os.path.abspath(output_dir)
logging.debug(f"Setting up output directory: {output_dir}")
os.makedirs(output_dir, exist_ok=True)
if not os.access(output_dir, os.W_OK):
raise PermissionError(f"No write permission for directory: {output_dir}")
logging.info(f"Output directory is set to {output_dir}")
return output_dir
except Exception as e:
logging.error(f"Error setting up output directory: {str(e)}")
raise
class BatchExtractor:
@staticmethod
def extract_batch(folder_path, output_dir, url="http://localhost:5001/extract_batch"):
"""
Extracts figures and metadata from a batch of PDF files in a directory.
:param folder_path: Path to the folder containing PDF files.
:param output_dir: Directory to save the results.
:param url: URL of the batch extraction service.
:return: Processed results.
"""
try:
logging.info(f"Processing batch extraction for {folder_path}")
output_dir = DirectoryProcessor.setup_output_directory(output_dir)
logging.debug(f"Output directory: {output_dir}")
zip_file_path = DirectoryProcessor.zip_directory(folder_path)
logging.debug(f"Created ZIP file: {zip_file_path}")
with open(zip_file_path, 'rb') as zip_file:
files = {
'folder': ('batch.zip', zip_file, 'application/zip')
}
logging.debug(f"Sending request to {url}")
logging.debug(f"Files: {files}")
response = requests.post(url, files=files)
response.raise_for_status()
logging.error(f"Received response: {response.status_code} - {response.text}")
if not isinstance(response, dict) or "error" not in response:
response_data = response.json()
logging.info(f"Received response data: {json.dumps(response_data, indent=2)}")
logging.info(f"Downloading extracted data to {response_data}")
for doc in response_data:
logging.info(f"Downloading metadata for {doc['document']}")
# download metadata, figures and tables for each document
FileDownloader.download_extracted_data(doc, output_dir, base_url="http://localhost:5001")
# update figures and tables paths with output directory
figures = doc.get('figures', [])
tables = doc.get('tables', [])
doc["figures"] = [os.path.join(output_dir, fig) for fig in figures]
doc["tables"] = [os.path.join(output_dir, tab) for tab in tables]
# Extract figure-level information
figures_with_metadata = []
figure_metadata_path = os.path.join(output_dir, f"{doc['document']}.json")
with open(figure_metadata_path, 'r') as metadata_file:
figure_metadata = json.load(metadata_file)
for fig in figures:
figure_info = get_figure_metadata(figure_metadata, fig)
figures_with_metadata.append({
'figure': fig,
'metadata': figure_info
})
doc['figures_with_metadata'] = figures_with_metadata
# Save the response data as a JSON file
json_output_path = os.path.join(output_dir, 'stat_file.json')
with open(json_output_path, 'w') as json_file:
json.dump(response_data, json_file, indent=2)
logging.info(f"Saved response data to {json_output_path}")
return response_data
except Exception as e:
logging.error(f"Error during batch extraction: {str(e)}")
raise
finally:
if os.path.exists(zip_file_path):
os.unlink(zip_file_path)
logging.debug(f"Deleted temporary ZIP file {zip_file_path}")
def get_figure_metadata(figure_metadata, fig):
fig_filename = os.path.basename(fig)
logging.debug(f"Searching for renderURL ending with: /{fig_filename}")
render_url = next((item['renderURL'] for item in figure_metadata if 'renderURL' in item and item['renderURL'].endswith(f"/{fig_filename}")), None)
if render_url:
logging.debug(f"Found renderURL for {fig_filename}: {render_url}")
figure_info = next((item for item in figure_metadata if item['renderURL'] == render_url), {})
return figure_info
else:
logging.debug(f"No renderURL found for {fig_filename}")
return {}
def extract_figures(input_path, output_dir, url=None): # TODO: Always return a list of dictionaries
"""
Processes the given path either as a directory or a file and runs the appropriate extraction function.
:param input_path: Path to the input file or directory.
:param output_dir: Directory to save the output files.
:param url: URL for the extraction service (optional).
"""
if os.path.isfile(input_path):
if url is None:
url = "http://localhost:5001/extract"
response = PDFExtractor.extract_pdf(input_path, output_dir, url)
logging.info(f"Extraction response/extract: {json.dumps(response, indent=2)}")
# Log the type and content of response
logging.debug(f"Type of response from extract_pdf: {type(response)}")
logging.debug(f"Content of response from extract_pdf: {response}")
# Check if response is a dictionary; if so, wrap it in a list
if isinstance(response, dict):
response_list = [response]
logging.debug("Wrapped single response dictionary in a list.")
elif isinstance(response, list):
response_list = response
logging.debug("Received a list of response dictionaries.")
else:
logging.error(f"Unexpected response type: {type(response)}")
raise TypeError("extract_pdf should return a dictionary or a list of dictionaries.")
# Further ensure each item in the list is a dictionary
for idx, item in enumerate(response_list):
if not isinstance(item, dict):
logging.error(f"Item at index {idx} is not a dictionary: {type(item)}")
raise TypeError("Each item in response_list should be a dictionary.")
logging.debug(f"Returning response_list: {response_list}")
return response_list
elif os.path.isdir(input_path):
if url is None:
url = "http://localhost:5001/extract_batch"
response = BatchExtractor.extract_batch(input_path, output_dir, url)
logging.info(f"Batch extraction response/batch: {json.dumps(response, indent=2)}")
return response
else:
logging.error("Invalid input path. It should be either a file or a directory.")
return None
def main():
parser = argparse.ArgumentParser(description="Process PDF files and extract figures, tables and images.")
parser.add_argument('input_path', help="Path to the input PDF file or directory containing PDF files.")
parser.add_argument('--output_dir', nargs='?', default='output', help="Directory to save extracted figures. Defaults to './output' if not specified.")
parser.add_argument('--url', help="URL for the extraction service. For file extraction: 'http://localhost:5001/extract', for batch extraction: 'http://localhost:5001/extract_batch'. Only needed if you change the port while running Docker.")
args = parser.parse_args()
try:
output_dir = args.output_dir if args.output_dir else DEFAULT_OUTPUT_DIR
output_dir = DirectoryProcessor.setup_output_directory(output_dir)
response = extract_figures(args.input_path, output_dir, args.url)
print(json.dumps(response, indent=2))
except Exception as e:
logging.error(f"Error during extraction: {str(e)}")
if __name__ == "__main__":
main()