Skip to content

Commit

Permalink
Merge pull request #5 from dolead/rd-7634/fix_stat_crea_pipeline
Browse files Browse the repository at this point in the history
fix point in time
  • Loading branch information
NahidOulmi authored Oct 5, 2023
2 parents 8ac37e7 + e73387d commit 1884c7d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-e ../../bases/source-acceptance-test
-e ../../bases/connector-acceptance-test
-e .
elasticsearch
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
from datetime import datetime, timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import elasticsearch.exceptions
from airbyte_cdk.sources.streams.http.auth import NoAuth, HttpAuthenticator
from elasticsearch import Elasticsearch
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.core import IncrementalMixin
from airbyte_cdk.models import SyncMode
from requests.auth import AuthBase

"""
TODO: Most comments in this class are instructive and should be deleted after the source is implemented.
Expand All @@ -35,7 +33,9 @@
# Basic full refresh stream
class ElasticSearchV2Stream(HttpStream, ABC):
url_base = "http://aes-statistic01.prod.dld:9200"
client: Elasticsearch = Elasticsearch(url_base)
date_start = ""
pit = None

def __init__(self, date_start):
super().__init__()
Expand All @@ -60,6 +60,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
search_after = docs[len(docs) - 1].get("sort")
return {"search_after": search_after, "pit_id": pit_id}
else:
# Case when no more pages
try:
self.client.close_point_in_time(id=self.pit.body["id"])
except elasticsearch.exceptions.NotFoundError as e:
logging.info("Not PIT found")
return None

def request_headers(
Expand Down Expand Up @@ -93,7 +98,9 @@ def request_body_data(
else:
date_filter_start = stream_state.get("date")

# If this is a new sync
if next_page_token is None:
self.pit = self.client.open_point_in_time(index=self.index, keep_alive="1m")
payload = {
"query": {
"bool": {
Expand All @@ -110,7 +117,7 @@ def request_body_data(
},
"pit": {
"id": self.pit.body["id"],
"keep_alive": "5m"
"keep_alive": "1m"
},
"size": 10000,
"sort": [
Expand All @@ -120,6 +127,7 @@ def request_body_data(
]
}

# If this is the next page of a sync
else:
pit_id = next_page_token["pit_id"]
search_after = next_page_token["search_after"]
Expand Down Expand Up @@ -161,12 +169,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
data["_id"] = hit["_id"]
yield data


# Basic incremental stream
class IncrementalElasticSearchV2Stream(ElasticSearchV2Stream, ABC):
# point in time
pit = ""
client: Elasticsearch = Elasticsearch("http://aes-statistic01.prod.dld:9200")
date_start = ""

def __init__(self, date_start):
Expand Down Expand Up @@ -229,19 +235,13 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:

date_start = config["date_start"]

return [Creatives(date_start), Campaigns(date_start), Accounts(date_start)]

return [Campaigns(date_start), Accounts(date_start), Creatives(date_start)]

class Creatives(IncrementalElasticSearchV2Stream):
cursor_field = "date"
primary_key = "_id"
date_start = ""

def __init__(self, date_start):

super().__init__(date_start)
pit = self.client.open_point_in_time(index="statistics_ad_creative*", keep_alive="1m")
self.pit = pit
index = "statistics_ad_creative*"

def request_body_data(
self,
Expand All @@ -266,12 +266,7 @@ class Campaigns(IncrementalElasticSearchV2Stream):
primary_key = "_id"
pit = ""
client: Elasticsearch = Elasticsearch("http://aes-statistic01.prod.dld:9200")

def __init__(self, date_start):

super().__init__(date_start)
pit = self.client.open_point_in_time(index="statistics_campaign*", keep_alive="1m")
self.pit = pit
index = "statistics_campaign*"

def request_body_data(
self,
Expand All @@ -295,12 +290,7 @@ class Accounts(IncrementalElasticSearchV2Stream):
primary_key = "_id"
pit = ""
client: Elasticsearch = Elasticsearch("http://aes-statistic01.prod.dld:9200")

def __init__(self, date_start):

super().__init__(date_start)
pit = self.client.open_point_in_time(index="statistics_account*", keep_alive="1m")
self.pit = pit
index = "statistics_account*"

def request_body_data(
self,
Expand Down

0 comments on commit 1884c7d

Please sign in to comment.