Skip to content

Commit

Permalink
Merge pull request #6471 from osamahammad21/dst-boost
Browse files Browse the repository at this point in the history
DST: update to Boost 1.87
  • Loading branch information
maliberty authored Jan 4, 2025
2 parents 0ab6013 + 36e8385 commit 21623fa
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 47 deletions.
12 changes: 6 additions & 6 deletions src/dst/src/BalancerConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ using namespace dst;
BOOST_CLASS_EXPORT(dst::BalancerJobDescription)
BOOST_CLASS_EXPORT(dst::BroadcastJobDescription)

BalancerConnection::BalancerConnection(asio::io_service& io_service,
BalancerConnection::BalancerConnection(asio::io_context& service,
LoadBalancer* owner,
utl::Logger* logger)
: sock_(io_service), logger_(logger), owner_(owner)
: sock_(service), logger_(logger), owner_(owner)
{
}
// socket creation
Expand Down Expand Up @@ -112,8 +112,8 @@ void BalancerConnection::handle_read(boost::system::error_code const& err,
owner_->dist_->sendResult(reply, sock_);
sock_.close();
} else {
asio::io_service io_service;
tcp::socket socket(io_service);
asio::io_context service;
tcp::socket socket(service);
int failed_workers_trials = 0;
asio::streambuf receive_buffer;
bool failure = true;
Expand Down Expand Up @@ -181,8 +181,8 @@ void BalancerConnection::handle_read(boost::system::error_code const& err,
pool,
[worker, data, &failed_workers, &broadcast_failure_mutex]() {
try {
asio::io_service io_service;
tcp::socket socket(io_service);
asio::io_context service;
tcp::socket socket(service);
socket.connect(tcp::endpoint(worker.ip, worker.port));
asio::write(socket, asio::buffer(data));
asio::streambuf receive_buffer;
Expand Down
6 changes: 3 additions & 3 deletions src/dst/src/BalancerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ class BalancerConnection
{
public:
using pointer = boost::shared_ptr<BalancerConnection>;
BalancerConnection(asio::io_service& io_service,
BalancerConnection(asio::io_context& service,
LoadBalancer* owner,
utl::Logger* logger);
static pointer create(asio::io_service& io_service,
static pointer create(asio::io_context& service,
LoadBalancer* owner,
utl::Logger* logger)
{
return boost::make_shared<BalancerConnection>(io_service, owner, logger);
return boost::make_shared<BalancerConnection>(service, owner, logger);
}
tcp::socket& socket();
void start();
Expand Down
18 changes: 9 additions & 9 deletions src/dst/src/Distributed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ void Distributed::runLoadBalancer(const char* ip,
const char* workers_domain)
{
try {
asio::io_service io_service;
LoadBalancer balancer(this, io_service, logger_, ip, workers_domain, port);
asio::io_context service;
LoadBalancer balancer(this, service, logger_, ip, workers_domain, port);
if (std::strcmp(workers_domain, "") == 0) {
for (const auto& worker : end_points_) {
balancer.addWorker(worker.ip, worker.port);
}
}
io_service.run();
service.run();
} catch (std::exception& e) {
logger_->error(utl::DST, 9, "LoadBalancer error: {}", e.what());
}
Expand Down Expand Up @@ -167,10 +167,10 @@ bool Distributed::sendJob(JobMessage& msg,
}
std::string resultStr;
while (tries++ < MAX_TRIES) {
asio::io_service io_service;
dst::socket sock(io_service);
asio::io_context service;
dst::socket sock(service);
try {
sock.connect(tcp::endpoint(ip::address::from_string(ip), port));
sock.connect(tcp::endpoint(ip::make_address(ip), port));
} catch (const boost::system::system_error& ex) {
logger_->warn(utl::DST,
113,
Expand Down Expand Up @@ -228,10 +228,10 @@ bool Distributed::sendJobMultiResult(JobMessage& msg,
}
std::string resultStr;
while (tries++ < MAX_TRIES) {
asio::io_service io_service;
dst::socket sock(io_service);
asio::io_context service;
dst::socket sock(service);
try {
sock.connect(tcp::endpoint(ip::address::from_string(ip), port));
sock.connect(tcp::endpoint(ip::make_address(ip), port));
} catch (const boost::system::system_error& ex) {
logger_->warn(utl::DST,
13,
Expand Down
28 changes: 13 additions & 15 deletions src/dst/src/LoadBalancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void LoadBalancer::start_accept()
}
jobs_++;
BalancerConnection::pointer connection
= BalancerConnection::create(*service, this, logger_);
= BalancerConnection::create(*service_, this, logger_);
acceptor_.async_accept(connection->socket(),
boost::bind(&LoadBalancer::handle_accept,
this,
Expand All @@ -62,18 +62,18 @@ void LoadBalancer::start_accept()
}

LoadBalancer::LoadBalancer(Distributed* dist,
asio::io_service& io_service,
asio::io_context& service,
utl::Logger* logger,
const char* ip,
const char* workers_domain,
unsigned short port)
: dist_(dist),
acceptor_(io_service, tcp::endpoint(ip::address::from_string(ip), port)),
acceptor_(service, tcp::endpoint(ip::make_address(ip), port)),
logger_(logger),
jobs_(0)
{
// pool_ = std::make_unique<asio::thread_pool>();
service = &io_service;
service_ = &service;
start_accept();
if (std::strcmp(workers_domain, "") != 0) {
workers_lookup_thread = boost::thread(
Expand All @@ -96,9 +96,9 @@ bool LoadBalancer::addWorker(const std::string& ip, unsigned short port)
if (!broadcastData.empty()) {
for (auto data : broadcastData) {
try {
asio::io_service io_service;
tcp::socket socket(io_service);
socket.connect(tcp::endpoint(ip::address::from_string(ip), port));
asio::io_context service;
tcp::socket socket(service);
socket.connect(tcp::endpoint(ip::make_address(ip), port));
asio::write(socket, asio::buffer(data));
asio::streambuf receive_buffer;
asio::read(socket, receive_buffer, asio::transfer_all());
Expand All @@ -114,7 +114,7 @@ bool LoadBalancer::addWorker(const std::string& ip, unsigned short port)
}
}
if (validWorkerState) {
workers_.push(worker(ip::address::from_string(ip), port, 0));
workers_.push(worker(ip::make_address(ip), port, 0));
}
return validWorkerState;
}
Expand Down Expand Up @@ -186,15 +186,14 @@ void LoadBalancer::removeWorker(const ip::address& ip,

void LoadBalancer::lookUpWorkers(const char* domain, unsigned short port)
{
asio::io_service ios;
asio::io_context ios;
std::vector<worker> workers_set;
udp::resolver::query resolver_query(
domain, std::to_string(port), udp::resolver::query::numeric_service);
udp::resolver resolver(ios);
while (alive) {
std::vector<worker> new_workers;
boost::system::error_code ec;
auto it = resolver.resolve(resolver_query, ec);
udp::resolver::results_type results
= resolver.resolve(domain, std::to_string(port), ec);
if (ec) {
logger_->warn(utl::DST,
203,
Expand All @@ -204,9 +203,8 @@ void LoadBalancer::lookUpWorkers(const char* domain, unsigned short port)
ec.message());
}
int new_workers_count = 0;
udp::resolver::iterator it_end;
for (; it != it_end; ++it) {
auto discovered_worker = worker(it->endpoint().address(), port, 0);
for (const auto& entry : results) {
auto discovered_worker = worker(entry.endpoint().address(), port, 0);
if (std::find(workers_set.begin(), workers_set.end(), discovered_worker)
== workers_set.end()) {
workers_set.push_back(discovered_worker);
Expand Down
4 changes: 2 additions & 2 deletions src/dst/src/LoadBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class LoadBalancer
public:
// constructor for accepting connection from client
LoadBalancer(Distributed* dist,
asio::io_service& io_service,
asio::io_context& service,
utl::Logger* logger,
const char* ip,
const char* workers_domain,
Expand Down Expand Up @@ -90,7 +90,7 @@ class LoadBalancer

Distributed* dist_;
tcp::acceptor acceptor_;
asio::io_service* service;
asio::io_context* service_;
utl::Logger* logger_;
std::priority_queue<worker, std::vector<worker>, CompareWorker> workers_;
std::mutex workers_mutex_;
Expand Down
2 changes: 1 addition & 1 deletion src/dst/src/Worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Worker::Worker(Distributed* dist,
utl::Logger* logger,
const char* ip,
unsigned short port)
: acceptor_(service_, tcp::endpoint(ip::address::from_string(ip), port)),
: acceptor_(service_, tcp::endpoint(ip::make_address(ip), port)),
dist_(dist),
logger_(logger)
{
Expand Down
2 changes: 1 addition & 1 deletion src/dst/src/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Worker
~Worker();

private:
asio::io_service service_;
asio::io_context service_;
tcp::acceptor acceptor_;
Distributed* dist_;
utl::Logger* logger_;
Expand Down
4 changes: 2 additions & 2 deletions src/dst/src/WorkerConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
#include "utl/Logger.h"
namespace dst {

WorkerConnection::WorkerConnection(asio::io_service& io_service,
WorkerConnection::WorkerConnection(asio::io_context& service,
Distributed* dist,
utl::Logger* logger,
Worker* worker)
: sock_(io_service),
: sock_(service),
dist_(dist),
logger_(logger),
msg_(JobMessage::NONE),
Expand Down
2 changes: 1 addition & 1 deletion src/dst/src/WorkerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Worker;
class WorkerConnection : public boost::enable_shared_from_this<WorkerConnection>
{
public:
WorkerConnection(asio::io_service& io_service,
WorkerConnection(asio::io_context& service,
Distributed* dist,
utl::Logger* logger,
Worker* worker);
Expand Down
12 changes: 5 additions & 7 deletions src/dst/test/cpp/TestBalancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ BOOST_AUTO_TEST_CASE(test_default)
unsigned short worker_port_2 = 5557;
unsigned short worker_port_3 = 5558;
unsigned short worker_port_4 = 5559;
asio::io_service io_service;
asio::io_context service;
LoadBalancer* balancer = new LoadBalancer(
dist, io_service, logger, local_ip.c_str(), "", balancer_port);
dist, service, logger, local_ip.c_str(), "", balancer_port);

// Checking simple interface functions
balancer->addWorker(local_ip, worker_port_1);
Expand All @@ -42,14 +42,13 @@ BOOST_AUTO_TEST_CASE(test_default)
balancer->getNextWorker(address, port);
BOOST_TEST(address.to_string() == local_ip);
BOOST_TEST(port == worker_port_2);
balancer->updateWorker(asio::ip::address::from_string(local_ip),
worker_port_2);
balancer->updateWorker(asio::ip::make_address(local_ip), worker_port_2);
balancer->getNextWorker(address, port);
BOOST_TEST(address.to_string() == local_ip);
BOOST_TEST(port == worker_port_2);

// Checking if balancer is up and responding
boost::thread t(boost::bind(&asio::io_service::run, &io_service));
boost::thread t(boost::bind(&asio::io_context::run, &service));
JobMessage msg(JobMessage::JobType::BALANCER);
JobMessage result;
BOOST_TEST(dist->sendJob(msg, local_ip.c_str(), balancer_port, result));
Expand All @@ -58,8 +57,7 @@ BOOST_AUTO_TEST_CASE(test_default)
// Checking if a balancer can relay a message to a worker and send the result
// correctly. note we make worker 2, which is not running, the next
// worker. That should be handled correctly by balancer.
balancer->updateWorker(asio::ip::address::from_string(local_ip),
worker_port_2);
balancer->updateWorker(asio::ip::make_address(local_ip), worker_port_2);
dist->addCallBack(new HelperCallBack(dist));
dist->runWorker(local_ip.c_str(), worker_port_1, true);
msg.setJobType(JobMessage::JobType::ROUTING);
Expand Down

0 comments on commit 21623fa

Please sign in to comment.