-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
split up the Dune Client class into multiple API components
- Loading branch information
Showing
9 changed files
with
574 additions
and
498 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
"""" | ||
Basic Dune Client Class responsible for refreshing Dune Queries | ||
Framework built on Dune's API Documentation | ||
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a | ||
""" | ||
from __future__ import annotations | ||
|
||
import logging.config | ||
from json import JSONDecodeError | ||
from typing import Dict, Optional, Any | ||
|
||
import requests | ||
from requests import Response | ||
|
||
|
||
# pylint: disable=too-few-public-methods | ||
class BaseDuneClient: | ||
""" | ||
A Base Client for Dune which sets up default values | ||
and provides some convenient functions to use in other clients | ||
""" | ||
|
||
BASE_URL = "https://api.dune.com" | ||
DEFAULT_TIMEOUT = 10 | ||
|
||
def __init__( | ||
self, api_key: str, client_version: str = "v1", performance: str = "medium" | ||
): | ||
self.token = api_key | ||
self.client_version = client_version | ||
self.performance = performance | ||
self.logger = logging.getLogger(__name__) | ||
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s") | ||
|
||
@property | ||
def api_version(self) -> str: | ||
"""Returns client version string""" | ||
return f"/api/{self.client_version}" | ||
|
||
def default_headers(self) -> Dict[str, str]: | ||
"""Return default headers containing Dune Api token""" | ||
return {"x-dune-api-key": self.token} | ||
|
||
|
||
class BaseRouter(BaseDuneClient): | ||
"""Extending the Base Client with elementary api routing""" | ||
|
||
def _handle_response(self, response: Response) -> Any: | ||
"""Generic response handler utilized by all Dune API routes""" | ||
try: | ||
# Some responses can be decoded and converted to DuneErrors | ||
response_json = response.json() | ||
self.logger.debug(f"received response {response_json}") | ||
return response_json | ||
except JSONDecodeError as err: | ||
# Others can't. Only raise HTTP error for not decodable errors | ||
response.raise_for_status() | ||
raise ValueError("Unreachable since previous line raises") from err | ||
|
||
def _route_url(self, route: str) -> str: | ||
return f"{self.BASE_URL}{self.api_version}{route}" | ||
|
||
def _get( | ||
self, | ||
route: str, | ||
params: Optional[Any] = None, | ||
raw: bool = False, | ||
) -> Any: | ||
"""Generic interface for the GET method of a Dune API request""" | ||
url = self._route_url(route) | ||
self.logger.debug(f"GET received input url={url}") | ||
response = requests.get( | ||
url=url, | ||
headers=self.default_headers(), | ||
timeout=self.DEFAULT_TIMEOUT, | ||
params=params, | ||
) | ||
if raw: | ||
return response | ||
return self._handle_response(response) | ||
|
||
def _post(self, route: str, params: Optional[Any] = None) -> Any: | ||
"""Generic interface for the POST method of a Dune API request""" | ||
url = self._route_url(route) | ||
self.logger.debug(f"POST received input url={url}, params={params}") | ||
response = requests.post( | ||
url=url, | ||
json=params, | ||
headers=self.default_headers(), | ||
timeout=self.DEFAULT_TIMEOUT, | ||
) | ||
return self._handle_response(response) | ||
|
||
def _patch(self, route: str, params: Any) -> Any: | ||
"""Generic interface for the PATCH method of a Dune API request""" | ||
url = self._route_url(route) | ||
self.logger.debug(f"PATCH received input url={url}, params={params}") | ||
response = requests.request( | ||
method="PATCH", | ||
url=url, | ||
json=params, | ||
headers={"x-dune-api-key": self.token}, | ||
timeout=self.DEFAULT_TIMEOUT, | ||
) | ||
return self._handle_response(response) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
""" | ||
Implementation of all Dune API query execution and get results routes. | ||
Further Documentation: | ||
execution: https://dune.com/docs/api/api-reference/execute-queries/ | ||
get results: https://dune.com/docs/api/api-reference/get-results/ | ||
""" | ||
from io import BytesIO | ||
from typing import Optional | ||
|
||
from deprecated import deprecated | ||
|
||
from dune_client.api.base import BaseRouter | ||
from dune_client.models import ( | ||
ExecutionResponse, | ||
ExecutionStatusResponse, | ||
ResultsResponse, | ||
ExecutionResultCSV, | ||
DuneError, | ||
) | ||
from dune_client.query import QueryBase | ||
|
||
|
||
class ExecutionAPI(BaseRouter): | ||
""" | ||
Query execution and result fetching functions. | ||
""" | ||
|
||
def execute_query( | ||
self, query: QueryBase, performance: Optional[str] = None | ||
) -> ExecutionResponse: | ||
"""Post's to Dune API for execute `query`""" | ||
params = query.request_format() | ||
params["performance"] = performance or self.performance | ||
|
||
self.logger.info( | ||
f"executing {query.query_id} on {performance or self.performance} cluster" | ||
) | ||
response_json = self._post( | ||
route=f"/query/{query.query_id}/execute", | ||
params=params, | ||
) | ||
try: | ||
return ExecutionResponse.from_dict(response_json) | ||
except KeyError as err: | ||
raise DuneError(response_json, "ExecutionResponse", err) from err | ||
|
||
def cancel_execution(self, job_id: str) -> bool: | ||
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)""" | ||
response_json = self._post( | ||
route=f"/execution/{job_id}/cancel", | ||
params=None, | ||
) | ||
try: | ||
# No need to make a dataclass for this since it's just a boolean. | ||
success: bool = response_json["success"] | ||
return success | ||
except KeyError as err: | ||
raise DuneError(response_json, "CancellationResponse", err) from err | ||
|
||
def get_execution_status(self, job_id: str) -> ExecutionStatusResponse: | ||
"""GET status from Dune API for `job_id` (aka `execution_id`)""" | ||
response_json = self._get(route=f"/execution/{job_id}/status") | ||
try: | ||
return ExecutionStatusResponse.from_dict(response_json) | ||
except KeyError as err: | ||
raise DuneError(response_json, "ExecutionStatusResponse", err) from err | ||
|
||
def get_execution_results(self, job_id: str) -> ResultsResponse: | ||
"""GET results from Dune API for `job_id` (aka `execution_id`)""" | ||
response_json = self._get(route=f"/execution/{job_id}/results") | ||
try: | ||
return ResultsResponse.from_dict(response_json) | ||
except KeyError as err: | ||
raise DuneError(response_json, "ResultsResponse", err) from err | ||
|
||
def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV: | ||
""" | ||
GET results in CSV format from Dune API for `job_id` (aka `execution_id`) | ||
this API only returns the raw data in CSV format, it is faster & lighterweight | ||
use this method for large results where you want lower CPU and memory overhead | ||
if you need metadata information use get_results() or get_status() | ||
""" | ||
route = f"/execution/{job_id}/results/csv" | ||
url = self._route_url(f"/execution/{job_id}/results/csv") | ||
self.logger.debug(f"GET CSV received input url={url}") | ||
response = self._get(route=route, raw=True) | ||
response.raise_for_status() | ||
return ExecutionResultCSV(data=BytesIO(response.content)) | ||
|
||
####################### | ||
# Deprecated Functions: | ||
####################### | ||
@deprecated(version="1.2.1", reason="Please use execute_query") | ||
def execute( | ||
self, query: QueryBase, performance: Optional[str] = None | ||
) -> ExecutionResponse: | ||
"""Post's to Dune API for execute `query`""" | ||
return self.execute_query(query, performance) | ||
|
||
@deprecated(version="1.2.1", reason="Please use get_execution_status") | ||
def get_status(self, job_id: str) -> ExecutionStatusResponse: | ||
"""GET status from Dune API for `job_id` (aka `execution_id`)""" | ||
return self.get_execution_status(job_id) | ||
|
||
@deprecated(version="1.2.1", reason="Please use get_execution_results") | ||
def get_result(self, job_id: str) -> ResultsResponse: | ||
"""GET results from Dune API for `job_id` (aka `execution_id`)""" | ||
return self.get_execution_results(job_id) | ||
|
||
@deprecated(version="1.2.1", reason="Please use get_execution_results_csv") | ||
def get_result_csv(self, job_id: str) -> ExecutionResultCSV: | ||
""" | ||
GET results in CSV format from Dune API for `job_id` (aka `execution_id`) | ||
this API only returns the raw data in CSV format, it is faster & lighterweight | ||
use this method for large results where you want lower CPU and memory overhead | ||
if you need metadata information use get_results() or get_status() | ||
""" | ||
return self.get_execution_results_csv(job_id) |
Oops, something went wrong.