11package ng .appserver .wointegration ;
22
3- import java .io .IOException ;
4- import java .io .InputStream ;
5- import java .io .OutputStream ;
6- import java .net .DatagramPacket ;
7- import java .net .DatagramSocket ;
8- import java .net .InetAddress ;
9- import java .net .Socket ;
10- import java .net .SocketException ;
11- import java .nio .charset .StandardCharsets ;
3+ import java .net .URI ;
4+ import java .net .http .HttpClient ;
5+ import java .net .http .HttpRequest ;
6+ import java .net .http .HttpResponse ;
7+ import java .time .Duration ;
128import java .util .Objects ;
9+ import java .util .concurrent .Executors ;
10+ import java .util .concurrent .ScheduledExecutorService ;
11+ import java .util .concurrent .TimeUnit ;
1312
1413import org .slf4j .Logger ;
1514import org .slf4j .LoggerFactory ;
1615
1716/**
18- * Lives alongside the application and sends regular "lifebeats" (as in "Hello, I'm still here) to the wotaskd).
19- *
20- * Also manages other communication with wotaskd
21- * - hasStarted
22- * - willStop
23- * - willCrash
17+ * Sends regular "lifebeats" to wotaskd to report application liveness.
2418 */
2519
26- public class NGLifebeatThread extends Thread {
20+ public class NGLifebeatThread {
2721
2822 private static final Logger logger = LoggerFactory .getLogger ( NGLifebeatThread .class );
2923
30- /**
31- * Stores the value of the response to the last sent lifebeat (FIXME: I think)
32- */
33- private final byte [] lifebeatResponseBuffer = new byte ["HTTP/1.X XXX" .length () + " Apple WebObjects\r \n " .length () + "\r \n \r \n " .length ()];
34-
35- private final InetAddress _localAddress ;
36- private final int _lifebeatDestinationPort ;
24+ private final HttpClient _httpClient ;
25+ private final ScheduledExecutorService _scheduler ;
26+ private final String _baseUrl ;
3727 private final long _lifebeatIntervalMS ;
38- private final MessageGenerator _messageGenerator ;
39-
40- private int _deathCounter ;
41- private Socket lifebeatSocket ;
42- private OutputStream lifebeatOS ;
43- private InputStream lifebeatIS ;
44- private DatagramSocket datagramSocket ;
45- private final byte [] _buffer = new byte [1000 ];
46- private DatagramPacket incomingDatagramPacket ;
47- private DatagramPacket outgoingDatagramPacket ;
48-
49- /**
50- * The standard messages we will send
51- */
52- private static class MessageGenerator {
53- private final byte [] _hasStarted ;
54- private final byte [] _lifebeat ;
55- private final byte [] _willStop ;
56- private final byte [] _willCrash ;
57- private final byte [] _versionRequest ;
58-
59- private MessageGenerator ( final String appName , final String localhostName , final int appPort ) {
60- final String preString = "GET /cgi-bin/WebObjects/wotaskd.woa/wlb?" ;
61- final String postString = "&" + appName + "&" + localhostName + "&" + appPort + " HTTP/1.1\r \n \r \n " ;
62- final String versionString = WOMPRequestHandler .KEY + "://queryVersion" ;
63-
64- _hasStarted = (preString + "hasStarted" + postString ).getBytes ();
65- _lifebeat = (preString + "lifebeat" + postString ).getBytes ();
66- _willStop = (preString + "willStop" + postString ).getBytes ();
67- _willCrash = (preString + "willCrash" + postString ).getBytes ();
68- _versionRequest = versionString .getBytes ();
69- }
70- }
7128
72- public NGLifebeatThread ( final String appName , final int appPort , final InetAddress appHost , final int lifebeatDestinationPort , final long lifebeatIntervalMS ) {
73- logger .info ( "Attempting to create LifebeatThread: {}, {}, {}, {}, {} " , appName , appPort , appHost , lifebeatDestinationPort , lifebeatIntervalMS );
29+ public NGLifebeatThread ( final String appName , final int appPort , final String wotaskdHost , final int wotaskdPort , final long lifebeatIntervalMS ) {
30+ logger .info ( "Creating LifebeatThread: appName= {}, appPort= {}, wotaskdHost= {}, wotaskdPort= {}, intervalMS={} " , appName , appPort , wotaskdHost , wotaskdPort , lifebeatIntervalMS );
7431
75- Objects .requireNonNull ( appName );
32+ Objects .requireNonNull ( appName , "appName" );
33+ Objects .requireNonNull ( wotaskdHost , "wotaskdHost" );
7634
7735 if ( appPort < 1 ) {
7836 throw new IllegalArgumentException ( "appPort must be a positive number" );
7937 }
8038
81- Objects .requireNonNull ( appHost );
82-
83- if ( lifebeatDestinationPort < 1 ) {
84- throw new IllegalArgumentException ( "lifebeatDestinationPort must be a positive number" );
39+ if ( wotaskdPort < 1 ) {
40+ throw new IllegalArgumentException ( "wotaskdPort must be a positive number" );
8541 }
8642
8743 if ( lifebeatIntervalMS < 1 ) {
8844 throw new IllegalArgumentException ( "lifebeatIntervalMS must be a positive number" );
8945 }
9046
91- _lifebeatDestinationPort = lifebeatDestinationPort ;
9247 _lifebeatIntervalMS = lifebeatIntervalMS ;
9348
94- _localAddress = appHost ;
49+ // Base URL: http://wotaskdHost:wotaskdPort/cgi-bin/WebObjects/wotaskd.woa/wlb?{action}&{appName}&{wotaskdHost}&{appPort}
50+ _baseUrl = "http://" + wotaskdHost + ":" + wotaskdPort + "/cgi-bin/WebObjects/wotaskd.woa/wlb?%s&" + appName + "&" + wotaskdHost + "&" + appPort ;
9551
96- setName ( "LifebeatSendReceiveThread" );
52+ _httpClient = HttpClient .newBuilder ()
53+ .connectTimeout ( Duration .ofSeconds ( 5 ) )
54+ .build ();
9755
98- _messageGenerator = new MessageGenerator ( appName , appHost .getHostName (), appPort );
56+ _scheduler = Executors .newSingleThreadScheduledExecutor ( r -> {
57+ final Thread t = new Thread ( r , "LifebeatThread" );
58+ t .setDaemon ( true );
59+ return t ;
60+ } );
9961 }
10062
101- public void sendWillStop () {
102- sendMessage ( _messageGenerator ._willStop );
103- }
63+ public void start () {
64+ sendMessage ( "hasStarted" );
10465
105- public void sendMessage ( byte [] aMessage ) {
106- Objects .requireNonNull ( aMessage );
107-
108- try {
109- if ( lifebeatSocket == null ) {
110- logger .debug ( "Creating new lifebeat socket" );
111-
112- lifebeatSocket = new Socket ( _localAddress , _lifebeatDestinationPort , _localAddress , 0 );
113- lifebeatSocket .setTcpNoDelay ( true );
114- lifebeatSocket .setSoLinger ( false , 0 );
115- lifebeatIS = lifebeatSocket .getInputStream ();
116- lifebeatOS = lifebeatSocket .getOutputStream ();
117- }
118-
119- // write the message
120- lifebeatOS .write ( aMessage );
121- lifebeatOS .flush ();
122-
123- // read response
124- // 200 == OK, 400 == Bad Request, 500 == Force Quit
125- int fetched = 0 ;
126- int thisFetch = -1 ;
127- while ( fetched < lifebeatResponseBuffer .length ) {
128- thisFetch = lifebeatIS .read ( lifebeatResponseBuffer , fetched , lifebeatResponseBuffer .length - fetched );
129- if ( thisFetch != -1 ) {
130- fetched += thisFetch ;
131- }
132- else {
133- break ;
134- }
135- }
136-
137- if ( (thisFetch == -1 ) || (lifebeatResponseBuffer [9 ] == '4' ) ) {
138- // Trash this connection and create a new one
139- // Note this means that we don't support 5.2 apps talking to 5.1 wotaskd
140- // we'll increment the deathCounter each time!
141- _closeLifebeatSocket ();
142- }
143- else if ( lifebeatResponseBuffer [9 ] == '5' ) {
144- try {
145- logger .info ( "Force Quit received. Exiting now..." );
146- // Send a crash message if we can
147- lifebeatSocket = new Socket ( _localAddress , _lifebeatDestinationPort , _localAddress , 0 );
148- lifebeatOS = lifebeatSocket .getOutputStream ();
149- lifebeatOS .write ( _messageGenerator ._willCrash );
150- lifebeatOS .flush ();
151- _closeLifebeatSocket ();
152- }
153- finally {
154- // OK to exit - code unused in Servlet Containers
155- System .exit ( 1 );
156- }
157- }
158- else {
159- _deathCounter = 0 ;
160- }
161- }
162- catch ( final java .io .IOException e ) {
163- logger .debug ( "Exception sending lifebeat to wotaskd: " + e );
164- _closeLifebeatSocket ();
165- }
66+ _scheduler .scheduleAtFixedRate (
67+ () -> sendMessage ( "lifebeat" ),
68+ _lifebeatIntervalMS ,
69+ _lifebeatIntervalMS ,
70+ TimeUnit .MILLISECONDS );
16671 }
16772
168- private void _closeLifebeatSocket () {
169- // Closing everything
170- lifebeatOS = null ;
171- lifebeatIS = null ;
172- if ( lifebeatSocket != null ) {
173- try {
174- lifebeatSocket .close ();
175- }
176- catch ( final IOException ioe ) {
177- logger .debug ( "Exception closing lifebeat socket: " + ioe );
178- }
179- lifebeatSocket = null ;
180- }
181-
182- _deathCounter ++;
183- }
184-
185- private void udpMessage () {
186- try {
187- datagramSocket .send ( outgoingDatagramPacket );
188- incomingDatagramPacket .setLength ( _buffer .length );
189- datagramSocket .receive ( incomingDatagramPacket );
190- final String reply = new String ( incomingDatagramPacket .getData (), StandardCharsets .UTF_8 );
191- if ( reply .startsWith ( WOMPRequestHandler .KEY ) ) {
192- _deathCounter = 0 ;
193- }
194- }
195- catch ( final Throwable e ) {
196- logger .debug ( "Exception checking for wotaskd using UDP: " + e );
197- }
73+ public void sendWillStop () {
74+ sendMessage ( "willStop" );
19875 }
19976
200- @ Override
201- public void run () {
202-
203- boolean _udpSocketNotAvailable = false ;
77+ private void sendMessage ( final String action ) {
78+ final String url = String .format ( _baseUrl , action );
20479
20580 try {
206- datagramSocket = new DatagramSocket ( 0 , _localAddress );
207- datagramSocket .setSoTimeout ( 5000 );
208- outgoingDatagramPacket = new DatagramPacket ( _messageGenerator ._versionRequest , _messageGenerator ._versionRequest .length , _localAddress , _lifebeatDestinationPort );
209- incomingDatagramPacket = new DatagramPacket ( _buffer , _buffer .length );
210- }
211- catch ( final SocketException e ) {
212- logger .error ( "<_LifebeatThread> Exception creating datagramSocket " , e );
213- _udpSocketNotAvailable = true ;
214- }
215-
216- sendMessage ( _messageGenerator ._hasStarted );
81+ final HttpRequest request = HttpRequest .newBuilder ()
82+ .uri ( URI .create ( url ) )
83+ .timeout ( Duration .ofSeconds ( 5 ) )
84+ .GET ()
85+ .build ();
21786
218- try {
219- Thread .sleep ( _lifebeatIntervalMS );
220- }
221- catch ( final InterruptedException ex ) {
222- logger .debug ( "Comms failure" , ex );
223- }
87+ final HttpResponse <Void > response = _httpClient .send ( request , HttpResponse .BodyHandlers .discarding () );
88+ final int status = response .statusCode ();
22489
225- while ( true ) {
226- // FIXME: Document what exactly we're doing here
227- if ( _deathCounter < 10 || _udpSocketNotAvailable ) {
228- sendMessage ( _messageGenerator ._lifebeat );
90+ if ( status == 200 ) {
91+ logger .debug ( "Lifebeat '{}' acknowledged" , action );
22992 }
230- else {
231- udpMessage ();
232- }
233-
234- try {
235- Thread .sleep ( _lifebeatIntervalMS );
93+ else if ( status == 500 ) {
94+ logger .info ( "Force quit received from wotaskd. Exiting." );
95+ sendMessage ( "willCrash" );
96+ System .exit ( 1 );
23697 }
237- catch ( final InterruptedException ex ) {
238- logger .debug ( "Thread interrupted " , ex );
98+ else {
99+ logger .debug ( "Lifebeat '{}' returned status {} " , action , status );
239100 }
240101 }
102+ catch ( final Exception e ) {
103+ logger .debug ( "Failed to send lifebeat '{}': {}" , action , e .getMessage () );
104+ }
241105 }
242- }
106+ }
0 commit comments