Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: export as parquet #3

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ export default function QueryControlDropdown(props) {
<FileExcelOutlinedIcon /> Download as Excel File
</QueryResultsLink>
</Menu.Item>
<Menu.Item>
<QueryResultsLink
fileType="parquet"
disabled={props.queryExecuting || !props.queryResult.getData || !props.queryResult.getData()}
query={props.query}
queryResult={props.queryResult}
embed={props.embed}
apiKey={props.apiKey}>
<FileOutlinedIcon /> Download as Parquet File
</QueryResultsLink>
</Menu.Item>
</Menu>
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ function visualizationWidgetMenuOptions({ widget, canEditDashboard, onParameters
"Download as Excel File"
)}
</Menu.Item>,
<Menu.Item key="download_parquet" disabled={isQueryResultEmpty}>
{!isQueryResultEmpty ? (
<Link href={downloadLink("parquet")} download={downloadName("parquet")} target="_self">
Download as Parquet File
</Link>
) : (
"Download as Parquet File"
)}
</Menu.Item>,
(canViewQuery || canEditParameters) && <Menu.Divider key="divider" />,
canViewQuery && (
<Menu.Item key="view_query">
Expand Down
11 changes: 11 additions & 0 deletions client/app/pages/queries/VisualizationEmbed.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ function VisualizationEmbedFooter({
<FileExcelOutlinedIcon /> Download as Excel File
</QueryResultsLink>
</Menu.Item>
<Menu.Item>
<QueryResultsLink
fileType="parquet"
query={query}
queryResult={queryResults}
apiKey={apiKey}
disabled={!queryResults || !queryResults.getData || !queryResults.getData()}
embed>
<FileExcelOutlinedIcon /> Download as Parquet File
</QueryResultsLink>
</Menu.Item>
</Menu>
);

Expand Down
63 changes: 39 additions & 24 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
import logging
import time

import unicodedata

from flask import make_response, request
from flask_login import current_user
from flask_restful import abort
from werkzeug.urls import url_quote

from redash import models, settings
from redash.handlers.base import BaseResource, get_object_or_404, record_event
from redash.models.parameterized_query import (
InvalidParameterError,
ParameterizedQuery,
QueryDetachedFromDataSourceError,
dropdown_values,
)
from redash.permissions import (
has_access,
not_view_only,
require_access,
require_permission,
require_any_of_permission,
require_permission,
view_only,
)
from redash.serializers import (
serialize_job,
serialize_query_result,
serialize_query_result_to_dsv,
serialize_query_result_to_parquet,
serialize_query_result_to_xlsx,
)
from redash.tasks import Job
from redash.tasks.queries import enqueue_query
from redash.utils import (
collect_parameters_from_request,
json_dumps,
utcnow,
to_filename,
)
from redash.models.parameterized_query import (
ParameterizedQuery,
InvalidParameterError,
QueryDetachedFromDataSourceError,
dropdown_values,
)
from redash.serializers import (
serialize_query_result,
serialize_query_result_to_dsv,
serialize_query_result_to_xlsx,
serialize_job,
utcnow,
)


Expand Down Expand Up @@ -119,9 +121,11 @@ def run_query(
current_user.id,
current_user.is_api_user(),
metadata={
"Username": repr(current_user)
if current_user.is_api_user()
else current_user.email,
"Username": (
repr(current_user)
if current_user.is_api_user()
else current_user.email
),
"query_id": query_id,
},
)
Expand Down Expand Up @@ -260,14 +264,14 @@ def options(self, query_id=None, query_result_id=None, filetype="json"):
self.add_cors_headers(headers)

if settings.ACCESS_CONTROL_REQUEST_METHOD:
headers[
"Access-Control-Request-Method"
] = settings.ACCESS_CONTROL_REQUEST_METHOD
headers["Access-Control-Request-Method"] = (
settings.ACCESS_CONTROL_REQUEST_METHOD
)

if settings.ACCESS_CONTROL_ALLOW_HEADERS:
headers[
"Access-Control-Allow-Headers"
] = settings.ACCESS_CONTROL_ALLOW_HEADERS
headers["Access-Control-Allow-Headers"] = (
settings.ACCESS_CONTROL_ALLOW_HEADERS
)

return make_response("", 200, headers)

Expand Down Expand Up @@ -402,6 +406,7 @@ def get(self, query_id=None, query_result_id=None, filetype="json"):
"xlsx": self.make_excel_response,
"csv": self.make_csv_response,
"tsv": self.make_tsv_response,
"parquet": self.make_parquet_response,
}
response = response_builders[filetype](query_result)

Expand Down Expand Up @@ -450,6 +455,16 @@ def make_excel_response(query_result):
}
return make_response(serialize_query_result_to_xlsx(query_result), 200, headers)

@staticmethod
def make_parquet_response(query_result):
headers = {
# https://issues.apache.org/jira/browse/PARQUET-1889
# "Content-Type": "application/parquet"
}
return make_response(
serialize_query_result_to_parquet(query_result), 200, headers
)


class JobResource(BaseResource):
def get(self, job_id, query_id=None):
Expand Down
11 changes: 10 additions & 1 deletion redash/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
classes we have. This will ensure cleaner code and better
separation of concerns.
"""

from funcy import project

from flask_login import current_user
Expand All @@ -19,6 +20,7 @@
serialize_query_result,
serialize_query_result_to_dsv,
serialize_query_result_to_xlsx,
serialize_query_result_to_parquet,
)


Expand Down Expand Up @@ -55,7 +57,14 @@ def public_widget(widget):
def public_dashboard(dashboard):
dashboard_dict = project(
serialize_dashboard(dashboard, with_favorite_state=False),
("name", "layout", "dashboard_filters_enabled", "updated_at", "created_at", "options"),
(
"name",
"layout",
"dashboard_filters_enabled",
"updated_at",
"created_at",
"options",
),
)

widget_list = (
Expand Down
104 changes: 99 additions & 5 deletions redash/serializers/query_result.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import io
import csv
import io
from typing import Optional

import pyarrow
import pyarrow.compute
import pyarrow.parquet
import xlsxwriter
from funcy import rpartial, project
from dateutil.parser import isoparse as parse_date
from redash.utils import json_loads, UnicodeWriter
from redash.query_runner import TYPE_BOOLEAN, TYPE_DATE, TYPE_DATETIME
from funcy import project, rpartial

from redash.authentication.org_resolving import current_org
from redash.query_runner import (
TYPE_BOOLEAN,
TYPE_DATE,
TYPE_DATETIME,
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_STRING,
)
from redash.utils import UnicodeWriter, json_loads


def _convert_format(fmt):
Expand Down Expand Up @@ -86,7 +99,9 @@ def serialize_query_result_to_dsv(query_result, delimiter):

fieldnames, special_columns = _get_column_lists(query_data["columns"] or [])

writer = csv.DictWriter(s, extrasaction="ignore", fieldnames=fieldnames, delimiter=delimiter)
writer = csv.DictWriter(
s, extrasaction="ignore", fieldnames=fieldnames, delimiter=delimiter
)
writer.writeheader()

for row in query_data["rows"]:
Expand Down Expand Up @@ -121,3 +136,82 @@ def serialize_query_result_to_xlsx(query_result):
book.close()

return output.getvalue()


def serialize_query_result_to_parquet(query_result):
output = io.BytesIO()
query_data = query_result.data

def redash_datetime_to_pyarrow_timestamp(
table: "pyarrow.Table",
field: "pyarrow.Field",
conversion: Optional[dict] = None,
) -> "pyarrow.Table":
column_index: int = table.schema.get_field_index(field.name)
column_data = table.column(column_index)

formats = conversion["redash_formats"]
for datetime_format in formats:
try:
column_data = pyarrow.compute.strptime(
column_data,
format=datetime_format,
unit="s",
)
break
except pyarrow.lib.ArrowInvalid:
continue

new_table = table.set_column(column_index, field.name, column_data)
return new_table

conversions = [
{"pyarrow_type": pyarrow.bool_(), "redash_type": TYPE_BOOLEAN},
{
"pyarrow_type": pyarrow.date32(),
"redash_type": TYPE_DATE,
"redash_formats": [r"%Y-%m-%d"],
"redash_to_pyarrow": redash_datetime_to_pyarrow_timestamp,
},
{
"pyarrow_type": pyarrow.timestamp("s"),
"redash_type": TYPE_DATETIME,
"redash_formats": [r"%Y-%m-%dT%H:%M:%S", r"%Y-%m-%d %H:%M:%S"],
"redash_to_pyarrow": redash_datetime_to_pyarrow_timestamp,
},
{"pyarrow_type": pyarrow.float64(), "redash_type": TYPE_FLOAT},
{"pyarrow_type": pyarrow.int64(), "redash_type": TYPE_INTEGER},
{"pyarrow_type": pyarrow.string(), "redash_type": TYPE_STRING},
]

table = pyarrow.Table.from_pylist(query_data["rows"])
fields = []

for column in query_data["columns"]:
for conversion in conversions:
if column["type"] == conversion["redash_type"]:
field = pyarrow.field(
name=column["name"],
type=conversion["pyarrow_type"],
metadata={"friendly_name": column["friendly_name"]},
)
fields.append(field)
converter = conversion.get("redash_to_pyarrow")
if converter:
table = converter(
table=table,
field=field,
conversion=conversion,
)
break

target_schema = pyarrow.schema(fields)
table = table.cast(target_schema=target_schema)

with pyarrow.parquet.ParquetWriter(
where=output,
schema=target_schema,
) as writer:
writer.write_table(table)

return output.getvalue()
2 changes: 1 addition & 1 deletion requirements_all_ds.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ nzpy>=1.15
nzalchemy
python-arango==6.1.0
pinotdb>=0.4.5
pyarrow==10.0.0
pyarrow==10.0.0