Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres #1

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 101 additions & 4 deletions src/dbc.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use serde_json;

mod mysql;
mod sqlite;
mod postgres;
mod sqlite;

pub type Error = Box<dyn std::error::Error + Send + Sync>;

Expand Down Expand Up @@ -47,15 +46,23 @@ impl Database {
Ok(serde_json::to_vec(&result)?)
}

pub fn execute_query_with_params(&mut self, query: &str, params: &[&str]) -> Result<QueryResult, Error> {
pub fn execute_query_with_params(
&mut self,
query: &str,
params: &[&str],
) -> Result<QueryResult, Error> {
let mut query = query.to_string();
for param in params {
query = query.replace("?", param);
}
self.execute_query(&query)
}

pub fn execute_query_with_params_and_serialize(&mut self, query: &str, params: &[&str]) -> Result<String, Error> {
pub fn execute_query_with_params_and_serialize(
&mut self,
query: &str,
params: &[&str],
) -> Result<String, Error> {
let result = self.execute_query_with_params(query, params)?;
Ok(serde_json::to_string(&result)?)
}
Expand Down Expand Up @@ -97,24 +104,114 @@ pub struct QueryResult {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ColumnType {
NULL,
NUMERIC,
DECIMAL,
DECIMAL_NEW,
INT,
INT2,
INT4,
INT8,
INT24,
TINY,
SHORT,
LONG,
LONGLONG,
FLOAT,
FLOAT4,
FLOAT8,
BIT,
BOOL,
DOUBLE,
STRING,
VARCHAR,
TEXT,
CHAR,
BYTEA,
TIMESTAMP,
TIMESTAMPTZ,
DATE,
TIME,
TIMETZ,
INTERVAL,
YEAR,
DATETIME,
JSON,
JSONB,
ENUM,
SET,
BLOB,
BLOB_TINY,
BLOB_MEDIUM,
BLOB_LONG,
GEOMETRY,
UUID,
OID,
XML,
CIDR,
INET,
MACADDR,
VARBIT,
REFCURSOR,
}

macro_rules! add_postgres_types {
($( $existing_variant:ident ),* ) => {
pub enum ColumnType {
$(
$existing_variant,
)*
Postgres(postgres::types::Type),
}
};
}

add_postgres_types!(
NULL,
NUMERIC,
DECIMAL,
DECIMAL_NEW,
INT,
INT2,
INT4,
INT8,
INT24,
TINY,
SHORT,
LONG,
LONGLONG,
FLOAT,
FLOAT4,
FLOAT8,
BIT,
BOOL,
DOUBLE,
STRING,
VARCHAR,
TEXT,
CHAR,
BYTEA,
TIMESTAMP,
TIMESTAMPTZ,
DATE,
TIME,
TIMETZ,
INTERVAL,
YEAR,
DATETIME,
JSON,
JSONB,
ENUM,
SET,
BLOB,
BLOB_TINY,
BLOB_MEDIUM,
BLOB_LONG,
GEOMETRY,
UUID,
OID,
XML,
CIDR,
INET,
MACADDR,
VARBIT,
);
70 changes: 37 additions & 33 deletions src/dbc/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;
use crate::dbc;
use mysql;
use mysql::prelude::Queryable;
use mysql_common::constants::ColumnType;
use crate::dbc;
use std::sync::Arc;

pub(crate) struct MySQLConnection {
connection: mysql::Conn,
Expand All @@ -19,37 +19,37 @@ impl MySQLConnection {
impl dbc::Connection for MySQLConnection {
fn execute(&mut self, query: &str) -> Result<dbc::QueryResult, dbc::Error> {
let result = self.connection.query_iter(query)?;
let columns = result.columns().as_ref().iter().map(
|column| {
dbc::Column {
name: column.name_str().to_string(),
column_type: column.column_type().into(),
}
}
).collect::<Vec<dbc::Column>>();
let columns = result
.columns()
.as_ref()
.iter()
.map(|column| dbc::Column {
name: column.name_str().to_string(),
column_type: column.column_type().into(),
})
.collect::<Vec<dbc::Column>>();
let columns = Arc::from(columns);


let mut rows: Vec<dbc::Row> = Vec::new();
for row in result {
let row = row?;
let values: Vec<dbc::Value> = row.unwrap_raw().iter().map(
|value| {
let values: Vec<dbc::Value> = row
.unwrap_raw()
.iter()
.map(|value| {
if value.is_none() {
dbc::Value::NULL
} else {
value.as_ref().unwrap().into()
}
}
).collect();
})
.collect();
rows.push(dbc::Row {
values,
columns: Arc::clone(&columns),
});
}
Ok(dbc::QueryResult{
rows,
})
Ok(dbc::QueryResult { rows })
}
}

Expand All @@ -62,8 +62,12 @@ impl From<&mysql::Value> for dbc::Value {
mysql::Value::UInt(uint) => dbc::Value::UInt(*uint),
mysql::Value::Float(float) => dbc::Value::Float(*float),
mysql::Value::Double(double) => dbc::Value::Double(*double),
mysql::Value::Date(year, month, day, hour, minute, second, microsecond) => dbc::Value::Date(*year, *month, *day, *hour, *minute, *second, *microsecond),
mysql::Value::Time(negative, days, hours, minutes, seconds, microseconds) => dbc::Value::Time(*negative, *days, *hours, *minutes, *seconds, *microseconds),
mysql::Value::Date(year, month, day, hour, minute, second, microsecond) => {
dbc::Value::Date(*year, *month, *day, *hour, *minute, *second, *microsecond)
}
mysql::Value::Time(negative, days, hours, minutes, seconds, microseconds) => {
dbc::Value::Time(*negative, *days, *hours, *minutes, *seconds, *microseconds)
}
}
}
}
Expand All @@ -80,31 +84,31 @@ impl From<ColumnType> for dbc::ColumnType {
ColumnType::MYSQL_TYPE_NULL => dbc::ColumnType::NULL,
ColumnType::MYSQL_TYPE_TIMESTAMP => dbc::ColumnType::TIMESTAMP,
ColumnType::MYSQL_TYPE_LONGLONG => dbc::ColumnType::LONGLONG,
ColumnType::MYSQL_TYPE_INT24 => dbc::ColumnType::INT,
ColumnType::MYSQL_TYPE_INT24 => dbc::ColumnType::INT24,
ColumnType::MYSQL_TYPE_DATE => dbc::ColumnType::DATE,
ColumnType::MYSQL_TYPE_TIME => dbc::ColumnType::TIME,
ColumnType::MYSQL_TYPE_DATETIME => dbc::ColumnType::TIMESTAMP,
ColumnType::MYSQL_TYPE_YEAR => dbc::ColumnType::INT,
ColumnType::MYSQL_TYPE_DATETIME => dbc::ColumnType::DATETIME,
ColumnType::MYSQL_TYPE_YEAR => dbc::ColumnType::YEAR,
ColumnType::MYSQL_TYPE_NEWDATE => dbc::ColumnType::DATE, // Internal? do we need this?
ColumnType::MYSQL_TYPE_VARCHAR => dbc::ColumnType::STRING,
ColumnType::MYSQL_TYPE_VARCHAR => dbc::ColumnType::VARCHAR,
ColumnType::MYSQL_TYPE_BIT => dbc::ColumnType::BIT,
ColumnType::MYSQL_TYPE_TIMESTAMP2 => dbc::ColumnType::TIMESTAMP,
ColumnType::MYSQL_TYPE_DATETIME2 => dbc::ColumnType::DATETIME,
ColumnType::MYSQL_TYPE_TIME2 => dbc::ColumnType::TIME,
ColumnType::MYSQL_TYPE_TIMESTAMP2 => dbc::ColumnType::TIMESTAMP, // Internal? do we need this?
ColumnType::MYSQL_TYPE_DATETIME2 => dbc::ColumnType::DATETIME, // Internal? do we need this?
ColumnType::MYSQL_TYPE_TIME2 => dbc::ColumnType::TIME, // Internal? do we need this?
ColumnType::MYSQL_TYPE_JSON => dbc::ColumnType::JSON,
ColumnType::MYSQL_TYPE_NEWDECIMAL => dbc::ColumnType::DECIMAL,
ColumnType::MYSQL_TYPE_NEWDECIMAL => dbc::ColumnType::DECIMAL_NEW,
ColumnType::MYSQL_TYPE_ENUM => dbc::ColumnType::ENUM,
ColumnType::MYSQL_TYPE_SET => dbc::ColumnType::SET,
ColumnType::MYSQL_TYPE_TINY_BLOB => dbc::ColumnType::BLOB,
ColumnType::MYSQL_TYPE_MEDIUM_BLOB => dbc::ColumnType::BLOB,
ColumnType::MYSQL_TYPE_LONG_BLOB => dbc::ColumnType::BLOB,
ColumnType::MYSQL_TYPE_TINY_BLOB => dbc::ColumnType::BLOB_TINY,
ColumnType::MYSQL_TYPE_MEDIUM_BLOB => dbc::ColumnType::BLOB_MEDIUM,
ColumnType::MYSQL_TYPE_LONG_BLOB => dbc::ColumnType::BLOB_LONG,
ColumnType::MYSQL_TYPE_BLOB => dbc::ColumnType::BLOB,
ColumnType::MYSQL_TYPE_VAR_STRING => dbc::ColumnType::STRING,
ColumnType::MYSQL_TYPE_VAR_STRING => dbc::ColumnType::VARCHAR,
ColumnType::MYSQL_TYPE_STRING => dbc::ColumnType::STRING,
ColumnType::MYSQL_TYPE_GEOMETRY => dbc::ColumnType::GEOMETRY,
_ => {
panic!("Unknown column type: {:?}", column_type);
}
}
}
}
}
87 changes: 86 additions & 1 deletion src/dbc/postgres.rs
Original file line number Diff line number Diff line change
@@ -1 +1,86 @@
use postgres;
use std::sync::Arc;

use postgres;

use crate::dbc;

pub struct PostgresConnection {
pub(crate) connection: postgres::Client,
}

impl PostgresConnection {
pub(crate) fn get_connection(url: &str) -> Result<Box<dyn dbc::Connection>, dbc::Error> {
Ok(Box::new(PostgresConnection {
connection: postgres::Client::connect(url, postgres::NoTls)?,
}) as Box<dyn dbc::Connection>)
}
}

impl dbc::Connection for PostgresConnection {
fn execute(&mut self, query: &str) -> Result<dbc::QueryResult, dbc::Error> {
let result = self.connection.query(query, &[])?;
let columns = result
.columns()
.iter()
.map(|column| dbc::Column {
name: column.name().to_string(),
column_type: column.type_().into(),
})
.collect::<Vec<dbc::Column>>();
let columns = Arc::from(columns);

let mut rows: Vec<dbc::Row> = Vec::new();
for row in result {
let values: Vec<dbc::Value> = row
.iter()
.map(|value| {
if value.is_none() {
dbc::Value::NULL
} else {
value.unwrap().into()
}
})
.collect();
rows.push(dbc::Row {
values,
columns: Arc::clone(&columns),
});
}
Ok(dbc::QueryResult { rows })
}
}

impl From<postgres::types::Type> for dbc::ColumnType {
fn from(value: postgres::types::Type) -> Self {
match value {
postgres::types::Type::BOOL => dbc::ColumnType::BOOL,
postgres::types::Type::INT2 => dbc::ColumnType::INT2,
postgres::types::Type::INT4 => dbc::ColumnType::INT4,
postgres::types::Type::INT8 => dbc::ColumnType::INT8,
postgres::types::Type::FLOAT4 => dbc::ColumnType::FLOAT4,
postgres::types::Type::FLOAT8 => dbc::ColumnType::FLOAT8,
postgres::types::Type::NUMERIC => dbc::ColumnType::NUMERIC,
postgres::types::Type::TIMESTAMP => dbc::ColumnType::TIMESTAMP,
postgres::types::Type::TIMESTAMPTZ => dbc::ColumnType::TIMESTAMPTZ,
postgres::types::Type::DATE => dbc::ColumnType::DATE,
postgres::types::Type::TIME => dbc::ColumnType::TIME,
postgres::types::Type::TIMETZ => dbc::ColumnType::TIMETZ,
postgres::types::Type::INTERVAL => dbc::ColumnType::INTERVAL,
postgres::types::Type::TEXT => dbc::ColumnType::TEXT,
postgres::types::Type::CHAR => dbc::ColumnType::CHAR,
postgres::types::Type::VARCHAR => dbc::ColumnType::VARCHAR,
postgres::types::Type::BYTEA => dbc::ColumnType::BYTEA,
postgres::types::Type::UUID => dbc::ColumnType::UUID,
postgres::types::Type::JSON => dbc::ColumnType::JSON,
postgres::types::Type::JSONB => dbc::ColumnType::JSONB,
postgres::types::Type::XML => dbc::ColumnType::XML,
postgres::types::Type::OID => dbc::ColumnType::OID,
postgres::types::Type::CIDR => dbc::ColumnType::CIDR,
postgres::types::Type::INET => dbc::ColumnType::INET,
postgres::types::Type::MACADDR => dbc::ColumnType::MACADDR,
postgres::types::Type::BIT => dbc::ColumnType::BIT,
postgres::types::Type::VARBIT => dbc::ColumnType::VARBIT,
_ => dbc::ColumnType::UNKNOWN,
}
}
}