Skip to content

Commit

Permalink
Add CF logs client
Browse files Browse the repository at this point in the history
  • Loading branch information
I562548 committed Oct 9, 2024
1 parent 2af5f44 commit d7911cd
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ private Messages() {
// INFO messages
public static final String CALLING_CF_ROOT_0_TO_ACCESS_LOG_CACHE_URL = "Calling CF root: {0} to access log-cache URL";
public static final String CF_ROOT_REQUEST_FINISHED = "CF root request finished";
public static final String CALLING_LOG_CACHE_ENDPOINT_TO_GET_APP_LOGS = "Calling log-cache endpoint to get app logs";
public static final String TRYING_TO_GET_APP_LOGS = "Trying to get app logs";
public static final String APP_LOGS_WERE_FETCHED_SUCCESSFULLY = "App logs were fetched successfully";
public static final String STARTED_READING_LOG_RESPONSE_INPUT_STREAM = "Started reading log response input stream";
public static final String ENDED_READING_LOG_RESPONSE_INPUT_STREAM = "Ended reading log response input stream";

// WARN messages
public static final String RETRYING_OPERATION = "Retrying operation that failed with: {0}";
Expand All @@ -25,7 +23,6 @@ private Messages() {
public static final String SERVICE_PLAN_WITH_GUID_0_NOT_AVAILABLE_FOR_SERVICE_INSTANCE_1 = "Service plan with guid \"{0}\" is not available for service instance \"{1}\".";
public static final String SERVICE_OFFERING_WITH_GUID_0_IS_NOT_AVAILABLE = "Service offering with guid \"{0}\" is not available.";
public static final String SERVICE_OFFERING_WITH_GUID_0_NOT_FOUND = "Service offering with guid \"{0}\" not found.";
public static final String CANT_READ_APP_LOGS_RESPONSE = "Failed to read application recent logs response: %s";
public static final String CANT_DESERIALIZE_APP_LOGS_RESPONSE = "Failed to deserialize application recent logs response: %s";
public static final String FAILED_TO_FETCH_APP_LOGS_FOR_APP = "Failed to fetch app logs for app: %s";

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,7 @@ public CloudFoundryClient createClient(URL controllerUrl, OAuthClient oAuthClien
}

public LogCacheClient createLogCacheClient(URL controllerUrl, OAuthClient oAuthClient, Map<String, String> requestTags) {
String logCacheApi;
try {
var links = CloudUtil.executeWithRetry(() -> callCfRoot(controllerUrl, requestTags));
@SuppressWarnings("unchecked")
var logCache = (Map<String, Object>) links.get("log_cache");
logCacheApi = (String) logCache.get("href");
} catch (CloudException e) {
LOGGER.warn(MessageFormat.format(Messages.CALL_TO_0_FAILED_WITH_1, controllerUrl.toString(), e.getMessage()), e);
logCacheApi = controllerUrl.toString()
.replace("api", "log-cache");
}
return new LogCacheClient(logCacheApi, oAuthClient, requestTags);
return new LogCacheClient(oAuthClient, requestTags, getOrCreateConnectionContext(controllerUrl.getHost()));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package com.sap.cloudfoundry.client.facade.adapters;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import com.sap.cloudfoundry.client.facade.CloudException;
import com.sap.cloudfoundry.client.facade.Messages;
import com.sap.cloudfoundry.client.facade.domain.ApplicationLog;
import com.sap.cloudfoundry.client.facade.domain.ImmutableApplicationLog;
import com.sap.cloudfoundry.client.facade.oauth2.OAuthClient;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.EnvelopeType;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.logcache.v1.ReactorLogCacheClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -18,120 +27,75 @@
import java.util.Map;
import java.util.UUID;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.util.UriComponentsBuilder;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.cloudfoundry.client.facade.CloudException;
import com.sap.cloudfoundry.client.facade.CloudOperationException;
import com.sap.cloudfoundry.client.facade.Messages;
import com.sap.cloudfoundry.client.facade.domain.ApplicationLog;
import com.sap.cloudfoundry.client.facade.domain.ImmutableApplicationLog;
import com.sap.cloudfoundry.client.facade.oauth2.OAuthClient;
import com.sap.cloudfoundry.client.facade.util.CloudUtil;

public class LogCacheClient {

private static final ObjectMapper MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, true);

private static final Logger LOGGER = LoggerFactory.getLogger(LogCacheClient.class);

private final String logCacheApi;
private static final String SOURCE_TYPE_KEY_NAME = "source_type";
private static final int MAX_LOG_COUNT = 1000;
private final OAuthClient oAuthClient;
private final Map<String, String> requestTags;
private final ConnectionContext connectionContext;

public LogCacheClient(String logCacheApi, OAuthClient oAuthClient, Map<String, String> requestTags) {
this.logCacheApi = logCacheApi;
public LogCacheClient(OAuthClient oAuthClient, Map<String, String> requestTags, ConnectionContext connectionContext) {
this.oAuthClient = oAuthClient;
this.requestTags = requestTags;
this.connectionContext = connectionContext;
}

public List<ApplicationLog> getRecentLogs(UUID applicationGuid, LocalDateTime offset) {
HttpResponse<InputStream> response = CloudUtil.executeWithRetry(() -> executeRequest(applicationGuid, offset));

return parseBody(response.body()).getLogs()
.stream()
.map(this::mapToAppLog)
// we use a linked list so that the log messages can be a LIFO sequence
// that way, we avoid unnecessary sorting and copying to and from another collection/array
.collect(LinkedList::new, LinkedList::addFirst, LinkedList::addAll);
}
LOGGER.info(Messages.TRYING_TO_GET_APP_LOGS);
org.cloudfoundry.logcache.v1.LogCacheClient logCacheClient = createReactorLogCacheClient();
ReadResponse applicationLogsResponse = readApplicationLogs(logCacheClient, applicationGuid, offset);

private HttpResponse<InputStream> executeRequest(UUID applicationGuid, LocalDateTime offset) {
try {
HttpRequest request = buildGetLogsRequest(applicationGuid, offset);
LOGGER.info(Messages.CALLING_LOG_CACHE_ENDPOINT_TO_GET_APP_LOGS);
var response = CloudFoundryClientFactory.HTTP_CLIENT.send(request, BodyHandlers.ofInputStream());
if (response.statusCode() / 100 != 2) {
var status = HttpStatus.valueOf(response.statusCode());
throw new CloudOperationException(status, status.getReasonPhrase(), parseBodyToString(response.body()));
}
if (applicationLogsResponse != null) {
LOGGER.info(Messages.APP_LOGS_WERE_FETCHED_SUCCESSFULLY);
return response;
} catch (IOException | InterruptedException e) {
throw new CloudException(e.getMessage(), e);
return applicationLogsResponse.getEnvelopes()
.getBatch()
.stream()
.map(this::mapToAppLog)
// we use a linked list so that the log messages can be a LIFO sequence
// that way, we avoid unnecessary sorting and copying to and from another collection/array
.collect(LinkedList::new, LinkedList::addFirst, LinkedList::addAll);
} else {
throw new CloudException(MessageFormat.format(Messages.FAILED_TO_FETCH_APP_LOGS_FOR_APP, applicationGuid));
}

}

private HttpRequest buildGetLogsRequest(UUID applicationGuid, LocalDateTime offset) {
var requestBuilder = HttpRequest.newBuilder()
.GET()
.uri(buildGetLogsUrl(applicationGuid, offset))
.timeout(Duration.ofMinutes(5))
.header(HttpHeaders.AUTHORIZATION, oAuthClient.getAuthorizationHeaderValue());
requestTags.forEach(requestBuilder::header);
return requestBuilder.build();
private ReactorLogCacheClient createReactorLogCacheClient() {
return ReactorLogCacheClient.builder()
.requestTags(requestTags)
.connectionContext(connectionContext)
.tokenProvider(oAuthClient.getTokenProvider())
.build();
}

private URI buildGetLogsUrl(UUID applicationGuid, LocalDateTime offset) {
private ReadResponse readApplicationLogs(org.cloudfoundry.logcache.v1.LogCacheClient logCacheClient, UUID applicationGuid,
LocalDateTime offset) {
var instant = offset.toInstant(ZoneOffset.UTC);
var secondsInNanos = Duration.ofSeconds(instant.getEpochSecond())
.toNanos();
return UriComponentsBuilder.fromHttpUrl(logCacheApi)
.pathSegment("api", "v1", "read", applicationGuid.toString())
.queryParam("envelope_types", "LOG")
.queryParam("descending", "true")
.queryParam("limit", "1000")
.queryParam("start_time", Long.toString(secondsInNanos + instant.getNano() + 1))
.build()
.toUri();
}

private String parseBodyToString(InputStream is) {
try (InputStream wrapped = is) {
return IOUtils.toString(wrapped, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new CloudException(String.format(Messages.CANT_READ_APP_LOGS_RESPONSE, e.getMessage()), e);
}
}

private ApplicationLogsResponse parseBody(InputStream is) {
LOGGER.info(Messages.STARTED_READING_LOG_RESPONSE_INPUT_STREAM);
try (InputStream wrapped = is) {
var appLogsResponse = MAPPER.readValue(wrapped, ApplicationLogsResponse.class);
LOGGER.info(Messages.ENDED_READING_LOG_RESPONSE_INPUT_STREAM);
return appLogsResponse;
} catch (IOException e) {
throw new CloudException(String.format(Messages.CANT_DESERIALIZE_APP_LOGS_RESPONSE, e.getMessage()), e);
}
return logCacheClient.read(ReadRequest.builder()
.envelopeType(EnvelopeType.LOG)
.sourceId(applicationGuid.toString())
.descending(Boolean.TRUE)
.limit(MAX_LOG_COUNT)
.startTime(secondsInNanos + instant.getNano() + 1)
.build())
.block();
}

private ApplicationLog mapToAppLog(ApplicationLogEntity log) {
private ApplicationLog mapToAppLog(Envelope envelope) {
return ImmutableApplicationLog.builder()
.applicationGuid(log.getSourceId())
.message(decodeLogPayload(log.getLogBody()
.getMessage()))
.timestamp(fromLogTimestamp(log.getTimestampInNanoseconds()))
.messageType(fromLogMessageType(log.getLogBody()
.getMessageType()))
.sourceName(log.getTags()
.get("source_type"))
.applicationGuid(envelope.getSourceId())
.message(decodeLogPayload(envelope.getLog()
.getPayload()))
.timestamp(fromLogTimestamp(envelope.getTimestamp()))
.messageType(fromLogMessageType(envelope.getLog()
.getType()
.getValue()))
.sourceName(envelope.getTags()
.get(SOURCE_TYPE_KEY_NAME))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.sap.cloudfoundry.client.facade.oauth2.OAuthClient;
import com.sap.cloudfoundry.client.facade.util.RestUtil;
import org.cloudfoundry.client.CloudFoundryClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.reactor.ConnectionContext;
import org.immutables.value.Value;
import org.springframework.util.StringUtils;
Expand Down

0 comments on commit d7911cd

Please sign in to comment.