Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for deleting empty namespace parent folders #5699

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/src/main/java/io/kestra/core/storages/Namespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ default boolean delete(NamespaceFile file) throws IOException {
return delete(Path.of(file.path()));
}

/**
* Deletes namespaces directories at the given path.
*
* @param file the {@link NamespaceFile} to be deleted.
* @throws IOException if an error happens while performing the delete operation.
*/
default boolean deleteDirectory(NamespaceFile file) throws IOException {
return delete(Path.of(file.path()));
}

/**
* Deletes any namespaces files at the given path.
*
Expand All @@ -126,6 +136,21 @@ default boolean delete(NamespaceFile file) throws IOException {
*/
boolean delete(Path path) throws IOException;

/**
* Checks if a directory is empty.
*
* @param path the directory path to check
* @return true if the directory is empty or doesn't exist, false otherwise
* @throws IOException if an error occurs while checking the directory
*/
default boolean isDirectoryEmpty(String path) throws IOException {
List<NamespaceFile> files = findAllFilesMatching(
List.of(path + "/**"),
List.of()
);
return files.isEmpty();
}

enum Conflicts {
OVERWRITE,
ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.utils.PathMatcherPredicate;
import io.kestra.core.utils.Rethrow;
import io.kestra.plugin.core.namespace.DeleteFiles.Output;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
Expand All @@ -19,10 +20,13 @@
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

@SuperBuilder
@Getter
Expand Down Expand Up @@ -66,7 +70,7 @@
)
}
)
public class DeleteFiles extends Task implements RunnableTask<DeleteFiles.Output> {
public class DeleteFiles extends Task implements RunnableTask<Output> {
@NotNull
@Schema(
title = "The namespace from which the files should be deleted."
Expand All @@ -83,6 +87,15 @@ public class DeleteFiles extends Task implements RunnableTask<DeleteFiles.Output
@PluginProperty(dynamic = true)
private Object files;

@Schema(
title = "Whether to delete empty parent folders after deleting files.",
description = "If true, parent folders that become empty after file deletion will also be removed.",
defaultValue = "false"
)
@PluginProperty(dynamic = false)
@Builder.Default
private Boolean deleteParentFolder = false;

@SuppressWarnings("unchecked")
@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -101,22 +114,64 @@ public Output run(RunContext runContext) throws Exception {
}

List<NamespaceFile> matched = namespace.findAllFilesMatching(PathMatcherPredicate.matches(renderedFiles));
Set<String> parentFolders = Boolean.TRUE.equals(deleteParentFolder) ? new TreeSet<>() : null;
long count = matched
.stream()
.map(Rethrow.throwFunction(file -> {
if (namespace.delete(NamespaceFile.of(renderedNamespace, Path.of(file.path().replace("\\","/"))).storagePath())) {
logger.debug(String.format("Deleted %s", (file.path())));

if (Boolean.TRUE.equals(deleteParentFolder)) {
trackParentFolder(file, parentFolders);
}
return true;
}
return false;
}))
.filter(Boolean::booleanValue)
.count();

// Handle folder deletion if enabled
if (parentFolders != null && !parentFolders.isEmpty()) {
deleteEmptyFolders(namespace, parentFolders, logger);
}

runContext.metric(Counter.of("deleted", count));
return Output.builder().build();
}

private void deleteEmptyFolders(Namespace namespace, Set<String> folders, Logger logger) {
folders.stream()
.sorted((a, b) -> b.split("/").length - a.split("/").length)
.forEach(folderPath -> {
try {
if (namespace.isDirectoryEmpty(folderPath)) {
// Create proper NamespaceFile for folder with trailing slash
NamespaceFile folder = NamespaceFile.of(
namespace.namespace(),
URI.create(folderPath + "/")
);

if (namespace.deleteDirectory(folder)) {
logger.debug("Deleted empty folder: {}", folderPath);
}
}
} catch (IOException e) {
logger.warn("Failed to delete folder: " + folderPath, e);
}
});
}

private void trackParentFolder(NamespaceFile file, Set<String> parentFolders) {
String path = file.path();
int lastSlash = path.lastIndexOf('/');
while (lastSlash > 0) {
path = path.substring(0, lastSlash);
parentFolders.add(path);
lastSlash = path.lastIndexOf('/');
}
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package io.kestra.plugin.core.namespace;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

Expand All @@ -22,9 +21,6 @@

@KestraTest
public class DeleteFilesTest {
@Inject
StorageInterface storageInterface;

@Inject
RunContextFactory runContextFactory;

Expand Down Expand Up @@ -54,4 +50,89 @@ void shouldDeleteNamespaceFilesForMatchingExpression() throws Exception {
// Then
assertThat(namespace.all("/a/b/", false).size(), is(1));
}

@Test
void shouldDeleteParentFolder() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();

DeleteFiles deleteFiles = DeleteFiles.builder()
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**/file.txt"))
.namespace("{{ inputs.namespace }}")
.deleteParentFolder(true)
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
final Namespace namespace = runContext.storage().namespace(namespaceId);

namespace.putFile(Path.of("/folder/file.txt"), new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)));

assertThat(namespace.all("/folder/", false).size(), is(1));

// When
assertThat(deleteFiles.run(runContext), notNullValue());

// Then
assertThat(namespace.all("/folder/", false).size(), is(0));
assertThat(namespace.all("/", false).size(), is(0));
}

@Test
void shouldNotDeleteParentFolderWhenFlagIsFalse() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();

DeleteFiles deleteFiles = DeleteFiles.builder()
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**/file.txt"))
.namespace("{{ inputs.namespace }}")
.deleteParentFolder(false)
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
final Namespace namespace = runContext.storage().namespace(namespaceId);

namespace.putFile(Path.of("/folder/file.txt"), new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)));

assertThat(namespace.all("/folder/", false).size(), is(1));

// When
assertThat(deleteFiles.run(runContext), notNullValue());

// Then
assertThat(namespace.all("/folder/", false).size(), is(0));
assertThat(namespace.all("/", true).size(), is(1)); // Folder should still exist
}

@Test
void shouldNotDeleteParentFolderWhenMultipleFilesExist() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();

DeleteFiles deleteFiles = DeleteFiles.builder()
.id(DeleteFiles.class.getSimpleName())
.type(DeleteFiles.class.getName())
.files(List.of("**/file1.txt"))
.namespace("{{ inputs.namespace }}")
.deleteParentFolder(true)
.build();

final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId));
final Namespace namespace = runContext.storage().namespace(namespaceId);

namespace.putFile(Path.of("/folder/file1.txt"), new ByteArrayInputStream("content1".getBytes(StandardCharsets.UTF_8)));
namespace.putFile(Path.of("/folder/file2.txt"), new ByteArrayInputStream("content2".getBytes(StandardCharsets.UTF_8)));

assertThat(namespace.all("/folder/", false).size(), is(2));

// When
assertThat(deleteFiles.run(runContext), notNullValue());

// Then
assertThat(namespace.all("/folder/", false).size(), is(1)); // One file should still exist
assertThat(namespace.all("/", false).size(), is(1)); // Folder should still exist
}
}
Loading