Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Java SDK "append" API uses enormous and uncontrollable amounts of memory #43640

Open
wheezil opened this issue Dec 26, 2024 · 5 comments
Open
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-more-info Issue needs more information to triage question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Storage Storage Service (Queues, Blobs, Files)

Comments

@wheezil
Copy link

wheezil commented Dec 26, 2024

Describe the bug
We are using the DataLakeFileSystemClient.appendWithResponse() to upload multiple parts in parallel. Despite making our own input stream which does not buffer yet still satisfies the "markable" property, the SDK wants to buffer an arbitrary amount of data in memory. We see no way to control this. Furthermore, uploading multiples of such files concurrently expands the amount of memory being used, leading to OOM at some point.

Exception or Stack Trace

To Reproduce
Make a maven project out of the attached code snippets. This should be simple. Sorry, I tried to just attach a ZIP archive with the entire project, but it was rejected.

Build and run project with arguments 8 200, which uploads 8 parts in parallel of 200MB each.
You can see the heap logged:

Creating temp file
Uploading 8 parts of size 209715200 bytes
2024-12-26T10:56:39.132 [Thread-0] MemoryLogger.logMemory:35 INFO - JVM MEMORY: used=38MB, total=1,024MB, free=985MB
2024-12-26T10:56:40.146 [Thread-0] MemoryLogger.logMemory:35 INFO - JVM MEMORY: used=16MB, total=80MB, free=63MB
2024-12-26T10:56:41.160 [Thread-0] MemoryLogger.logMemory:35 INFO - JVM MEMORY: used=1,292MB, total=4,584MB, free=3,291MB
2024-12-26T10:56:42.163 [Thread-0] MemoryLogger.logMemory:35 INFO - JVM MEMORY: used=1,304MB, total=4,584MB, free=3,279MB

Since we are not buffering data in memory, why is the SDK doing it? Our stream is markable and the SDK should just read from the stream and rewind it if needed for a retry.

Code Snippet
See attached

Expected behavior
Use no more than a reasonable amount of memory for in-flight data transfer, just enough to get good buffering performance from the local disk file-read, such as 128K per upload thread.

OR, have an alterative API which uses less memory.

Screenshots

Setup (please complete the following information):

  • OS: Windows 11
  • IDE: IntelliJ
  • Library/Libraries: See maven dependencies
  • Java version: OpenJDK 17
@github-actions github-actions bot added customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Dec 26, 2024
@jairmyree jairmyree added needs-more-info Issue needs more information to triage Storage Storage Service (Queues, Blobs, Files) labels Dec 31, 2024
@github-actions github-actions bot removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Dec 31, 2024
@jairmyree
Copy link
Member

jairmyree commented Dec 31, 2024

@wheezil Thank you for reaching out. For security reasons, we cannot accept .zip files of your code so I have removed the download link. Please instead upload a code snippet and reproduction steps so we can look into the issue.

@ibrahimrabab when additional context is provided, can you look into this issue?

@wheezil
Copy link
Author

wheezil commented Dec 31, 2024

Main test class

package net.redpoint.test;

import com.azure.core.util.Context;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import reactor.core.publisher.Hooks;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test2 {
  // Fill in these values
  static final String ADL2_ACCOUNT_NAME = "";
  static final String ADL2_ACCESS_KEY = "";
  static final String ADL2_CONTAINER_NAME = "";

  static final int TIMEOUT_SECONDS = 600;
  static final String AZURE_STORAGE_HOST_SUFFIX = ".dfs.core.windows.net/";
  static final int adl2_max_tries = 12;
  static final int adl2_try_timeout_in_seconds = 100000;
  static final long adl2_retry_delay_in_ms = 60;
  static final long adl2_max_retry_delay_in_ms = 60000;

  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.out.println("Requires two arguments: <threads> and <size_in_mb>");
      System.exit(1);
    }
    final int THREADS = Integer.parseInt(args[0]);
    final long CHUNK_SIZE = Integer.parseInt(args[1]) * 1024L * 1024;
    System.out.println("Creating temp file");
    Random r = new Random();
    File temp = File.createTempFile("adl2test", ".tmp");
    try (OutputStream os = new FileOutputStream(temp)) {
      byte[] chunk = new byte[1024];
      for (long l = 0; l < CHUNK_SIZE; l += chunk.length) {
        r.nextBytes(chunk);
        os.write(chunk);
      }
    }

    System.out.println("Uploading " + THREADS + " parts of size " + CHUNK_SIZE + " bytes");
    MemoryLogger.start(1000);
    DataLakeServiceClientBuilder serviceClientBuilder = new DataLakeServiceClientBuilder()
      .endpoint("https://" + ADL2_ACCOUNT_NAME + AZURE_STORAGE_HOST_SUFFIX)
      .retryOptions( new RequestRetryOptions(
        RetryPolicyType.EXPONENTIAL
        , adl2_max_tries        // Maximum number of attempts an operation will be retried, default is 4
        , adl2_try_timeout_in_seconds  // Maximum time allowed before a request is cancelled and assumed failed, default is Integer.MAX_VALUE
        , adl2_retry_delay_in_ms    // Amount of delay to use before retrying an operation, default value is 4ms when retryPolicyType is EXPONENTIAL
        , adl2_max_retry_delay_in_ms  // Maximum delay allowed before retrying an operation, default value is 120ms
        , null              // secondaryHost - Secondary Storage account to retry requests against, default is none
      ));
    serviceClientBuilder.credential( new StorageSharedKeyCredential(ADL2_ACCOUNT_NAME, ADL2_ACCESS_KEY));
    DataLakeServiceClient serviceClient = serviceClientBuilder.buildClient();
    DataLakeFileSystemClient fileSystemClient = serviceClient.getFileSystemClient(ADL2_CONTAINER_NAME);
    String cloudPath = UUID.randomUUID().toString();
    DataLakeFileClient fileClient = fileSystemClient.getFileClient(cloudPath);
    fileClient.create();
    ExecutorService exec = Executors.newFixedThreadPool(THREADS);
    List<Future<Void>> futures = new ArrayList<>();
    for (int i = 0; i < THREADS; i++) {
      final long offset = i * CHUNK_SIZE;
      futures.add(exec.submit(() -> {
        try (InputStream uploadStream = new MarkableFileInputStream(new FileInputStream(temp))) {
          fileClient.appendWithResponse(uploadStream, offset, CHUNK_SIZE,
          null, null, Duration.ofSeconds(TIMEOUT_SECONDS), Context.NONE);
        }
        return null;
      }));
    }
    for (Future<Void> future : futures) {
      future.get();
    }
    DataLakeRequestConditions requestConditions = new DataLakeRequestConditions();
    Duration timeout = Duration.ofSeconds(TIMEOUT_SECONDS);
    fileClient.flushWithResponse(THREADS * CHUNK_SIZE, false, true, null, requestConditions, timeout, Context.NONE);
    System.out.println("upload complete");
    System.exit(0);
  }

}

MemoryLogger util class

package net.redpoint.test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryLogger {
  private static final Logger LOG = LoggerFactory.getLogger(MemoryLogger.class);
  private static final long MB = 1024 * 1024;
  private static volatile Thread thread;
  private static Runtime runtime;

  public static void start() { start(10000);}

  public static synchronized void start(long millis) {
    if (thread != null) {
      return;
    }
    runtime = Runtime.getRuntime();
    thread = new Thread(() -> {
      do {
        logMemory();
        try {
          Thread.sleep(millis);
        } catch (InterruptedException e) {
          break;
        }
      } while (thread == thread.currentThread());
    });
    thread.start();
  }

  public static void logMemory() {
    long total = runtime.totalMemory();
    long free = runtime.freeMemory();
    LOG.info(String.format("JVM MEMORY: used=%,dMB, total=%,dMB, free=%,dMB", (total-free)/MB, total / MB, free / MB));
  }

  public static synchronized void stop() {
    thread = null;
  }
}

MarkableFileINputStream class, used to read from file w/o buffering entire file

package net.redpoint.test;


import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

public class MarkableFileInputStream extends FilterInputStream {
  private FileChannel myFileChannel;
  private long mark = -1;

  public MarkableFileInputStream(FileInputStream fis) {
    super(fis);
    myFileChannel = fis.getChannel();
  }

  @Override
  public boolean markSupported() {
    return true;
  }

  @Override
  public synchronized void mark(int readlimit) {
    try {
      mark = myFileChannel.position();
    }
    catch (IOException ex) {
      mark = -1;
    }
  }

  @Override
  public synchronized void reset() throws IOException {
    if (mark == -1) {
      throw new IOException("not marked");
    }
    myFileChannel.position(mark);
  }
}

@wheezil
Copy link
Author

wheezil commented Dec 31, 2024

pom.xml file to build using maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>redpoint</groupId>
    <version>10.0.1-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>multipartupload</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <main-class>net.redpoint.test.Test2</main-class>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- BOMs -->
            <dependency>
                <groupId>com.azure</groupId>
                <artifactId>azure-sdk-bom</artifactId>
                <version>1.1.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <shadedArtifactAttached>true</shadedArtifactAttached>
                    <shadedClassifierName>shaded</shadedClassifierName>
                    <transformers>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>META-INF/spring.handlers</resource>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>META-INF/spring.schemas</resource>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>${main-class}</mainClass>
                        </transformer>
                    </transformers>
                    <filters>
                        <filter>
                            <!-- filter out signature files from signed dependencies, else repackaging fails with security ex -->
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-storage-file-datalake</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.16.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.16.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.16.0</version>
        </dependency>

    </dependencies>

</project>

@wheezil
Copy link
Author

wheezil commented Dec 31, 2024

log4j2.properties file, put in resources to enable logger output

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSS} [%t] %c{1}.%M:%L %p - %m%ex%n

logger.app.name = com.azure.core
logger.app.level = ERROR

rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT

@wheezil
Copy link
Author

wheezil commented Jan 2, 2025

FYI I've been advised on stackoverflow to simply use the raw REST API, but this is a much less desirable solution, as we'll always be chasing security updates and other changes, which we'd really prefer the SDK do for us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-more-info Issue needs more information to triage question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Storage Storage Service (Queues, Blobs, Files)
Projects
None yet
Development

No branches or pull requests

2 participants