Skip to content

Commit

Permalink
fix container starup
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Jan 10, 2025
1 parent 618c5e2 commit 15fb5f3
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 61 deletions.
9 changes: 4 additions & 5 deletions tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ pub async fn main() -> Result<()> {
run_http_client(args.clone()).await?;
}
HANDLER_HYBRID => {
let docker = Docker::connect_with_local_defaults().unwrap();
run_hybrid_client(args.clone(), &docker, &mut containers).await?;
run_hybrid_client(args.clone(), &mut containers).await?;
}
_ => {
return Err(format!("Unknown test handler: {handler}").into());
Expand All @@ -148,19 +147,19 @@ async fn run_http_client(args: SqlLogicTestArgs) -> Result<()> {

async fn run_hybrid_client(
args: SqlLogicTestArgs,
docker: &Docker,
cs: &mut Vec<ContainerAsync<GenericImage>>,
) -> Result<()> {
println!("Hybird client starts to run with: {:?}", args);

// preparse docker envs
let mut port_start = TTC_PORT_START;

let docker = Docker::connect_with_local_defaults().unwrap();
for (c, _) in HYBRID_CONFIGS.iter() {
match c.as_ref() {
ClientType::MySQL | ClientType::Http => {}
ClientType::Ttc(image, _) => {
run_ttc_container(docker, image, port_start, cs).await?;
run_ttc_container(&docker, image, port_start, cs).await?;
port_start += 1;
}
ClientType::Hybird => panic!("Can't run hybrid client in hybrid client"),
Expand Down Expand Up @@ -222,7 +221,6 @@ async fn create_databend(client_type: &ClientType, filename: &str) -> Result<Dat

async fn run_suits(args: SqlLogicTestArgs, client_type: ClientType) -> Result<()> {
// Todo: set validator to process regex
let mut tasks = vec![];
let mut num_of_tests = 0;
let mut lazy_dirs = HashSet::new();
let mut files = vec![];
Expand Down Expand Up @@ -297,6 +295,7 @@ async fn run_suits(args: SqlLogicTestArgs, client_type: ClientType) -> Result<()
.unwrap();
}
} else {
let mut tasks = Vec::with_capacity(files.len());
for file in files {
let client_type = client_type.clone();
tasks.push(async move { run_file_async(&client_type, file.unwrap().path()).await });
Expand Down
179 changes: 123 additions & 56 deletions tests/sqllogictests/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;

use bollard::Docker;
use clap::Parser;
Expand All @@ -40,6 +41,10 @@ use crate::arg::SqlLogicTestArgs;
use crate::error::DSqlLogicTestError;
use crate::error::Result;

const CONTAINER_RETRY_TIMES: usize = 3;
const CONTAINER_STARTUP_TIMEOUT_SECONDS: u64 = 120;
const CONTAINER_TIMEOUT_SECONDS: u64 = 300;

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
pub struct ServerInfo {
pub id: String,
Expand Down Expand Up @@ -246,36 +251,54 @@ pub async fn run_ttc_container(
let mut images = image.split(":");
let image = images.next().unwrap();
let tag = images.next().unwrap_or("latest");

let container_name = format!("databend-ttc-{}", port);
println!("Start {container_name}");

let start = Instant::now();
println!("Start container {container_name}");

// Stop the container
let _ = docker.stop_container(&container_name, None).await;
let _ = docker.remove_container(&container_name, None).await;

let container_res = GenericImage::new(image, tag)
.with_exposed_port(port.tcp())
.with_wait_for(WaitFor::Duration {
length: Duration::from_secs(300),
})
.with_network("host")
.with_env_var(
"DATABEND_DSN",
"databend://root:@127.0.0.1:8000?sslmode=disable",
)
.with_env_var("TTC_PORT", format!("{port}"))
.with_container_name(&container_name)
.start()
.await;
match container_res {
Ok(container) => {
println!("Started container: {}", container.id());
cs.push(container);
Ok(())
let mut i = 1;
loop {
let container_res = GenericImage::new(image, tag)
.with_exposed_port(port.tcp())
.with_wait_for(WaitFor::message_on_stdout("Ready to accept connections"))
.with_network("host")
.with_env_var(
"DATABEND_DSN",
"databend://root:@127.0.0.1:8000?sslmode=disable",
)
.with_env_var("TTC_PORT", format!("{port}"))
.with_container_name(&container_name)
.start()
.await;
let duration = start.elapsed().as_secs();
match container_res {
Ok(container) => {
println!(
"Start container {} using {} secs success",
container.id(),
duration
);
cs.push(container);
return Ok(());
}
Err(err) => {
println!(
"Start container {} using {} secs failed: {}",
container_name, duration, err
);
if i == CONTAINER_RETRY_TIMES || duration >= CONTAINER_TIMEOUT_SECONDS {
break;
} else {
i += 1;
}
}
}
Err(e) => Err(format!("Start {container_name} failed: {e}").into()),
}
Err(format!("Start {container_name} failed").into())
}

#[allow(dead_code)]
Expand All @@ -290,8 +313,8 @@ pub async fn lazy_run_dictionary_containers(
if !lazy_dirs.contains(&LazyDir::Dictionaries) {
return Ok(None);
}
println!("Start run dictionary source server container");
let docker = Docker::connect_with_local_defaults().unwrap();
println!("run dictionary source server container");
let redis = run_redis_server(&docker).await?;
let mysql = run_mysql_server(&docker).await?;
let dict_container = DictionaryContainer { redis, mysql };
Expand All @@ -300,40 +323,63 @@ pub async fn lazy_run_dictionary_containers(
}

async fn run_redis_server(docker: &Docker) -> Result<ContainerAsync<Redis>> {
let start = Instant::now();
let container_name = "redis".to_string();
println!("Start container {container_name}");

// Stop the container
let _ = docker.stop_container(&container_name, None).await;
let _ = docker.remove_container(&container_name, None).await;

let redis_res = Redis::default()
.with_network("host")
.with_startup_timeout(Duration::from_secs(300))
.with_container_name(&container_name)
.start()
.await;
let mut i = 1;
loop {
let redis_res = Redis::default()
.with_network("host")
.with_startup_timeout(Duration::from_secs(CONTAINER_STARTUP_TIMEOUT_SECONDS))
.with_container_name(&container_name)
.start()
.await;

match redis_res {
Ok(redis) => {
let host_ip = redis.get_host().await.unwrap();
let url = format!("redis://{}:{}", host_ip, REDIS_PORT);
let client = redis::Client::open(url.as_ref()).unwrap();
let mut con = client.get_connection().unwrap();
let duration = start.elapsed().as_secs();
match redis_res {
Ok(redis) => {
let host_ip = redis.get_host().await.unwrap();
let url = format!("redis://{}:{}", host_ip, REDIS_PORT);
let client = redis::Client::open(url.as_ref()).unwrap();
let mut con = client.get_connection().unwrap();

// Add some key values for test.
let keys = vec!["a", "b", "c", "1", "2"];
for key in keys {
let val = format!("{}_value", key);
con.set::<_, _, ()>(key, val).unwrap();
// Add some key values for test.
let keys = vec!["a", "b", "c", "1", "2"];
for key in keys {
let val = format!("{}_value", key);
con.set::<_, _, ()>(key, val).unwrap();
}
println!(
"Start container {} using {} secs success",
container_name, duration
);
return Ok(redis);
}
Err(err) => {
println!(
"Start container {} using {} secs failed: {}",
container_name, duration, err
);
if i == CONTAINER_RETRY_TIMES || duration >= CONTAINER_TIMEOUT_SECONDS {
break;
} else {
i += 1;
}
}
Ok(redis)
}
Err(e) => Err(format!("Start {container_name} failed: {e}").into()),
}
Err(format!("Start {container_name} failed").into())
}

async fn run_mysql_server(docker: &Docker) -> Result<ContainerAsync<Mysql>> {
let container_name = "mysqld".to_string();
let start = Instant::now();
let container_name = "mysql".to_string();
println!("Start container {container_name}");

// Stop the container
let _ = docker.stop_container(&container_name, None).await;
Expand All @@ -357,19 +403,40 @@ async fn run_mysql_server(docker: &Docker) -> Result<ContainerAsync<Mysql>> {
// | 4 | Tom | 55 | 3000.55 | 0 |
// | 5 | NULL | NULL | NULL | NULL |
// +------+-------+------+---------+--------+
let mysql_res = Mysql::default()
.with_init_sql(
"CREATE TABLE test.user(id INT, name VARCHAR(100), age SMALLINT UNSIGNED, salary DOUBLE, active BOOL); INSERT INTO test.user VALUES(1, 'Alice', 24, 100, true), (2, 'Bob', 35, 200.1, false), (3, 'Lily', 41, 1000.2, true), (4, 'Tom', 55, 3000.55, false), (5, NULL, NULL, NULL, NULL);"
.to_string()
.into_bytes(),
)
.with_network("host")
.with_startup_timeout(Duration::from_secs(300))
.with_container_name(&container_name)
.start().await;
let mut i = 1;
loop {
let mysql_res = Mysql::default()
.with_init_sql(
"CREATE TABLE test.user(id INT, name VARCHAR(100), age SMALLINT UNSIGNED, salary DOUBLE, active BOOL); INSERT INTO test.user VALUES(1, 'Alice', 24, 100, true), (2, 'Bob', 35, 200.1, false), (3, 'Lily', 41, 1000.2, true), (4, 'Tom', 55, 3000.55, false), (5, NULL, NULL, NULL, NULL);"
.to_string()
.into_bytes(),
)
.with_network("host")
.with_startup_timeout(Duration::from_secs(CONTAINER_STARTUP_TIMEOUT_SECONDS))
.with_container_name(&container_name)
.start().await;

match mysql_res {
Ok(mysql) => Ok(mysql),
Err(e) => Err(format!("Start {container_name} failed: {e}").into()),
let duration = start.elapsed().as_secs();
match mysql_res {
Ok(mysql) => {
println!(
"Start container {} using {} secs success",
container_name, duration
);
return Ok(mysql);
}
Err(err) => {
println!(
"Start container {} using {} secs failed: {}",
container_name, duration, err
);
if i == CONTAINER_RETRY_TIMES || duration >= CONTAINER_TIMEOUT_SECONDS {
break;
} else {
i += 1;
}
}
}
}
Err(format!("Start {container_name} failed").into())
}

0 comments on commit 15fb5f3

Please sign in to comment.