From 9a3cb0a817d2615dce5dd25310dd34c86fab323f Mon Sep 17 00:00:00 2001 From: Ryan Dunckel Date: Tue, 3 Jan 2017 23:18:50 -0600 Subject: [PATCH 1/6] support Path based watching and cross-platform tests --- pom.xml | 15 ++ .../github/davidmoten/rx/FileObservable.java | 153 ++++++++--- .../operators/OperatorFileTailer.java | 51 ++-- .../davidmoten/rx/FileObservableTest.java | 250 +++++++++++------- 4 files changed, 308 insertions(+), 161 deletions(-) diff --git a/pom.xml b/pom.xml index d6d1edf..5771093 100644 --- a/pom.xml +++ b/pom.xml @@ -117,6 +117,21 @@ 1.9.5 test + + + com.google.jimfs + jimfs + 1.1 + test + + + + org.awaitility + awaitility + 2.0.0 + test + + diff --git a/src/main/java/com/github/davidmoten/rx/FileObservable.java b/src/main/java/com/github/davidmoten/rx/FileObservable.java index 77c9197..37688f2 100644 --- a/src/main/java/com/github/davidmoten/rx/FileObservable.java +++ b/src/main/java/com/github/davidmoten/rx/FileObservable.java @@ -2,8 +2,10 @@ import java.io.File; import java.nio.charset.Charset; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchEvent.Kind; @@ -37,6 +39,14 @@ private FileObservable() { // prevent instantiation } + /** + * @see {@link #tailFile(Path, long, long, int)} + */ + public final static Observable tailFile(File file, long startPosition, + long sampleTimeMs, int chunkSize) { + return tailFile(file.toPath(), startPosition, sampleTimeMs, chunkSize); + } + /** * Returns an {@link Observable} that uses NIO {@link WatchService} (and a * dedicated thread) to push modified events to an observable that reads and @@ -59,7 +69,7 @@ private FileObservable() { * don't know what to put here. * @return observable of byte arrays */ - public final static Observable tailFile(File file, long startPosition, + public final static Observable tailFile(Path file, long startPosition, long sampleTimeMs, int chunkSize) { Preconditions.checkNotNull(file); Observable events = from(file, StandardWatchEventKinds.ENTRY_CREATE, @@ -73,6 +83,14 @@ public final static Observable tailFile(File file, long startPosition, return tailFile(file, startPosition, sampleTimeMs, chunkSize, events); } + /** + * @see {@link #tailFile(Path, long, long, int, Observable) + */ + public final static Observable tailFile(File file, long startPosition, + long sampleTimeMs, int chunkSize, Observable events) { + return tailFile(file.toPath(), startPosition, sampleTimeMs, chunkSize, events); + } + /** * Returns an {@link Observable} that uses given given observable to push * modified events to an observable that reads and reports new sequences of @@ -99,7 +117,7 @@ public final static Observable tailFile(File file, long startPosition, * {@link Observable#interval(long, TimeUnit)} for example. * @return observable of byte arrays */ - public final static Observable tailFile(File file, long startPosition, + public final static Observable tailFile(Path file, long startPosition, long sampleTimeMs, int chunkSize, Observable events) { Preconditions.checkNotNull(file); return sampleModifyOrOverflowEventsOnly(events, sampleTimeMs) @@ -107,6 +125,14 @@ public final static Observable tailFile(File file, long startPosition, .lift(new OperatorFileTailer(file, startPosition, chunkSize)); } + /** + * @see {@link #tailTextFile(Path, long, long, Charset)} + */ + public final static Observable tailTextFile(File file, long startPosition, + long sampleTimeMs, Charset charset) { + return tailTextFile(file.toPath(), startPosition, sampleTimeMs, charset); + } + /** * Returns an {@link Observable} that uses NIO {@link WatchService} (and a * dedicated thread) to push modified events to an observable that reads and @@ -127,12 +153,20 @@ public final static Observable tailFile(File file, long startPosition, * the character set to use to decode the bytes to a string * @return observable of strings */ - public final static Observable tailTextFile(File file, long startPosition, + public final static Observable tailTextFile(Path file, long startPosition, long sampleTimeMs, Charset charset) { return toLines(tailFile(file, startPosition, sampleTimeMs, DEFAULT_MAX_BYTES_PER_EMISSION), charset); } + /** + * @see {@link #tailTextFile(Path, long, int, Charset, Observable)} + */ + public final static Observable tailTextFile(File file, long startPosition, + int chunkSize, Charset charset, Observable events) { + return tailTextFile(file.toPath(), startPosition, chunkSize, charset, events); + } + /** * Returns an {@link Observable} of String that uses the given events stream * to trigger checks on file change so that new lines can be read and @@ -154,7 +188,7 @@ public final static Observable tailTextFile(File file, long startPositio * {@link Observable#interval(long, TimeUnit)} for example. * @return observable of strings */ - public final static Observable tailTextFile(File file, long startPosition, + public final static Observable tailTextFile(Path file, long startPosition, int chunkSize, Charset charset, Observable events) { Preconditions.checkNotNull(file); Preconditions.checkNotNull(charset); @@ -216,6 +250,15 @@ public final static Observable> from(WatchService watchService) { TimeUnit.MILLISECONDS, 0, TimeUnit.SECONDS, BackpressureStrategy.BUFFER); } + + /** + * @see {@link #from(Path, Kind...)} + */ + @SafeVarargs + public final static Observable> from(final File file, Kind... kinds) { + return from(file.toPath(), kinds); + } + /** * If file does not exist at subscribe time then is assumed to not be a * directory. If the file is not a directory (bearing in mind the aforesaid @@ -231,11 +274,17 @@ public final static Observable> from(WatchService watchService) { * event kinds to watch for and emit * @return observable of watch events */ - @SafeVarargs - public final static Observable> from(final File file, Kind... kinds) { + public final static Observable> from(final Path file, Kind... kinds) { return from(file, null, kinds); } + /** + * @see {@link #from(Path, List)} + */ + public final static Observable> from(final File file, List> kinds) { + return from(file.toPath(), kinds); + } + /** * If file does not exist at subscribe time then is assumed to not be a * directory. If the file is not a directory (bearing in mind the aforesaid @@ -251,10 +300,18 @@ public final static Observable> from(final File file, Kind... k * event kinds to watch for and emit * @return observable of watch events */ - public final static Observable> from(final File file, List> kinds) { + public final static Observable> from(final Path file, List> kinds) { return from(file, null, kinds.toArray(new Kind[] {})); } + /** + * @see {@link #from(Path, Action0, Kind...)} + */ + public final static Observable> from(final File file, + final Action0 onWatchStarted, Kind... kinds) { + return from(file.toPath(), onWatchStarted, kinds); + } + /** * If file does not exist at subscribe time then is assumed to not be a * directory. If the file is not a directory (bearing in mind the aforesaid @@ -272,7 +329,7 @@ public final static Observable> from(final File file, List * kinds of watch events to register for * @return observable of watch events */ - public final static Observable> from(final File file, + public final static Observable> from(final Path file, final Action0 onWatchStarted, Kind... kinds) { return watchService(file, kinds) // when watch service created call onWatchStarted @@ -289,6 +346,16 @@ public void call(WatchService w) { .filter(onlyRelatedTo(file)); } + /** + * @see {@link #watchService(Path, Kind...)} + */ + @SafeVarargs + public final static Observable watchService(final File file, + final Kind... kinds) { + return watchService(file.toPath(), kinds); + + } + /** * Creates a {@link WatchService} on subscribe for the given file and event * kinds. @@ -299,17 +366,16 @@ public void call(WatchService w) { * event kinds to watch for * @return observable of watch events */ - @SafeVarargs - public final static Observable watchService(final File file, + public final static Observable watchService(final Path file, final Kind... kinds) { return Observable.defer(new Func0>() { @Override public Observable call() { try { - final Path path = getBasePath(file); - WatchService watchService = path.getFileSystem().newWatchService(); - path.register(watchService, kinds); + final Path basePath = getBasePath(file); + WatchService watchService = basePath.getFileSystem().newWatchService(); + basePath.register(watchService, kinds); return Observable.just(watchService); } catch (Exception e) { return Observable.error(e); @@ -318,14 +384,11 @@ public Observable call() { }); } - - private final static Path getBasePath(final File file) { - final Path path; - if (file.exists() && file.isDirectory()) - path = Paths.get(file.toURI()); - else - path = Paths.get(file.getParentFile().toURI()); - return path; + + private final static Path getBasePath(final Path path) { + return Files.exists(path) && Files.isDirectory(path) + ? path + : path.getParent(); } /** @@ -337,14 +400,14 @@ private final static Path getBasePath(final File file) { * the file to restrict events to * @return predicate */ - private final static Func1, Boolean> onlyRelatedTo(final File file) { + private final static Func1, Boolean> onlyRelatedTo(final Path path) { return new Func1, Boolean>() { @Override public Boolean call(WatchEvent event) { final boolean ok; - if (file.isDirectory()) + if (Files.isDirectory(path)) ok = true; else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind())) ok = true; @@ -352,9 +415,9 @@ else if (StandardWatchEventKinds.OVERFLOW.equals(event.kind())) Object context = event.context(); if (context != null && context instanceof Path) { Path p = (Path) context; - Path basePath = getBasePath(file); - File pFile = new File(basePath.toFile(), p.toString()); - ok = pFile.getAbsolutePath().equals(file.getAbsolutePath()); + Path basePath = getBasePath(path); + Path pPath = basePath.resolve(p); + ok = pPath.toAbsolutePath().equals(path.toAbsolutePath()); } else ok = false; } @@ -417,11 +480,15 @@ public Boolean call(Object event) { }; public static WatchEventsBuilder from(File file) { - return new WatchEventsBuilder(file); + return from(file.toPath()); + } + + public static WatchEventsBuilder from(Path path) { + return new WatchEventsBuilder(path); } public static final class WatchEventsBuilder { - private final File file; + private final Path path; private Optional scheduler = Optional.absent(); private long pollInterval = 0; private TimeUnit pollIntervalUnit = TimeUnit.MILLISECONDS; @@ -430,8 +497,8 @@ public static final class WatchEventsBuilder { private final List> kinds = new ArrayList<>(); private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER; - private WatchEventsBuilder(File file) { - this.file = file; + private WatchEventsBuilder(Path path) { + this.path = path; } public WatchEventsBuilder scheduler(Scheduler scheduler) { @@ -471,7 +538,7 @@ public WatchEventsBuilder onBackpressure(BackpressureStrategy strategy) { } public Observable> events() { - return watchService(file, kinds.toArray(new Kind[] {})) + return watchService(path, kinds.toArray(new Kind[] {})) .flatMap(new Func1>>() { @Override public Observable> call(WatchService watchService) { @@ -499,7 +566,7 @@ public static TailerBuilder tailer() { public static final class TailerBuilder { - private File file = null; + private Path path = null; private long startPosition = 0; private long sampleTimeMs = 500; private int chunkSize = 8192; @@ -511,10 +578,18 @@ public void call() { // do nothing } }; + private FileSystem fileSystem = FileSystems.getDefault(); private TailerBuilder() { } + /** + * @see {@link #file(Path)} + */ + public TailerBuilder file(File file) { + return file(file.toPath()); + } + /** * The file to tail. * @@ -522,13 +597,13 @@ private TailerBuilder() { * file to tail * @return the builder (this) */ - public TailerBuilder file(File file) { - this.file = file; + public TailerBuilder file(Path file) { + this.path = file; return this; } public TailerBuilder file(String filename) { - return file(new File(filename)); + return file(fileSystem.getPath(filename)); } public TailerBuilder onWatchStarted(Action0 onWatchStarted) { @@ -611,16 +686,16 @@ public TailerBuilder source(Observable source) { public Observable tail() { - return tailFile(file, startPosition, sampleTimeMs, chunkSize, getSource()); + return tailFile(path, startPosition, sampleTimeMs, chunkSize, getSource()); } public Observable tailText() { - return tailTextFile(file, startPosition, chunkSize, charset, getSource()); + return tailTextFile(path, startPosition, chunkSize, charset, getSource()); } private Observable getSource() { if (source == null) - return from(file, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE, + return from(path, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW); else return source; diff --git a/src/main/java/com/github/davidmoten/rx/internal/operators/OperatorFileTailer.java b/src/main/java/com/github/davidmoten/rx/internal/operators/OperatorFileTailer.java index 5ac2b5d..2d8f69e 100644 --- a/src/main/java/com/github/davidmoten/rx/internal/operators/OperatorFileTailer.java +++ b/src/main/java/com/github/davidmoten/rx/internal/operators/OperatorFileTailer.java @@ -1,11 +1,13 @@ package com.github.davidmoten.rx.internal.operators; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import com.github.davidmoten.rx.Bytes; @@ -25,10 +27,22 @@ */ public class OperatorFileTailer implements Operator { - private final File file; + private final Path path; private final AtomicLong currentPosition = new AtomicLong(); private final int maxBytesPerEmission; + /** + * Constructor. Emits byte arrays of up to 8*1024 bytes. + * + * @param file + * file to tail + * @param startPosition + * start position in bytes + */ + public OperatorFileTailer(File file, long startPosition) { + this(file, startPosition, 8192); + } + /** * Constructor. * @@ -40,25 +54,15 @@ public class OperatorFileTailer implements Operator { * maximum number of bytes per emission */ public OperatorFileTailer(File file, long startPosition, int maxBytesPerEmission) { - if (file == null) - throw new NullPointerException("file cannot be null"); - this.file = file; + this(Objects.requireNonNull(file, "file cannot be null").toPath(), startPosition, maxBytesPerEmission); + } + + public OperatorFileTailer(Path path, long startPosition, int maxBytesPerEmission) { + this.path = Objects.requireNonNull(path, "file cannot be null"); this.currentPosition.set(startPosition); this.maxBytesPerEmission = maxBytesPerEmission; } - /** - * Constructor. Emits byte arrays of up to 8*1024 bytes. - * - * @param file - * file to tail - * @param startPosition - * start position in bytes - */ - public OperatorFileTailer(File file, long startPosition) { - this(file, startPosition, 8192); - } - @Override public Subscriber call(Subscriber child) { final PublishSubjectSingleSubscriber subject = PublishSubjectSingleSubscriber @@ -67,13 +71,13 @@ public Subscriber call(Subscriber child) { child.add(parent); subject // report new lines for each event - .concatMap(reportNewLines(file, currentPosition, maxBytesPerEmission)) + .concatMap(reportNewLines(path, currentPosition, maxBytesPerEmission)) // subscribe .unsafeSubscribe(child); return parent; } - private static Func1> reportNewLines(final File file, + private static Func1> reportNewLines(final Path path, final AtomicLong currentPosition, final int maxBytesPerEmission) { return new Func1>() { @Override @@ -86,10 +90,15 @@ public Observable call(Object event) { currentPosition.set(0); } } - long length = file.length(); + long length; + try { + length = Files.size(path); + } catch (IOException e) { + return Observable.error(e); + } if (length > currentPosition.get()) { try { - final FileInputStream fis = new FileInputStream(file); + final InputStream fis = Files.newInputStream(path); fis.skip(currentPosition.get()); // apply using method to ensure fis is closed on // termination or unsubscription diff --git a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java index f8a63ba..600a1fb 100644 --- a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java +++ b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java @@ -2,16 +2,19 @@ import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.PrintStream; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchEvent.Kind; @@ -20,12 +23,20 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; +import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; +import com.google.common.jimfs.WatchServiceConfiguration; + import rx.Observable; import rx.Observer; import rx.Subscription; @@ -34,12 +45,26 @@ import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; + public class FileObservableTest { + private FileSystem fileSystem; + + @Before + public void setup() throws IOException { + WatchServiceConfiguration watchServiceConfig = WatchServiceConfiguration.polling(10, MILLISECONDS); + Configuration fileSystemConfig = Configuration.osX().toBuilder().setWatchServiceConfiguration(watchServiceConfig) + .build(); + this.fileSystem = Jimfs.newFileSystem(fileSystemConfig); + Path target = fileSystem.getPath("target"); + Files.createDirectories(target); + } + @Test public void testNoEventsThrownIfFileDoesNotExist() throws InterruptedException { - File file = new File("target/does-not-exist"); + Path file = fileSystem.getPath("target", "does-not-exist"); Observable> events = FileObservable.from(file, ENTRY_MODIFY); + final CountDownLatch latch = new CountDownLatch(1); Subscription sub = events.subscribeOn(Schedulers.io()) .subscribe(new Observer>() { @@ -60,6 +85,7 @@ public void onNext(WatchEvent arg0) { latch.countDown(); } }); + assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); sub.unsubscribe(); } @@ -67,45 +93,52 @@ public void onNext(WatchEvent arg0) { @Test public void testCreateAndModifyEventsForANonDirectoryFileBlockForever() throws InterruptedException, IOException { - File file = new File("target/f"); + Path file = fileSystem.getPath("target", "f"); Observable> events = FileObservable.from(file).kind(ENTRY_MODIFY) .kind(ENTRY_CREATE).events(); + checkCreateAndModifyEvents(file, events); } @Test public void testCreateAndModifyEventsForANonDirectoryFilePollEveryInterval() throws InterruptedException, IOException { - File file = new File("target/f"); + Path file = fileSystem.getPath("target", "f"); Observable> events = FileObservable.from(file).kind(ENTRY_MODIFY) .kind(ENTRY_CREATE).pollInterval(100, TimeUnit.MILLISECONDS).events(); + checkCreateAndModifyEvents(file, events); } @Test public void testCreateAndModifyEventsForANonDirectoryFileBlockingPollEveryInterval() throws InterruptedException, IOException { - File file = new File("target/f"); + Path file = fileSystem.getPath("target", "f"); Observable> events = FileObservable.from(file).kind(ENTRY_MODIFY) .kind(ENTRY_CREATE).pollInterval(100, TimeUnit.MILLISECONDS) .pollDuration(100, TimeUnit.MILLISECONDS).events(); + checkCreateAndModifyEvents(file, events); } - private void checkCreateAndModifyEvents(File file, Observable> events) + private void checkCreateAndModifyEvents(Path file, Observable> events) throws InterruptedException, IOException, FileNotFoundException { - file.delete(); - final CountDownLatch latch = new CountDownLatch(1); + Files.deleteIfExists(file); + @SuppressWarnings("unchecked") final List> eventKinds = Mockito.mock(List.class); InOrder inOrder = Mockito.inOrder(eventKinds); - final AtomicInteger errorCount = new AtomicInteger(0); + + final AtomicBoolean isComplete = new AtomicBoolean(); + final AtomicInteger eventCount = new AtomicInteger(); + final AtomicInteger errorCount = new AtomicInteger(); + Subscription sub = events.subscribeOn(Schedulers.io()) .subscribe(new Observer>() { @Override public void onCompleted() { - System.out.println("completed"); + isComplete.set(true); } @Override @@ -115,34 +148,40 @@ public void onError(Throwable e) { @Override public void onNext(WatchEvent event) { - System.out.println("event=" + event); eventKinds.add(event.kind()); - latch.countDown(); + eventCount.incrementAndGet(); } }); - // sleep long enough for WatchService to start - Thread.sleep(1000); - file.createNewFile(); - FileOutputStream fos = new FileOutputStream(file, true); - fos.write("hello there".getBytes()); - fos.close(); - // give the WatchService time to register the change - Thread.sleep(100); - assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); + + sleepForWatchEvent(); + + Files.createFile(file); + await().untilAtomic(eventCount, equalTo(1)); + + Files.write(file, "hello there".getBytes(),StandardOpenOption.CREATE, StandardOpenOption.APPEND); + await().untilAtomic(eventCount, equalTo(2)); + inOrder.verify(eventKinds).add(StandardWatchEventKinds.ENTRY_CREATE); inOrder.verify(eventKinds).add(StandardWatchEventKinds.ENTRY_MODIFY); inOrder.verifyNoMoreInteractions(); - sub.unsubscribe(); - Thread.sleep(100); + sub.unsubscribe(); assertEquals(0, errorCount.get()); } + private ConditionFactory await() { + return Awaitility.await().with().pollInterval(10, MILLISECONDS).atMost(200, MILLISECONDS); + } + + private void sleepForWatchEvent() throws InterruptedException { + Thread.sleep(20); + } + @Test public void testFileTailingFromStartOfFile() throws InterruptedException, IOException { - final File log = new File("target/test.log"); - log.delete(); - log.createNewFile(); + final Path log = fileSystem.getPath("target", "test.log"); + Files.deleteIfExists(log); + Files.createFile(log); append(log, "a0"); Observable tailer = FileObservable.tailer().file(log).onWatchStarted(new Action0() { @@ -152,136 +191,145 @@ public void call() { append(log, "a2"); } }).sampleTimeMs(50).utf8().tailText(); - final List list = new ArrayList(); - final CountDownLatch latch = new CountDownLatch(3); + + final List list = new ArrayList<>(); + final AtomicInteger eventCount = new AtomicInteger(); + Subscription sub = tailer.subscribeOn(Schedulers.io()).subscribe(new Action1() { @Override public void call(String line) { - System.out.println("received: '" + line + "'"); list.add(line); - latch.countDown(); + eventCount.incrementAndGet(); } }); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + + await().untilAtomic(eventCount, equalTo(3)); assertEquals(Arrays.asList("a0", "a1", "a2"), list); + sub.unsubscribe(); } - + @Test public void testFileTailingWhenFileIsCreatedAfterSubscription() throws InterruptedException, IOException { - final File log = new File("target/test.log"); - log.delete(); - + final Path log = fileSystem.getPath("target", "test.log"); + Files.deleteIfExists(log); append(log, "a0"); - Observable tailer = FileObservable.tailer().file(log).startPosition(0) - .sampleTimeMs(50).utf8().onWatchStarted(new Action0() { - @Override - public void call() { - try { - log.createNewFile(); - } catch (IOException e) { - throw new RuntimeException(e); - } - append(log, "a1"); - append(log, "a2"); - } - }).tailText(); + + Observable tailer = FileObservable.tailer().file(log).startPosition(0).sampleTimeMs(50).utf8() + .onWatchStarted(new Action0() { + @Override + public void call() { + try { + if (!Files.exists(log)) { + Files.createFile(log); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + append(log, "a1"); + append(log, "a2"); + } + }).tailText(); final List list = new ArrayList(); - final CountDownLatch latch = new CountDownLatch(3); + final AtomicInteger eventCount = new AtomicInteger(); + Subscription sub = tailer.subscribeOn(Schedulers.io()).subscribe(new Action1() { @Override public void call(String line) { - System.out.println("received: '" + line + "'"); list.add(line); - latch.countDown(); + eventCount.incrementAndGet(); } }); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + + await().untilAtomic(eventCount, equalTo(3)); assertEquals(Arrays.asList("a0", "a1", "a2"), list); + sub.unsubscribe(); } - - private static void append(File file, String line) { - try { - FileOutputStream fos = new FileOutputStream(file, true); - fos.write(line.getBytes(Charset.forName("UTF-8"))); - fos.write('\n'); - fos.close(); - } catch (FileNotFoundException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - + @Test public void testTailTextFileStreamsFromEndOfFileIfSpecified() - throws FileNotFoundException, InterruptedException { - File file = new File("target/test1.txt"); - file.delete(); - try (PrintStream out = new PrintStream(file)) { - out.println("line 1"); - } - final List list = new ArrayList(); + throws InterruptedException, IOException { + Path file = fileSystem.getPath("target", "test1.txt"); + Files.deleteIfExists(file); + append(file, "line 1"); + + final List list = new ArrayList<>(); TestSubscriber ts = TestSubscriber.create(); - FileObservable.tailer().file(file).startPosition(file.length()).sampleTimeMs(10).utf8() + final AtomicInteger eventCount = new AtomicInteger(); + + FileObservable.tailer().file(file).startPosition(Files.size(file)).sampleTimeMs(10).utf8() .tailText() // for each .doOnNext(new Action1() { - @Override public void call(String line) { - System.out.println(line); list.add(line); + eventCount.incrementAndGet(); } }).subscribeOn(Schedulers.newThread()).subscribe(ts); - Thread.sleep(1100); + + sleepForWatchEvent(); + assertTrue(list.isEmpty()); - try (PrintStream out = new PrintStream(new FileOutputStream(file, true))) { - out.println("line 2"); - } - Thread.sleep(1100); + + append(file, "line 2"); + await().untilAtomic(eventCount, equalTo(1)); assertEquals(1, list.size()); assertEquals("line 2", list.get(0).trim()); + ts.unsubscribe(); } @Test public void testTailTextFileStreamsFromEndOfFileIfDeleteOccurs() throws InterruptedException, IOException { - File file = new File("target/test2.txt"); - file.delete(); - try (PrintStream out = new PrintStream(file)) { - out.println("line 1"); - } + Path file = fileSystem.getPath("target", "test2.txt"); + Files.deleteIfExists(file); + append(file, "hello there"); + final List list = new ArrayList(); - Subscription sub = FileObservable.tailer().file(file).startPosition(file.length()) + final AtomicInteger eventCount = new AtomicInteger(); + + Subscription sub = FileObservable.tailer().file(file).startPosition(Files.size(file)) .sampleTimeMs(10).utf8().tailText() // for each .doOnNext(new Action1() { - @Override public void call(String line) { - System.out.println(line); list.add(line); + eventCount.incrementAndGet(); } }).subscribeOn(Schedulers.newThread()).subscribe(); - // delay must be long enough for last update timestamp to change on - // windows (resolution to the second) - Thread.sleep(1100); + + sleepForWatchEvent(); + assertTrue(list.isEmpty()); + // delete file then make it bigger than it was - assertTrue(file.delete()); - try (PrintStream out = new PrintStream(new FileOutputStream(file, true))) { - out.println("line 2"); - out.println("line 3"); - } - Thread.sleep(1100); + assertTrue(Files.deleteIfExists(file)); + sleepForWatchEvent(); + + append(file, "line 2"); + await().untilAtomic(eventCount, equalTo(1)); + + append(file, "line 3"); + await().untilAtomic(eventCount, equalTo(2)); + assertEquals(2, list.size()); assertEquals("line 2", list.get(0).trim()); assertEquals("line 3", list.get(1).trim()); + sub.unsubscribe(); } + + private static void append(Path file, String line) { + try { + Files.write(file, (line + "\n").getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } From bbd77ae78649ef9bedb55e8d112ea5fe0db1b484 Mon Sep 17 00:00:00 2001 From: Ryan Dunckel Date: Tue, 3 Jan 2017 23:26:23 -0600 Subject: [PATCH 2/6] Test across filesystems --- .../davidmoten/rx/FileObservableTest.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java index 600a1fb..4859244 100644 --- a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java +++ b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java @@ -20,6 +20,7 @@ import java.nio.file.WatchEvent.Kind; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -30,6 +31,9 @@ import org.awaitility.core.ConditionFactory; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.InOrder; import org.mockito.Mockito; @@ -45,17 +49,27 @@ import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; - +@RunWith(Parameterized.class) public class FileObservableTest { private FileSystem fileSystem; + private Configuration fileSystemConfig; + + public FileObservableTest(Configuration fileSystemConifg) { + this.fileSystemConfig = fileSystemConifg; + } + + @Parameters + public static Collection data() { + return Arrays + .asList(new Configuration[] { Configuration.windows(), Configuration.unix(), Configuration.osX() }); + } @Before public void setup() throws IOException { WatchServiceConfiguration watchServiceConfig = WatchServiceConfiguration.polling(10, MILLISECONDS); - Configuration fileSystemConfig = Configuration.osX().toBuilder().setWatchServiceConfiguration(watchServiceConfig) - .build(); - this.fileSystem = Jimfs.newFileSystem(fileSystemConfig); + Configuration config = fileSystemConfig.toBuilder().setWatchServiceConfiguration(watchServiceConfig).build(); + this.fileSystem = Jimfs.newFileSystem(config); Path target = fileSystem.getPath("target"); Files.createDirectories(target); } From b5ed0b9632815d0654cbea9feb7a03ba725559ce Mon Sep 17 00:00:00 2001 From: Ryan Dunckel Date: Tue, 3 Jan 2017 23:34:40 -0600 Subject: [PATCH 3/6] increase await timeout to possibly help slow ci --- src/test/java/com/github/davidmoten/rx/FileObservableTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java index 4859244..7706f05 100644 --- a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java +++ b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java @@ -184,7 +184,7 @@ public void onNext(WatchEvent event) { } private ConditionFactory await() { - return Awaitility.await().with().pollInterval(10, MILLISECONDS).atMost(200, MILLISECONDS); + return Awaitility.await().with().pollInterval(10, MILLISECONDS).atMost(2000, MILLISECONDS); } private void sleepForWatchEvent() throws InterruptedException { From c88b3d586c3dc531498a8a1b87ef1dddcfe2a6ec Mon Sep 17 00:00:00 2001 From: Ryan Dunckel Date: Tue, 3 Jan 2017 23:46:06 -0600 Subject: [PATCH 4/6] adjust timeout for watch service event --- .../davidmoten/rx/FileObservableTest.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java index 7706f05..3cc1320 100644 --- a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java +++ b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java @@ -78,7 +78,7 @@ public void setup() throws IOException { public void testNoEventsThrownIfFileDoesNotExist() throws InterruptedException { Path file = fileSystem.getPath("target", "does-not-exist"); Observable> events = FileObservable.from(file, ENTRY_MODIFY); - + final CountDownLatch latch = new CountDownLatch(1); Subscription sub = events.subscribeOn(Schedulers.io()) .subscribe(new Observer>() { @@ -100,6 +100,8 @@ public void onNext(WatchEvent arg0) { } }); + sleepForWatchEvent(); + assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); sub.unsubscribe(); } @@ -146,7 +148,7 @@ private void checkCreateAndModifyEvents(Path file, Observable> eve final AtomicBoolean isComplete = new AtomicBoolean(); final AtomicInteger eventCount = new AtomicInteger(); final AtomicInteger errorCount = new AtomicInteger(); - + Subscription sub = events.subscribeOn(Schedulers.io()) .subscribe(new Observer>() { @@ -166,7 +168,7 @@ public void onNext(WatchEvent event) { eventCount.incrementAndGet(); } }); - + sleepForWatchEvent(); Files.createFile(file); @@ -205,10 +207,12 @@ public void call() { append(log, "a2"); } }).sampleTimeMs(50).utf8().tailText(); - + final List list = new ArrayList<>(); final AtomicInteger eventCount = new AtomicInteger(); + sleepForWatchEvent(); + Subscription sub = tailer.subscribeOn(Schedulers.io()).subscribe(new Action1() { @Override public void call(String line) { @@ -216,7 +220,7 @@ public void call(String line) { eventCount.incrementAndGet(); } }); - + await().untilAtomic(eventCount, equalTo(3)); assertEquals(Arrays.asList("a0", "a1", "a2"), list); @@ -245,6 +249,8 @@ public void call() { append(log, "a2"); } }).tailText(); + + sleepForWatchEvent(); final List list = new ArrayList(); final AtomicInteger eventCount = new AtomicInteger(); @@ -256,7 +262,7 @@ public void call(String line) { eventCount.incrementAndGet(); } }); - + await().untilAtomic(eventCount, equalTo(3)); assertEquals(Arrays.asList("a0", "a1", "a2"), list); @@ -273,7 +279,7 @@ public void testTailTextFileStreamsFromEndOfFileIfSpecified() final List list = new ArrayList<>(); TestSubscriber ts = TestSubscriber.create(); final AtomicInteger eventCount = new AtomicInteger(); - + FileObservable.tailer().file(file).startPosition(Files.size(file)).sampleTimeMs(10).utf8() .tailText() // for each @@ -286,7 +292,7 @@ public void call(String line) { }).subscribeOn(Schedulers.newThread()).subscribe(ts); sleepForWatchEvent(); - + assertTrue(list.isEmpty()); append(file, "line 2"); @@ -307,6 +313,8 @@ public void testTailTextFileStreamsFromEndOfFileIfDeleteOccurs() final List list = new ArrayList(); final AtomicInteger eventCount = new AtomicInteger(); + sleepForWatchEvent(); + Subscription sub = FileObservable.tailer().file(file).startPosition(Files.size(file)) .sampleTimeMs(10).utf8().tailText() // for each @@ -317,8 +325,6 @@ public void call(String line) { eventCount.incrementAndGet(); } }).subscribeOn(Schedulers.newThread()).subscribe(); - - sleepForWatchEvent(); assertTrue(list.isEmpty()); From 8c33e8b302fc1980ada35f36d16e3bef8a7b494f Mon Sep 17 00:00:00 2001 From: Ryan Dunckel Date: Wed, 4 Jan 2017 19:58:34 -0600 Subject: [PATCH 5/6] allow onWatchStarted in WatchEventBuilder --- .../github/davidmoten/rx/FileObservable.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/main/java/com/github/davidmoten/rx/FileObservable.java b/src/main/java/com/github/davidmoten/rx/FileObservable.java index 37688f2..5a4f3a6 100644 --- a/src/main/java/com/github/davidmoten/rx/FileObservable.java +++ b/src/main/java/com/github/davidmoten/rx/FileObservable.java @@ -496,10 +496,21 @@ public static final class WatchEventsBuilder { private TimeUnit pollDurationUnit = TimeUnit.MILLISECONDS; private final List> kinds = new ArrayList<>(); private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER; + private Action0 onWatchStarted = new Action0() { + @Override + public void call() { + // do nothing + } + }; private WatchEventsBuilder(Path path) { this.path = path; } + + public WatchEventsBuilder onWatchStarted(Action0 onWatchStarted) { + this.onWatchStarted = onWatchStarted; + return this; + } public WatchEventsBuilder scheduler(Scheduler scheduler) { this.scheduler = Optional.of(scheduler); @@ -539,6 +550,14 @@ public WatchEventsBuilder onBackpressure(BackpressureStrategy strategy) { public Observable> events() { return watchService(path, kinds.toArray(new Kind[] {})) + // when watch service created call onWatchStarted + .doOnNext(new Action1() { + @Override + public void call(WatchService w) { + if (onWatchStarted != null) + onWatchStarted.call(); + } + }) .flatMap(new Func1>>() { @Override public Observable> call(WatchService watchService) { From 393102d6fcff9d6b40ba83bc2aaa07970ae0fbc3 Mon Sep 17 00:00:00 2001 From: Ryan Dunckel Date: Wed, 4 Jan 2017 19:58:55 -0600 Subject: [PATCH 6/6] make async tests less flakey --- .../davidmoten/rx/FileObservableTest.java | 120 +++++++++++------- 1 file changed, 77 insertions(+), 43 deletions(-) diff --git a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java index 3cc1320..e00bb14 100644 --- a/src/test/java/com/github/davidmoten/rx/FileObservableTest.java +++ b/src/test/java/com/github/davidmoten/rx/FileObservableTest.java @@ -2,6 +2,7 @@ import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -37,6 +38,7 @@ import org.mockito.InOrder; import org.mockito.Mockito; +import com.github.davidmoten.rx.FileObservable.WatchEventsBuilder; import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; import com.google.common.jimfs.WatchServiceConfiguration; @@ -67,7 +69,7 @@ public static Collection data() { @Before public void setup() throws IOException { - WatchServiceConfiguration watchServiceConfig = WatchServiceConfiguration.polling(10, MILLISECONDS); + WatchServiceConfiguration watchServiceConfig = WatchServiceConfiguration.polling(1, MILLISECONDS); Configuration config = fileSystemConfig.toBuilder().setWatchServiceConfiguration(watchServiceConfig).build(); this.fileSystem = Jimfs.newFileSystem(config); Path target = fileSystem.getPath("target"); @@ -99,9 +101,7 @@ public void onNext(WatchEvent arg0) { latch.countDown(); } }); - - sleepForWatchEvent(); - + assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); sub.unsubscribe(); } @@ -110,34 +110,34 @@ public void onNext(WatchEvent arg0) { public void testCreateAndModifyEventsForANonDirectoryFileBlockForever() throws InterruptedException, IOException { Path file = fileSystem.getPath("target", "f"); - Observable> events = FileObservable.from(file).kind(ENTRY_MODIFY) - .kind(ENTRY_CREATE).events(); + WatchEventsBuilder eventsBuilder = FileObservable.from(file).kind(ENTRY_MODIFY) + .kind(ENTRY_CREATE); - checkCreateAndModifyEvents(file, events); + checkCreateAndModifyEvents(file, eventsBuilder); } @Test public void testCreateAndModifyEventsForANonDirectoryFilePollEveryInterval() throws InterruptedException, IOException { Path file = fileSystem.getPath("target", "f"); - Observable> events = FileObservable.from(file).kind(ENTRY_MODIFY) - .kind(ENTRY_CREATE).pollInterval(100, TimeUnit.MILLISECONDS).events(); + WatchEventsBuilder eventsBuilder = FileObservable.from(file).kind(ENTRY_MODIFY) + .kind(ENTRY_CREATE).pollInterval(10, TimeUnit.MILLISECONDS); - checkCreateAndModifyEvents(file, events); + checkCreateAndModifyEvents(file, eventsBuilder); } @Test public void testCreateAndModifyEventsForANonDirectoryFileBlockingPollEveryInterval() throws InterruptedException, IOException { Path file = fileSystem.getPath("target", "f"); - Observable> events = FileObservable.from(file).kind(ENTRY_MODIFY) - .kind(ENTRY_CREATE).pollInterval(100, TimeUnit.MILLISECONDS) - .pollDuration(100, TimeUnit.MILLISECONDS).events(); + WatchEventsBuilder eventsBuilder = FileObservable.from(file).kind(ENTRY_MODIFY) + .kind(ENTRY_CREATE).pollInterval(10, TimeUnit.MILLISECONDS) + .pollDuration(10, TimeUnit.MILLISECONDS); - checkCreateAndModifyEvents(file, events); + checkCreateAndModifyEvents(file, eventsBuilder); } - private void checkCreateAndModifyEvents(Path file, Observable> events) + private void checkCreateAndModifyEvents(Path file, WatchEventsBuilder eventsBuilder) throws InterruptedException, IOException, FileNotFoundException { Files.deleteIfExists(file); @@ -145,16 +145,22 @@ private void checkCreateAndModifyEvents(Path file, Observable> eve final List> eventKinds = Mockito.mock(List.class); InOrder inOrder = Mockito.inOrder(eventKinds); - final AtomicBoolean isComplete = new AtomicBoolean(); + final AtomicBoolean isStarted = new AtomicBoolean(); final AtomicInteger eventCount = new AtomicInteger(); final AtomicInteger errorCount = new AtomicInteger(); - - Subscription sub = events.subscribeOn(Schedulers.io()) + + Subscription sub = eventsBuilder + .onWatchStarted(new Action0() { + @Override + public void call() { + isStarted.set(true); + } + }) + .events().subscribeOn(Schedulers.io()) .subscribe(new Observer>() { @Override public void onCompleted() { - isComplete.set(true); } @Override @@ -169,7 +175,7 @@ public void onNext(WatchEvent event) { } }); - sleepForWatchEvent(); + await().untilTrue(isStarted); Files.createFile(file); await().untilAtomic(eventCount, equalTo(1)); @@ -185,34 +191,29 @@ public void onNext(WatchEvent event) { assertEquals(0, errorCount.get()); } - private ConditionFactory await() { - return Awaitility.await().with().pollInterval(10, MILLISECONDS).atMost(2000, MILLISECONDS); - } - - private void sleepForWatchEvent() throws InterruptedException { - Thread.sleep(20); - } - @Test public void testFileTailingFromStartOfFile() throws InterruptedException, IOException { final Path log = fileSystem.getPath("target", "test.log"); Files.deleteIfExists(log); Files.createFile(log); append(log, "a0"); - + Observable tailer = FileObservable.tailer().file(log).onWatchStarted(new Action0() { @Override public void call() { append(log, "a1"); append(log, "a2"); } - }).sampleTimeMs(50).utf8().tailText(); + }).sampleTimeMs(50).utf8().tailText().doOnSubscribe(new Action0() { + @Override + public void call() { + sleepForWatchEvent(); + } + }); final List list = new ArrayList<>(); final AtomicInteger eventCount = new AtomicInteger(); - - sleepForWatchEvent(); - + Subscription sub = tailer.subscribeOn(Schedulers.io()).subscribe(new Action1() { @Override public void call(String line) { @@ -220,7 +221,7 @@ public void call(String line) { eventCount.incrementAndGet(); } }); - + await().untilAtomic(eventCount, equalTo(3)); assertEquals(Arrays.asList("a0", "a1", "a2"), list); @@ -234,10 +235,13 @@ public void testFileTailingWhenFileIsCreatedAfterSubscription() Files.deleteIfExists(log); append(log, "a0"); + final AtomicBoolean isStarted = new AtomicBoolean(); + Observable tailer = FileObservable.tailer().file(log).startPosition(0).sampleTimeMs(50).utf8() .onWatchStarted(new Action0() { @Override public void call() { + isStarted.set(true); try { if (!Files.exists(log)) { Files.createFile(log); @@ -248,10 +252,13 @@ public void call() { append(log, "a1"); append(log, "a2"); } - }).tailText(); + }).tailText().doOnSubscribe(new Action0() { + @Override + public void call() { + sleepForWatchEvent(); + } + }); - sleepForWatchEvent(); - final List list = new ArrayList(); final AtomicInteger eventCount = new AtomicInteger(); @@ -278,9 +285,16 @@ public void testTailTextFileStreamsFromEndOfFileIfSpecified() final List list = new ArrayList<>(); TestSubscriber ts = TestSubscriber.create(); + final AtomicBoolean isStarted = new AtomicBoolean(); final AtomicInteger eventCount = new AtomicInteger(); FileObservable.tailer().file(file).startPosition(Files.size(file)).sampleTimeMs(10).utf8() + .onWatchStarted(new Action0() { + @Override + public void call() { + isStarted.set(true); + } + }) .tailText() // for each .doOnNext(new Action1() { @@ -289,9 +303,10 @@ public void call(String line) { list.add(line); eventCount.incrementAndGet(); } - }).subscribeOn(Schedulers.newThread()).subscribe(ts); + }) + .subscribeOn(Schedulers.newThread()).subscribe(ts); - sleepForWatchEvent(); + await().untilTrue(isStarted); assertTrue(list.isEmpty()); @@ -312,9 +327,7 @@ public void testTailTextFileStreamsFromEndOfFileIfDeleteOccurs() final List list = new ArrayList(); final AtomicInteger eventCount = new AtomicInteger(); - - sleepForWatchEvent(); - + Subscription sub = FileObservable.tailer().file(file).startPosition(Files.size(file)) .sampleTimeMs(10).utf8().tailText() // for each @@ -328,9 +341,18 @@ public void call(String line) { assertTrue(list.isEmpty()); + final AtomicBoolean isDeleted = new AtomicBoolean(); + + FileObservable.from(file).kind(ENTRY_DELETE).events().subscribe(new Action1>() { + @Override + public void call(WatchEvent t) { + isDeleted.set(true); + } + }); + // delete file then make it bigger than it was assertTrue(Files.deleteIfExists(file)); - sleepForWatchEvent(); + await().untilTrue(isDeleted); append(file, "line 2"); await().untilAtomic(eventCount, equalTo(1)); @@ -345,6 +367,18 @@ public void call(String line) { sub.unsubscribe(); } + private ConditionFactory await() { + return Awaitility.await().with().pollInterval(1, MILLISECONDS).atMost(200, MILLISECONDS); + } + + private void sleepForWatchEvent() { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + private static void append(Path file, String line) { try { Files.write(file, (line + "\n").getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);