Skip to content

Commit

Permalink
small refactor (make it inner class)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 9, 2025
1 parent 9b88a2d commit 1796ec8
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 55 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

### Bug Fixes
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
- Fix issue introduced in 24.12.1 with peer stability when the upperbound is set to a high number
- Fix issue (introduced in `24.12.1`) with peer stability when the upperbound is set to a high number
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.ThrottlingRpcHandler;
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.networking.p2p.peer.DisconnectRequestHandler;
Expand All @@ -43,6 +43,7 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeer implements Peer {
private static final Logger LOG = LogManager.getLogger();
Expand Down Expand Up @@ -239,4 +240,25 @@ public void adjustReputation(final ReputationAdjustment adjustment) {
disconnectCleanly(DisconnectReason.REMOTE_FAULT).ifExceptionGetsHereRaiseABug();
}
}

private static class ThrottlingRpcHandler<
TOutgoingHandler extends RpcRequestHandler,
TRequest,
TRespHandler extends RpcResponseHandler<?>> {

private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;
private final ThrottlingTaskQueue requestsQueue =
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);

public ThrottlingRpcHandler(
final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
this.delegate = delegate;
}

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return requestsQueue.queueTask(
() -> delegate.sendRequest(connection, request, responseHandler));
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,49 @@
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.p2p.libp2p.rpc;
package tech.pegasys.teku.networking.p2p.libp2p;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.libp2p.core.Connection;
import io.libp2p.core.security.SecureChannel.Session;
import java.util.List;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class ThrottlingRpcHandlerTest {
public class LibP2PPeerTest {

private final Connection connection = mock(Connection.class);

@SuppressWarnings("unchecked")
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> delegate =
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcHandler =
mock(RpcHandler.class);

@SuppressWarnings("unchecked")
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
mock(RpcMethod.class);

private ThrottlingRpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>>
throttlingRpcHandler;
private LibP2PPeer libP2PPeer;

@BeforeEach
public void init() {
when(delegate.getRpcMethod()).thenReturn(rpcMethod);
throttlingRpcHandler = new ThrottlingRpcHandler<>(delegate);
when(rpcHandler.getRpcMethod()).thenReturn(rpcMethod);
final Session secureSession = mock(Session.class);
when(connection.secureSession()).thenReturn(secureSession);
when(connection.closeFuture()).thenReturn(new SafeFuture<>());
libP2PPeer =
new LibP2PPeer(connection, List.of(rpcHandler), ReputationManager.NOOP, peer -> 0.0);
}

@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
Expand All @@ -61,17 +67,17 @@ public void sendRequest_throttlesRequests() {
__ -> {
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
new SafeFuture<>();
when(delegate.sendRequest(connection, null, null)).thenReturn(future);
throttlingRpcHandler.sendRequest(connection, null, null);
when(rpcHandler.sendRequest(connection, null, null)).thenReturn(future);
libP2PPeer.sendRequest(rpcMethod, null, null);
return future;
})
.toList();

when(throttlingRpcHandler.sendRequest(connection, null, null))
when(rpcHandler.sendRequest(connection, null, null))
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
throttlingRpcHandler.sendRequest(connection, null, null);
libP2PPeer.sendRequest(rpcMethod, null, null);

// completed request should be throttled
assertThat(throttledRequest).isNotDone();
Expand Down

0 comments on commit 1796ec8

Please sign in to comment.