1717
1818import java .util .List ;
1919import java .util .concurrent .TimeUnit ;
20+ import java .util .concurrent .atomic .AtomicBoolean ;
2021
2122import rx .Observable ;
2223import rx .Observable .OnSubscribeFunc ;
2324import rx .Observer ;
2425import rx .Scheduler ;
2526import rx .Subscription ;
2627import rx .concurrency .Schedulers ;
28+ import rx .subscriptions .CompositeSubscription ;
2729import rx .util .functions .Func0 ;
2830import rx .util .functions .Func1 ;
2931
@@ -65,11 +67,14 @@ public static <T, TClosing> OnSubscribeFunc<List<T>> buffer(final Observable<T>
6567 public Subscription onSubscribe (Observer <? super List <T >> observer ) {
6668 NonOverlappingChunks <T , List <T >> buffers = new NonOverlappingChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker ());
6769 ChunkCreator creator = new ObservableBasedSingleChunkCreator <T , List <T >, TClosing >(buffers , bufferClosingSelector );
68- return source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ));
70+ return new CompositeSubscription (
71+ new ChunkToSubscription (creator ),
72+ source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ))
73+ );
6974 }
7075 };
7176 }
72-
77+
7378 /**
7479 * <p>This method creates a {@link Func1} object which represents the buffer operation. This operation takes
7580 * values from the specified {@link Observable} source and stores them in the currently active chunks. Initially
@@ -101,7 +106,10 @@ public static <T, TOpening, TClosing> OnSubscribeFunc<List<T>> buffer(final Obse
101106 public Subscription onSubscribe (final Observer <? super List <T >> observer ) {
102107 OverlappingChunks <T , List <T >> buffers = new OverlappingChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker ());
103108 ChunkCreator creator = new ObservableBasedMultiChunkCreator <T , List <T >, TOpening , TClosing >(buffers , bufferOpenings , bufferClosingSelector );
104- return source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ));
109+ return new CompositeSubscription (
110+ new ChunkToSubscription (creator ),
111+ source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ))
112+ );
105113 }
106114 };
107115 }
@@ -156,7 +164,10 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
156164 public Subscription onSubscribe (final Observer <? super List <T >> observer ) {
157165 Chunks <T , List <T >> chunks = new SizeBasedChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker (), count );
158166 ChunkCreator creator = new SkippingChunkCreator <T , List <T >>(chunks , skip );
159- return source .subscribe (new ChunkObserver <T , List <T >>(chunks , observer , creator ));
167+ return new CompositeSubscription (
168+ new ChunkToSubscription (creator ),
169+ source .subscribe (new ChunkObserver <T , List <T >>(chunks , observer , creator ))
170+ );
160171 }
161172 };
162173 }
@@ -211,7 +222,10 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
211222 public Subscription onSubscribe (final Observer <? super List <T >> observer ) {
212223 NonOverlappingChunks <T , List <T >> buffers = new NonOverlappingChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker ());
213224 ChunkCreator creator = new TimeBasedChunkCreator <T , List <T >>(buffers , timespan , unit , scheduler );
214- return source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ));
225+ return new CompositeSubscription (
226+ new ChunkToSubscription (creator ),
227+ source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ))
228+ );
215229 }
216230 };
217231 }
@@ -270,9 +284,13 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
270284 return new OnSubscribeFunc <List <T >>() {
271285 @ Override
272286 public Subscription onSubscribe (final Observer <? super List <T >> observer ) {
273- Chunks <T , List <T >> chunks = new TimeAndSizeBasedChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker (), count , timespan , unit , scheduler );
287+ TimeAndSizeBasedChunks <T , List <T >> chunks = new TimeAndSizeBasedChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker (), count , timespan , unit , scheduler );
274288 ChunkCreator creator = new SingleChunkCreator <T , List <T >>(chunks );
275- return source .subscribe (new ChunkObserver <T , List <T >>(chunks , observer , creator ));
289+ return new CompositeSubscription (
290+ chunks ,
291+ new ChunkToSubscription (creator ),
292+ source .subscribe (new ChunkObserver <T , List <T >>(chunks , observer , creator ))
293+ );
276294 }
277295 };
278296 }
@@ -331,9 +349,13 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
331349 return new OnSubscribeFunc <List <T >>() {
332350 @ Override
333351 public Subscription onSubscribe (final Observer <? super List <T >> observer ) {
334- OverlappingChunks <T , List <T >> buffers = new TimeBasedChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker (), timespan , unit , scheduler );
352+ TimeBasedChunks <T , List <T >> buffers = new TimeBasedChunks <T , List <T >>(observer , OperationBuffer .<T > bufferMaker (), timespan , unit , scheduler );
335353 ChunkCreator creator = new TimeBasedChunkCreator <T , List <T >>(buffers , timeshift , unit , scheduler );
336- return source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ));
354+ return new CompositeSubscription (
355+ buffers ,
356+ new ChunkToSubscription (creator ),
357+ source .subscribe (new ChunkObserver <T , List <T >>(buffers , observer , creator ))
358+ );
337359 }
338360 };
339361 }
@@ -355,4 +377,24 @@ public List<T> getContents() {
355377 return contents ;
356378 }
357379 }
380+
381+ /**
382+ * Converts a chunk creator into a subscription which stops the chunk.
383+ */
384+ private static class ChunkToSubscription implements Subscription {
385+ private ChunkCreator cc ;
386+ private final AtomicBoolean done ;
387+ public ChunkToSubscription (ChunkCreator cc ) {
388+ this .cc = cc ;
389+ this .done = new AtomicBoolean ();
390+ }
391+ @ Override
392+ public void unsubscribe () {
393+ if (done .compareAndSet (false , true )) {
394+ ChunkCreator cc0 = cc ;
395+ cc = null ;
396+ cc0 .stop ();
397+ }
398+ }
399+ }
358400}
0 commit comments