Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/cli/commands/data_gc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ export const dataGcCommand = new Command()
.option("-f, --force", "Skip confirmation prompt")
.action(async function (options: AnyOptions) {
const ctx = createContext(options as GlobalOptions, ["data", "gc"]);
const { repoDir, repoContext } = await requireInitializedRepo({
const { repoContext } = await requireInitializedRepo({
repoDir: options.repoDir ?? ".",
outputMode: ctx.outputMode,
});

const service = new DefaultDataLifecycleService(
repoContext.unifiedDataRepo,
repoContext.workflowRunRepo,
repoDir,
);

// If interactive and no force, prompt for confirmation
Expand Down
147 changes: 46 additions & 101 deletions src/domain/data/data_lifecycle_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
// You should have received a copy of the GNU Affero General Public License
// along with Swamp. If not, see <https://www.gnu.org/licenses/>.

import { join } from "@std/path";
import type { Data } from "./data.ts";
import type { Lifetime } from "./data_metadata.ts";
import {
SWAMP_SUBDIRS,
swampPath,
} from "../../infrastructure/persistence/paths.ts";
import type { UnifiedDataRepository } from "../../infrastructure/persistence/unified_data_repository.ts";
import type { WorkflowRunRepository } from "../workflows/repositories.ts";
import { ModelType } from "../models/model_type.ts";
import type { ModelType } from "../models/model_type.ts";
import {
createWorkflowId,
createWorkflowRunId,
Expand Down Expand Up @@ -109,7 +104,6 @@ export class DefaultDataLifecycleService implements DataLifecycleService {
constructor(
private readonly dataRepo: UnifiedDataRepository,
private readonly workflowRunRepo: WorkflowRunRepository,
private readonly repoDir: string,
) {}

calculateExpiration(lifetime: Lifetime, createdAt: Date): Date | null {
Expand Down Expand Up @@ -180,75 +174,36 @@ export class DefaultDataLifecycleService implements DataLifecycleService {

async findExpiredData(): Promise<ExpiredDataInfo[]> {
const expired: ExpiredDataInfo[] = [];
const dataDir = swampPath(this.repoDir, SWAMP_SUBDIRS.data);

try {
// Scan model types
for await (const typeEntry of Deno.readDir(dataDir)) {
if (!typeEntry.isDirectory) continue;
const type = ModelType.create(typeEntry.name);

const typeDir = join(dataDir, type.toDirectoryPath());

// Scan model IDs
for await (const modelEntry of Deno.readDir(typeDir)) {
if (!modelEntry.isDirectory) continue;
const modelId = modelEntry.name;

const modelDir = join(typeDir, modelId);

// Scan data names
for await (const dataEntry of Deno.readDir(modelDir)) {
if (!dataEntry.isDirectory) continue;
const dataName = dataEntry.name;

// Load the latest version metadata
try {
const data = await this.dataRepo.findByName(
type,
modelId,
dataName,
);
if (!data) continue;

const isExpired = await this.isExpired(data);
if (isExpired) {
let reason: ExpiredDataInfo["reason"];
if (
data.lifetime === "workflow" ||
data.lifetime === "job"
) {
reason = data.lifetime === "workflow"
? "workflow-deleted"
: "job-deleted";
} else {
reason = "duration-expired";
}

expired.push({
type,
modelId,
dataName,
data,
reason,
});
}
} catch (error) {
console.error(
`Error checking data ${type}/${modelId}/${dataName}:`,
error,
);
// Continue with other data
}
const allData = await this.dataRepo.findAllGlobal();
for (const { data, modelType, modelId } of allData) {
try {
const isExpired = await this.isExpired(data);
if (isExpired) {
let reason: ExpiredDataInfo["reason"];
if (data.lifetime === "workflow" || data.lifetime === "job") {
reason = data.lifetime === "workflow"
? "workflow-deleted"
: "job-deleted";
} else {
reason = "duration-expired";
}

expired.push({
type: modelType,
modelId,
dataName: data.name,
data,
reason,
});
}
} catch (error) {
console.error(
`Error checking data ${modelType.toDirectoryPath()}/${modelId}/${data.name}:`,
error,
);
// Continue with other data
}
} catch (error) {
if (error instanceof Deno.errors.NotFound) {
// Data directory doesn't exist yet
return [];
}
throw error;
}

return expired;
Expand Down Expand Up @@ -291,35 +246,25 @@ export class DefaultDataLifecycleService implements DataLifecycleService {
// Now run version-based garbage collection on all models
// This hard-deletes old versions based on garbageCollection policy
if (!dryRun) {
const dataDir = swampPath(this.repoDir, SWAMP_SUBDIRS.data);
try {
for await (const typeEntry of Deno.readDir(dataDir)) {
if (!typeEntry.isDirectory) continue;
const type = ModelType.create(typeEntry.name);
const typeDir = join(dataDir, type.toDirectoryPath());

for await (const modelEntry of Deno.readDir(typeDir)) {
if (!modelEntry.isDirectory) continue;
const modelId = modelEntry.name;

try {
const result = await this.dataRepo.collectGarbage(
type,
modelId,
);
versionsDeleted += result.versionsRemoved;
bytesReclaimed += result.bytesReclaimed;
} catch (error) {
console.error(
`Error running GC on ${type.toDirectoryPath()}/${modelId}:`,
error,
);
}
}
}
} catch (error) {
if (!(error instanceof Deno.errors.NotFound)) {
console.error("Error during version GC:", error);
const allData = await this.dataRepo.findAllGlobal();
const seen = new Set<string>();
for (const { modelType, modelId } of allData) {
const key = `${modelType.toDirectoryPath()}/${modelId}`;
if (seen.has(key)) continue;
seen.add(key);

try {
const result = await this.dataRepo.collectGarbage(
modelType,
modelId,
);
versionsDeleted += result.versionsRemoved;
bytesReclaimed += result.bytesReclaimed;
} catch (error) {
console.error(
`Error running GC on ${modelType.toDirectoryPath()}/${modelId}:`,
error,
);
}
}
}
Expand Down
Loading