Skip to content

Commit

Permalink
Use spring web client instead of java http client
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanBorislavovDimitrov committed Oct 19, 2023
1 parent 13b5628 commit 59dc7e9
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ private Messages() {
public static final String SERVICE_PLAN_WITH_GUID_0_NOT_AVAILABLE_FOR_SERVICE_INSTANCE_1 = "Service plan with guid \"{0}\" is not available for service instance \"{1}\".";
public static final String SERVICE_OFFERING_WITH_GUID_0_IS_NOT_AVAILABLE = "Service offering with guid \"{0}\" is not available.";
public static final String SERVICE_OFFERING_WITH_GUID_0_NOT_FOUND = "Service offering with guid \"{0}\" not found.";
public static final String CANT_READ_APP_LOGS_RESPONSE = "Failed to read application recent logs response: %s";
public static final String CANT_DESERIALIZE_APP_LOGS_RESPONSE = "Failed to deserialize application recent logs response: %s";

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.sap.cloudfoundry.client.facade.adapters;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.function.Function;

import org.cloudfoundry.client.CloudFoundryClient;
Expand All @@ -25,31 +21,31 @@
import org.immutables.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;

import com.sap.cloudfoundry.client.facade.CloudException;
import com.sap.cloudfoundry.client.facade.CloudOperationException;
import com.sap.cloudfoundry.client.facade.Messages;
import com.sap.cloudfoundry.client.facade.oauth2.OAuthClient;
import com.sap.cloudfoundry.client.facade.rest.CloudSpaceClient;
import com.sap.cloudfoundry.client.facade.util.CloudUtil;
import com.sap.cloudfoundry.client.facade.util.JsonUtil;

import io.netty.channel.ChannelOption;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

@Value.Immutable
public abstract class CloudFoundryClientFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(CloudFoundryClientFactory.class);

static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
.executor(Executors.newSingleThreadExecutor())
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofMinutes(10))
.build();

private final Map<String, ConnectionContext> connectionContextCache = new ConcurrentHashMap<>();

static final WebClient WEB_CLIENT = buildWebClient();

public abstract Optional<Duration> getSslHandshakeTimeout();

public abstract Optional<Duration> getConnectTimeout();
Expand Down Expand Up @@ -85,30 +81,38 @@ public LogCacheClient createLogCacheClient(URL controllerUrl, OAuthClient oAuthC

@SuppressWarnings("unchecked")
private Map<String, Object> callCfRoot(URL controllerUrl, Map<String, String> requestTags) {
HttpResponse<String> response;
LOGGER.info(MessageFormat.format(Messages.CALLING_CF_ROOT_0_TO_ACCESS_LOG_CACHE_URL, controllerUrl));
String response = WEB_CLIENT.get()
.uri(getControllerUri(controllerUrl))
.headers(httpHeaders -> httpHeaders.addAll(getAdditionalRequestHeaders(requestTags)))
.exchangeToMono(this::handleClientResponse)
.block();
LOGGER.info(Messages.CF_ROOT_REQUEST_FINISHED);
var map = JsonUtil.convertJsonToMap(response);
return (Map<String, Object>) map.get("links");
}

private URI getControllerUri(URL controllerUrl) {
try {
HttpRequest request = buildCfRootRequest(controllerUrl, requestTags);
LOGGER.info(MessageFormat.format(Messages.CALLING_CF_ROOT_0_TO_ACCESS_LOG_CACHE_URL, controllerUrl));
response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() / 100 != 2) {
var status = HttpStatus.valueOf(response.statusCode());
throw new CloudOperationException(status, status.getReasonPhrase(), response.body());
}
LOGGER.info(Messages.CF_ROOT_REQUEST_FINISHED);
} catch (InterruptedException | URISyntaxException | IOException e) {
return controllerUrl.toURI();
} catch (URISyntaxException e) {
throw new CloudException(e.getMessage(), e);
}
var map = JsonUtil.convertJsonToMap(response.body());
return (Map<String, Object>) map.get("links");
}

private HttpRequest buildCfRootRequest(URL controllerUrl, Map<String, String> requestTags) throws URISyntaxException {
var requestBuilder = HttpRequest.newBuilder()
.GET()
.uri(controllerUrl.toURI())
.timeout(Duration.ofMinutes(5));
requestTags.forEach(requestBuilder::header);
return requestBuilder.build();
private LinkedMultiValueMap<String, String> getAdditionalRequestHeaders(Map<String, String> requestTags) {
LinkedMultiValueMap<String, String> additionalHeaders = new LinkedMultiValueMap<>();
requestTags.forEach(additionalHeaders::add);
return additionalHeaders;
}

private Mono<String> handleClientResponse(ClientResponse clientResponse) {
if (clientResponse.statusCode()
.is2xxSuccessful()) {
return clientResponse.bodyToMono(String.class);
}
return clientResponse.createException()
.flatMap(Mono::error);
}

public CloudSpaceClient createSpaceClient(URL controllerUrl, OAuthClient oAuthClient, Map<String, String> requestTags) {
Expand Down Expand Up @@ -165,4 +169,15 @@ private reactor.netty.http.client.HttpClient getAdditionalHttpClientConfiguratio
return clientWithOptions;
}

private static WebClient buildWebClient() {
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) Duration.ofMinutes(10)
.toMillis())
.responseTimeout(Duration.ofMinutes(5));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();

}

}
Original file line number Diff line number Diff line change
@@ -1,40 +1,45 @@
package com.sap.cloudfoundry.client.facade.adapters;

import static com.sap.cloudfoundry.client.facade.adapters.CloudFoundryClientFactory.WEB_CLIENT;

import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponentsBuilder;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.cloudfoundry.client.facade.CloudException;
import com.sap.cloudfoundry.client.facade.CloudOperationException;
import com.sap.cloudfoundry.client.facade.Messages;
import com.sap.cloudfoundry.client.facade.domain.ApplicationLog;
import com.sap.cloudfoundry.client.facade.domain.ImmutableApplicationLog;
import com.sap.cloudfoundry.client.facade.oauth2.OAuthClient;
import com.sap.cloudfoundry.client.facade.util.CloudUtil;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class LogCacheClient {

private static final ObjectMapper MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
Expand All @@ -53,40 +58,21 @@ public LogCacheClient(String logCacheApi, OAuthClient oAuthClient, Map<String, S
}

public List<ApplicationLog> getRecentLogs(UUID applicationGuid, LocalDateTime offset) {
HttpResponse<InputStream> response = CloudUtil.executeWithRetry(() -> executeRequest(applicationGuid, offset));

return parseBody(response.body()).getLogs()
.stream()
.map(this::mapToAppLog)
// we use a linked list so that the log messages can be a LIFO sequence
// that way, we avoid unnecessary sorting and copying to and from another collection/array
.collect(LinkedList::new, LinkedList::addFirst, LinkedList::addAll);
}

private HttpResponse<InputStream> executeRequest(UUID applicationGuid, LocalDateTime offset) {
try {
HttpRequest request = buildGetLogsRequest(applicationGuid, offset);
LOGGER.info(Messages.CALLING_LOG_CACHE_ENDPOINT_TO_GET_APP_LOGS);
var response = CloudFoundryClientFactory.HTTP_CLIENT.send(request, BodyHandlers.ofInputStream());
if (response.statusCode() / 100 != 2) {
var status = HttpStatus.valueOf(response.statusCode());
throw new CloudOperationException(status, status.getReasonPhrase(), parseBodyToString(response.body()));
}
LOGGER.info(Messages.APP_LOGS_WERE_FETCHED_SUCCESSFULLY);
return response;
} catch (IOException | InterruptedException e) {
throw new CloudException(e.getMessage(), e);
}
ApplicationLogsResponse applicationLogsResponse = CloudUtil.executeWithRetry(() -> getApplicationLogsResponse(applicationGuid,
offset));
return applicationLogsResponse.getLogs()
.stream()
.map(this::mapToAppLog)
// we use a linked list so that the log messages can be a LIFO sequence
// that way, we avoid unnecessary sorting and copying to and from another collection/array
.collect(LinkedList::new, LinkedList::addFirst, LinkedList::addAll);
}

private HttpRequest buildGetLogsRequest(UUID applicationGuid, LocalDateTime offset) {
var requestBuilder = HttpRequest.newBuilder()
.GET()
.uri(buildGetLogsUrl(applicationGuid, offset))
.timeout(Duration.ofMinutes(5))
.header(HttpHeaders.AUTHORIZATION, oAuthClient.getAuthorizationHeaderValue());
requestTags.forEach(requestBuilder::header);
return requestBuilder.build();
private ApplicationLogsResponse getApplicationLogsResponse(UUID applicationGuid, LocalDateTime offset) {
LOGGER.info(Messages.CALLING_LOG_CACHE_ENDPOINT_TO_GET_APP_LOGS);
InputStream responseBodyStream = getResponseBodyAsStream(buildGetLogsUrl(applicationGuid, offset));
LOGGER.info(Messages.APP_LOGS_WERE_FETCHED_SUCCESSFULLY);
return parseBody(responseBodyStream);
}

private URI buildGetLogsUrl(UUID applicationGuid, LocalDateTime offset) {
Expand All @@ -103,18 +89,37 @@ private URI buildGetLogsUrl(UUID applicationGuid, LocalDateTime offset) {
.toUri();
}

private String parseBodyToString(InputStream is) {
try (InputStream wrapped = is) {
return IOUtils.toString(wrapped, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new CloudException(String.format(Messages.CANT_READ_APP_LOGS_RESPONSE, e.getMessage()), e);
private InputStream getResponseBodyAsStream(URI logsUrl) {
List<InputStream> responseStreams = WEB_CLIENT.get()
.uri(logsUrl)
.headers(httpHeaders -> httpHeaders.addAll(getAdditionalRequestHeaders(requestTags)))
.header(HttpHeaders.AUTHORIZATION, oAuthClient.getAuthorizationHeaderValue())
.exchangeToFlux(this::handleClientResponse)
.map(DataBuffer::asInputStream)
.collectList()
.block();
return new SequenceInputStream(Collections.enumeration(responseStreams));
}

private LinkedMultiValueMap<String, String> getAdditionalRequestHeaders(Map<String, String> requestTags) {
LinkedMultiValueMap<String, String> additionalHeaders = new LinkedMultiValueMap<>();
requestTags.forEach(additionalHeaders::add);
return additionalHeaders;
}

private Flux<DataBuffer> handleClientResponse(ClientResponse clientResponse) {
if (clientResponse.statusCode()
.is2xxSuccessful()) {
return clientResponse.body(BodyExtractors.toDataBuffers());
}
return clientResponse.createException()
.flatMapMany(Mono::error);
}

private ApplicationLogsResponse parseBody(InputStream is) {
private ApplicationLogsResponse parseBody(InputStream inputStream) {
LOGGER.info(Messages.STARTED_READING_LOG_RESPONSE_INPUT_STREAM);
try (InputStream wrapped = is) {
var appLogsResponse = MAPPER.readValue(wrapped, ApplicationLogsResponse.class);
try (inputStream) {
var appLogsResponse = MAPPER.readValue(inputStream, ApplicationLogsResponse.class);
LOGGER.info(Messages.ENDED_READING_LOG_RESPONSE_INPUT_STREAM);
return appLogsResponse;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.sap.cloudfoundry.client.facade.domain;

import java.util.Objects;

import org.immutables.value.Value;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.sap.cloudfoundry.client.facade.Nullable;

import java.util.Objects;

@Value.Immutable
@JsonSerialize(as = ImmutableCloudRoute.class)
@JsonDeserialize(as = ImmutableCloudRoute.class)
Expand Down Expand Up @@ -58,10 +58,8 @@ public boolean equals(Object object) {
var thisDomain = getDomain().getName();
var otherDomain = otherRoute.getDomain()
.getName();
return thisDomain.equals(otherDomain)
&& areEmptyOrEqual(getHost(), otherRoute.getHost())
&& areEmptyOrEqual(getPath(), otherRoute.getPath())
&& Objects.equals(getPort(), otherRoute.getPort());
return thisDomain.equals(otherDomain) && areEmptyOrEqual(getHost(), otherRoute.getHost())
&& areEmptyOrEqual(getPath(), otherRoute.getPath()) && Objects.equals(getPort(), otherRoute.getPort());
}

private static boolean areEmptyOrEqual(String lhs, String rhs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Set;
import java.util.UUID;

import com.sap.cloudfoundry.client.facade.dto.ApplicationToCreateDto;
import org.cloudfoundry.client.v3.Metadata;

import com.sap.cloudfoundry.client.facade.UploadStatusCallback;
Expand All @@ -34,6 +33,7 @@
import com.sap.cloudfoundry.client.facade.domain.Staging;
import com.sap.cloudfoundry.client.facade.domain.Upload;
import com.sap.cloudfoundry.client.facade.domain.UserRole;
import com.sap.cloudfoundry.client.facade.dto.ApplicationToCreateDto;

/**
* Interface defining operations available for the cloud controller REST client implementations
Expand Down
1 change: 1 addition & 0 deletions src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
requires org.apache.commons.logging;
requires org.reactivestreams;
requires java.net.http;
requires io.netty.transport;

requires static java.compiler;
requires static org.immutables.value;
Expand Down

0 comments on commit 59dc7e9

Please sign in to comment.