From a511080854cc6d75bde27f2b5d92a0271974a6e5 Mon Sep 17 00:00:00 2001 From: Yoom Lam Date: Fri, 10 Jan 2025 13:35:22 -0600 Subject: [PATCH] feat: Export markdown files during ingestion (#171) --- README.md | 11 + app/.gitignore | 6 +- app/Makefile | 17 + app/src/ingest_edd_web.py | 298 ++++++++++-------- app/src/ingest_la_county_policy.py | 13 +- app/src/ingestion/imagine_la/ingest.py | 43 ++- app/src/ingestion/scrape_la_policy.py | 2 +- app/src/ingestion/scrapy_runner.py | 2 +- app/src/util/ingest_utils.py | 61 +++- .../src/ingestion/imagine-la/test_ingest.py | 6 +- app/tests/src/test_ingest_edd_web.py | 26 +- app/tests/src/test_ingest_la_county_policy.py | 32 +- app/tests/src/test_ingest_utils.py | 10 +- 13 files changed, 354 insertions(+), 173 deletions(-) diff --git a/README.md b/README.md index e44256d3..8a3d0bdf 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,17 @@ poetry run scrape-edd-web To load into the vector database, see sections above but use `make ingest-edd-web DATASET_ID="CA EDD" BENEFIT_PROGRAM=employment BENEFIT_REGION=California FILEPATH=src/ingestion/edd_scrapings.json`. + +### To Skip Access to the DB + +For dry-runs or exporting of markdown files, avoid reading and writing to the DB during ingestion by adding the `--skip_db` argument like so: +``` +make ingest-edd-web DATASET_ID="CA EDD test" BENEFIT_PROGRAM=employment BENEFIT_REGION=California FILEPATH=src/ingestion/edd_scrapings.json INGEST_ARGS="--skip_db" +``` + +See PR #171 for other examples. + + ## Batch processing To have answers generated for multiple questions at once, create a .csv file with a `question` column, for example: diff --git a/app/.gitignore b/app/.gitignore index e74a30c5..b89eb804 100644 --- a/app/.gitignore +++ b/app/.gitignore @@ -43,5 +43,7 @@ documents/ .chainlit/translations/*.json !.chainlit/translations/en-US.json -/chunks-log/ -/src/ingestion/imagine_la/scrape/pages/* \ No newline at end of file +/src/ingestion/imagine_la/scrape/pages/* + +# intermediate markdown files during ingestion +/*_md/ diff --git a/app/Makefile b/app/Makefile index cbb4535a..d5d58e91 100644 --- a/app/Makefile +++ b/app/Makefile @@ -248,11 +248,28 @@ ifndef FILEPATH $(error FILEPATH is undefined) endif +scrape-edd-web: + $(PY_RUN_CMD) scrape-edd-web + ingest-edd-web: check-ingest-arguments $(PY_RUN_CMD) ingest-edd-web "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)" $(INGEST_ARGS) + +scrape-imagine-la: + cd src/ingestion/imagine_la/scrape; uv run --no-project scrape_content_hub.py https://socialbenefitsnavigator25.web.app/contenthub $(CONTENTHUB_PASSWORD) + ingest-imagine-la: check-ingest-arguments $(PY_RUN_CMD) ingest-imagine-la "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)" + +scrape-la-county-policy: + # Use playwright to scrape dynamic la_policy_nav_bar.html, required for the next step + cd src/ingestion/la_policy/scrape; uv run --no-project scrape_la_policy_nav_bar.py + + # Now that we have the expanded nav bar, scrape all the links in the nav bar + # Either should work: + # DEBUG_SCRAPINGS=true uv run --no-project scrape_la_policy.py &> out.log + $(PY_RUN_CMD) scrape-la-policy 2>&1 | tee out.log + ingest-la-county-policy: check-ingest-arguments $(PY_RUN_CMD) ingest-la-policy "$(DATASET_ID)" "$(BENEFIT_PROGRAM)" "$(BENEFIT_REGION)" "$(FILEPATH)" $(INGEST_ARGS) diff --git a/app/src/ingest_edd_web.py b/app/src/ingest_edd_web.py index 56e766f9..8a291655 100644 --- a/app/src/ingest_edd_web.py +++ b/app/src/ingest_edd_web.py @@ -6,7 +6,6 @@ from pathlib import Path from typing import Callable, Optional, Sequence -from nutree import Tree from smart_open import open as smart_open from src.adapters import db @@ -17,9 +16,11 @@ from src.util.ingest_utils import ( DefaultChunkingConfig, add_embeddings, + create_file_path, deconstruct_list, deconstruct_table, document_exists, + load_or_save_doc_markdown, process_and_ingest_sys_args, reconstruct_list, reconstruct_table, @@ -30,7 +31,7 @@ logger = logging.getLogger(__name__) -class SplitWithContextText: +class Split: def __init__( self, @@ -46,9 +47,26 @@ def __init__( else: self.text_to_encode = f"{context_str.strip()}\n\n" + remove_links(text) self.token_count = len(tokenize(self.text_to_encode)) + + # For debugging tree-based chunking self.chunk_id = "" self.data_ids = "" + @staticmethod + def from_dict(split_dict: dict[str, str]) -> "Split": + split = Split( + split_dict["headings"], + split_dict["text"], # text used for citations + "", + split_dict["text_to_encode"], # text used for embeddings + ) + split.chunk_id = split_dict.get("chunk_id", "") + split.data_ids = split_dict.get("data_ids", "") + return split + + +class HeadingBasedSplit(Split): + def add_if_within_limit(self, paragraph: str, delimiter: str = "\n\n") -> bool: new_text_to_encode = f"{self.text_to_encode}{delimiter}{remove_links(paragraph)}" token_count = len(tokenize(new_text_to_encode)) @@ -70,6 +88,8 @@ def _ingest_edd_web( db_session: db.Session, json_filepath: str, doc_attribs: dict[str, str], + md_base_dir: str = "edd_md", + skip_db: bool = False, resume: bool = False, ) -> None: def prep_json_item(item: dict[str, str]) -> dict[str, str]: @@ -79,37 +99,81 @@ def prep_json_item(item: dict[str, str]) -> dict[str, str]: return item common_base_url = "https://edd.ca.gov/en/" - ingest_json(db_session, json_filepath, doc_attribs, common_base_url, resume, prep_json_item) + ingest_json( + db_session, + json_filepath, + doc_attribs, + md_base_dir, + common_base_url, + skip_db, + resume, + prep_json_item, + ) + + +def _fix_input_markdown(markdown: str) -> str: + # Fix ellipsis text that causes markdown parsing errors + # '. . .' is parsed as sublists on the same line + # in https://edd.ca.gov/en/uibdg/total_and_partial_unemployment_tpu_5/ + markdown = markdown.replace(". . .", "...") + + # Nested sublist '* + California's New Application' created without parent list + # in https://edd.ca.gov/en/about_edd/eddnext + markdown = markdown.replace("* + ", " + ") + + # Blank sublist '* ###" in https://edd.ca.gov/en/unemployment/Employer_Information/ + # Tab labels are parsed into list items with headings; remove them + markdown = re.sub(r"^\s*\* #+", "", markdown, flags=re.MULTILINE) + + # Blank sublist '* +" in https://edd.ca.gov/en/unemployment/Employer_Information/ + # Empty sublist '4. * ' in https://edd.ca.gov/en/about_edd/your-benefit-payment-options/ + # Remove empty nested sublists + markdown = re.sub( + r"^\s*(\w+\.|\*|\+|\-) (\w+\.|\*|\+|\-)\s*$", "", markdown, flags=re.MULTILINE + ) + return markdown def ingest_json( db_session: db.Session, json_filepath: str, doc_attribs: dict[str, str], + md_base_dir: str, common_base_url: str, + skip_db: bool = False, resume: bool = False, prep_json_item: Callable[[dict[str, str]], dict[str, str]] = lambda x: x, ) -> None: - json_items = load_json_items(db_session, json_filepath, doc_attribs, resume) + json_items = load_json_items(db_session, json_filepath, doc_attribs, skip_db, resume) for item in json_items: item = prep_json_item(item) - # First, split all json_items into chunks (fast) to debug any issues quickly - all_chunks = _create_chunks(json_items, doc_attribs, common_base_url) - # Then save to DB, which is slow since embeddings are computed - save_to_db(db_session, resume, all_chunks) + # First, chunk all json_items into splits (fast) to debug any issues quickly + all_splits = _chunk_into_splits_from_json(md_base_dir, json_items, doc_attribs, common_base_url) + + if skip_db: + logger.info("Skip saving to DB") + else: + # Then save to DB, which is slow since embeddings are computed + save_to_db(db_session, resume, all_splits) def load_json_items( - db_session: db.Session, json_filepath: str, doc_attribs: dict[str, str], resume: bool = False + db_session: db.Session, + json_filepath: str, + doc_attribs: dict[str, str], + skip_db: bool = False, + resume: bool = False, ) -> Sequence[dict[str, str]]: with smart_open(json_filepath, "r", encoding="utf-8") as json_file: json_items = json.load(json_file) def verbose_document_exists(item: dict[str, str]) -> bool: - if document_exists(db_session, item["url"], doc_attribs): - logger.info("Skipping -- document already exists: %s", item["url"]) + if skip_db: + logger.debug("Skip DB lookup for %s", item["url"]) + elif document_exists(db_session, item["url"], doc_attribs): + logger.info("Skipping -- document already exists in DB: %s", item["url"]) return True return False @@ -122,17 +186,28 @@ def verbose_document_exists(item: dict[str, str]) -> bool: def save_to_db( db_session: db.Session, resume: bool, - all_chunks: Sequence[tuple[Document, Sequence[Chunk], Sequence[SplitWithContextText]]], + all_splits: Sequence[tuple[Document, Sequence[Split]]], ) -> None: - for document, chunks, splits in all_chunks: - if not chunks: + for document, splits in all_splits: + if not splits: logger.warning("No chunks for %r", document.source) continue logger.info("Adding embeddings for %r", document.source) - # Next, add embeddings to each chunk (slow) + chunks = [ + Chunk( + document=document, + content=split.text, + headings=split.headings, + num_splits=len(splits), + split_index=index, + tokens=split.token_count, + ) + for index, split in enumerate(splits) + ] + # Add embedding of text_to_encode to each chunk (slow) add_embeddings(chunks, [s.text_to_encode for s in splits]) - logger.info("Embedded webpage across %d chunks: %r", len(chunks), document.name) + logger.info(" Embedded webpage across %d chunks: %r", len(chunks), document.name) # Then, add to the database db_session.add(document) @@ -141,35 +216,48 @@ def save_to_db( db_session.commit() -def _create_chunks( - json_items: Sequence[dict[str, str]], doc_attribs: dict[str, str], common_base_url: str -) -> Sequence[tuple[Document, Sequence[Chunk], Sequence[SplitWithContextText]]]: +def _chunk_into_splits_from_json( + md_base_dir: str, + json_items: Sequence[dict[str, str]], + doc_attribs: dict[str, str], + common_base_url: str, +) -> Sequence[tuple[Document, Sequence[Split]]]: urls_processed: set[str] = set() result = [] for item in json_items: - if item["url"] in urls_processed: + assert "url" in item, f"Item {item['url']} has no url" + url = item["url"] + if url in urls_processed: # Workaround for duplicate items from web scraping - logger.warning("Skipping duplicate URL: %s", item["url"]) + logger.warning("Skipping duplicate URL: %s", url) continue - logger.info("Processing: %s", item["url"]) - urls_processed.add(item["url"]) + logger.info("Processing: %s", url) + urls_processed.add(url) + + assert "title" in item, f"Item {url} has no title" + assert "markdown" in item, f"Item {url} has no markdown content" + document = Document(name=item["title"], content=item["markdown"], source=url, **doc_attribs) - assert "markdown" in item, f"Item {item['url']} has no markdown content" - content = item["markdown"] + file_path = create_file_path(md_base_dir, common_base_url, url) + load_or_save_doc_markdown(file_path, document) + + chunks_file_path = f"{file_path}.splits.json" + if os.path.exists(chunks_file_path): + # Load the splits from the file in case they've been manually edited + splits_dicts = json.loads(Path(chunks_file_path).read_text(encoding="utf-8")) + splits: Sequence[Split] = [Split.from_dict(split_dict) for split_dict in splits_dicts] + logger.info(" Loaded %d splits from file: %r", len(splits), chunks_file_path) + else: + splits = _chunk_page(document) + logger.info(" Chunked into %d splits: %r", len(splits), document.name) + _save_splits_to_files(chunks_file_path, url, splits) - assert "title" in item, f"Item {item['url']} has no title" - title = item["title"] - document = Document(name=title, content=content, source=item["url"], **doc_attribs) - if os.path.exists("SAVE_CHUNKS"): - _save_markdown_to_file(document, common_base_url) - chunks, splits = _chunk_page(document, common_base_url) - logger.info("Split into %d chunks: %s", len(chunks), title) - result.append((document, chunks, splits)) + result.append((document, splits)) logger.info( - "Done splitting %d webpages into %d chunks", + "=== DONE splitting all %d webpages into a total of %d chunks", len(json_items), - sum(len(chunks) for _, chunks, _ in result), + sum(len(splits) for _, splits in result), ) return result @@ -177,39 +265,15 @@ def _create_chunks( USE_MARKDOWN_TREE = True -def _chunk_page( - document: Document, common_base_url: str -) -> tuple[Sequence[Chunk], Sequence[SplitWithContextText]]: +def _chunk_page(document: Document) -> Sequence[Split]: if USE_MARKDOWN_TREE: - splits = _create_splits_using_markdown_tree(document, common_base_url) + return _create_splits_using_markdown_tree(document) else: - splits = [] - assert document.content - for headings, text in split_markdown_by_heading( - f"# {document.name}\n\n" + document.content - ): - # Start a new split for each heading - section_splits = _split_heading_section(headings, text) - splits.extend(section_splits) - - chunks = [ - Chunk( - document=document, - content=split.text, - headings=split.headings, - num_splits=len(splits), - split_index=index, - tokens=split.token_count, - ) - for index, split in enumerate(splits) - ] - return chunks, splits + return _create_splits_using_headings(document) -def _create_splits_using_markdown_tree( - document: Document, common_base_url: str -) -> list[SplitWithContextText]: - splits: list[SplitWithContextText] = [] +def _create_splits_using_markdown_tree(document: Document) -> list[Split]: + splits: list[Split] = [] chunking_config = DefaultChunkingConfig() try: assert document.content @@ -218,20 +282,20 @@ def _create_splits_using_markdown_tree( ) tree_chunks = chunk_tree(tree, chunking_config) + # For debugging, save the tree to a file + if os.path.exists("DEBUG_TREE"): + assert document.source + tree_file_path = f"{document.source.rsplit('/', 1)[-1]}.tree" + Path(tree_file_path).write_text(tree.format(), encoding="utf-8") + for chunk in tree_chunks: - split = SplitWithContextText( - chunk.headings, chunk.markdown, chunk.context_str, chunk.embedding_str - ) + split = Split(chunk.headings, chunk.markdown, chunk.context_str, chunk.embedding_str) assert split.token_count == chunk.length splits.append(split) - if os.path.exists("SAVE_CHUNKS"): - # Add some extra info for debugging - split.chunk_id = chunk.id - split.data_ids = ", ".join(chunk.data_ids) - if os.path.exists("SAVE_CHUNKS"): - assert document.source - path = "chunks-log/" + document.source.removeprefix(common_base_url).rstrip("/") - _save_splits_to_files(f"{path}.json", document.source, document.content, splits, tree) + + # Add extra info for debugging + split.chunk_id = chunk.id + split.data_ids = ", ".join(chunk.data_ids) except (Exception, KeyboardInterrupt) as e: # pragma: no cover logger.error("Error chunking %s (%s): %s", document.name, document.source, e) logger.error(tree.format()) @@ -239,57 +303,36 @@ def _create_splits_using_markdown_tree( return splits -def _fix_input_markdown(markdown: str) -> str: - # Fix ellipsis text that causes markdown parsing errors - # '. . .' is parsed as sublists on the same line - # in https://edd.ca.gov/en/uibdg/total_and_partial_unemployment_tpu_5/ - markdown = markdown.replace(". . .", "...") +def _save_splits_to_files(file_path: str, uri: str, splits: Sequence[Split]) -> None: + logger.info(" Saving splits to %r", file_path) + splits_json = json.dumps([split.__dict__ for split in splits], indent=2) + Path(file_path).write_text(splits_json, encoding="utf-8") - # Nested sublist '* + California's New Application' created without parent list - # in https://edd.ca.gov/en/about_edd/eddnext - markdown = markdown.replace("* + ", " + ") + # Save prettified splits to a markdown file for manual inspection + with open(f"{os.path.splitext(file_path)[0]}.md", "w", encoding="utf-8") as file: + file.write(f"{uri} => {len(splits)} splits\n") + file.write("\n") + for split in splits: + file.write( + f"---\nchunk_id: {split.chunk_id}\nlength: {split.token_count}\nheadings: {split.headings}\n---\n" + ) + file.write(split.text) + file.write("\n====================================\n") + file.write("\n\n") - # Blank sublist '* ###" in https://edd.ca.gov/en/unemployment/Employer_Information/ - # Tab labels are parsed into list items with headings; remove them - markdown = re.sub(r"^\s*\* #+", "", markdown, flags=re.MULTILINE) - # Blank sublist '* +" in https://edd.ca.gov/en/unemployment/Employer_Information/ - # Empty sublist '4. * ' in https://edd.ca.gov/en/about_edd/your-benefit-payment-options/ - # Remove empty nested sublists - markdown = re.sub( - r"^\s*(\w+\.|\*|\+|\-) (\w+\.|\*|\+|\-)\s*$", "", markdown, flags=re.MULTILINE - ) - return markdown +# endregion +# region Splitting Markdown by Headings -def _save_markdown_to_file(document: Document, common_base_url: str) -> None: - assert document.source - file_path = "chunks-log/" + document.source.removeprefix(common_base_url).rstrip("/") - os.makedirs(os.path.dirname(file_path), exist_ok=True) - logger.info("Saving markdown to %r", f"{file_path}.md") +def _create_splits_using_headings(document: Document) -> Sequence[Split]: + splits = [] assert document.content - Path(f"{file_path}.md").write_text(document.content, encoding="utf-8") - - -def _save_splits_to_files( - file_path: str, uri: str, content: str, splits: list[SplitWithContextText], tree: Tree -) -> None: # pragma: no cover - logger.info("Saving chunks to %r", file_path) - os.makedirs(os.path.dirname(file_path), exist_ok=True) - with open(file_path, "w", encoding="utf-8") as file: - file.write(f"{uri} => {len(splits)} chunks\n") - file.write("\n") - for split in splits: - file.write(f">>> {split.chunk_id!r} (length {split.token_count}) {split.headings}\n") - file.write(split.text) - file.write("\n--------------------------------------------------\n") - file.write("\n\n") - json_str = json.dumps([split.__dict__ for split in splits], indent=2) - file.write(json_str) - file.write("\n\n") - file.write(tree.format()) - file.write("\n\n") - file.write(content) + for headings, text in split_markdown_by_heading(f"# {document.name}\n\n" + document.content): + # Start a new split for each heading + section_splits = _split_heading_section(headings, text) + splits.extend(section_splits) + return splits # MarkdownHeaderTextSplitter splits text by "\n" then calls aggregate_lines_to_chunks() to reaggregate @@ -297,7 +340,7 @@ def _save_splits_to_files( MarkdownHeaderTextSplitter_DELIMITER = " \n" -def _split_heading_section(headings: Sequence[str], text: str) -> list[SplitWithContextText]: +def _split_heading_section(headings: Sequence[str], text: str) -> list[HeadingBasedSplit]: # Add headings to the context_str; other context can also be added context_str = "\n".join(headings) logger.debug("New heading: %s", headings) @@ -321,7 +364,7 @@ def _split_heading_section(headings: Sequence[str], text: str) -> list[SplitWith flags=re.MULTILINE, ) - splits: list[SplitWithContextText] = [] + splits: list[HeadingBasedSplit] = [] # Split content by MarkdownHeaderTextSplitter_DELIMITER, then gather into the largest chunks # that tokenize to less than the max_seq_length paragraphs = text.split(MarkdownHeaderTextSplitter_DELIMITER) @@ -340,11 +383,11 @@ def _split_heading_section(headings: Sequence[str], text: str) -> list[SplitWith def _create_splits( headings: Sequence[str], context_str: str, - splits: list[SplitWithContextText], + splits: list[HeadingBasedSplit], paragraphs: Sequence[str], delimiter: str = "\n\n", ) -> None: - splits.append(SplitWithContextText(headings, paragraphs[0], context_str)) + splits.append(HeadingBasedSplit(headings, paragraphs[0], context_str)) logger.debug("Paragraph0: %r", paragraphs[0]) for paragraph in paragraphs[1:]: logger.debug("Paragraph: %r", paragraph) @@ -357,7 +400,7 @@ def _create_splits( else: logger.info("Split %i has %i tokens", len(splits), splits[-1].token_count) # Start new split since longer_split will exceed max_seq_length - splits.append(SplitWithContextText(headings, paragraph, context_str)) + splits.append(HeadingBasedSplit(headings, paragraph, context_str)) _split_large_text_block(headings, context_str, splits) for split in splits: @@ -367,7 +410,7 @@ def _create_splits( def _split_large_text_block( - headings: Sequence[str], context_str: str, splits: list[SplitWithContextText] + headings: Sequence[str], context_str: str, splits: list[HeadingBasedSplit] ) -> None: split = splits[-1] context_token_count = len(tokenize(f"{context_str}\n\n")) @@ -408,5 +451,8 @@ def _split_large_text_block( ) +# endregion + + def main() -> None: process_and_ingest_sys_args(sys.argv, logger, _ingest_edd_web) diff --git a/app/src/ingest_la_county_policy.py b/app/src/ingest_la_county_policy.py index b5815178..42242353 100644 --- a/app/src/ingest_la_county_policy.py +++ b/app/src/ingest_la_county_policy.py @@ -12,6 +12,8 @@ def _ingest_la_county_policy( db_session: db.Session, json_filepath: str, doc_attribs: dict[str, str], + md_base_dir: str = "la_policy_md", + skip_db: bool = False, resume: bool = False, ) -> None: def prep_json_item(item: dict[str, str]) -> dict[str, str]: @@ -20,7 +22,16 @@ def prep_json_item(item: dict[str, str]) -> dict[str, str]: return item common_base_url = "https://epolicy.dpss.lacounty.gov/epolicy/epolicy/server/general/projects_responsive/ePolicyMaster/mergedProjects/" - ingest_json(db_session, json_filepath, doc_attribs, common_base_url, resume, prep_json_item) + ingest_json( + db_session, + json_filepath, + doc_attribs, + md_base_dir, + common_base_url, + skip_db, + resume, + prep_json_item, + ) def main() -> None: diff --git a/app/src/ingestion/imagine_la/ingest.py b/app/src/ingestion/imagine_la/ingest.py index 0cd19fc3..02460a8c 100644 --- a/app/src/ingestion/imagine_la/ingest.py +++ b/app/src/ingestion/imagine_la/ingest.py @@ -1,6 +1,6 @@ import logging import sys -from typing import BinaryIO +from typing import Sequence from bs4 import BeautifulSoup from markdownify import markdownify as md @@ -12,7 +12,14 @@ from src.ingestion.markdown_chunking import ChunkingConfig, chunk_tree from src.ingestion.markdown_tree import create_markdown_tree from src.util.file_util import get_files -from src.util.ingest_utils import add_embeddings, process_and_ingest_sys_args, save_json, tokenize +from src.util.ingest_utils import ( + add_embeddings, + create_file_path, + load_or_save_doc_markdown, + process_and_ingest_sys_args, + save_json, + tokenize, +) from src.util.string_utils import remove_links logger = logging.getLogger(__name__) @@ -30,10 +37,11 @@ def text_length(self, text: str) -> int: def _parse_html( - file: BinaryIO, file_path: str, doc_attribs: dict[str, str] -) -> tuple[Document, list[Chunk], list[str]]: + md_base_dir: str, common_base_url: str, file_path: str, doc_attribs: dict[str, str] +) -> tuple[Document, Sequence[Chunk], Sequence[str]]: - file_contents = file.read() + with open(file_path, "r") as file: + file_contents = file.read() soup = BeautifulSoup(file_contents, "html.parser") doc_attribs["name"] = soup.find("h2").text.strip() @@ -61,6 +69,10 @@ def _parse_html( content += f"## {heading}\n\n{md(body)}\n\n" document.content = content + assert document.source + file_path = create_file_path(md_base_dir, common_base_url, document.source) + load_or_save_doc_markdown(file_path, document) + # Convert markdown to chunks tree = create_markdown_tree(content, doc_name=document.name, doc_source=document.source) tree_chunks = chunk_tree(tree, ImagineLaChunkingConfig()) @@ -70,6 +82,10 @@ def _parse_html( ] chunk_texts_to_encode = [remove_links(chunk.markdown) for chunk in tree_chunks] + chunks_file_path = f"{file_path}.chunks.json" + logger.info(" Saving chunks to %r", chunks_file_path) + save_json(chunks_file_path, chunks) + return document, chunks, chunk_texts_to_encode @@ -77,7 +93,8 @@ def _ingest_content_hub( db_session: db.Session, html_file_dir: str, doc_attribs: dict[str, str], - should_save_json: bool = True, + md_base_dir: str = "imagine_la_md", + skip_db: bool = False, ) -> None: file_list = sorted(get_files(html_file_dir)) @@ -87,18 +104,24 @@ def _ingest_content_hub( app_config.embedding_model, doc_attribs, ) + common_base_url = "https://socialbenefitsnavigator25.web.app/contenthub/" + + all_chunks: list[tuple[Document, Sequence[Chunk], Sequence[str]]] = [] for file_path in file_list: if not file_path.endswith(".html"): continue logger.info("Processing file: %s", file_path) - with open(file_path, "r") as file: - document, chunks, chunk_texts_to_encode = _parse_html(file, file_path, doc_attribs) + result = _parse_html(md_base_dir, common_base_url, file_path, doc_attribs) + all_chunks.append(result) + + if skip_db: + logger.info("Skip saving to DB") + else: + for document, chunks, chunk_texts_to_encode in all_chunks: add_embeddings(chunks, chunk_texts_to_encode) db_session.add(document) db_session.add_all(chunks) - if should_save_json: - save_json(file_path, chunks) def main() -> None: diff --git a/app/src/ingestion/scrape_la_policy.py b/app/src/ingestion/scrape_la_policy.py index e530b887..4cd6f808 100644 --- a/app/src/ingestion/scrape_la_policy.py +++ b/app/src/ingestion/scrape_la_policy.py @@ -22,7 +22,7 @@ def main() -> None: from .scrapy_runner import run - run(SPIDER_NAME, OUTPUT_JSON, debug=bool(os.environ["DEBUG_SCRAPINGS"])) + run(SPIDER_NAME, OUTPUT_JSON, debug=bool(os.environ.get("DEBUG_SCRAPINGS", False))) if __name__ == "__main__": diff --git a/app/src/ingestion/scrapy_runner.py b/app/src/ingestion/scrapy_runner.py index 8d6b5cd8..419faa6c 100644 --- a/app/src/ingestion/scrapy_runner.py +++ b/app/src/ingestion/scrapy_runner.py @@ -33,7 +33,7 @@ def run_spider(spider_name: str, output_json_filename: str) -> None: process.crawl(spider_name) process.start() # the script will block here until the crawling is finished - logger.info("Scraping results saved to %s", output_json_filename) + logger.info("Scraping results saved to %s", os.path.realpath(output_json_filename)) def postprocess_json(input_filename: str) -> None: diff --git a/app/src/util/ingest_utils.py b/app/src/util/ingest_utils.py index a37180ca..a1a82d08 100644 --- a/app/src/util/ingest_utils.py +++ b/app/src/util/ingest_utils.py @@ -2,11 +2,13 @@ import inspect import json import logging +import os import re from logging import Logger +from pathlib import Path from typing import Callable, Optional, Sequence -from smart_open import open +from smart_open import open as smart_open from sqlalchemy import and_, delete, select from sqlalchemy.sql import exists @@ -53,15 +55,22 @@ def process_and_ingest_sys_args(argv: list[str], logger: Logger, ingestion_call: parser.add_argument("benefit_region") parser.add_argument("file_path") parser.add_argument("--resume", action="store_true") + parser.add_argument("--skip_db", action="store_true") args = parser.parse_args(argv[1:]) + params = inspect.signature(ingestion_call).parameters if args.resume: - params = inspect.signature(ingestion_call).parameters if "resume" not in params: raise NotImplementedError( f"Ingestion function does not support `resume`: {ingestion_call}" ) logger.info("Enabled resuming from previous run.") + if args.skip_db: + if "skip_db" not in params: + raise NotImplementedError( + f"Ingestion function does not support `skip_db`: {ingestion_call}" + ) + logger.info("Skipping reading or writing to the DB.") doc_attribs = { "dataset": args.dataset_id, @@ -72,13 +81,15 @@ def process_and_ingest_sys_args(argv: list[str], logger: Logger, ingestion_call: with app_config.db_session() as db_session: if args.resume: - ingestion_call(db_session, args.file_path, doc_attribs, resume=args.resume) + ingestion_call( + db_session, args.file_path, doc_attribs, skip_db=args.skip_db, resume=args.resume + ) else: dropped = _drop_existing_dataset(db_session, args.dataset_id) if dropped: logger.warning("Dropped existing dataset %s", args.dataset_id) db_session.commit() - ingestion_call(db_session, args.file_path, doc_attribs) + ingestion_call(db_session, args.file_path, doc_attribs, skip_db=args.skip_db) db_session.commit() logger.info("Finished ingesting") @@ -249,12 +260,50 @@ def _join_up_to_max_seq_length( return chunks +def create_file_path(base_dir: str, common_base_url: str, source_url: str) -> str: + assert common_base_url.endswith("/") + relative_path = source_url.removeprefix(common_base_url) + assert not relative_path.startswith("/") + file_path = os.path.join(base_dir, relative_path) + + if file_path == "" or file_path.endswith("/"): + # Ensure that the file_path ends with a filename + file_path += "_index" + return file_path + + +def load_or_save_doc_markdown(file_path: str, document: Document) -> str: + md_file_path = f"{file_path}.md" + if os.path.exists(md_file_path): + # Load the markdown content from the file in case it's been manually edited + logger.info(" Loading markdown from file: %r", md_file_path) + document.content = Path(md_file_path).read_text(encoding="utf-8") + else: + logger.info(" Saving markdown to %r", md_file_path) + assert document.content + os.makedirs(os.path.dirname(md_file_path), exist_ok=True) + Path(md_file_path).write_text(document.content, encoding="utf-8") + return file_path + + def save_json(file_path: str, chunks: list[Chunk]) -> None: chunks_as_json = [chunk.to_json() for chunk in chunks] - - with open(file_path + ".json", "w") as file: + with smart_open(file_path, "w") as file: file.write(json.dumps(chunks_as_json)) + # Save prettified chunks to a markdown file for manual inspection + with smart_open(f"{os.path.splitext(file_path)[0]}.md", "w", encoding="utf-8") as file: + file.write(f"{len(chunks)} chunks\n") + file.write("\n") + for chunk in chunks: + if not chunk.tokens: + chunk.tokens = len(tokenize(chunk.content)) + + file.write(f"---\nlength: {chunk.tokens}\nheadings: {chunk.headings}\n---\n") + file.write(chunk.content) + file.write("\n====================================\n") + file.write("\n\n") + class DefaultChunkingConfig(ChunkingConfig): def __init__(self) -> None: diff --git a/app/tests/src/ingestion/imagine-la/test_ingest.py b/app/tests/src/ingestion/imagine-la/test_ingest.py index cf69a399..c7b9b241 100644 --- a/app/tests/src/ingestion/imagine-la/test_ingest.py +++ b/app/tests/src/ingestion/imagine-la/test_ingest.py @@ -49,11 +49,9 @@ def test__ingest_content_hub(caplog, app_config, db_session, s3_html, file_locat with caplog.at_level(logging.INFO): if file_location == "local": - _ingest_content_hub( - db_session, "/app/tests/docs/imagine_la/", doc_attribs, should_save_json=False - ) + _ingest_content_hub(db_session, "/app/tests/docs/imagine_la/", doc_attribs) else: - _ingest_content_hub(db_session, s3_html, doc_attribs, should_save_json=False) + _ingest_content_hub(db_session, s3_html, doc_attribs) assert any(text.startswith("Processing file: ") for text in caplog.messages) diff --git a/app/tests/src/test_ingest_edd_web.py b/app/tests/src/test_ingest_edd_web.py index db540e4d..67b8c36c 100644 --- a/app/tests/src/test_ingest_edd_web.py +++ b/app/tests/src/test_ingest_edd_web.py @@ -1,5 +1,6 @@ import json import logging +from tempfile import TemporaryDirectory import pytest from sqlalchemy import delete, select @@ -110,11 +111,11 @@ def test__ingest_edd( db_session.execute(delete(Document)) - with caplog.at_level(logging.WARNING): + with TemporaryDirectory(suffix="edd_md") as md_base_dir, caplog.at_level(logging.WARNING): if file_location == "local": - _ingest_edd_web(db_session, edd_web_local_file, doc_attribs) + _ingest_edd_web(db_session, edd_web_local_file, doc_attribs, md_base_dir=md_base_dir) else: - _ingest_edd_web(db_session, edd_web_s3_file, doc_attribs) + _ingest_edd_web(db_session, edd_web_s3_file, doc_attribs, md_base_dir=md_base_dir) documents = db_session.execute(select(Document).order_by(Document.name)).scalars().all() assert len(documents) == 4 @@ -224,17 +225,22 @@ def test__ingest_edd_using_md_tree(caplog, app_config, db_session, edd_web_local db_session.execute(delete(Document)) - with caplog.at_level(logging.WARNING): - _ingest_edd_web(db_session, edd_web_local_file, doc_attribs, resume=True) + with TemporaryDirectory(suffix="edd_md") as md_base_dir: + with caplog.at_level(logging.WARNING): + _ingest_edd_web( + db_session, edd_web_local_file, doc_attribs, md_base_dir=md_base_dir, resume=True + ) - check_database_contents(db_session, caplog) + check_database_contents(db_session, caplog) - # Re-ingesting the same data should not add any new documents - with caplog.at_level(logging.INFO): - _ingest_edd_web(db_session, edd_web_local_file, doc_attribs, resume=True) + # Re-ingesting the same data should not add any new documents + with caplog.at_level(logging.INFO): + _ingest_edd_web( + db_session, edd_web_local_file, doc_attribs, md_base_dir=md_base_dir, resume=True + ) skipped_logs = { - msg for msg in caplog.messages if msg.startswith("Skipping -- document already exists:") + msg for msg in caplog.messages if msg.startswith("Skipping -- document already exists") } assert len(skipped_logs) == 4 assert db_session.query(Document.id).count() == 4 diff --git a/app/tests/src/test_ingest_la_county_policy.py b/app/tests/src/test_ingest_la_county_policy.py index e767b431..9cd15ae3 100644 --- a/app/tests/src/test_ingest_la_county_policy.py +++ b/app/tests/src/test_ingest_la_county_policy.py @@ -1,5 +1,6 @@ import json import logging +from tempfile import TemporaryDirectory import pytest from sqlalchemy import delete @@ -40,17 +41,30 @@ def test_ingestion(caplog, app_config, db_session, la_county_policy_local_file): db_session.execute(delete(Document)) - with caplog.at_level(logging.WARNING): - _ingest_la_county_policy(db_session, la_county_policy_local_file, doc_attribs, resume=True) - - check_database_contents(db_session, caplog) - - # Re-ingesting the same data should not add any new documents - with caplog.at_level(logging.INFO): - _ingest_la_county_policy(db_session, la_county_policy_local_file, doc_attribs, resume=True) + with TemporaryDirectory(suffix="la_policy_md") as md_base_dir: + with caplog.at_level(logging.WARNING): + _ingest_la_county_policy( + db_session, + la_county_policy_local_file, + doc_attribs, + md_base_dir=md_base_dir, + resume=True, + ) + + check_database_contents(db_session, caplog) + + # Re-ingesting the same data should not add any new documents + with caplog.at_level(logging.INFO): + _ingest_la_county_policy( + db_session, + la_county_policy_local_file, + doc_attribs, + md_base_dir=md_base_dir, + resume=True, + ) skipped_logs = { - msg for msg in caplog.messages if msg.startswith("Skipping -- document already exists:") + msg for msg in caplog.messages if msg.startswith("Skipping -- document already exists") } assert len(skipped_logs) == 4 assert db_session.query(Document.id).count() == 4 diff --git a/app/tests/src/test_ingest_utils.py b/app/tests/src/test_ingest_utils.py index af20cc16..eb0a6529 100644 --- a/app/tests/src/test_ingest_utils.py +++ b/app/tests/src/test_ingest_utils.py @@ -83,6 +83,7 @@ def test_process_and_ingest_sys_args_calls_ingest(caplog): "program": "SNAP", "region": "Michigan", }, + skip_db=False, ) @@ -153,7 +154,9 @@ def test_process_and_ingest_sys_args_resume(db_session, caplog, enable_factory_c ) # Use an unmocked function so that the resume parameter is detectable by process_and_ingest_sys_args() - def ingest_with_resume(db_session, json_filepath, doc_attribs, resume=False) -> None: + def ingest_with_resume( + db_session, json_filepath, doc_attribs, skip_db=False, resume=False + ) -> None: logger.info("Ingesting with resume: %r", resume) DocumentFactory.create(dataset="CA EDD") @@ -278,8 +281,9 @@ def test__save_json(file_location, mock_s3_bucket_resource): if file_location == "s3" else os.path.join(tempfile.mkdtemp(), "test.pdf") ) - save_json(file_path, chunks) - saved_json = json.loads(open(file_path + ".json", "r").read()) + json_file = f"{file_path}.json" + save_json(json_file, chunks) + saved_json = json.loads(open(json_file, "r").read()) assert saved_json == [ { "id": str(chunks[0].id),