Skip to content

Commit bae1321

Browse files
authored
Merge pull request #1491 from nats-io/os-purge-on-put-replace
Object Store purge on re-put
2 parents c77dc66 + cf7e4ef commit bae1321

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

src/main/java/io/nats/client/impl/NatsObjectStore.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public ObjectInfo put(ObjectMeta meta, InputStream inputStream) throws IOExcepti
8989
throw OsLinkNotAllowOnPut.instance();
9090
}
9191

92+
ObjectInfo newInfo;
93+
ObjectInfo oldInfo = getInfo(meta.getObjectName());
94+
9295
String nuid = NUID.nextGlobal();
9396
String chunkSubject = rawChunkSubject(nuid);
9497

@@ -122,7 +125,7 @@ public ObjectInfo put(ObjectMeta meta, InputStream inputStream) throws IOExcepti
122125
red = inputStream.read(buffer);
123126
}
124127

125-
return publishMeta(ObjectInfo.builder(bucketName, meta)
128+
newInfo = publishMeta(ObjectInfo.builder(bucketName, meta)
126129
.size(totalSize)
127130
.chunks(chunks)
128131
.nuid(nuid)
@@ -135,12 +138,20 @@ public ObjectInfo put(ObjectMeta meta, InputStream inputStream) throws IOExcepti
135138
jsm.purgeStream(streamName, PurgeOptions.subject(rawChunkSubject(nuid)));
136139
}
137140
catch (Exception ignore) {}
138-
139141
throw e;
140142
}
141143
finally {
142144
try { inputStream.close(); } catch (IOException ignore) {}
143145
}
146+
147+
if (oldInfo != null) {
148+
try {
149+
jsm.purgeStream(streamName, PurgeOptions.builder().subject(rawChunkSubject(oldInfo.getNuid())).build());
150+
}
151+
catch (IOException | JetStreamApiException ignore) {}
152+
}
153+
154+
return newInfo;
144155
}
145156

146157
/**

src/test/java/io/nats/client/impl/ObjectStoreTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ private static Object[] getInput(int size) {
238238
if (f.isFile()) {
239239
long flen = f.length();
240240
if (flen == size) {
241+
foundLen = flen;
241242
found = f;
242243
break;
243244
}
@@ -506,6 +507,42 @@ public void testCompression() throws Exception {
506507
});
507508
}
508509

510+
@Test
511+
public void testOverwrite() throws Exception {
512+
jsServer.run(TestBase::atLeast2_10, nc -> {
513+
JetStreamManagement jsm = nc.jetStreamManagement();
514+
JetStream js = jsm.jetStream();
515+
516+
String bucket = bucket();
517+
String objName = variant();
518+
519+
ObjectStoreManagement osm = nc.objectStoreManagement();
520+
ObjectStoreStatus oss = osm.create(ObjectStoreConfiguration.builder(bucket)
521+
.storageType(StorageType.Memory)
522+
.build());
523+
String realStreamName = oss.getConfiguration().getBackingConfig().getName();
524+
525+
ObjectStore os = nc.objectStore(bucket);
526+
527+
Object[] input = getInput(4096 * 10);
528+
File file = (File)input[1];
529+
530+
try (InputStream in = Files.newInputStream(file.toPath())) {
531+
os.put(objName, in);
532+
}
533+
StreamInfo si = jsm.getStreamInfo(realStreamName, StreamInfoOptions.allSubjects());
534+
long byteCount1 = si.getStreamState().getByteCount();
535+
536+
try (InputStream in = Files.newInputStream(file.toPath())) {
537+
os.put(objName, in);
538+
}
539+
si = jsm.getStreamInfo(realStreamName, StreamInfoOptions.allSubjects());
540+
long byteCount2 = si.getStreamState().getByteCount();
541+
542+
assertEquals(byteCount1, byteCount2);
543+
});
544+
}
545+
509546
static class TestObjectStoreWatcher implements ObjectStoreWatcher {
510547
public String name;
511548
public List<ObjectInfo> entries = new ArrayList<>();

0 commit comments

Comments
 (0)