Skip to content

Commit

Permalink
Update driver for time changes in 0.34
Browse files Browse the repository at this point in the history
  • Loading branch information
relferreira committed Apr 7, 2020
1 parent 3711971 commit 38968cd
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 109 deletions.
168 changes: 102 additions & 66 deletions src/metabase/driver/hive_like.clj
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
(ns metabase.driver.hive-like
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as str]
[honeysql
[core :as hsql]
[format :as hformat]]
[metabase
[driver :as driver]
[util :as u]]
[metabase.driver.sql-jdbc.sync :as sql-jdbc.sync]
(:require [clojure.string :as str]
[honeysql.core :as hsql]
[java-time :as t]
[metabase.driver :as driver]
[metabase.driver.sql-jdbc
[connection :as sql-jdbc.conn]
[execute :as sql-jdbc.execute]
[sync :as sql-jdbc.sync]]
[metabase.driver.sql-jdbc.execute.legacy-impl :as legacy]
[metabase.driver.sql.query-processor :as sql.qp]
[metabase.driver.sql.util.unprepare :as unprepare]
[metabase.models
[field :refer [Field]]
[table :refer [Table]]]
[metabase.models.table :refer [Table]]
[metabase.util
[date :as du]
[date-2 :as u.date]
[honeysql-extensions :as hx]]
[toucan.db :as db])
(:import [java.sql PreparedStatement Time] java.util.Date))

(driver/register! :hive-like, :parent :sql-jdbc, :abstract? true)

(defmethod sql-jdbc.sync/database-type->base-type :hive-like [_ database-type]
(:import [java.sql ResultSet Types]
[java.time LocalDate OffsetDateTime ZonedDateTime]))

(driver/register! :hive-like
:parent #{:sql-jdbc ::legacy/use-legacy-classes-for-read-and-set}
:abstract? true)

(defmethod sql-jdbc.conn/data-warehouse-connection-pool-properties :hive-like
[driver]
;; The Hive JDBC driver doesn't support `Connection.isValid()`, so we need to supply a test query for c3p0 to use to
;; validate connections upon checkout.
(merge
((get-method sql-jdbc.conn/data-warehouse-connection-pool-properties :sql-jdbc) driver)
{"preferredTestQuery" "SELECT 1"}))

(defmethod sql-jdbc.sync/database-type->base-type :hive-like
[_ database-type]
(condp re-matches (name database-type)
#"boolean" :type/Boolean
#"tinyint" :type/Integer
Expand All @@ -44,9 +54,10 @@
#"map" :type/Dictionary
#".*" :type/*))

(defmethod sql.qp/current-datetime-fn :hive-like [_] :%now)
(defmethod sql.qp/current-datetime-honeysql-form :hive-like [_] :%now)

(defmethod sql.qp/unix-timestamp->timestamp [:hive-like :seconds] [_ _ expr]
(defmethod sql.qp/unix-timestamp->honeysql [:hive-like :seconds]
[_ _ expr]
(hx/->timestamp (hsql/call :from_unixtime expr)))

(defn- date-format [format-str expr]
Expand All @@ -61,6 +72,7 @@
(defn- trunc-with-format [format-str expr]
(str-to-date format-str (date-format format-str expr)))

(defmethod sql.qp/date [:hive-like :default] [_ _ expr] (hx/->timestamp expr))
(defmethod sql.qp/date [:hive-like :minute] [_ _ expr] (trunc-with-format "yyyy-MM-dd HH:mm" (hx/->timestamp expr)))
(defmethod sql.qp/date [:hive-like :minute-of-hour] [_ _ expr] (hsql/call :minute (hx/->timestamp expr)))
(defmethod sql.qp/date [:hive-like :hour] [_ _ expr] (trunc-with-format "yyyy-MM-dd HH" (hx/->timestamp expr)))
Expand All @@ -74,72 +86,96 @@
(defmethod sql.qp/date [:hive-like :quarter-of-year] [_ _ expr] (hsql/call :quarter (hx/->timestamp expr)))
(defmethod sql.qp/date [:hive-like :year] [_ _ expr] (hsql/call :trunc (hx/->timestamp expr) (hx/literal :year)))

(defmethod sql.qp/date [:hive-like :day-of-week] [_ _ expr]
(defmethod sql.qp/date [:hive-like :day-of-week]
[_ _ expr]
(hx/->integer (date-format "u"
(hx/+ (hx/->timestamp expr)
(hsql/raw "interval '1' day")))))

(defmethod sql.qp/date [:hive-like :week] [_ _ expr]
(defmethod sql.qp/date [:hive-like :week]
[_ _ expr]
(hsql/call :date_sub
(hx/+ (hx/->timestamp expr)
(hsql/raw "interval '1' day"))
(date-format "u"
(hx/+ (hx/->timestamp expr)
(hsql/raw "interval '1' day")))))

(defmethod sql.qp/date [:hive-like :quarter] [_ _ expr]
(defmethod sql.qp/date [:hive-like :quarter]
[_ _ expr]
(hsql/call :add_months
(hsql/call :trunc (hx/->timestamp expr) (hx/literal :year))
(hx/* (hx/- (hsql/call :quarter (hx/->timestamp expr))
1)
3)))

(defmethod driver/date-add :hive-like [_ dt amount unit]
(hx/+ (hx/->timestamp dt) (hsql/raw (format "(INTERVAL '%d' %s)" (int amount) (name unit)))))
(defmethod sql.qp/->honeysql [:hive-like :replace]
[driver [_ arg pattern replacement]]
(hsql/call :regexp_replace (sql.qp/->honeysql driver arg) (sql.qp/->honeysql driver pattern) (sql.qp/->honeysql driver replacement)))

(defmethod sql.qp/->honeysql [:hive-like :regex-match-first]
[driver [_ arg pattern]]
(hsql/call :regexp_extract (sql.qp/->honeysql driver arg) (sql.qp/->honeysql driver pattern)))

(defmethod sql.qp/->honeysql [:hive-like :median]
[driver [_ arg]]
(hsql/call :percentile (sql.qp/->honeysql driver arg) 0.5))

(defmethod sql.qp/->honeysql [:hive-like :percentile]
[driver [_ arg p]]
(hsql/call :percentile (sql.qp/->honeysql driver arg) (sql.qp/->honeysql driver p)))

(defmethod sql.qp/add-interval-honeysql-form :hive-like
[_ hsql-form amount unit]
(hx/+ (hx/->timestamp hsql-form) (hsql/raw (format "(INTERVAL '%d' %s)" (int amount) (name unit)))))

;; ignore the schema when producing the identifier
(defn qualified-name-components
"Return the pieces that represent a path to FIELD, of the form `[table-name parent-fields-name* field-name]`.
This function should be used by databases where schemas do not make much sense."
[{field-name :name, table-id :table_id, parent-id :parent_id}]
;; TODO - we are making too many DB calls here!
;; (At least this is only used for SQL parameters, which is why we can't currently use the Store)
(conj (vec (if-let [parent (Field parent-id)]
(qualified-name-components parent)
(let [{table-name :name, schema :schema} (db/select-one [Table :name :schema], :id table-id)]
[table-name])))
field-name))

(defmethod sql.qp/field->identifier :hive-like [_ field]
(apply hsql/qualify (qualified-name-components field)))

(defn- run-query
"Run the query itself."
[{sql :query, :keys [params remark max-rows]} connection]
(let [sql (str "-- " remark "\n" sql)
options {:identifiers identity
:as-arrays? true
:max-rows max-rows}]
(with-open [connection (jdbc/get-connection connection)]
(with-open [^PreparedStatement statement (jdbc/prepare-statement connection sql options)]
(let [statement (into [statement] params)
[columns & rows] (jdbc/query connection statement options)]
{:rows (or rows [])
:columns (map u/qualified-name columns)})))))

(defn run-query-without-timezone
"Runs the given query without trying to set a timezone"
[_ _ connection query]
(run-query query connection))

(defmethod unprepare/unprepare-value [:hive-like Date] [_ value]
(hformat/to-sql
(hsql/call :from_unixtime
(hsql/call :unix_timestamp
(hx/literal (du/date->iso-8601 value))
(hx/literal "yyyy-MM-dd\\\\'T\\\\'HH:mm:ss.SSS\\\\'Z\\\\'")))))
"Return the pieces that represent a path to `field`, of the form `[table-name parent-fields-name* field-name]`."
[{field-name :name, table-id :table_id}]
[(db/select-one-field :name Table, :id table-id) field-name])

(prefer-method unprepare/unprepare-value [:sql Time] [:hive-like Date])
(defmethod sql.qp/field->identifier :hive-like
[_ field]
(apply hsql/qualify (qualified-name-components field)))

(defmethod unprepare/unprepare-value [:hive-like String] [_ value]
(defmethod unprepare/unprepare-value [:hive-like String]
[_ value]
(str \' (str/replace value "'" "\\\\'") \'))

;; Hive/Spark SQL doesn't seem to like DATEs so convert it to a DATETIME first
(defmethod unprepare/unprepare-value [:hive-like LocalDate]
[driver t]
(unprepare/unprepare-value driver (t/local-date-time t (t/local-time 0))))

(defmethod unprepare/unprepare-value [:hive-like OffsetDateTime]
[_ t]
(format "to_utc_timestamp('%s', '%s')" (u.date/format-sql (t/local-date-time t)) (t/zone-offset t)))

(defmethod unprepare/unprepare-value [:hive-like ZonedDateTime]
[_ t]
(format "to_utc_timestamp('%s', '%s')" (u.date/format-sql (t/local-date-time t)) (t/zone-id t)))

;; Hive/Spark SQL doesn't seem to like DATEs so convert it to a DATETIME first
(defmethod sql-jdbc.execute/set-parameter [:hive-like LocalDate]
[driver ps i t]
(sql-jdbc.execute/set-parameter driver ps i (t/local-date-time t (t/local-time 0))))

;; TIMEZONE FIXME — not sure what timezone the results actually come back as
(defmethod sql-jdbc.execute/read-column-thunk [:hive-like Types/TIME]
[_ ^ResultSet rs rsmeta ^Integer i]
(fn []
(when-let [t (.getTimestamp rs i)]
(t/offset-time (t/local-time t) (t/zone-offset 0)))))

(defmethod sql-jdbc.execute/read-column-thunk [:hive-like Types/DATE]
[_ ^ResultSet rs rsmeta ^Integer i]
(fn []
(when-let [t (.getDate rs i)]
(t/zoned-date-time (t/local-date t) (t/local-time 0) (t/zone-id "UTC")))))

(defmethod sql-jdbc.execute/read-column-thunk [:hive-like Types/TIMESTAMP]
[_ ^ResultSet rs rsmeta ^Integer i]
(fn []
(when-let [t (.getTimestamp rs i)]
(t/zoned-date-time (t/local-date-time t) (t/zone-id "UTC")))))
104 changes: 61 additions & 43 deletions src/metabase/driver/sparksql_databricks.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,11 @@
[metabase.query-processor
[store :as qp.store]
[util :as qputil]]
[metabase.util.honeysql-extensions :as hx]))
[metabase.util.honeysql-extensions :as hx])
(:import [java.sql Connection ResultSet]))

(driver/register! :sparksql-databricks, :parent :hive-like)

;;; +----------------------------------------------------------------------------------------------------------------+
;;; | metabase.driver.sql-jdbc impls |
;;; +----------------------------------------------------------------------------------------------------------------+

;; (defmethod sql-jdbc.conn/connection-details->spec :sparksql-databricks [_ {:keys [host port db jdbc-flags], :as opts}]
;; (merge
;; {:classname "com.simba.spark.jdbc41.Driver" ; must be in classpath
;; :subprotocol "spark"
;; :subname (str "//" host ":" port "/" db jdbc-flags)
;; :ssl true}
;; (dissoc opts :host :port :db :jdbc-flags)))


;;; ------------------------------------------ Custom HoneySQL Clause Impls ------------------------------------------

(def ^:private source-table-alias
Expand Down Expand Up @@ -81,12 +69,13 @@
:or {host "localhost", port 10000, db "", jdbc-flags ""}
:as opts}]
(merge
{:classname "metabase.driver.FixedHiveDriver"
:subprotocol "spark"
:subname (str "//" host ":" port "/" db jdbc-flags)}
{:classname "metabase.driver.FixedHiveDriver"
:subprotocol "spark"
:subname (str "//" host ":" port "/" db jdbc-flags)}
(dissoc opts :host :port :jdbc-flags)))

(defmethod sql-jdbc.conn/connection-details->spec :sparksql-databricks [_ details]
(defmethod sql-jdbc.conn/connection-details->spec :sparksql-databricks
[_ details]
(-> details
(update :port (fn [port]
(if (string? port)
Expand Down Expand Up @@ -136,31 +125,60 @@
:database-type data-type
:base-type (sql-jdbc.sync/database-type->base-type :hive-like (keyword data-type))}))))})


;; we need this because transactions are not supported in Hive 1.2.1
;; bound variables are not supported in Spark SQL (maybe not Hive either, haven't checked)
(defmethod driver/execute-query :sparksql-databricks
[driver {:keys [database settings], query :native, :as outer-query}]
(let [query (-> (assoc query
:remark (qputil/query->remark outer-query)
:query (if (seq (:params query))
(unprepare/unprepare driver (cons (:query query) (:params query)))
(:query query))
:max-rows (mbql.u/query->max-rows-limit outer-query))
(dissoc :params))]
(sql-jdbc.execute/do-with-try-catch
(fn []
(let [db-connection (sql-jdbc.conn/db->pooled-connection-spec database)]
(hive-like/run-query-without-timezone driver settings db-connection query))))))

(defmethod driver/supports? [:sparksql-databricks :basic-aggregations] [_ _] true)
(defmethod driver/supports? [:sparksql-databricks :binning] [_ _] true)
(defmethod driver/supports? [:sparksql-databricks :expression-aggregations] [_ _] true)
(defmethod driver/supports? [:sparksql-databricks :expressions] [_ _] true)
(defmethod driver/supports? [:sparksql-databricks :native-parameters] [_ _] true)
(defmethod driver/supports? [:sparksql-databricks :nested-queries] [_ _] true)
(defmethod driver/supports? [:sparksql-databricks :standard-deviation-aggregations] [_ _] true)

(defmethod driver/supports? [:sparksql-databricks :foreign-keys] [_ _] true)
(defmethod driver/execute-reducible-query :sparksql-databricks
[driver {:keys [database settings], {sql :query, :keys [params], :as inner-query} :native, :as outer-query} context respond]
(let [inner-query (-> (assoc inner-query
:remark (qputil/query->remark outer-query)
:query (if (seq params)
(unprepare/unprepare driver (cons sql params))
sql)
:max-rows (mbql.u/query->max-rows-limit outer-query))
(dissoc :params))
query (assoc outer-query :native inner-query)]
((get-method driver/execute-reducible-query :sql-jdbc) driver query context respond)))

;; 1. SparkSQL doesn't support `.supportsTransactionIsolationLevel`
;; 2. SparkSQL doesn't support session timezones (at least our driver doesn't support it)
;; 3. SparkSQL doesn't support making connections read-only
;; 4. SparkSQL doesn't support setting the default result set holdability
(defmethod sql-jdbc.execute/connection-with-timezone :sparksql-databricks
[driver database ^String timezone-id]
(let [conn (.getConnection (sql-jdbc.execute/datasource database))]
(try
(.setTransactionIsolation conn Connection/TRANSACTION_READ_UNCOMMITTED)
conn
(catch Throwable e
(.close conn)
(throw e)))))

;; 1. SparkSQL doesn't support setting holdability type to `CLOSE_CURSORS_AT_COMMIT`
(defmethod sql-jdbc.execute/prepared-statement :sparksql-databricks
[driver ^Connection conn ^String sql params]
(let [stmt (.prepareStatement conn sql
ResultSet/TYPE_FORWARD_ONLY
ResultSet/CONCUR_READ_ONLY)]
(try
(.setFetchDirection stmt ResultSet/FETCH_FORWARD)
(sql-jdbc.execute/set-parameters! driver stmt params)
stmt
(catch Throwable e
(.close stmt)
(throw e)))))


(doseq [feature [:basic-aggregations
:binning
:expression-aggregations
:expressions
:native-parameters
:nested-queries
:standard-deviation-aggregations]]
(defmethod driver/supports? [:sparksql-databricks feature] [_ _] true))

;; only define an implementation for `:foreign-keys` if none exists already. In test extensions we define an alternate
;; implementation, and we don't want to stomp over that if it was loaded already
(when-not (get (methods driver/supports?) [:sparksql-databricks :foreign-keys])
(defmethod driver/supports? [:sparksql-databricks :foreign-keys] [_ _] true))

(defmethod sql.qp/quote-style :sparksql-databricks [_] :mysql)

0 comments on commit 38968cd

Please sign in to comment.