Skip to content

Commit

Permalink
Use POST search endpoints for contacts and conversations (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
viktigpetterr authored Oct 6, 2023
1 parent 2c8d9cb commit a5fcfbb
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ tap_intercom/.vscode/settings.json
.DS_Store
test_configuration.py
tap_target_commands.sh
*/.idea/*
3 changes: 1 addition & 2 deletions tap_intercom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
import json
import singer
from singer import utils
from tap_intercom.stream import Stream
Expand All @@ -19,7 +18,7 @@ def sync(config: Dict, state: Optional[Dict] = None):
stream = Stream(config)

for tap_stream_id in STREAMS:
LOGGER.info(f"syncing {stream}")
LOGGER.info(f"syncing {tap_stream_id}")
stream.do_sync(tap_stream_id=tap_stream_id, state=state)


Expand Down
101 changes: 89 additions & 12 deletions tap_intercom/intercom.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ def scroll_companies(self):
def call_scroll_api(self, url, params={}):
response = self.SESSION.get(
url,
headers={
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
},
headers=self.__make_header(),
params=params,
)
LOGGER.debug(response.url)
Expand All @@ -91,11 +88,8 @@ def get_records(self, tap_stream_id):
page_size = 60
pagination_path = ["pages", "next"]

if tap_stream_id in ["conversations", "segments"]:
if tap_stream_id == "conversations":
data_field = tap_stream_id
if tap_stream_id == "contacts":
pagination_path.append("starting_after")
page_size = 150

yield from self.__get(
page_size=page_size,
Expand Down Expand Up @@ -183,12 +177,95 @@ def paginate(
def call_api(self, url, params={}):
response = self.SESSION.get(
url,
headers={
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
},
headers=self.__make_header(),
params=params,
)
LOGGER.debug(response.url)
response.raise_for_status()
return response.json()

@backoff.on_exception(
backoff.expo,
(
requests.exceptions.RequestException,
requests.exceptions.HTTPError,
ratelimit.exception.RateLimitException,
JSONDecodeError,
),
max_tries=20,
factor=5,
max_time=60 * 10,
giveup=_is_internal_server_error,
)
@limits(calls=1000, period=ONE_MINUTE)
def call_search_api(self, url, params={}, json=None):
response = self.SESSION.post(
url,
headers=self.__make_header(),
params=params,
json=json,
)
LOGGER.debug(response.url)
response.raise_for_status()
return response.json()

def search(self, tap_stream_id, start_date, end_date):
url = f"{self.BASE_URL}/{tap_stream_id}/search"

data_field = tap_stream_id
if tap_stream_id == "contacts":
data_field = "data"

# For more options:
# https://developers.intercom.com/docs/references/rest-api/api.intercom.io/Contacts/SearchContacts/
per_page = 150
pagination = {"page": 0, "per_page": per_page}
json = {
"query": {
"operator": "OR",
"value": [
{
"operator": "AND",
"value": [
{
"field": "updated_at",
"operator": ">",
"value": start_date.timestamp(),
},
{
"field": "updated_at",
"operator": "<",
"value": end_date.timestamp(),
},
],
},
{
"field": "updated_at",
"operator": "=",
"value": start_date.timestamp(),
},
],
},
"sort": {"field": "updated_at", "order": "ascending"},
"pagination": pagination,
}
pagination_path = ["pages", "next"]
replication_path = ["updated_at"]
while True:
response_data = self.call_search_api(url, json=json)
if response_data:
records = self.get_value(response_data, [data_field])
for record in records:
replication_value = self.get_value(record, replication_path)
yield record, self.unixseconds_to_datetime(replication_value)
pagination = self.get_value(response_data, pagination_path)
if not pagination or pagination == "null":
return
pagination["per_page"] = per_page
json["pagination"] = pagination

def __make_header(self):
return {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
}
6 changes: 4 additions & 2 deletions tap_intercom/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, config: Dict):
self.intercom = Intercom(config["access_token"])

def do_sync(self, tap_stream_id: str, state: Optional[datetime] = None):

latest_bookmark = None
start_date, end_date = self.__get_start_end(
state=state, tap_stream_id=tap_stream_id
Expand All @@ -33,11 +32,14 @@ def do_sync(self, tap_stream_id: str, state: Optional[datetime] = None):
# special "scroll" endpoint.
if tap_stream_id == "companies":
data = self.intercom.scroll_companies()
# To increase performance of fetching, a POST search
# endpoint is used because it offers more fine-grained filtering.
elif tap_stream_id in ["contacts", "conversations"]:
data = self.intercom.search(tap_stream_id, start_date, end_date)
else:
data = self.intercom.get_records(tap_stream_id)

for record, replication_value in data:

if replication_value and (
start_date >= replication_value or end_date <= replication_value
):
Expand Down

0 comments on commit a5fcfbb

Please sign in to comment.