Skip to content

chore: adds concurrent sink#201

Merged
KeranYang merged 3 commits intonumaproj:mainfrom
ankiiitraj:sink-examples
Sep 9, 2025
Merged

chore: adds concurrent sink#201
KeranYang merged 3 commits intonumaproj:mainfrom
ankiiitraj:sink-examples

Conversation

@ankiiitraj
Copy link
Copy Markdown
Contributor

@ankiiitraj ankiiitraj commented Aug 23, 2025

closes #179

@codecov
Copy link
Copy Markdown

codecov bot commented Aug 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
⚠️ Please upload report for BASE (main@958d5d5). Learn more about missing BASE report.

Additional details and impacted files
@@           Coverage Diff           @@
##             main     #201   +/-   ##
=======================================
  Coverage        ?   59.05%           
  Complexity      ?      477           
=======================================
  Files           ?      151           
  Lines           ?     3417           
  Branches        ?      235           
=======================================
  Hits            ?     2018           
  Misses          ?     1229           
  Partials        ?      170           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Member

@KeranYang KeranYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contributing! @ankiiitraj

@ankiiitraj
Copy link
Copy Markdown
Contributor Author

updated the tests to better reflect the concurrent implementation.
Removed performance tests.

Copy link
Copy Markdown
Member

@KeranYang KeranYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! @yhl25 Please review.

@Slf4j
public class ConcurrentSink extends Sinker {

private static final int DEFAULT_THREAD_POOL_SIZE = 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use number of available cores.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken care

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "ConcurrentSink-Worker-" + (++counter));
t.setDaemon(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be a daemon?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken care, shouldn't be daemon thread.

Server server = new Server(concurrentSink);
server.start();
server.awaitTermination();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke shutdown after the server terminates?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Signed-off-by: Ankit Raj <ankitatiiitr@gmail.com>
Signed-off-by: Ankit Raj <ankitatiiitr@gmail.com>
Signed-off-by: Ankit Raj <ankitatiiitr@gmail.com>
@KeranYang KeranYang merged commit 4adc44b into numaproj:main Sep 9, 2025
3 checks passed
@KeranYang
Copy link
Copy Markdown
Member

I merged the change. Thank you @ankiiitraj !

@KeranYang
Copy link
Copy Markdown
Member

Hey @ankiiitraj , I just want to share a good news that here at Intuit, one of our internal Numaflow users is able to refer to your example and successfully support their concurrency use case. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Fork/Join example for concurrent processing in sink

3 participants