Skip to content

Commit

Permalink
rpcdaemon: replace UnaryRpc with rpc::unary_rpc (#2636)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jan 8, 2025
1 parent 499dbed commit 567168d
Show file tree
Hide file tree
Showing 17 changed files with 141 additions and 311 deletions.
4 changes: 2 additions & 2 deletions cmd/dev/grpc_toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ int ethbackend_coroutines(const std::string& target) {
const auto channel = grpc::CreateChannel(target, grpc::InsecureChannelCredentials());

// Etherbase
ethbackend::RemoteBackEnd eth_backend{*ioc, channel, *grpc_context};
ethbackend::RemoteBackEnd eth_backend{channel, *grpc_context};
boost::asio::co_spawn(*ioc, ethbackend_etherbase(eth_backend), [&](std::exception_ptr) {
context_pool.stop();
});
Expand Down Expand Up @@ -995,7 +995,7 @@ int execute_temporal_kv_query(const std::string& target, KVQueryFunc<Q> query_fu
};

// ETHBACKEND
ethbackend::RemoteBackEnd eth_backend{*ioc, channel_factory(), *grpc_context};
ethbackend::RemoteBackEnd eth_backend{channel_factory(), *grpc_context};
// DB KV API client
CoherentStateCache state_cache;
db::kv::grpc::client::RemoteClient client{channel_factory,
Expand Down
2 changes: 1 addition & 1 deletion examples/get_latest_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int main(int argc, char* argv[]) {

kv::api::CoherentStateCache state_cache;
auto channel{::grpc::CreateChannel(target, ::grpc::InsecureChannelCredentials())};
auto backend{std::make_unique<rpc::ethbackend::RemoteBackEnd>(*ioc, channel, *grpc_context)};
auto backend{std::make_unique<rpc::ethbackend::RemoteBackEnd>(channel, *grpc_context)};
auto database = std::make_unique<ethdb::kv::RemoteDatabase>(backend.get(), &state_cache, *grpc_context, channel);

auto context_pool_thread = std::thread([&]() { context_pool.run(); });
Expand Down
116 changes: 0 additions & 116 deletions silkworm/infra/grpc/client/unary_rpc.hpp

This file was deleted.

2 changes: 1 addition & 1 deletion silkworm/rpc/commands/net_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ TEST_CASE("NetRpcApi::NetRpcApi", "[rpc][erigon_api]") {
agrpc::GrpcContext grpc_context;
add_private_service<ethbackend::BackEnd>(
ioc,
std::make_unique<ethbackend::RemoteBackEnd>(ioc, grpc_channel, grpc_context));
std::make_unique<ethbackend::RemoteBackEnd>(grpc_channel, grpc_context));
CHECK_NOTHROW(NetRpcApi{ioc});
}
#endif // SILKWORM_SANITIZE
Expand Down
5 changes: 1 addition & 4 deletions silkworm/rpc/commands/trace_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <silkworm/infra/concurrency/private_service.hpp>
#include <silkworm/infra/concurrency/shared_service.hpp>
#include <silkworm/rpc/common/worker_pool.hpp>
#include <silkworm/rpc/ethbackend/backend.hpp>
#include <silkworm/rpc/ethdb/database.hpp>
#include <silkworm/rpc/json/stream.hpp>

Expand All @@ -43,8 +42,7 @@ class TraceRpcApi {
block_cache_{must_use_shared_service<BlockCache>(ioc_)},
state_cache_{must_use_shared_service<db::kv::api::StateCache>(ioc_)},
database_{must_use_private_service<ethdb::Database>(ioc_)},
workers_{workers},
backend_{must_use_private_service<ethbackend::BackEnd>(ioc_)} {}
workers_{workers} {}

virtual ~TraceRpcApi() = default;

Expand All @@ -70,7 +68,6 @@ class TraceRpcApi {
db::kv::api::StateCache* state_cache_;
ethdb::Database* database_;
WorkerPool& workers_;
ethbackend::BackEnd* backend_;

friend class silkworm::rpc::json_rpc::RequestHandler;
};
Expand Down
6 changes: 3 additions & 3 deletions silkworm/rpc/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ void Daemon::add_private_services() {

auto* state_cache{must_use_shared_service<db::kv::api::StateCache>(ioc)};

auto backend{std::make_unique<rpc::ethbackend::RemoteBackEnd>(ioc, grpc_channel, grpc_context)};
auto tx_pool{std::make_unique<txpool::TransactionPool>(ioc, grpc_channel, grpc_context)};
auto miner{std::make_unique<txpool::Miner>(ioc, grpc_channel, grpc_context)};
auto backend{std::make_unique<rpc::ethbackend::RemoteBackEnd>(grpc_channel, grpc_context)};
auto tx_pool{std::make_unique<txpool::TransactionPool>(grpc_channel, grpc_context)};
auto miner{std::make_unique<txpool::Miner>(grpc_channel, grpc_context)};
std::unique_ptr<ethdb::Database> database;
if (data_store_) {
database = std::make_unique<ethdb::file::LocalDatabase>(*data_store_, state_cache);
Expand Down
62 changes: 27 additions & 35 deletions silkworm/rpc/ethbackend/remote_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,25 @@
#include <silkworm/core/types/address.hpp>
#include <silkworm/infra/common/clock_time.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/grpc/client/unary_rpc.hpp>
#include <silkworm/infra/grpc/client/call.hpp>
#include <silkworm/infra/grpc/common/conversion.hpp>
#include <silkworm/rpc/json/types.hpp>

namespace silkworm::rpc::ethbackend {

RemoteBackEnd::RemoteBackEnd(
boost::asio::io_context& ioc,
const std::shared_ptr<grpc::Channel>& channel,
agrpc::GrpcContext& grpc_context)
: RemoteBackEnd(ioc.get_executor(), ::remote::ETHBACKEND::NewStub(channel), grpc_context) {}
namespace proto = ::remote;
using Stub = proto::ETHBACKEND::StubInterface;

RemoteBackEnd::RemoteBackEnd(boost::asio::io_context::executor_type executor,
std::unique_ptr<::remote::ETHBACKEND::StubInterface> stub,
agrpc::GrpcContext& grpc_context)
: executor_(std::move(executor)), stub_(std::move(stub)), grpc_context_(grpc_context) {}
RemoteBackEnd::RemoteBackEnd(const std::shared_ptr<grpc::Channel>& channel, agrpc::GrpcContext& grpc_context)
: RemoteBackEnd(proto::ETHBACKEND::NewStub(channel), grpc_context) {}

RemoteBackEnd::RemoteBackEnd(std::unique_ptr<Stub> stub, agrpc::GrpcContext& grpc_context)
: stub_(std::move(stub)), grpc_context_(grpc_context) {}

Task<evmc::address> RemoteBackEnd::etherbase() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncEtherbase> eb_rpc{*stub_, grpc_context_};
const auto reply = co_await eb_rpc.finish_on(executor_, ::remote::EtherbaseRequest{});
const proto::EtherbaseRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncEtherbase, *stub_, request, grpc_context_);
evmc::address evmc_address;
if (reply.has_address()) {
const auto& h160_address = reply.address();
Expand All @@ -58,35 +56,35 @@ Task<evmc::address> RemoteBackEnd::etherbase() {

Task<uint64_t> RemoteBackEnd::protocol_version() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncProtocolVersion> pv_rpc{*stub_, grpc_context_};
const auto reply = co_await pv_rpc.finish_on(executor_, ::remote::ProtocolVersionRequest{});
const proto::ProtocolVersionRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncProtocolVersion, *stub_, request, grpc_context_);
const auto pv = reply.id();
SILK_TRACE << "RemoteBackEnd::protocol_version version=" << pv << " t=" << clock_time::since(start_time);
co_return pv;
}

Task<BlockNum> RemoteBackEnd::net_version() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncNetVersion> nv_rpc{*stub_, grpc_context_};
const auto reply = co_await nv_rpc.finish_on(executor_, ::remote::NetVersionRequest{});
const proto::NetVersionRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncNetVersion, *stub_, request, grpc_context_);
const auto nv = reply.id();
SILK_TRACE << "RemoteBackEnd::net_version version=" << nv << " t=" << clock_time::since(start_time);
co_return nv;
}

Task<std::string> RemoteBackEnd::client_version() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncClientVersion> cv_rpc{*stub_, grpc_context_};
const auto reply = co_await cv_rpc.finish_on(executor_, ::remote::ClientVersionRequest{});
const proto::ClientVersionRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncClientVersion, *stub_, request, grpc_context_);
const auto& cv = reply.node_name();
SILK_TRACE << "RemoteBackEnd::client_version version=" << cv << " t=" << clock_time::since(start_time);
co_return cv;
}

Task<uint64_t> RemoteBackEnd::net_peer_count() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncNetPeerCount> npc_rpc{*stub_, grpc_context_};
const auto reply = co_await npc_rpc.finish_on(executor_, ::remote::NetPeerCountRequest{});
const proto::NetPeerCountRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncNetPeerCount, *stub_, request, grpc_context_);
const auto count = reply.count();
SILK_TRACE << "RemoteBackEnd::net_peer_count count=" << count << " t=" << clock_time::since(start_time);
co_return count;
Expand All @@ -95,8 +93,8 @@ Task<uint64_t> RemoteBackEnd::net_peer_count() {
Task<NodeInfos> RemoteBackEnd::engine_node_info() {
NodeInfos node_info_list;
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncNodeInfo> ni_rpc{*stub_, grpc_context_};
const auto reply = co_await ni_rpc.finish_on(executor_, ::remote::NodesInfoRequest{});
const proto::NodesInfoRequest request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncNodeInfo, *stub_, request, grpc_context_);
for (int i = 0; i < reply.nodes_info_size(); ++i) {
NodeInfo node_info;
const auto& backend_node_info = reply.nodes_info(i);
Expand All @@ -119,9 +117,8 @@ Task<NodeInfos> RemoteBackEnd::engine_node_info() {

Task<PeerInfos> RemoteBackEnd::peers() {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncPeers> peers_rpc{*stub_, grpc_context_};
::google::protobuf::Empty request;
const auto reply = co_await peers_rpc.finish_on(executor_, request);
const ::google::protobuf::Empty request;
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncPeers, *stub_, request, grpc_context_);
PeerInfos peer_infos;
peer_infos.reserve(static_cast<size_t>(reply.peers_size()));
for (const auto& peer : reply.peers()) {
Expand All @@ -145,11 +142,10 @@ Task<PeerInfos> RemoteBackEnd::peers() {

Task<bool> RemoteBackEnd::get_block(BlockNum block_num, const HashAsSpan& hash, bool read_senders, silkworm::Block& block) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncBlock> get_block_rpc{*stub_, grpc_context_};
::remote::BlockRequest request;
request.set_block_height(block_num);
request.set_allocated_block_hash(h256_from_bytes(hash).release());
const auto reply = co_await get_block_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncBlock, *stub_, request, grpc_context_);
ByteView block_rlp{string_view_to_byte_view(reply.block_rlp())};
if (const auto decode_result{rlp::decode(block_rlp, block)}; !decode_result) {
co_return false;
Expand All @@ -171,10 +167,9 @@ Task<bool> RemoteBackEnd::get_block(BlockNum block_num, const HashAsSpan& hash,

Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_txn_hash(const HashAsSpan& hash) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncTxnLookup> txn_lookup_rpc{*stub_, grpc_context_};
::remote::TxnLookupRequest request;
request.set_allocated_txn_hash(h256_from_bytes(hash).release());
const auto reply = co_await txn_lookup_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncTxnLookup, *stub_, request, grpc_context_);
if (reply.block_number() == 0) {
co_return std::nullopt;
}
Expand All @@ -185,10 +180,9 @@ Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_txn_hash(const H

Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_hash(const HashAsSpan& hash) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncHeaderNumber> header_number_rpc{*stub_, grpc_context_};
::remote::HeaderNumberRequest request;
request.set_allocated_hash(h256_from_bytes(hash).release());
const auto reply = co_await header_number_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncHeaderNumber, *stub_, request, grpc_context_);
if (!reply.has_number()) {
co_return std::nullopt;
}
Expand All @@ -199,10 +193,9 @@ Task<std::optional<BlockNum>> RemoteBackEnd::get_block_num_from_hash(const HashA

Task<std::optional<evmc::bytes32>> RemoteBackEnd::get_block_hash_from_block_num(BlockNum block_num) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncCanonicalHash> canonical_hsh_rpc{*stub_, grpc_context_};
::remote::CanonicalHashRequest request;
request.set_block_number(block_num);
const auto reply = co_await canonical_hsh_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncCanonicalHash, *stub_, request, grpc_context_);
evmc::bytes32 hash;
if (reply.has_hash() == 0) {
co_return std::nullopt;
Expand All @@ -215,10 +208,9 @@ Task<std::optional<evmc::bytes32>> RemoteBackEnd::get_block_hash_from_block_num(

Task<std::optional<Bytes>> RemoteBackEnd::canonical_body_for_storage(BlockNum block_num) {
const auto start_time = clock_time::now();
UnaryRpc<&::remote::ETHBACKEND::StubInterface::AsyncCanonicalBodyForStorage> canonical_body_for_storage_rpc{*stub_, grpc_context_};
::remote::CanonicalBodyForStorageRequest request;
request.set_blocknumber(block_num);
const auto reply = co_await canonical_body_for_storage_rpc.finish_on(executor_, request);
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncCanonicalBodyForStorage, *stub_, request, grpc_context_);
SILK_TRACE << "RemoteBackEnd::canonical_body_for_storage block_num=" << block_num
<< " t=" << clock_time::since(start_time);
if (reply.body().empty()) {
Expand Down
10 changes: 2 additions & 8 deletions silkworm/rpc/ethbackend/remote_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <vector>

#include <agrpc/grpc_context.hpp>
#include <boost/asio/io_context.hpp>
#include <evmc/evmc.hpp>

#include <silkworm/interfaces/remote/ethbackend.grpc.pb.h>
Expand All @@ -33,12 +32,8 @@ namespace silkworm::rpc::ethbackend {

class RemoteBackEnd final : public BackEnd {
public:
RemoteBackEnd(boost::asio::io_context& ioc, const std::shared_ptr<grpc::Channel>& channel,
agrpc::GrpcContext& grpc_context);
RemoteBackEnd(boost::asio::io_context::executor_type executor,
std::unique_ptr<::remote::ETHBACKEND::StubInterface> stub,
agrpc::GrpcContext& grpc_context);
~RemoteBackEnd() override = default;
RemoteBackEnd(const std::shared_ptr<grpc::Channel>& channel, agrpc::GrpcContext& grpc_context);
RemoteBackEnd(std::unique_ptr<::remote::ETHBACKEND::StubInterface> stub, agrpc::GrpcContext& grpc_context);

Task<evmc::address> etherbase() override;
Task<uint64_t> protocol_version() override;
Expand All @@ -57,7 +52,6 @@ class RemoteBackEnd final : public BackEnd {
static std::vector<Bytes> decode(const ::google::protobuf::RepeatedPtrField<std::string>& grpc_txs);
static std::vector<Withdrawal> decode(const ::google::protobuf::RepeatedPtrField<::types::Withdrawal>& grpc_withdrawals);

boost::asio::io_context::executor_type executor_;
std::unique_ptr<::remote::ETHBACKEND::StubInterface> stub_;
agrpc::GrpcContext& grpc_context_;
};
Expand Down
Loading

0 comments on commit 567168d

Please sign in to comment.