44import org .junit .jupiter .api .Test ;
55
66import java .io .BufferedReader ;
7+ import java .io .Closeable ;
78import java .io .IOException ;
89import java .io .InputStreamReader ;
910import java .net .ServerSocket ;
1011import java .net .Socket ;
12+ import java .util .Collections ;
1113import java .util .HashMap ;
1214import java .util .List ;
1315import java .util .Map ;
1416import java .util .concurrent .Semaphore ;
15- import java .util .regex .Pattern ;
1617
1718import static org .junit .jupiter .api .Assertions .*;
1819
1920public class ProxyTest {
20- private final int NUM_THREADS = 10 ;
21- private final int NUM_ITERATIONS = 1000 ;
21+ private static final int NUM_THREADS = 10 ;
22+ private static final int NUM_ITERATIONS = 1000 ;
23+ private static final int THREAD_WAIT_ITERATIONS = 10 ;
24+ private static final List <String > EXPECTED_METRIC_LINE =
25+ Collections .singletonList ("^\" dummy\" 1\\ .0 [0-9]+ source=\" a-host\" $" );
2226
23- private final int THREAD_WAIT_ITERATIONS = 10 ;
27+ private static class MockServer implements Runnable , Closeable {
28+ public static final int MOCK_PROXY_PORT = 12345 ;
29+ private ServerSocket s ;
2430
25- private final static Pattern linePattern = Pattern .compile ("\" dummy\" 1\\ .0 [0-9]+ source=\" dummy\" " );
31+ static MockServer start () throws InterruptedException {
32+ MockServer server = new MockServer ();
33+ Thread thread = new Thread (server );
34+ thread .setDaemon (false );
35+ thread .start ();
36+ for (int i = 0 ; i < 10 ; i ++) {
37+ if (server .isBound ())
38+ return server ;
39+ Thread .sleep (100 );
40+ }
41+ fail ("Timed out waiting for MockServer to bind" );
42+ return null ;
43+ }
2644
27- private final class MockServer implements Runnable {
2845 public void run () {
2946 try {
30- ServerSocket s = new ServerSocket (12345 );
47+ s = new ServerSocket (MOCK_PROXY_PORT );
48+
3149 Socket cs = s .accept ();
3250 BufferedReader in = new BufferedReader (new InputStreamReader (cs .getInputStream ()));
3351 int n = 0 ;
3452 String line ;
3553 while ((line = in .readLine ()) != null ) {
36- assertTrue (linePattern .matcher (line ).matches (), "Unexpected data from sender" );
54+ if (line .charAt (1 ) == '~' || line .charAt (2 ) == '~' ) {
55+ System .out .println ("Ignoring internal metric:" + line );
56+ continue ;
57+ }
58+ assertLinesMatch (EXPECTED_METRIC_LINE , Collections .singletonList (line ));
3759 n ++;
3860 }
3961 assertEquals (NUM_ITERATIONS * NUM_THREADS , n , "Wrong number of messages received" );
4062 } catch (IOException e ) {
4163 fail (e );
4264 }
4365 }
66+
67+ public boolean isBound () {
68+ return (s != null ) && s .isBound () && !s .isClosed ();
69+ }
70+
71+ @ Override
72+ public void close () throws IOException {
73+ s .close ();
74+ }
4475 }
4576
46- private static final Thread [] getAllThreads () {
77+ private static Thread [] getAllThreads () {
4778 ThreadGroup rootGroup = Thread .currentThread ().getThreadGroup ();
4879 ThreadGroup parentGroup ;
4980 while ((parentGroup = rootGroup .getParent ()) != null ) {
@@ -58,7 +89,7 @@ private static final Thread[] getAllThreads() {
5889
5990 @ Test
6091 public void testProxyRoundtrip () {
61- try {
92+ try ( MockServer mockProxy = MockServer . start ()) {
6293 // Take a snapshot of active threads before we start the client
6394 Thread [] threads = getAllThreads ();
6495 Map <Long , Thread > tMap = new HashMap <>();
@@ -68,18 +99,16 @@ public void testProxyRoundtrip() {
6899 }
69100 tMap .put (t .getId (), t );
70101 }
71- Thread server = new Thread (new MockServer ());
72- server .setDaemon (false );
73- server .start ();
102+
74103 WavefrontProxyClient .Builder b = new WavefrontProxyClient .Builder ("localhost" );
75- b .metricsPort (12345 );
76- final WavefrontSender s = b .build ();
104+ b .metricsPort (MockServer . MOCK_PROXY_PORT );
105+ final WavefrontSender wfSender = b .build ();
77106 final Semaphore semaphore = new Semaphore (0 );
78107 for (int i = 0 ; i < NUM_THREADS ; ++i ) {
79108 new Thread (() -> {
80109 for (int j = 0 ; j < NUM_ITERATIONS ; ++j ) {
81110 try {
82- s .sendMetric ("dummy" , 1.0 , System .currentTimeMillis (), "dummy " , new HashMap <> ());
111+ wfSender .sendMetric ("dummy" , 1.0 , System .currentTimeMillis (), "a-host " , Collections . emptyMap ());
83112 } catch (IOException e ) {
84113 fail (e );
85114 }
@@ -88,8 +117,8 @@ public void testProxyRoundtrip() {
88117 }).start ();
89118 }
90119
91- semaphore .acquire (NUM_THREADS );
92- s .close ();
120+ semaphore .acquire (NUM_THREADS );
121+ wfSender .close ();
93122
94123 // Wait for all new non-daemon threads to terminate (or timeout)
95124 int n = 0 ;
@@ -102,7 +131,7 @@ public void testProxyRoundtrip() {
102131 }
103132 if (!tMap .containsKey (t .getId ()) && !t .isDaemon ()) {
104133 ++newT ;
105- System .out .println ("Non-daemon thread still running: " + t . toString () + ". Waiting for it to finish" );
134+ System .out .println ("Non-daemon thread still running: " + t + ". Waiting for it to finish" );
106135 }
107136 }
108137 if (newT > 0 ) {
@@ -115,10 +144,7 @@ public void testProxyRoundtrip() {
115144 break ;
116145 }
117146 }
118- } catch (IOException e ) {
119- fail (e );
120- }
121- catch (InterruptedException e ) {
147+ } catch (IOException | InterruptedException e ) {
122148 fail (e );
123149 }
124150 }
0 commit comments