diff --git a/app/Makefile b/app/Makefile
index 37c899a9..ceeb02ac 100644
--- a/app/Makefile
+++ b/app/Makefile
@@ -251,12 +251,6 @@ endif
ingest-guru-cards: check-ingest-arguments
$(PY_RUN_CMD) ingest-guru-cards "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)"
-ingest-policy-pdfs: check-ingest-arguments
- $(PY_RUN_CMD) ingest-policy-pdfs "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)"
-
-ingest-bem-pdfs: check-ingest-arguments
- $(PY_RUN_CMD) ingest-bem-pdfs "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)"
-
ingest-edd-web: check-ingest-arguments
$(PY_RUN_CMD) ingest-edd-web "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)" $(INGEST_ARGS)
diff --git a/app/pyproject.toml b/app/pyproject.toml
index 77159d97..40fddc11 100644
--- a/app/pyproject.toml
+++ b/app/pyproject.toml
@@ -69,8 +69,6 @@ db-migrate = "src.db.migrations.run:up"
db-migrate-down = "src.db.migrations.run:down"
db-migrate-down-all = "src.db.migrations.run:downall"
ingest-guru-cards = "src.ingest_guru_cards:main"
-ingest-policy-pdfs = "src.ingest_policy_pdfs:main"
-ingest-bem-pdfs = "src.ingest_bem_pdfs:main"
ingest-edd-web = "src.ingest_edd_web:main"
scrape-edd-web = "src.ingestion.scrape_edd_web:main"
ingest-imagine-la = "src.ingestion.imagine_la.ingest:main"
diff --git a/app/src/chat_engine.py b/app/src/chat_engine.py
index 9c93dc0d..ce958e50 100644
--- a/app/src/chat_engine.py
+++ b/app/src/chat_engine.py
@@ -9,7 +9,7 @@
split_into_subsections,
)
from src.db.models.document import ChunkWithScore, Subsection
-from src.format import BemFormattingConfig, FormattingConfig, format_guru_cards
+from src.format import FormattingConfig, format_guru_cards
from src.generate import PROMPT, ChatHistory, MessageAttributes, analyze_message, generate
from src.retrieve import retrieve_with_scores
from src.util.class_utils import all_subclasses
@@ -170,21 +170,6 @@ class GuruSnapEngine(BaseEngine):
formatter = staticmethod(format_guru_cards)
-class BridgesEligibilityManualEngine(BaseEngine):
- retrieval_k: int = 10
- retrieval_k_min_score: float = -1
-
- # Note: currently not used
- chunks_shown_min_score: float = -1
- chunks_shown_max_num: int = 8
-
- engine_id: str = "bridges-eligibility-manual"
- name: str = "Michigan Bridges Eligibility Manual Chat Engine"
- datasets = ["bridges-eligibility-manual"]
-
- formatting_config = BemFormattingConfig()
-
-
class CaEddWebEngine(BaseEngine):
retrieval_k: int = 50
retrieval_k_min_score: float = -1
diff --git a/app/src/format.py b/app/src/format.py
index 8b0d9409..4a7b288a 100644
--- a/app/src/format.py
+++ b/app/src/format.py
@@ -3,13 +3,12 @@
import re
from collections import defaultdict
from itertools import groupby
-from typing import Match, OrderedDict, Sequence
+from typing import Match, Sequence
import markdown
from src.citations import CITATION_PATTERN, remap_citation_ids
from src.db.models.document import Chunk, ChunkWithScore, Document, Subsection
-from src.util.bem_util import get_bem_url, replace_bem_with_link
logger = logging.getLogger(__name__)
@@ -40,32 +39,6 @@ def format_accordion_body(self, citation_body: str) -> str:
return to_html(citation_body)
-class BemFormattingConfig(FormattingConfig):
- "BEM-specific formatting configuration"
-
- def __init__(self) -> None:
- self.add_citation_link_per_subsection = True
-
- def get_citation_link(self, subsection: Subsection) -> str:
- chunk = subsection.chunk
- bem_url_for_page = get_bem_url(chunk.document.name)
- if chunk.page_number:
- bem_url_for_page += "#page=" + str(chunk.page_number)
- return (
- f"
Open document to page {chunk.page_number}
"
- if chunk.page_number
- else ""
- )
-
- def get_superscript_link(self, chunk: Chunk) -> str:
- link = get_bem_url(chunk.document.name) if "BEM" in chunk.document.name else "#"
- link += "#page=" + str(chunk.page_number) if chunk.page_number else ""
- return link
-
- def format_accordion_body(self, citation_body: str) -> str:
- return to_html(replace_bem_with_link(citation_body))
-
-
def format_guru_cards(
chunks_shown_max_num: int,
chunks_shown_min_score: float,
@@ -92,34 +65,6 @@ def format_guru_cards(
return response_with_citations + "Related Guru cards
" + cards_html
-def _get_bem_documents_to_show(
- chunks_shown_max_num: int,
- chunks_shown_min_score: float,
- chunks_with_scores: list[ChunkWithScore],
-) -> OrderedDict[Document, list[ChunkWithScore]]:
- chunks_with_scores.sort(key=lambda c: c.score, reverse=True)
-
- # Build a dictionary of documents with their associated chunks,
- # Ordered by the highest score of each chunk associated with the document
- documents: OrderedDict[Document, list[ChunkWithScore]] = OrderedDict()
- for chunk_with_score in chunks_with_scores[:chunks_shown_max_num]:
- document = chunk_with_score.chunk.document
- if chunk_with_score.score < chunks_shown_min_score:
- logger.info(
- "Skipping chunk with score less than %f: %s",
- chunks_shown_min_score,
- chunk_with_score.chunk.document.name,
- )
- continue
-
- if document in documents:
- documents[document].append(chunk_with_score)
- else:
- documents[document] = [chunk_with_score]
-
- return documents
-
-
def to_html(text: str) -> str:
# markdown expects '\n' before the start of a list
corrected_text = re.sub(r"^- ", "\n- ", text, flags=re.MULTILINE, count=1)
@@ -213,12 +158,10 @@ def _build_citation_body(
)
if config.add_citation_link_per_subsection:
citation_link = config.get_citation_link(subsection)
- # generated citation links for BEM redirect to specific pages
citation_body += f"{citation_link}
"
if not config.add_citation_link_per_subsection:
citation_link = config.get_document_link(document)
- # display source link once
citation_body += f"{citation_link}
"
return citation_body
@@ -240,23 +183,6 @@ def _get_breadcrumb_html(headings: Sequence[str] | None, document_name: str) ->
return f"{' → '.join(headings)}
"
-# TODO: This is not called. Remove it?
-def format_bem_documents(
- chunks_shown_max_num: int,
- chunks_shown_min_score: float,
- chunks_with_scores: Sequence[ChunkWithScore],
- subsections: Sequence[Subsection],
- raw_response: str,
-) -> str:
- response_with_citations = reify_citations(raw_response, subsections, BemFormattingConfig(), {})
-
- documents = _get_bem_documents_to_show(
- chunks_shown_max_num, chunks_shown_min_score, list(chunks_with_scores)
- )
-
- return response_with_citations + _format_bem_to_accordion_group_html(documents)
-
-
def _format_guru_to_accordion_html(document: Document, score: float) -> str:
global _accordion_id
_accordion_id += 1
@@ -282,86 +208,6 @@ def _format_guru_to_accordion_html(document: Document, score: float) -> str:
"""
-def _format_bem_to_accordion_group_html(
- documents: OrderedDict[Document, list[ChunkWithScore]]
-) -> str:
- global _accordion_id
- html = ""
- citation_number = 1
- for document in documents:
- citations = ""
- _accordion_id += 1
-
- citation_number_start = citation_number
-
- for chunk_with_score in documents[document]:
- chunk = chunk_with_score.chunk
-
- formatted_chunk = _add_ellipses_for_bem(chunk)
- formatted_chunk = replace_bem_with_link(formatted_chunk)
-
- # Adjust markdown for lists so Chainlit renders correctly
- formatted_chunk = re.sub("^ - ", "- ", formatted_chunk, flags=re.MULTILINE)
- if formatted_chunk.startswith("- "):
- formatted_chunk = "\n" + formatted_chunk
-
- bem_url_for_page = get_bem_url(document.name)
- if chunk.page_number:
- bem_url_for_page += "#page=" + str(chunk.page_number)
-
- citation_heading = f"Citation {citation_number}:
"
- chunk_headings = "" + " → ".join(chunk.headings) + "
" if chunk.headings else ""
- citation_body = f'{formatted_chunk}
'
- citation_link = (
- (
- f"Open document to page {chunk.page_number}
"
- )
- if chunk.page_number
- else ""
- )
- citations += citation_heading + chunk_headings + citation_body + citation_link
-
- citation_number += 1
-
- citation_number_end = citation_number - 1
- citation_range = (
- f"Citation {citation_number_start}"
- if citation_number_start == citation_number_end
- else f"Citations {citation_number_start} - {citation_number_end}"
- )
-
- html += f"""
- """ # noqa: B907
-
- return "\nSource(s)
" + html if html else ""
-
-
-def _add_ellipses_for_bem(chunk: Chunk) -> str:
- chunk_content = chunk.content
- if chunk.num_splits != 0:
- if chunk.split_index == 0:
- return f"{chunk_content} ..."
- elif chunk.split_index == chunk.num_splits:
- return f"... {chunk_content}"
- else:
- return f"... {chunk_content} ..."
- return chunk_content
-
-
def reify_citations(
response: str,
subsections: Sequence[Subsection],
diff --git a/app/src/ingest_bem_pdfs.py b/app/src/ingest_bem_pdfs.py
deleted file mode 100644
index 6b783707..00000000
--- a/app/src/ingest_bem_pdfs.py
+++ /dev/null
@@ -1,244 +0,0 @@
-import logging
-import math
-import re
-import sys
-import uuid
-from typing import BinaryIO
-
-from smart_open import open as smart_open
-from unstructured.documents.elements import Element
-from unstructured.partition.pdf import partition_pdf
-
-from src.adapters import db
-from src.app_config import app_config
-from src.db.models.document import Chunk, Document
-from src.ingestion.pdf_elements import EnrichedText, TextType
-from src.ingestion.pdf_postprocess import add_markdown, associate_stylings, group_texts
-from src.ingestion.pdf_stylings import extract_stylings
-from src.util import pdf_utils
-from src.util.file_util import get_files
-from src.util.ingest_utils import add_embeddings, process_and_ingest_sys_args, save_json, tokenize
-from src.util.pdf_utils import Heading
-from src.util.string_utils import split_list, split_paragraph
-
-logger = logging.getLogger(__name__)
-
-
-def _get_bem_title(file: BinaryIO, file_path: str) -> str:
- """
- Get the BEM number from the file path (e.g., 100.pdf) and the
- document title from the PDF meta data and combine, e.g.,:
- "BEM 100: Introduction Example"
- """
- pdf_info = pdf_utils.get_pdf_info(file)
- pdf_title = pdf_info.title or file_path
- bem_num = file_path.split("/")[-1].rsplit(".", 1)[0]
- return f"BEM {bem_num}: {pdf_title}"
-
-
-def _ingest_bem_pdfs(
- db_session: db.Session,
- pdf_file_dir: str,
- doc_attribs: dict[str, str],
- should_save_json: bool = True,
-) -> None:
- file_list = sorted(get_files(pdf_file_dir))
-
- logger.info(
- "Processing PDFs in %s using %s with %s",
- pdf_file_dir,
- app_config.embedding_model,
- doc_attribs,
- )
- for file_path in file_list:
- if not file_path.endswith(".pdf"):
- continue
-
- logger.info("Processing file: %s", file_path)
- with smart_open(file_path, "rb") as file:
- grouped_texts = _parse_pdf(file, file_path)
- doc_attribs["name"] = _get_bem_title(file, file_path)
- document = Document(content="\n".join(g.text for g in grouped_texts), **doc_attribs)
- db_session.add(document)
-
- chunks = _split_into_chunks(document, grouped_texts)
- add_embeddings(chunks)
- db_session.add_all(chunks)
-
- if should_save_json:
- # Note that chunks are being added to the DB before saving the JSON.
- # Originally, we thought about reviewing the JSON manually before adding chunks to the DB.
- save_json(file_path, chunks)
-
-
-def _parse_pdf(file: BinaryIO, file_path: str) -> list[EnrichedText]:
- enriched_texts = _enrich_texts(file)
- try:
- stylings = extract_stylings(file)
- associate_stylings(enriched_texts, stylings)
- except Exception as e:
- # 101.pdf is a large collection of tables that's hard to parse
- logger.warning("%s: Failed to extract and associate stylings: %s", file_path, e)
- markdown_texts = add_markdown(enriched_texts)
- grouped_texts = group_texts(markdown_texts)
-
- # Assign unique ids to each grouped text before they get split into chunks
- for text in grouped_texts:
- text.id = str(uuid.uuid1())
- assert len(set(text.id for text in grouped_texts)) == len(grouped_texts)
-
- return grouped_texts
-
-
-def _enrich_texts(file: BinaryIO) -> list[EnrichedText]:
- unstuctured_elem_list = partition_pdf(file=file, strategy="fast")
- enrich_text_list = []
-
- outline: list[Heading] = pdf_utils.extract_outline(file)
- current_headings: list[Heading] = []
-
- prev_element_was_empty_list_item = False
-
- for element in unstuctured_elem_list:
- if element.category == "Footer" or element.category == "Header":
- continue
-
- # Unstructured fails to categorize the date strings in the header,
- # so manually check for that and ignore those too
- if element.category == "UncategorizedText" and re.match(
- r"^\d{1,2}-\d{1,2}-\d{4}$", element.text
- ):
- continue
-
- if element.category == "Title":
- if next_heading := _next_heading(outline, element, current_headings):
- current_headings = next_heading
- continue
-
- # Sometimes Unstructured splits a ListItem into an empty ListItem
- # and then either a NarrativeText, UncategorizedText, or Title
- # For example, BEM 100 page 8 or page 13
- if element.category == "ListItem" and not element.text:
- prev_element_was_empty_list_item = True
- continue
- if prev_element_was_empty_list_item:
- if element.category in ("NarrativeText", "UncategorizedText", "Title"):
- element.category = "ListItem"
- else:
- logger.warning(
- "Empty list item not followed by NarrativeText, UncategorizedText, or Title; page %i",
- element.metadata.page_number,
- )
- prev_element_was_empty_list_item = False
-
- # UncategorizedText is frequently just NarrativeText that looks strange,
- # e.g., "45 CFR 400.45 - 400.69 and 400.90 - 400.107"
- # In 167.pdf, Unstructured recognizes an Address.
- if element.category in ["UncategorizedText", "Address"]:
- element.category = "NarrativeText"
-
- try:
- enriched_text_item = EnrichedText(
- text=element.text,
- type=TextType(element.category),
- page_number=element.metadata.page_number,
- headings=current_headings,
- id=element.id,
- )
- enrich_text_list.append(enriched_text_item)
- except ValueError:
- logger.warning(
- "%s is not an accepted TextType; page %i: '%s'",
- element.category,
- element.metadata.page_number,
- element.text,
- )
- return enrich_text_list
-
-
-def _match_heading(
- outline: list[Heading], heading_name: str, page_number: int | None
-) -> Heading | None:
- for heading in outline:
- if heading.pageno == page_number:
- # account for spacing differences in unstructured and pdfminer parsing
- heading_words = [word for word in heading.title.casefold() if not word.isspace()]
- element_words = [word for word in heading_name.casefold() if not word.isspace()]
- if heading_words == element_words:
- return heading
- return None
-
-
-def _next_heading(
- outline: list[Heading], element: Element, current_headings: list[Heading]
-) -> list[Heading] | None:
- if heading := _match_heading(outline, element.text, element.metadata.page_number):
- if heading.level == 1:
- current_headings = [heading]
- else:
- if heading.title != current_headings[-1].title:
- current_headings = current_headings[: heading.level - 1]
- current_headings.append(heading)
- else:
- # TODO: Should warn of unmatched headings that weren't found after processing all elements
- return None
- return current_headings
-
-
-def _split_into_chunks(document: Document, grouped_texts: list[EnrichedText]) -> list[Chunk]:
- """
- Given EnrichedTexts, convert the text to chunks and add them to the database.
- """
- chunks: list[Chunk] = []
- for paragraph in grouped_texts:
- assert paragraph.id is not None
- assert paragraph.page_number is not None
-
- embedding_model = app_config.sentence_transformer
- token_count = len(tokenize(paragraph.text))
- if token_count > embedding_model.max_seq_length:
- # Split the text into chunks of approximately equal length by characters,
- # which doesn't necessarily mean equal number of tokens, but close enough.
- # The arbitrary 1.5 tolerance factor tries to account for higher token counts per chunk when text is split.
- num_of_splits = math.ceil((token_count * 1.5) / embedding_model.max_seq_length)
- char_limit_per_split = math.ceil(len(paragraph.text) / num_of_splits)
- if paragraph.type == TextType.LIST:
- splits = split_list(paragraph.text, char_limit_per_split)
- elif paragraph.type == TextType.NARRATIVE_TEXT:
- splits = split_paragraph(paragraph.text, char_limit_per_split)
- elif paragraph.type == TextType.LIST_ITEM:
- # 233B.pdf: bottom of page 7: list item has no introductory sentence
- splits = split_list(paragraph.text, char_limit_per_split, has_intro_sentence=False)
- else:
- raise ValueError(f"Unexpected element type: {paragraph.type}: {paragraph.text}")
- logger.info(
- "Split long text with length %i into %i chunks with %i char limit: [%s]: %s ...",
- len(paragraph.text),
- len(splits),
- char_limit_per_split,
- ",".join([str(len(split)) for split in splits]),
- splits[0][:120],
- )
-
- else:
- splits = [paragraph.text]
-
- # Ignore empty splits
- splits = [s for s in splits if s.strip()]
- text_chunks = [
- Chunk(
- document=document,
- content=chunk_text,
- page_number=paragraph.page_number,
- headings=[h.title for h in paragraph.headings],
- num_splits=len(splits),
- split_index=index,
- )
- for index, chunk_text in enumerate(splits)
- ]
- chunks += text_chunks
- return chunks
-
-
-def main() -> None:
- process_and_ingest_sys_args(sys.argv, logger, _ingest_bem_pdfs)
diff --git a/app/src/ingest_policy_pdfs.py b/app/src/ingest_policy_pdfs.py
deleted file mode 100644
index 76e1e431..00000000
--- a/app/src/ingest_policy_pdfs.py
+++ /dev/null
@@ -1,142 +0,0 @@
-import logging
-import re
-import sys
-
-from pdfminer.high_level import extract_text
-from pdfminer.pdfdocument import PDFDocument
-from pdfminer.pdfparser import PDFParser
-from smart_open import open as smart_open_file
-
-from src.adapters import db
-from src.app_config import app_config
-from src.db.models.document import Chunk, Document
-from src.util.file_util import get_files
-from src.util.ingest_utils import process_and_ingest_sys_args, tokenize
-
-logger = logging.getLogger(__name__)
-
-HEADER_PATTERN = r"(BEM\s\d*\s+\d+\sof\s\d+\s+\w.*)"
-
-
-def _get_bem_title(file_path: str) -> str:
- """
- Get the BEM number from the file path (e.g., 100.pdf) and the
- document title from the PDF meta data, then put the document
- title in title case (e.g., INTRODUCTION EXAMPLE -> Introduction Example)
- and combine: "BEM 100: Introduction Example"
- """
- with smart_open_file(file_path, "rb") as file:
- pdf_title = PDFDocument(PDFParser(file)).info[0]["Title"].decode().title()
- bem_num = file_path.split("/")[-1].rsplit(".", 1)[0]
- return f"BEM {bem_num}: {pdf_title}"
-
-
-def _ingest_policy_pdfs(
- db_session: db.Session,
- pdf_file_dir: str,
- doc_attribs: dict[str, str],
-) -> None:
- file_list = get_files(pdf_file_dir)
- embedding_model = app_config.sentence_transformer
-
- logger.info(f"Processing pdfs {pdf_file_dir} using {embedding_model} with {doc_attribs}")
- for file_path in file_list:
- if file_path.endswith(".pdf"):
- logger.info(f"Processing pdf file: {file_path}")
- with smart_open_file(file_path, "rb") as file:
- output_string = extract_text(file)
- doc_attribs["name"] = _get_bem_title(file_path)
- parse_pdf_and_add_to_db(
- contents=output_string, doc_attribs=doc_attribs, db_session=db_session
- )
-
-
-def parse_pdf_and_add_to_db(
- contents: str, doc_attribs: dict[str, str], db_session: db.Session
-) -> None:
- # Match header in BEM manual
- text_split_by_header = re.split(HEADER_PATTERN, contents)
- body_content = ""
- start_new_section = True
- for text_contents in text_split_by_header:
- is_header, contents, start_new_section = get_header_and_is_current_section(
- text_contents, start_new_section
- )
- # Check if we need to start a new section
- if not is_header or not start_new_section or not body_content:
- body_content += f"{contents}\n"
-
- document = Document(content=body_content, **doc_attribs)
- db_session.add(document)
-
- process_chunk(body_content, document, db_session)
-
-
-def get_header_and_is_current_section(
- line_contents: str, start_new_section: bool
-) -> tuple[bool, str, bool]:
- line_details = line_contents.split("\n\n")
- is_header = True
- if "BEM" in line_contents and "of" in line_contents and len(line_details) == 3:
- bem_val, page_num, title = line_details
- current_page, last_page = [x.strip() for x in page_num.split(" of ")]
- start_new_section = current_page == "1" or current_page == last_page
- bem_val = bem_val.strip()
- title = f"{bem_val}: {title}".strip()
- contents = title
- else:
- is_header = False
- contents = line_contents
-
- return is_header, contents, start_new_section
-
-
-def _add_chunk(
- db_session: db.Session, current_chunk: list[str], document: Document, current_token_count: int
-) -> None:
- embedding_model = app_config.sentence_transformer
- chunk_text = "".join(current_chunk)
- chunk_embedding = embedding_model.encode(chunk_text, show_progress_bar=False)
- chunk = Chunk(
- document=document,
- content=chunk_text,
- tokens=current_token_count,
- mpnet_embedding=chunk_embedding,
- )
- db_session.add(chunk)
-
-
-def process_chunk(text: str, document: Document, db_session: db.Session) -> None:
- embedding_model = app_config.sentence_transformer
- sentence_boundary_pattern = r"(?<=[.!?])\s+(?=[^\d])"
- sentence_boundaries = [
- (m.start(), m.end()) for m in re.finditer(sentence_boundary_pattern, text)
- ]
-
- current_chunk = []
- current_token_count = 0
- current_position = 0
-
- for boundary_start, boundary_end in sentence_boundaries:
- sentence = text[current_position : boundary_start + 1]
- current_position = boundary_end
-
- token_count = len(tokenize(sentence))
-
- if current_token_count + token_count <= embedding_model.max_seq_length:
- current_chunk.append(sentence)
- current_token_count += token_count
- else:
- _add_chunk(db_session, current_chunk, document, current_token_count)
- # Initialize the variable with sentence, which was not used in the above chunk added to the DB
- current_chunk = [sentence]
- current_token_count = token_count
-
- # Append the last sentence
- last_sentence = text[current_position:]
- current_chunk.append(last_sentence)
- _add_chunk(db_session, current_chunk, document, current_token_count)
-
-
-def main() -> None:
- process_and_ingest_sys_args(sys.argv, logger, _ingest_policy_pdfs)
diff --git a/app/src/ingestion/pdf_elements.py b/app/src/ingestion/pdf_elements.py
index cd5a5cf7..7bc9d9c0 100644
--- a/app/src/ingestion/pdf_elements.py
+++ b/app/src/ingestion/pdf_elements.py
@@ -2,7 +2,6 @@
from enum import StrEnum
from typing import List
-from src.ingestion.pdf_stylings import Styling
from src.util.pdf_utils import Heading
@@ -14,6 +13,20 @@ class TextType(StrEnum):
TITLE = "Title"
+@dataclass
+class Styling:
+ # The text with the style
+ text: str
+ # Page number where the styled text is located
+ pageno: int
+ # Nested parent headings where the styled text is located
+ headings: List[Heading]
+ # Other text before and after the styled text
+ wider_text: str
+ # Style attributes
+ bold: bool = False
+
+
@dataclass
class Link:
start_index: int
diff --git a/app/src/ingestion/pdf_postprocess.py b/app/src/ingestion/pdf_postprocess.py
index 8ce2692b..8688139e 100644
--- a/app/src/ingestion/pdf_postprocess.py
+++ b/app/src/ingestion/pdf_postprocess.py
@@ -1,7 +1,6 @@
import logging
-from src.ingestion.pdf_elements import EnrichedText, TextType
-from src.ingestion.pdf_stylings import Styling
+from src.ingestion.pdf_elements import EnrichedText, Styling, TextType
from src.util.string_utils import basic_ascii
logger = logging.getLogger(__name__)
diff --git a/app/src/ingestion/pdf_stylings.py b/app/src/ingestion/pdf_stylings.py
deleted file mode 100644
index b02faad3..00000000
--- a/app/src/ingestion/pdf_stylings.py
+++ /dev/null
@@ -1,386 +0,0 @@
-"""
-Extracts text styling from PDFs using pdfminer.
-"""
-
-import logging
-from contextlib import contextmanager
-from dataclasses import dataclass, field
-from enum import Enum
-from io import BytesIO
-from pprint import pprint
-from typing import BinaryIO, Iterator, Optional
-from xml.dom import minidom
-from xml.dom.minidom import Element, Text
-
-from pdfminer.pdfcolor import PDFColorSpace
-from pdfminer.pdfdevice import PDFTextSeq, TagExtractor
-from pdfminer.pdfdocument import PDFDocument
-from pdfminer.pdfinterp import (
- PDFGraphicState,
- PDFPageInterpreter,
- PDFResourceManager,
- PDFStackT,
- PDFTextState,
-)
-from pdfminer.pdfpage import PDFPage
-from pdfminer.psparser import PSLiteral
-
-from src.util.pdf_utils import Heading, as_pdf_doc, extract_outline, get_pdf_info
-
-logger = logging.getLogger(__name__)
-
-
-@dataclass
-class Styling:
- # The text with the style
- text: str
-
- # Page number where the styled text is located
- pageno: int
- # Nested parent headings where the styled text is located
- headings: list[Heading]
- # Other text before and after the styled text to help find the correct occurrence of the text
- wider_text: str
-
- # Style attributes
- bold: bool = False
-
-
-def extract_stylings(pdf: BinaryIO | PDFDocument) -> list[Styling]:
- parser = OutlineAwarePdfParser(pdf, BemTagExtractor)
- extracted_texts = parser.flatten_xml(parser.extract_xml())
-
- stylings: list[Styling] = []
- for text_obj in extracted_texts:
- if text_obj.zone != PageZone.MAIN or text_obj.is_heading():
- continue
-
- wider_text = "".join([p.text for p in text_obj.phrases])
- logger.debug(text_obj, wider_text[:100])
- for _phrase in text_obj.phrases:
- if _phrase.bold:
- styling = Styling(
- text=_phrase.text,
- pageno=text_obj.pageno,
- headings=text_obj.headings,
- wider_text=wider_text,
- bold=_phrase.bold,
- )
- stylings.append(styling)
- return stylings
-
-
-class PageZone(Enum):
- HEADER = "HEADER"
- MAIN = "MAIN"
- FOOTER = "FOOTER"
-
-
-@dataclass
-class Phrase:
- "Phrase is a piece of text with optional styling. It is a part of a paragraph (ExtractedText)."
- text: str
- bold: bool = False
-
-
-@dataclass
-class ExtractedText:
- pageno: int
- zone: PageZone
- headings: list[Heading]
- parano: int
- phrases: list[Phrase]
-
- def is_heading(self) -> bool:
- return self.parano == 0
-
- def __str__(self) -> str:
- if self.is_heading() and self.headings:
- last_heading = f"{self.headings[-1].level}:{self.headings[-1].title}"
- return f"{self.pageno}.{self.parano} {last_heading}"
- elif self.zone == PageZone.MAIN:
- return f" {self.pageno}.{self.parano} {self.zone}"
- else:
- return f"({self.pageno} {self.zone})"
-
-
-@dataclass
-class ParsingContext:
- # Used to find headings in the PDF
- heading_stack: list[Heading]
-
- # The headings for the current text
- parent_headings: list[Heading] = field(default_factory=list)
-
- # Current page number
- pageno: int = 0
-
- # Paragraph number of the current text starting from 1 after each heading
- # Paragraph number is 0 for headings
- parano: int | None = None
-
- _zone: PageZone | None = None
-
- def is_next_heading(self, phrases: list[Phrase]) -> Heading | None:
- # If there are no headings left, it's not a heading
- if not self.heading_stack:
- return None
-
- # Headings are expected to be the only text on the line or in a paragraph
- if len(phrases) != 1:
- return None
-
- # Headings are almost always bold
- phrase = phrases[0]
- if not phrase.bold:
- return None
-
- # Page number should match that of the headings from the PDF outline
- next_heading = self.heading_stack[-1]
- if next_heading.pageno != self.pageno:
- return None
-
- # Use casefold() to make case-insensitive comparison
- if phrase.text.strip().casefold() == next_heading.title.casefold():
- return next_heading
-
- return None
-
- def set_next_heading(self) -> None:
- next_heading = self.heading_stack.pop()
- level = next_heading.level
-
- # Update the parent_headings list with the new heading
- if level > len(self.parent_headings): # new subheading
- self.parent_headings.append(next_heading)
- else:
- # Pop all subheadings (if any) until we reach level
- while level < len(self.parent_headings):
- self.parent_headings.pop()
-
- # Then set the current heading
- self.parent_headings[-1] = next_heading
- assert level == len(self.parent_headings)
-
- # Reset the paragraph number
- self.parano = 0
-
- @contextmanager
- def zone_context(self, zone: PageZone) -> Iterator[None]:
- self._zone = zone
- yield
- self._zone = None
-
- def create_extracted_text(self, phrases: list[Phrase]) -> ExtractedText:
- assert self._zone, "zone is not set"
- assert self.parano is not None, "parano should be set at this point"
- return ExtractedText(
- pageno=self.pageno,
- zone=self._zone,
- headings=self.parent_headings.copy(),
- parano=self.parano,
- phrases=phrases,
- )
-
-
-class OutlineAwarePdfParser:
- """
- PDF parser that extracts text from a PDF using the PDF's outline metadata
- and flattens the resulting XML into ExtractedText objects
- """
-
- def __init__(self, pdf: BinaryIO | PDFDocument, tag_extractor_class: type):
- self.tag_extractor_class = tag_extractor_class
- self.disable_caching: bool = False
- self.doc = as_pdf_doc(pdf)
-
- # Get the PDF outline containing headings.
- # We'll use it to find headings in the text as the PDF is processed.
- self.parsing_context = ParsingContext(list(reversed(extract_outline(self.doc))))
-
- # Adapted from pdfminer.high_level.py:extract_text_to_fp() used in pdf2txt.py
- def _create_interpreter(
- self, output_io: BytesIO, output_codec: str = "utf-8"
- ) -> PDFPageInterpreter:
- rsrcmgr = PDFResourceManager(caching=not self.disable_caching)
- pdf_device = self.tag_extractor_class(rsrcmgr, outfp=output_io, codec=output_codec)
- return PDFPageInterpreter(rsrcmgr, pdf_device)
-
- def extract_xml(self, validate_xml: bool = False) -> str:
- "Stage 1: Generate XML from the PDF using custom tag_extractor_class"
- output_io = BytesIO()
- interpreter = self._create_interpreter(output_io)
- for page in PDFPage.create_pages(self.doc):
- # As the interpreter reads the PDF, it will call methods on interpreter.device,
- # which will write to output_io
- interpreter.process_page(page)
-
- # After done writing to output_io, go back to the beginning so we can read() it
- output_io.seek(0)
- # Wrap all tags in a root tag
- xml_string = "" + output_io.read().decode() + ""
-
- if validate_xml:
- minidom.parseString(xml_string) # nosec
-
- return xml_string
-
- def flatten_xml(self, xml_string: str) -> list[ExtractedText]:
- "Stage 2: Flatten the extracted XML into ExtractedText"
- pdf_info = get_pdf_info(self.doc, count_pages=True)
- xml_doc = minidom.parseString(xml_string) # nosec
- root = xml_doc.documentElement
- result: list[ExtractedText] = []
- try:
- for page_node in root.getElementsByTagName("page"):
- self.parsing_context.pageno = int(page_node.getAttribute("id")) + 1
- assert self.parsing_context.pageno
- logger.info("Processing page %i", self.parsing_context.pageno)
- self.parsing_context.parano = 0
-
- for page_elem in page_node.childNodes:
- if isinstance(page_elem, Element):
- # An Element represents an XML tag
- if annotated_text := self._create_extracted_text(page_elem):
- result.append(annotated_text)
- elif isinstance(page_elem, Text):
- # A Text represents text content of an XML tag
- # When text is not wrapped in a tag (eg, 210.pdf)
- with self.parsing_context.zone_context(PageZone.MAIN):
- if phrase := self._create_phrase(None, page_elem):
- self.parsing_context.parano += 1
- result.append(self.parsing_context.create_extracted_text([phrase]))
-
- # Check that we've found all headings from the PDF outline
- assert len(self.parsing_context.heading_stack) == 0, self.parsing_context.heading_stack
- # Check that we've reached the last page
- assert self.parsing_context.pageno == pdf_info.page_count
- return result
- except Exception as e:
- print("Error processing XML:", pdf_info.title)
- pprint(self.parsing_context)
- raise e
-
- def _create_extracted_text(self, elem: Element) -> ExtractedText | None:
- assert self.parsing_context.parano is not None, "parano should be set at this point"
- if elem.tagName == "Artifact":
- if elem.getAttribute("Type") == "/'Pagination'":
- subtype = elem.getAttribute("Subtype")
- if subtype == "/'Header'":
- return self._extract_text_in_zone(elem, PageZone.HEADER)
- if subtype == "/'Footer'":
- return self._extract_text_in_zone(elem, PageZone.FOOTER)
-
- logger.debug("Ignoring Artifact: %s", elem.toxml())
- return None
-
- if elem.tagName == "P":
- self.parsing_context.parano += 1
-
- if elem.tagName in ["P", "BOLD", "Span"]:
- return self._extract_text_in_zone(elem, PageZone.MAIN)
-
- raise NotImplementedError(f"Unhandled top-level element: {elem.toxml()}")
-
- def _extract_text_in_zone(self, elem: Element, zone: PageZone) -> ExtractedText | None:
- "Create ExtractedTExt from top-level element on a page"
- with self.parsing_context.zone_context(zone):
- phrases: list[Phrase] = self._extract_phrases(elem)
-
- if zone == PageZone.MAIN:
- # Check for headings and update the parsing context
- if self.parsing_context.is_next_heading(phrases):
- self.parsing_context.set_next_heading()
-
- return self.parsing_context.create_extracted_text(phrases)
-
- def _extract_phrases(self, elem: Element) -> list[Phrase]:
- "Extract Phrases from lower-level (non-top-level) elements"
- phrases: list[Phrase] = []
- for child_node in elem.childNodes:
- if isinstance(child_node, Element):
- # Recurse and flatten the XML structure
- phrases += self._extract_phrases(child_node)
- elif isinstance(child_node, Text):
- if phrase := self._create_phrase(elem, child_node):
- phrases.append(phrase)
- else:
- raise NotImplementedError(
- f"Unexpected elem: {type(child_node)}, {self.parsing_context}"
- )
- return phrases
-
- def _create_phrase(self, parent_node: Element | None, child: Text) -> Phrase | None:
- # Ignore whitespace
- if not (child.data.strip()):
- return None
-
- bolded = bool(parent_node and parent_node.tagName == "BOLD")
- return Phrase(text=child.data, bold=bolded)
-
-
-class BemTagExtractor(TagExtractor):
- """
- This class will write XML to the specified outfp, and is customized for BEM PDF files:
- - detects bold text
- - addresses Span tags that are not closed properly
-
- Methods in this class are called by the PDFPageInterpreter as it reads the PDF.
- This class is adapted from pdfminer.pdfdevice.TagExtractor used by
- pdfminer.high_level.py:extract_text_to_fp(), which is used in pdf2txt.py.
- """
-
- def __init__(self, rsrcmgr: PDFResourceManager, outfp: BinaryIO, codec: str = "utf-8") -> None:
- super().__init__(rsrcmgr, outfp, codec)
-
- # Added the following in order to add the BOLD tag.
- # This reflects the last fontname used for a given tag level
- self._last_fontname_stack: list[str] = [""]
-
- def render_string(
- self,
- textstate: PDFTextState,
- seq: PDFTextSeq,
- ncs: PDFColorSpace,
- graphicstate: PDFGraphicState,
- ) -> None:
- "render_string() is called multiple times between each begin_tag() completion and before end_tag()"
- font = textstate.font
- assert font is not None
-
- last_fontname = self._last_fontname_stack[-1]
- if last_fontname != font.fontname:
- if "Bold" in font.fontname and (not last_fontname or "Bold" not in last_fontname):
- self._write("")
- elif "Bold" in last_fontname and "Bold" not in font.fontname:
- self._write("")
- self._last_fontname_stack[-1] = font.fontname
-
- # Following is copied from pdfminer.pdfdevice.TagExtractor.render_string()
- super().render_string(textstate, seq, ncs, graphicstate)
-
- def begin_tag(self, tag: PSLiteral, props: Optional[PDFStackT] = None) -> None:
- # Workaround for Span tags that are not closed properly
- # (i.e., BEM 101.pdf, 105.pdf, 203.pdf, 225.pdf, 400.pdf)
- if self._stack and self._stack[-1].name == "Span":
- self._stack.pop(-1)
- self._write("")
-
- self._last_fontname_stack.append("")
-
- super().begin_tag(tag, props)
-
- def end_tag(self) -> None:
- if "Bold" in self._last_fontname_stack[-1]:
- self._write("")
-
- self._last_fontname_stack.pop(-1)
-
- if not self._stack:
- logger.warning(
- "page %i: end_tag without matching begin_tag (ie, empty tag stack!); ignoring",
- self.pageno,
- )
- return
-
- super().end_tag()
diff --git a/app/src/util/bem_util.py b/app/src/util/bem_util.py
deleted file mode 100644
index cfff301d..00000000
--- a/app/src/util/bem_util.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Regular expression to match BEM followed by 3 digits, optionally followed by a letter
-import re
-
-BEM_PATTERN = r"(BEM\s(\d{3}[A-Z]?))"
-
-
-def get_bem_url(text: str) -> str:
- bem = re.search(BEM_PATTERN, text)
- if not bem:
- raise ValueError(f"No BEM number found in text: {text}")
- return f"https://dhhs.michigan.gov/OLMWeb/ex/BP/Public/BEM/{bem.group(2)}.pdf"
-
-
-def replace_bem_with_link(text: str) -> str:
- return re.sub(
- BEM_PATTERN,
- r'\1',
- text,
- )
diff --git a/app/tests/src/ingestion/test_pdf_stylings.py b/app/tests/src/ingestion/test_pdf_stylings.py
index d1f55d44..3098ab34 100644
--- a/app/tests/src/ingestion/test_pdf_stylings.py
+++ b/app/tests/src/ingestion/test_pdf_stylings.py
@@ -1,14 +1,23 @@
-from src.ingestion.pdf_elements import Heading
-from src.ingestion.pdf_stylings import Styling, extract_stylings
+from src.ingestion.pdf_elements import Heading, Styling
-def test_extract_styles():
- with open("/app/tests/src/util/707.pdf", "rb") as fp:
- _stylings = extract_stylings(fp)
-
- assert _stylings == all_expected_stylings
+def test_styling_dataclass():
+ """Test the Styling data structure"""
+ styling = Styling(
+ text="test text",
+ pageno=1,
+ headings=[Heading(title="Test Heading", level=1, pageno=1)],
+ wider_text="test text in context",
+ bold=True,
+ )
+ assert styling.text == "test text"
+ assert styling.pageno == 1
+ assert styling.headings[0].title == "Test Heading"
+ assert styling.wider_text == "test text in context"
+ assert styling.bold is True
+# Used by test_pdf_postprocess.py
all_expected_stylings = [
Styling(
text="CDC not eligible due to 6 month penalty period",
diff --git a/app/tests/src/test_chat_engine.py b/app/tests/src/test_chat_engine.py
index 5a099a2e..4390e98e 100644
--- a/app/tests/src/test_chat_engine.py
+++ b/app/tests/src/test_chat_engine.py
@@ -1,5 +1,5 @@
from src import chat_engine
-from src.chat_engine import BridgesEligibilityManualEngine, GuruMultiprogramEngine, GuruSnapEngine
+from src.chat_engine import GuruMultiprogramEngine, GuruSnapEngine
def test_available_engines():
@@ -8,7 +8,6 @@ def test_available_engines():
assert len(engines) > 0
assert "guru-multiprogram" in engines
assert "guru-snap" in engines
- assert "bridges-eligibility-manual" in engines
def test_create_engine_Guru_Multiprogram():
@@ -23,10 +22,3 @@ def test_create_engine_Guru_SNAP():
engine = chat_engine.create_engine(engine_id)
assert engine is not None
assert engine.name == GuruSnapEngine.name
-
-
-def test_create_engine_BridgesEligibilityManualEngine():
- engine_id = "bridges-eligibility-manual"
- engine = chat_engine.create_engine(engine_id)
- assert engine is not None
- assert engine.name == BridgesEligibilityManualEngine.name
diff --git a/app/tests/src/test_format.py b/app/tests/src/test_format.py
index 93d692a9..29b33df3 100644
--- a/app/tests/src/test_format.py
+++ b/app/tests/src/test_format.py
@@ -3,21 +3,17 @@
from sqlalchemy import delete
from src.citations import CitationFactory, split_into_subsections
-from src.db.models.document import Chunk, ChunkWithScore, Document, Subsection
+from src.db.models.document import Document
from src.format import (
- BemFormattingConfig,
FormattingConfig,
- _add_citation_links,
- _add_ellipses_for_bem,
_format_guru_to_accordion_html,
_get_breadcrumb_html,
build_accordions,
- format_bem_documents,
format_guru_cards,
reify_citations,
)
from src.retrieve import retrieve_with_scores
-from tests.src.db.models.factories import ChunkFactory, DocumentFactory
+from tests.src.db.models.factories import ChunkFactory
from tests.src.test_retrieve import _create_chunks
@@ -101,86 +97,6 @@ def test__format_guru_to_accordion_html(app_config, db_session, enable_factory_c
assert "
Similarity Score: 0.92
" in html
-def test_format_bem_documents():
- docs = DocumentFactory.build_batch(4)
- for doc in docs:
- doc.name += "BEM 123"
-
- chunks_with_scores = [
- # This document is ignored because below chunks_shown_min_score
- ChunkWithScore(ChunkFactory.build(document=docs[0]), 0.90),
- # This document is excluded because chunks_shown_max_num = 2,
- # and it has the lowest score of the three documents with chunks over
- # the chunks_shown_min_score threshold
- ChunkWithScore(ChunkFactory.build(document=docs[1]), 0.92),
- # This document is included because a chunk puts
- # it over the chunks_shown_min_score threshold
- ChunkWithScore(ChunkFactory.build(document=docs[2]), 0.90),
- ChunkWithScore(ChunkFactory.build(document=docs[2]), 0.93),
- # This document is included, but only once
- # And it will be displayed first because it has the highest score
- ChunkWithScore(ChunkFactory.build(document=docs[3]), 0.94),
- ChunkWithScore(ChunkFactory.build(document=docs[3]), 0.95),
- ]
-
- html = format_bem_documents(
- chunks_shown_max_num=2,
- chunks_shown_min_score=0.91,
- chunks_with_scores=chunks_with_scores,
- subsections=to_subsections(chunks_with_scores),
- raw_response="",
- )
-
- assert docs[0].content not in html
- assert docs[1].content not in html
- assert docs[3].content in html
- assert "Citation 2" in html
- assert "Citation 3" not in html
-
-
-def test__add_ellipses():
- one_chunk = Chunk(num_splits=0, split_index=0, content="This is the only chunk.")
- assert _add_ellipses_for_bem(one_chunk) == "This is the only chunk."
-
- first_chunk = Chunk(num_splits=3, split_index=0, content="This is the first chunk of 3.")
- assert _add_ellipses_for_bem(first_chunk) == "This is the first chunk of 3. ..."
-
- middle_chunk = Chunk(num_splits=3, split_index=2, content="This is a chunk in between.")
- assert _add_ellipses_for_bem(middle_chunk) == "... This is a chunk in between. ..."
-
- last_chunk = Chunk(num_splits=3, split_index=3, content="This is the last chunk.")
- assert _add_ellipses_for_bem(last_chunk) == "... This is the last chunk."
-
- multiple_ellipses = Chunk(
- num_splits=3, split_index=0, content="This is a chunk with multiple ellipses......"
- )
- assert (
- _add_ellipses_for_bem(multiple_ellipses)
- == "This is a chunk with multiple ellipses...... ..."
- )
-
-
-def test_build_accordions_for_bem(chunks_with_scores):
- subsections = to_subsections(chunks_with_scores)
-
- config = BemFormattingConfig()
- assert build_accordions(subsections, "", config) == ""
- assert (
- build_accordions([], "Non-existant citation: (citation-0)", config)
- == ""
- )
-
- assert (
- build_accordions([], "List intro sentence: \n- item 1\n- item 2", config)
- == ""
- )
-
- chunks_with_scores[0].chunk.document.name = "BEM 100: Intro"
- chunks_with_scores[1].chunk.document.name = "BEM 101: Another"
- html = build_accordions(subsections, "Some real citations: (citation-1) (citation-2)", config)
- assert len(_unique_accordion_ids(html)) == 2
-
-
def test_reify_citations():
chunks = ChunkFactory.build_batch(2)
chunks[0].content = "This is the first chunk.\n\nWith two subsections"
@@ -192,44 +108,21 @@ def test_reify_citations():
== "This is a citation "
)
- assert (
- reify_citations(
- f"This is a citation ({subsections[0].id}) and another ({subsections[1].id}).",
- subsections,
- config,
- None,
- )
- == "This is a citation 1 and another 2 ."
+ result = reify_citations(
+ f"This is a citation ({subsections[0].id}) and another ({subsections[1].id}).",
+ subsections,
+ config,
+ None,
)
-
-def test_add_citation_links():
- chunks = ChunkFactory.build_batch(3)
-
- remapped_citations = {
- "citation-1": Subsection(chunk=chunks[0], text=chunks[0].content, id="1"),
- "citation-44": Subsection(chunk=chunks[1], text=chunks[1].content, id="3"),
- "citation-3": Subsection(chunk=chunks[2], text=chunks[2].content, id="23"),
- }
-
- config = FormattingConfig()
-
- assert (
- _add_citation_links(
- "This is a citation (citation-1). This is another value citation (citation-44). And another not found(citation-5).",
- remapped_citations,
- config,
- {
- "1": "599299",
- "2": "599300",
- "3": "599300",
- "4": "599301",
- "5": "599302",
- "44": "599303",
- },
- )
- == "This is a citation 1 . This is another value citation 3 . And another not found."
- )
+ # Check that citations were added
+ assert "" in result
+ assert "accordion_item" in result
+ assert "style='cursor:pointer'" in result
+ assert "data-id='a-None'" in result
+ # Check basic text structure remains
+ assert result.startswith("This is a citation")
+ assert "and another" in result
def test__get_breadcrumb_html():
@@ -249,44 +142,28 @@ def test__get_breadcrumb_html():
assert _get_breadcrumb_html(headings, "Doc name") == "Heading 2
"
-def test__get_citation_link():
- doc = DocumentFactory.build_batch(2)
- chunk_list = ChunkFactory.build_batch(2)
- doc[0].name = "BEM 234"
- doc[1].source = "webpage 1"
-
- chunk_list[0].document = doc[0]
- chunk_list[0].page_number = 3
-
- chunk_list[1].document = doc[1]
- chunk_list[1].page_number = 3
-
- bem_link = BemFormattingConfig().get_citation_link(
- Subsection("1", chunk_list[0], "Subsection 1")
- )
-
- assert "Open document to page 3" in bem_link
- assert "Source" not in bem_link
-
- web_link = FormattingConfig().get_citation_link(Subsection("2", chunk_list[1], "Subsection 1"))
- assert "page 3" not in web_link
- assert "Source" in web_link
-
-
def test_build_accordions(chunks_with_scores):
subsections = to_subsections(chunks_with_scores)
config = FormattingConfig()
+ # Test empty response
assert build_accordions(subsections, "", config) == ""
+
+ # Test non-existent citation
assert (
- build_accordions([], "Non-existant citation: (citation-0)", config)
- == ""
+ build_accordions([], "Non-existent citation: (citation-0)", config)
+ == ""
)
+ # Test markdown list formatting
assert (
build_accordions([], "List intro sentence: \n- item 1\n- item 2", config)
== ""
)
+ # Test real citations
html = build_accordions(subsections, "Some real citations: (citation-1) (citation-2)", config)
assert len(_unique_accordion_ids(html)) == 2
+ assert "Source(s)" in html
+ assert "usa-accordion__button" in html
+ assert "usa-accordion__content" in html
diff --git a/app/tests/src/test_ingest_bem_pdfs.py b/app/tests/src/test_ingest_bem_pdfs.py
deleted file mode 100644
index ba2d8bc3..00000000
--- a/app/tests/src/test_ingest_bem_pdfs.py
+++ /dev/null
@@ -1,195 +0,0 @@
-import logging
-
-import pytest
-from smart_open import open as smart_open
-from sqlalchemy import delete, select
-from unstructured.documents.elements import ElementMetadata, Text
-
-from src.db.models.document import Chunk, Document
-from src.ingest_bem_pdfs import (
- _enrich_texts,
- _get_bem_title,
- _ingest_bem_pdfs,
- _match_heading,
- _next_heading,
-)
-from src.ingestion.pdf_elements import EnrichedText
-from src.util.pdf_utils import Heading
-from tests.src.test_ingest_policy_pdfs import doc_attribs
-
-_707_PDF_PATH = "/app/tests/src/util/707.pdf"
-
-
-@pytest.fixture
-def policy_s3_file(mock_s3_bucket_resource):
- data = smart_open(_707_PDF_PATH, "rb")
- mock_s3_bucket_resource.put_object(Body=data, Key="707.pdf")
- return "s3://test_bucket/"
-
-
-@pytest.mark.parametrize("file_location", ["local", "s3"])
-def test__get_bem_title(file_location, policy_s3_file):
- file_path = policy_s3_file + "707.pdf" if file_location == "s3" else _707_PDF_PATH
- with smart_open(file_path, "rb") as file:
- assert _get_bem_title(file, file_path) == "BEM 707: TIME AND ATTENDANCE REVIEWS"
-
-
-@pytest.fixture
-def mock_outline():
- return [
- Heading(title="Overview", level=1, pageno=1),
- Heading(title="Family Independence Program (FIP)", level=2, pageno=1),
- Heading(title="Program Goal", level=2, pageno=1),
- Heading(title="Medical Assistance Program", level=2, pageno=2),
- Heading(title="Program Goal", level=2, pageno=2),
- Heading(title="Tertiary Program Goal", level=3, pageno=2),
- Heading(title="4th Program Goal", level=4, pageno=2),
- Heading(title="Test Level 2", level=2, pageno=2),
- ]
-
-
-@pytest.fixture
-def mock_elements():
- return [
- Text(text="OVERVIEW", metadata=ElementMetadata(page_number=1)),
- Text(text="Family Independence Program (FIP)", metadata=ElementMetadata(page_number=1)),
- Text(text="Program Goal", metadata=ElementMetadata(page_number=1)),
- Text(text="Tertiary Program Goal", metadata=ElementMetadata(page_number=2)),
- Text(text="Test Level 1", metadata=ElementMetadata(page_number=2)),
- ]
-
-
-def chunk_matched(chunks: list[Chunk], content: str):
- return next(c for c in chunks if content in c.content)
-
-
-@pytest.mark.parametrize("file_location", ["local", "s3"])
-def test__ingest_bem_pdfs(caplog, app_config, db_session, policy_s3_file, file_location):
- db_session.execute(delete(Document))
-
- with caplog.at_level(logging.INFO):
- if file_location == "local":
- _ingest_bem_pdfs(
- db_session, "/app/tests/src/util/", doc_attribs, should_save_json=False
- )
- else:
- _ingest_bem_pdfs(db_session, policy_s3_file, doc_attribs, should_save_json=False)
-
- assert any(text.startswith("Processing file: ") for text in caplog.messages)
-
- document = db_session.execute(select(Document)).one()[0]
- assert document.dataset == "test_dataset"
- assert document.program == "test_benefit_program"
- assert document.region == "Michigan"
-
- assert document.name == "BEM 707: TIME AND ATTENDANCE REVIEWS"
-
- assert "In order to be eligible to bill and receive payments, child " in document.content
-
- first_chunk = chunk_matched(
- document.chunks, "In order to be eligible to bill and receive payments, child"
- )
- assert first_chunk.headings == ["Overview"]
- assert first_chunk.page_number == 1
-
- second_chunk = chunk_matched(
- document.chunks, "Rule violations include, but are not limited to:\n-"
- )
- assert second_chunk.headings == ["Rule Violations"]
- assert second_chunk.page_number == 1
-
- in_second_chunk = chunk_matched(
- document.chunks, "Failure to maintain time and attendance records."
- )
- assert in_second_chunk.headings == ["Rule Violations"]
- assert in_second_chunk.page_number == 1
-
- assert second_chunk.content == in_second_chunk.content
-
- list_type_chunk = chunk_matched(
- document.chunks,
- "The following are examples of IPVs:\n"
- "- Billing for children while they are in school.\n"
- "- Two instances of failing to respond to requests for records.\n"
- "- Two instances of providing care in the wrong location.\n"
- "- Billing for children no longer in care.\n"
- "- Knowingly billing for children not in care or more hours than children were in care.\n"
- "- Maintaining records that do not accurately reflect the time children were in care.",
- )
- assert list_type_chunk.headings == [
- "Time and Attendance Review Process",
- "Intentional Program Violations",
- ]
- assert list_type_chunk.page_number == 2
-
- bold_styled_chunk = chunk_matched(
- document.chunks,
- "Providers determined to have committed an IPV may serve the following penalties:\n"
- "- First occurrence - six month disqualification. The closure reason will be **CDC not eligible due to 6 month penalty period**.\n"
- "- Second occurrence - twelve month disqualification. The closure reason will be **CDC not eligible due to 12 month penalty period.**\n"
- "- Third occurrence - lifetime disqualification. The closure reason will be **CDC not eligible due to lifetime penalty.**",
- )
- assert bold_styled_chunk
-
- title_chunk = chunk_matched(document.chunks, "**CDC**\n\nThe Child Care and Development Block")
- assert title_chunk.headings == ["legal base"]
- assert title_chunk.page_number == 4
-
-
-def test__enrich_text():
- with smart_open(_707_PDF_PATH, "rb") as file:
- enriched_text_list = _enrich_texts(file)
-
- assert len(enriched_text_list) == 40
- first_enriched_text_item = enriched_text_list[0]
- assert isinstance(first_enriched_text_item, EnrichedText)
- assert first_enriched_text_item.headings == [Heading(title="Overview", level=1, pageno=1)]
- assert first_enriched_text_item.type == "NarrativeText"
- assert first_enriched_text_item.page_number == 1
-
- other_enriched_text_item = enriched_text_list[13]
- assert other_enriched_text_item.headings == [
- Heading(title="Time and Attendance Review Process", level=1, pageno=1),
- Heading(title="Provider Errors", level=2, pageno=1),
- ]
- assert other_enriched_text_item.type == "ListItem"
- assert other_enriched_text_item.page_number == 2
-
-
-def test__match_heading(mock_outline):
- heading_with_extra_space = _match_heading(mock_outline, "Family Independence Program (FIP)", 1)
- assert heading_with_extra_space
-
- heading_on_wrong_page = _match_heading(mock_outline, "Family Independence Program (FIP)", 5)
- assert heading_on_wrong_page is None
-
-
-def test__next_heading(mock_outline, mock_elements):
- second_level_heading = _next_heading(
- mock_outline,
- mock_elements[1],
- mock_outline[:2],
- )
- assert second_level_heading == [
- Heading(title="Overview", level=1, pageno=1),
- Heading(title="Family Independence Program (FIP)", level=2, pageno=1),
- ]
-
- replaced_second_level = _next_heading(mock_outline, mock_elements[2], mock_outline[:2])
- assert replaced_second_level == [
- Heading(title="Overview", level=1, pageno=1),
- Heading(title="Program Goal", level=2, pageno=1),
- ]
-
- current_headings = [
- Heading(title="Overview", level=1, pageno=1),
- Heading(title="Program Goal", level=2, pageno=1),
- Heading(title="Tertiary Program Goal", level=3, pageno=2),
- Heading(title="4th Program Goal", level=4, pageno=2),
- ]
- element = Text(text="Test Level 2", metadata=ElementMetadata(page_number=2))
- dropped_level = _next_heading(mock_outline, element, current_headings)
- assert dropped_level == [
- Heading(title="Overview", level=1, pageno=1),
- Heading(title="Test Level 2", level=2, pageno=2),
- ]
diff --git a/app/tests/src/test_ingest_policy_pdfs.py b/app/tests/src/test_ingest_policy_pdfs.py
deleted file mode 100644
index cdc2d5ff..00000000
--- a/app/tests/src/test_ingest_policy_pdfs.py
+++ /dev/null
@@ -1,63 +0,0 @@
-import logging
-import math
-
-import pytest
-from smart_open import open
-from sqlalchemy import delete, select
-
-from src.db.models.document import Document
-from src.ingest_policy_pdfs import _get_bem_title, _ingest_policy_pdfs
-
-
-@pytest.fixture
-def policy_s3_file(mock_s3_bucket_resource):
- data = open("/app/tests/docs/100.pdf", "rb")
- mock_s3_bucket_resource.put_object(Body=data, Key="100.pdf")
- return "s3://test_bucket/"
-
-
-doc_attribs = {
- "dataset": "test_dataset",
- "program": "test_benefit_program",
- "region": "Michigan",
-}
-
-
-@pytest.mark.parametrize("file_location", ["local", "s3"])
-def test__get_bem_title(file_location, policy_s3_file):
- file_path = policy_s3_file + "100.pdf" if file_location == "s3" else "/app/tests/docs/100.pdf"
- assert _get_bem_title(file_path) == "BEM 100: Introduction"
-
-
-@pytest.mark.parametrize("file_location", ["local", "s3"])
-def test__ingest_policy_pdfs(caplog, app_config, db_session, policy_s3_file, file_location):
- db_session.execute(delete(Document))
-
- with caplog.at_level(logging.INFO):
- if file_location == "local":
- _ingest_policy_pdfs(db_session, "/app/tests/docs/", doc_attribs)
- else:
- _ingest_policy_pdfs(db_session, policy_s3_file, doc_attribs)
-
- assert any(text.startswith("Processing pdf file:") for text in caplog.messages)
- document = db_session.execute(select(Document)).one()[0]
- assert document.dataset == "test_dataset"
- assert document.program == "test_benefit_program"
- assert document.region == "Michigan"
-
- assert document.name == "BEM 100: Introduction"
-
- # Document.content should be the full text
- assert "Temporary Assistance to Needy Families" in document.content
- assert "The Food Assistance Program" in document.content
-
- # The document should be broken into two chunks, which
- # have different content and different embeddings
- first_chunk, second_chunk = document.chunks
- assert "Temporary Assistance to Needy Families" in first_chunk.content
- assert "The Food Assistance Program" not in first_chunk.content
- assert math.isclose(first_chunk.mpnet_embedding[0], -0.7016304, rel_tol=1e-5)
-
- assert "Temporary Assistance to Needy Families" not in second_chunk.content
- assert "The Food Assistance Program" in second_chunk.content
- assert math.isclose(second_chunk.mpnet_embedding[0], -0.82242084, rel_tol=1e-3)
diff --git a/app/tests/src/util/test_bem_utils.py b/app/tests/src/util/test_bem_utils.py
deleted file mode 100644
index 19f5ed34..00000000
--- a/app/tests/src/util/test_bem_utils.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import pytest
-
-from src.util.bem_util import get_bem_url, replace_bem_with_link
-
-
-def test__get_bem_url():
- assert (
- get_bem_url("Please review BEM 123.")
- == "https://dhhs.michigan.gov/OLMWeb/ex/BP/Public/BEM/123.pdf"
- )
- assert (
- get_bem_url("The policy in BEM 123A has been updated.")
- == "https://dhhs.michigan.gov/OLMWeb/ex/BP/Public/BEM/123A.pdf"
- )
- with pytest.raises(ValueError):
- get_bem_url("This is not a valid case: BEM123.")
-
-
-def test__replace_bem_with_link():
- assert (
- replace_bem_with_link("Please review BEM 123.")
- == 'Please review BEM 123.'
- )
- assert (
- replace_bem_with_link("The policy in BEM 123A has been updated.")
- == 'The policy in BEM 123A has been updated.'
- )
- assert (
- replace_bem_with_link("Check both BEM 123 and BEM 500C.")
- == 'Check both BEM 123 and BEM 500C.'
- )
- assert (
- replace_bem_with_link("There is no matching pattern here.")
- == "There is no matching pattern here."
- )
- assert (
- replace_bem_with_link("This is not a valid case: BEM123.")
- == "This is not a valid case: BEM123."
- )