-
Notifications
You must be signed in to change notification settings - Fork 532
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2325 from lukas-krecan/s3
feat: add support for Amazon S3 as a lock storage
- Loading branch information
Showing
9 changed files
with
454 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>shedlock-parent</artifactId> | ||
<groupId>net.javacrumbs.shedlock</groupId> | ||
<version>6.0.3-SNAPSHOT</version> | ||
<relativePath>../../../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>shedlock-provider-s3</artifactId> | ||
<version>6.0.3-SNAPSHOT</version> | ||
|
||
<properties> | ||
<aws-java-sdk-s3.version>1.12.747</aws-java-sdk-s3.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>net.javacrumbs.shedlock</groupId> | ||
<artifactId>shedlock-core</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk-s3</artifactId> | ||
<version>${aws-java-sdk-s3.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>junit-jupiter</artifactId> | ||
<version>${test-containers.ver}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>localstack</artifactId> | ||
<version>${test-containers.ver}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>net.javacrumbs.shedlock</groupId> | ||
<artifactId>shedlock-test-support</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>ch.qos.logback</groupId> | ||
<artifactId>logback-classic</artifactId> | ||
<version>${logback.ver}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-jar-plugin</artifactId> | ||
<configuration> | ||
<archive> | ||
<manifestEntries> | ||
<Automatic-Module-Name> | ||
net.javacrumbs.shedlock.provider.s3 | ||
</Automatic-Module-Name> | ||
</manifestEntries> | ||
</archive> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
5 changes: 5 additions & 0 deletions
5
...iders/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/Lock.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package net.javacrumbs.shedlock.provider.s3; | ||
|
||
import java.time.Instant; | ||
|
||
record Lock(Instant lockUntil, Instant lockedAt, String lockedBy, String eTag) {} |
31 changes: 31 additions & 0 deletions
31
...hedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3LockProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package net.javacrumbs.shedlock.provider.s3; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import net.javacrumbs.shedlock.support.StorageBasedLockProvider; | ||
|
||
/** | ||
* Lock provider implementation for S3. | ||
*/ | ||
public class S3LockProvider extends StorageBasedLockProvider { | ||
|
||
/** | ||
* Constructs an S3LockProvider. | ||
* | ||
* @param s3Client Amazon S3 client used to interact with the S3 bucket. | ||
* @param bucketName The name of the S3 bucket where locks are stored. | ||
* @param objectPrefix The prefix of the S3 object lock. | ||
*/ | ||
public S3LockProvider(AmazonS3 s3Client, String bucketName, String objectPrefix) { | ||
super(new S3StorageAccessor(s3Client, bucketName, objectPrefix)); | ||
} | ||
|
||
/** | ||
* Constructs an S3LockProvider. | ||
* | ||
* @param s3Client Amazon S3 client used to interact with the S3 bucket. | ||
* @param bucketName The name of the S3 bucket where locks are stored. | ||
*/ | ||
public S3LockProvider(AmazonS3 s3Client, String bucketName) { | ||
this(s3Client, bucketName, "shedlock/"); | ||
} | ||
} |
189 changes: 189 additions & 0 deletions
189
...lock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3StorageAccessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
package net.javacrumbs.shedlock.provider.s3; | ||
|
||
import static net.javacrumbs.shedlock.core.ClockProvider.now; | ||
|
||
import com.amazonaws.AmazonServiceException; | ||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.amazonaws.services.s3.model.ObjectMetadata; | ||
import com.amazonaws.services.s3.model.PutObjectRequest; | ||
import com.amazonaws.services.s3.model.PutObjectResult; | ||
import java.io.ByteArrayInputStream; | ||
import java.nio.ByteBuffer; | ||
import java.time.Instant; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import net.javacrumbs.shedlock.core.LockConfiguration; | ||
import net.javacrumbs.shedlock.support.AbstractStorageAccessor; | ||
|
||
/** | ||
* Implementation of StorageAccessor for S3 as a lock storage backend. | ||
* Manages locks using S3 objects with metadata for expiration and conditional writes. | ||
*/ | ||
class S3StorageAccessor extends AbstractStorageAccessor { | ||
|
||
private static final String LOCK_UNTIL = "lockUntil"; | ||
private static final String LOCKED_AT = "lockedAt"; | ||
private static final String LOCKED_BY = "lockedBy"; | ||
private static final int PRECONDITION_FAILED = 412; | ||
|
||
private final AmazonS3 s3Client; | ||
private final String bucketName; | ||
private final String objectPrefix; | ||
|
||
public S3StorageAccessor(AmazonS3 s3Client, String bucketName, String objectPrefix) { | ||
this.s3Client = s3Client; | ||
this.bucketName = bucketName; | ||
this.objectPrefix = objectPrefix; | ||
} | ||
|
||
/** | ||
* Finds the lock in the S3 bucket. | ||
*/ | ||
Optional<Lock> find(String name, String action) { | ||
try { | ||
ObjectMetadata metadata = s3Client.getObjectMetadata(bucketName, objectName(name)); | ||
Instant lockUntil = Instant.parse(metadata.getUserMetaDataOf(LOCK_UNTIL)); | ||
Instant lockedAt = Instant.parse(metadata.getUserMetaDataOf(LOCKED_AT)); | ||
String lockedBy = metadata.getUserMetaDataOf(LOCKED_BY); | ||
String eTag = metadata.getETag(); | ||
|
||
logger.debug("Lock found. action: {}, name: {}, lockUntil: {}, e-tag: {}", action, name, lockUntil, eTag); | ||
return Optional.of(new Lock(lockUntil, lockedAt, lockedBy, eTag)); | ||
} catch (AmazonServiceException e) { | ||
if (e.getStatusCode() == 404) { | ||
logger.debug("Lock not found. action: {}, name: {}", action, name); | ||
return Optional.empty(); | ||
} | ||
throw e; | ||
} | ||
} | ||
|
||
@Override | ||
public boolean insertRecord(LockConfiguration lockConfiguration) { | ||
String name = lockConfiguration.getName(); | ||
if (find(name, "insertRecord").isPresent()) { | ||
logger.debug("Lock already exists. name: {}", name); | ||
return false; | ||
} | ||
|
||
try { | ||
var lockContent = getLockContent(); | ||
ObjectMetadata metadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname()); | ||
metadata.setContentLength(lockContent.length); | ||
|
||
PutObjectRequest request = | ||
new PutObjectRequest(bucketName, objectName(name), new ByteArrayInputStream(lockContent), metadata); | ||
request.putCustomRequestHeader("If-None-Match", "*"); | ||
|
||
s3Client.putObject(request); | ||
logger.debug("Lock created successfully. name: {}, metadata: {}", name, metadata.getUserMetadata()); | ||
return true; | ||
} catch (AmazonServiceException e) { | ||
if (e.getStatusCode() == PRECONDITION_FAILED) { | ||
logger.debug("Lock already in use. name: {}", name); | ||
} else { | ||
logger.warn("Failed to create lock. name: {}", name, e); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
@Override | ||
public boolean updateRecord(LockConfiguration lockConfiguration) { | ||
Optional<Lock> lock = find(lockConfiguration.getName(), "updateRecord"); | ||
if (lock.isEmpty()) { | ||
logger.warn("Update skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock); | ||
return false; | ||
} | ||
if (lock.get().lockUntil().isAfter(now())) { | ||
logger.debug("Update skipped. Lock still valid. name: {}, lock: {}", lockConfiguration.getName(), lock); | ||
return false; | ||
} | ||
|
||
ObjectMetadata newMetadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname()); | ||
return replaceObjectMetadata( | ||
lockConfiguration.getName(), newMetadata, lock.get().eTag(), "updateRecord"); | ||
} | ||
|
||
@Override | ||
public void unlock(LockConfiguration lockConfiguration) { | ||
Optional<Lock> lock = find(lockConfiguration.getName(), "unlock"); | ||
if (lock.isEmpty()) { | ||
logger.warn("Unlock skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock); | ||
return; | ||
} | ||
|
||
updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getUnlockTime(), "unlock"); | ||
} | ||
|
||
@Override | ||
public boolean extend(LockConfiguration lockConfiguration) { | ||
Optional<Lock> lock = find(lockConfiguration.getName(), "extend"); | ||
if (lock.isEmpty() | ||
|| lock.get().lockUntil().isBefore(now()) | ||
|| !lock.get().lockedBy().equals(getHostname())) { | ||
logger.debug( | ||
"Extend skipped. Lock invalid or not owned by host. name: {}, lock: {}", | ||
lockConfiguration.getName(), | ||
lock); | ||
return false; | ||
} | ||
|
||
return updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getLockAtMostUntil(), "extend"); | ||
} | ||
|
||
private boolean updateUntil(String name, Lock lock, Instant until, String action) { | ||
ObjectMetadata existingMetadata = s3Client.getObjectMetadata(bucketName, objectName(name)); | ||
ObjectMetadata newMetadata = | ||
createMetadata(until, Instant.parse(existingMetadata.getUserMetaDataOf(LOCKED_AT)), getHostname()); | ||
|
||
return replaceObjectMetadata(name, newMetadata, lock.eTag(), action); | ||
} | ||
|
||
private boolean replaceObjectMetadata(String name, ObjectMetadata newMetadata, String eTag, String action) { | ||
var lockContent = getLockContent(); | ||
newMetadata.setContentLength(lockContent.length); | ||
|
||
PutObjectRequest request = | ||
new PutObjectRequest(bucketName, objectName(name), new ByteArrayInputStream(lockContent), newMetadata); | ||
request.putCustomRequestHeader("If-Match", eTag); | ||
|
||
try { | ||
PutObjectResult response = s3Client.putObject(request); | ||
logger.debug( | ||
"Lock {} successfully. name: {}, old e-tag: {}, new e-tag: {}", | ||
action, | ||
name, | ||
eTag, | ||
response.getETag()); | ||
return true; | ||
} catch (AmazonServiceException e) { | ||
if (e.getStatusCode() == PRECONDITION_FAILED) { | ||
logger.debug("Lock not exists to {}. name: {}, e-tag {}", action, name, eTag); | ||
} else { | ||
logger.warn("Failed to {} lock. name: {}", action, name, e); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
private static byte[] getLockContent() { | ||
var uuid = UUID.randomUUID(); | ||
ByteBuffer bb = ByteBuffer.wrap(new byte[16]); | ||
bb.putLong(uuid.getMostSignificantBits()); | ||
bb.putLong(uuid.getLeastSignificantBits()); | ||
return bb.array(); | ||
} | ||
|
||
private ObjectMetadata createMetadata(Instant lockUntil, Instant lockedAt, String lockedBy) { | ||
ObjectMetadata metadata = new ObjectMetadata(); | ||
metadata.addUserMetadata(LOCK_UNTIL, lockUntil.toString()); | ||
metadata.addUserMetadata(LOCKED_AT, lockedAt.toString()); | ||
metadata.addUserMetadata(LOCKED_BY, lockedBy); | ||
return metadata; | ||
} | ||
|
||
private String objectName(String name) { | ||
return objectPrefix + name; | ||
} | ||
} |
Oops, something went wrong.