diff --git a/airbyte-integrations/connectors/source-elastic-search-v2/requirements.txt b/airbyte-integrations/connectors/source-elastic-search-v2/requirements.txt index b7144ed32cb4..3724dd5b0b6d 100644 --- a/airbyte-integrations/connectors/source-elastic-search-v2/requirements.txt +++ b/airbyte-integrations/connectors/source-elastic-search-v2/requirements.txt @@ -1,3 +1,3 @@ --e ../../bases/source-acceptance-test +-e ../../bases/connector-acceptance-test -e . elasticsearch \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-elastic-search-v2/source_elastic_search_v2/source.py b/airbyte-integrations/connectors/source-elastic-search-v2/source_elastic_search_v2/source.py index 84b1c4073591..1334ea71259a 100644 --- a/airbyte-integrations/connectors/source-elastic-search-v2/source_elastic_search_v2/source.py +++ b/airbyte-integrations/connectors/source-elastic-search-v2/source_elastic_search_v2/source.py @@ -7,15 +7,13 @@ from datetime import datetime, timedelta from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +import elasticsearch.exceptions from airbyte_cdk.sources.streams.http.auth import NoAuth, HttpAuthenticator from elasticsearch import Elasticsearch import requests from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.streams.core import IncrementalMixin -from airbyte_cdk.models import SyncMode -from requests.auth import AuthBase """ TODO: Most comments in this class are instructive and should be deleted after the source is implemented. @@ -35,7 +33,9 @@ # Basic full refresh stream class ElasticSearchV2Stream(HttpStream, ABC): url_base = "http://aes-statistic01.prod.dld:9200" + client: Elasticsearch = Elasticsearch(url_base) date_start = "" + pit = None def __init__(self, date_start): super().__init__() @@ -60,6 +60,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, search_after = docs[len(docs) - 1].get("sort") return {"search_after": search_after, "pit_id": pit_id} else: + # Case when no more pages + try: + self.client.close_point_in_time(id=self.pit.body["id"]) + except elasticsearch.exceptions.NotFoundError as e: + logging.info("Not PIT found") return None def request_headers( @@ -93,7 +98,9 @@ def request_body_data( else: date_filter_start = stream_state.get("date") + # If this is a new sync if next_page_token is None: + self.pit = self.client.open_point_in_time(index=self.index, keep_alive="1m") payload = { "query": { "bool": { @@ -110,7 +117,7 @@ def request_body_data( }, "pit": { "id": self.pit.body["id"], - "keep_alive": "5m" + "keep_alive": "1m" }, "size": 10000, "sort": [ @@ -120,6 +127,7 @@ def request_body_data( ] } + # If this is the next page of a sync else: pit_id = next_page_token["pit_id"] search_after = next_page_token["search_after"] @@ -161,12 +169,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp data["_id"] = hit["_id"] yield data - # Basic incremental stream class IncrementalElasticSearchV2Stream(ElasticSearchV2Stream, ABC): # point in time pit = "" - client: Elasticsearch = Elasticsearch("http://aes-statistic01.prod.dld:9200") date_start = "" def __init__(self, date_start): @@ -229,19 +235,13 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: date_start = config["date_start"] - return [Creatives(date_start), Campaigns(date_start), Accounts(date_start)] - + return [Campaigns(date_start), Accounts(date_start), Creatives(date_start)] class Creatives(IncrementalElasticSearchV2Stream): cursor_field = "date" primary_key = "_id" date_start = "" - - def __init__(self, date_start): - - super().__init__(date_start) - pit = self.client.open_point_in_time(index="statistics_ad_creative*", keep_alive="1m") - self.pit = pit + index = "statistics_ad_creative*" def request_body_data( self, @@ -266,12 +266,7 @@ class Campaigns(IncrementalElasticSearchV2Stream): primary_key = "_id" pit = "" client: Elasticsearch = Elasticsearch("http://aes-statistic01.prod.dld:9200") - - def __init__(self, date_start): - - super().__init__(date_start) - pit = self.client.open_point_in_time(index="statistics_campaign*", keep_alive="1m") - self.pit = pit + index = "statistics_campaign*" def request_body_data( self, @@ -295,12 +290,7 @@ class Accounts(IncrementalElasticSearchV2Stream): primary_key = "_id" pit = "" client: Elasticsearch = Elasticsearch("http://aes-statistic01.prod.dld:9200") - - def __init__(self, date_start): - - super().__init__(date_start) - pit = self.client.open_point_in_time(index="statistics_account*", keep_alive="1m") - self.pit = pit + index = "statistics_account*" def request_body_data( self,