From 0ab2f2d9dafd2a0cb12738d35e29ae501fe16035 Mon Sep 17 00:00:00 2001 From: Sean Freitag Date: Mon, 8 Feb 2016 10:43:22 -0600 Subject: [PATCH 1/6] Moving the NO_OP_HANDLER to StatsDClientErrorHandler. --- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 6 +----- .../java/com/timgroup/statsd/StatsDClientErrorHandler.java | 3 +++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 42d9980..2595b5b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -52,10 +52,6 @@ public final class NonBlockingStatsDClient implements StatsDClient { private static final int PACKET_SIZE_BYTES = 1500; - private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() { - @Override public void handle(final Exception e) { /* No-op */ } - }; - /** * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to * create one pre thread as this seems to offer a significant performance improvement over creating one per-thread: @@ -208,7 +204,7 @@ public NonBlockingStatsDClient(final String prefix, String[] constantTags, final this.prefix = ""; } if(errorHandler == null) { - handler = NO_OP_HANDLER; + handler = StatsDClientErrorHandler.NO_OP_HANDLER; } else { handler = errorHandler; diff --git a/src/main/java/com/timgroup/statsd/StatsDClientErrorHandler.java b/src/main/java/com/timgroup/statsd/StatsDClientErrorHandler.java index 1832b33..7460007 100644 --- a/src/main/java/com/timgroup/statsd/StatsDClientErrorHandler.java +++ b/src/main/java/com/timgroup/statsd/StatsDClientErrorHandler.java @@ -16,4 +16,7 @@ public interface StatsDClientErrorHandler { */ void handle(Exception exception); + StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() { + @Override public void handle(final Exception e) { /* No-op */ } + }; } From 6811407e47ede75220bb373f6a84c80e0337570a Mon Sep 17 00:00:00 2001 From: Sean Freitag Date: Mon, 8 Feb 2016 12:08:53 -0600 Subject: [PATCH 2/6] Splitting NonBlockingStatsDClient into its blocking and non-blocking parts w/ tests. --- .../timgroup/statsd/BlockingStatsDClient.java | 654 ++++++++++++++++++ .../statsd/NonBlockingStatsDClient.java | 522 +------------- .../statsd/BlockingStatsDClientTest.java | 11 + .../statsd/NonBlockingStatsDClientTest.java | 435 +----------- .../com/timgroup/statsd/StatsDClientTest.java | 441 ++++++++++++ 5 files changed, 1126 insertions(+), 937 deletions(-) create mode 100644 src/main/java/com/timgroup/statsd/BlockingStatsDClient.java create mode 100644 src/test/java/com/timgroup/statsd/BlockingStatsDClientTest.java create mode 100644 src/test/java/com/timgroup/statsd/StatsDClientTest.java diff --git a/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java new file mode 100644 index 0000000..97773f1 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java @@ -0,0 +1,654 @@ +package com.timgroup.statsd; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.charset.Charset; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.text.NumberFormat; +import java.util.Locale; +import java.util.concurrent.Callable; + +/** + * A simple StatsD client implementation facilitating metrics recording. + * + *

Upon instantiation, this client will establish a socket connection to a StatsD instance + * running on the specified host and port. Metrics are then sent over this connection as they are + * received by the client. + *

+ * + *

Three key methods are provided for the submission of data-points for the application under + * scrutiny: + *

+ * From the perspective of the application, these methods are non-blocking, with the resulting + * IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed + * not to throw an exception which may disrupt application execution. + *

+ * + *

As part of a clean system shutdown, the {@link #stop()} method should be invoked + * on any StatsD clients.

+ * + * @author Tom Denley + * + */ +public class BlockingStatsDClient implements StatsDClient { + + private static final int PACKET_SIZE_BYTES = 1500; + + /** + * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to + * create one pre thread as this seems to offer a significant performance improvement over creating one per-thread: + * http://stackoverflow.com/a/1285297/2648 + * https://github.com/indeedeng/java-dogstatsd-client/issues/4 + */ + private static final ThreadLocal NUMBER_FORMATTERS = new ThreadLocal() { + @Override + protected NumberFormat initialValue() { + + // Always create the formatter for the US locale in order to avoid this bug: + // https://github.com/indeedeng/java-dogstatsd-client/issues/3 + final NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); + numberFormatter.setGroupingUsed(false); + numberFormatter.setMaximumFractionDigits(6); + + // we need to specify a value for Double.NaN that is recognized by dogStatsD + if (numberFormatter instanceof DecimalFormat) { // better safe than a runtime error + final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; + final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); + symbols.setNaN("NaN"); + decimalFormat.setDecimalFormatSymbols(symbols); + } + + return numberFormatter; + } + }; + + private final String prefix; + private final DatagramChannel clientChannel; + protected final StatsDClientErrorHandler handler; + private final String constantTagsRendered; + private final Callable addressLookup; + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are consumed, guaranteeing + * that failures in metrics will not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @throws StatsDClientException + * if the client could not be started + */ + public BlockingStatsDClient(final String prefix, final String hostname, final int port) throws StatsDClientException { + this(prefix, hostname, port, null, null); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are consumed, guaranteeing + * that failures in metrics will not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @param constantTags + * tags to be added to all content sent + * @throws StatsDClientException + * if the client could not be started + */ + public BlockingStatsDClient(final String prefix, final String hostname, final int port, final String... constantTags) throws StatsDClientException { + this(prefix, hostname, port, constantTags, null); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are passed to the specified + * handler and then consumed, guaranteeing that failures in metrics will + * not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @param constantTags + * tags to be added to all content sent + * @param errorHandler + * handler to use when an exception occurs during usage, may be null to indicate noop + * @throws StatsDClientException + * if the client could not be started + */ + public BlockingStatsDClient(final String prefix, final String hostname, final int port, final String[] constantTags, + final StatsDClientErrorHandler errorHandler) throws StatsDClientException { + this(prefix, constantTags, errorHandler, staticStatsDAddressResolution(hostname, port)); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are passed to the specified + * handler and then consumed, guaranteeing that failures in metrics will + * not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param constantTags + * tags to be added to all content sent + * @param errorHandler + * handler to use when an exception occurs during usage, may be null to indicate noop + * @param addressLookup + * yields the IP address and socket of the StatsD server + * @throws StatsDClientException + * if the client could not be started + */ + public BlockingStatsDClient(final String prefix, String[] constantTags, final StatsDClientErrorHandler errorHandler, + final Callable addressLookup) throws StatsDClientException { + if((prefix != null) && (!prefix.isEmpty())) { + this.prefix = String.format("%s.", prefix); + } else { + this.prefix = ""; + } + if(errorHandler == null) { + handler = StatsDClientErrorHandler.NO_OP_HANDLER; + } + else { + handler = errorHandler; + } + + /* Empty list should be null for faster comparison */ + if((constantTags != null) && (constantTags.length == 0)) { + constantTags = null; + } + + if(constantTags != null) { + constantTagsRendered = tagString(constantTags, null); + } else { + constantTagsRendered = null; + } + + try { + clientChannel = DatagramChannel.open(); + } catch (final Exception e) { + throw new StatsDClientException("Failed to start StatsD client", e); + } + this.addressLookup = addressLookup; + } + + /** + * Cleanly shut down this StatsD client. This method may throw an exception if + * the socket cannot be closed. + */ + @Override + public void stop() { + if (clientChannel != null) { + try { + clientChannel.close(); + } + catch (IOException e) { + handler.handle(e); + } + } + } + + /** + * Generate a suffix conveying the given tag list to the client + */ + static String tagString(final String[] tags, final String tagPrefix) { + final StringBuilder sb; + if(tagPrefix != null) { + if((tags == null) || (tags.length == 0)) { + return tagPrefix; + } + sb = new StringBuilder(tagPrefix); + sb.append(","); + } else { + if((tags == null) || (tags.length == 0)) { + return ""; + } + sb = new StringBuilder("|#"); + } + + for(int n=tags.length - 1; n>=0; n--) { + sb.append(tags[n]); + if(n > 0) { + sb.append(","); + } + } + return sb.toString(); + } + + /** + * Generate a suffix conveying the given tag list to the client + */ + String tagString(final String[] tags) { + return tagString(tags, constantTagsRendered); + } + + /** + * Adjusts the specified counter by a given delta. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to adjust + * @param delta + * the amount to adjust the counter by + * @param tags + * array of tags to be added to the data + */ + @Override + public void count(final String aspect, final long delta, final String... tags) { + send(String.format("%s%s:%d|c%s", prefix, aspect, delta, tagString(tags))); + } + + /** + * Increments the specified counter by one. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to increment + * @param tags + * array of tags to be added to the data + */ + @Override + public void incrementCounter(final String aspect, final String... tags) { + count(aspect, 1, tags); + } + + /** + * Convenience method equivalent to {@link #incrementCounter(String, String[])}. + */ + @Override + public void increment(final String aspect, final String... tags) { + incrementCounter(aspect, tags); + } + + /** + * Decrements the specified counter by one. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to decrement + * @param tags + * array of tags to be added to the data + */ + @Override + public void decrementCounter(final String aspect, final String... tags) { + count(aspect, -1, tags); + } + + /** + * Convenience method equivalent to {@link #decrementCounter(String, String[])}. + */ + @Override + public void decrement(final String aspect, final String... tags) { + decrementCounter(aspect, tags); + } + + /** + * Records the latest fixed value for the specified named gauge. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordGaugeValue(final String aspect, final double value, final String... tags) { + /* Intentionally using %s rather than %f here to avoid + * padding with extra 0s to represent precision */ + send(String.format("%s%s:%s|g%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. + */ + @Override + public void gauge(final String aspect, final double value, final String... tags) { + recordGaugeValue(aspect, value, tags); + } + + + /** + * Records the latest fixed value for the specified named gauge. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordGaugeValue(final String aspect, final long value, final String... tags) { + send(String.format("%s%s:%d|g%s", prefix, aspect, value, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. + */ + @Override + public void gauge(final String aspect, final long value, final String... tags) { + recordGaugeValue(aspect, value, tags); + } + + /** + * Records an execution time in milliseconds for the specified named operation. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the timed operation + * @param timeInMs + * the time in milliseconds + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordExecutionTime(final String aspect, final long timeInMs, final String... tags) { + send(String.format("%s%s:%d|ms%s", prefix, aspect, timeInMs, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordExecutionTime(String, long, String[])}. + */ + @Override + public void time(final String aspect, final long value, final String... tags) { + recordExecutionTime(aspect, value, tags); + } + + /** + * Records a value for the specified named histogram. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordHistogramValue(final String aspect, final double value, final String... tags) { + /* Intentionally using %s rather than %f here to avoid + * padding with extra 0s to represent precision */ + send(String.format("%s%s:%s|h%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, double, String[])}. + */ + @Override + public void histogram(final String aspect, final double value, final String... tags) { + recordHistogramValue(aspect, value, tags); + } + + /** + * Records a value for the specified named histogram. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordHistogramValue(final String aspect, final long value, final String... tags) { + send(String.format("%s%s:%d|h%s", prefix, aspect, value, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, long, String[])}. + */ + @Override + public void histogram(final String aspect, final long value, final String... tags) { + recordHistogramValue(aspect, value, tags); + } + + private String eventMap(final Event event) { + final StringBuilder res = new StringBuilder(""); + + final long millisSinceEpoch = event.getMillisSinceEpoch(); + if (millisSinceEpoch != -1) { + res.append("|d:").append(millisSinceEpoch / 1000); + } + + final String hostname = event.getHostname(); + if (hostname != null) { + res.append("|h:").append(hostname); + } + + final String aggregationKey = event.getAggregationKey(); + if (aggregationKey != null) { + res.append("|k:").append(aggregationKey); + } + + final String priority = event.getPriority(); + if (priority != null) { + res.append("|p:").append(priority); + } + + final String alertType = event.getAlertType(); + if (alertType != null) { + res.append("|t:").append(alertType); + } + + return res.toString(); + } + + /** + * Records an event + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param event + * The event to record + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#events-1 + */ + @Override + public void recordEvent(final Event event, final String... tags) { + final String title = prefix + event.getTitle(); + final String text = event.getText(); + send(String.format("_e{%d,%d}:%s|%s%s%s", + title.length(), text.length(), title, text, eventMap(event), tagString(tags))); + } + + /** + * Records a run status for the specified named service check. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param sc + * the service check object + */ + @Override + public void recordServiceCheckRun(final ServiceCheck sc) { + send(toStatsDString(sc)); + } + + private String toStatsDString(final ServiceCheck sc) { + // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks + final StringBuilder sb = new StringBuilder(); + sb.append(String.format("_sc|%s|%d", sc.getName(), sc.getStatus())); + if (sc.getTimestamp() > 0) { + sb.append(String.format("|d:%d", sc.getTimestamp())); + } + if (sc.getHostname() != null) { + sb.append(String.format("|h:%s", sc.getHostname())); + } + sb.append(tagString(sc.getTags())); + if (sc.getMessage() != null) { + sb.append(String.format("|m:%s", sc.getEscapedMessage())); + } + return sb.toString(); + } + + /** + * Convenience method equivalent to {@link #recordServiceCheckRun(ServiceCheck sc)}. + */ + @Override + public void serviceCheck(final ServiceCheck sc) { + recordServiceCheckRun(sc); + } + + + /** + * Records a value for the specified set. + * + * Sets are used to count the number of unique elements in a group. If you want to track the number of + * unique visitor to your site, sets are a great way to do that. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the set + * @param value + * the value to track + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#sets + */ + @Override + public void recordSetValue(final String aspect, final String value, final String... tags) { + // documentation is light, but looking at dogstatsd source, we can send string values + // here instead of numbers + send(String.format("%s%s:%s|s%s", prefix, aspect, value, tagString(tags))); + } + + + public static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8"); + private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES); + + protected void send(String message) { + if (message == null) return; + + try { + final InetSocketAddress address = addressLookup.call(); + final byte[] data = message.getBytes(MESSAGE_CHARSET); + + sendBuffer.put(data); + + final int sizeOfBuffer = sendBuffer.position(); + sendBuffer.flip(); + + final int sentBytes = clientChannel.send(sendBuffer, address); + sendBuffer.limit(sendBuffer.capacity()); + sendBuffer.rewind(); + + if (sizeOfBuffer != sentBytes) { + handler.handle( + new IOException( + String.format( + "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", + sendBuffer.toString(), + address.getHostName(), + address.getPort(), + sentBytes, + sizeOfBuffer))); + } + } catch (Exception e) { + handler.handle(e); + } + } + + /** + * Create dynamic lookup for the given host name and port. + * + * @param hostname the host name of the targeted StatsD server + * @param port the port of the targeted StatsD server + * @return a function to perform the lookup + * @see BlockingStatsDClient#BlockingStatsDClient(String, String[], StatsDClientErrorHandler, Callable) + */ + public static Callable volatileAddressResolution(final String hostname, final int port) { + return new Callable() { + @Override public InetSocketAddress call() throws UnknownHostException { + return new InetSocketAddress(InetAddress.getByName(hostname), port); + } + }; + } + + /** + * Lookup the address for the given host name and cache the result. + * + * @param hostname the host name of the targeted StatsD server + * @param port the port of the targeted StatsD server + * @return a function that cached the result of the lookup + * @throws Exception if the lookup fails, i.e. {@link UnknownHostException} + */ + public static Callable staticAddressResolution(final String hostname, final int port) throws Exception { + final InetSocketAddress address = volatileAddressResolution(hostname, port).call(); + return new Callable() { + @Override public InetSocketAddress call() { + return address; + } + }; + } + + protected static Callable staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException { + try { + return staticAddressResolution(hostname, port); + } catch (final Exception e) { + throw new StatsDClientException("Failed to lookup StatsD host", e); + } + } +} diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 2595b5b..4312f98 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -48,42 +48,7 @@ * @author Tom Denley * */ -public final class NonBlockingStatsDClient implements StatsDClient { - - private static final int PACKET_SIZE_BYTES = 1500; - - /** - * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to - * create one pre thread as this seems to offer a significant performance improvement over creating one per-thread: - * http://stackoverflow.com/a/1285297/2648 - * https://github.com/indeedeng/java-dogstatsd-client/issues/4 - */ - private static final ThreadLocal NUMBER_FORMATTERS = new ThreadLocal() { - @Override - protected NumberFormat initialValue() { - - // Always create the formatter for the US locale in order to avoid this bug: - // https://github.com/indeedeng/java-dogstatsd-client/issues/3 - final NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); - numberFormatter.setGroupingUsed(false); - numberFormatter.setMaximumFractionDigits(6); - - // we need to specify a value for Double.NaN that is recognized by dogStatsD - if (numberFormatter instanceof DecimalFormat) { // better safe than a runtime error - final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; - final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); - symbols.setNaN("NaN"); - decimalFormat.setDecimalFormatSymbols(symbols); - } - - return numberFormatter; - } - }; - - private final String prefix; - private final DatagramChannel clientChannel; - private final StatsDClientErrorHandler handler; - private final String constantTagsRendered; +public final class NonBlockingStatsDClient extends BlockingStatsDClient { private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); @@ -198,35 +163,8 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final */ public NonBlockingStatsDClient(final String prefix, String[] constantTags, final StatsDClientErrorHandler errorHandler, final Callable addressLookup) throws StatsDClientException { - if((prefix != null) && (!prefix.isEmpty())) { - this.prefix = String.format("%s.", prefix); - } else { - this.prefix = ""; - } - if(errorHandler == null) { - handler = StatsDClientErrorHandler.NO_OP_HANDLER; - } - else { - handler = errorHandler; - } - - /* Empty list should be null for faster comparison */ - if((constantTags != null) && (constantTags.length == 0)) { - constantTags = null; - } - - if(constantTags != null) { - constantTagsRendered = tagString(constantTags, null); - } else { - constantTagsRendered = null; - } - - try { - clientChannel = DatagramChannel.open(); - } catch (final Exception e) { - throw new StatsDClientException("Failed to start StatsD client", e); - } - executor.submit(new QueueConsumer(addressLookup)); + super(prefix, constantTags, errorHandler, addressLookup); + this.executor.submit(new QueueConsumer()); } /** @@ -241,466 +179,32 @@ public void stop() { } catch (final Exception e) { handler.handle(e); + } finally { + super.stop(); } - finally { - if (clientChannel != null) { - try { - clientChannel.close(); - } - catch (final IOException e) { - handler.handle(e); - } - } - } - } - - /** - * Generate a suffix conveying the given tag list to the client - */ - static String tagString(final String[] tags, final String tagPrefix) { - final StringBuilder sb; - if(tagPrefix != null) { - if((tags == null) || (tags.length == 0)) { - return tagPrefix; - } - sb = new StringBuilder(tagPrefix); - sb.append(","); - } else { - if((tags == null) || (tags.length == 0)) { - return ""; - } - sb = new StringBuilder("|#"); - } - - for(int n=tags.length - 1; n>=0; n--) { - sb.append(tags[n]); - if(n > 0) { - sb.append(","); - } - } - return sb.toString(); - } - - /** - * Generate a suffix conveying the given tag list to the client - */ - String tagString(final String[] tags) { - return tagString(tags, constantTagsRendered); - } - - /** - * Adjusts the specified counter by a given delta. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the counter to adjust - * @param delta - * the amount to adjust the counter by - * @param tags - * array of tags to be added to the data - */ - @Override - public void count(final String aspect, final long delta, final String... tags) { - send(String.format("%s%s:%d|c%s", prefix, aspect, delta, tagString(tags))); - } - - /** - * Increments the specified counter by one. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the counter to increment - * @param tags - * array of tags to be added to the data - */ - @Override - public void incrementCounter(final String aspect, final String... tags) { - count(aspect, 1, tags); - } - - /** - * Convenience method equivalent to {@link #incrementCounter(String, String[])}. - */ - @Override - public void increment(final String aspect, final String... tags) { - incrementCounter(aspect, tags); - } - - /** - * Decrements the specified counter by one. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the counter to decrement - * @param tags - * array of tags to be added to the data - */ - @Override - public void decrementCounter(final String aspect, final String... tags) { - count(aspect, -1, tags); - } - - /** - * Convenience method equivalent to {@link #decrementCounter(String, String[])}. - */ - @Override - public void decrement(final String aspect, final String... tags) { - decrementCounter(aspect, tags); - } - - /** - * Records the latest fixed value for the specified named gauge. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the gauge - * @param value - * the new reading of the gauge - * @param tags - * array of tags to be added to the data - */ - @Override - public void recordGaugeValue(final String aspect, final double value, final String... tags) { - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - send(String.format("%s%s:%s|g%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); - } - - /** - * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. - */ - @Override - public void gauge(final String aspect, final double value, final String... tags) { - recordGaugeValue(aspect, value, tags); - } - - - /** - * Records the latest fixed value for the specified named gauge. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the gauge - * @param value - * the new reading of the gauge - * @param tags - * array of tags to be added to the data - */ - @Override - public void recordGaugeValue(final String aspect, final long value, final String... tags) { - send(String.format("%s%s:%d|g%s", prefix, aspect, value, tagString(tags))); - } - - /** - * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. - */ - @Override - public void gauge(final String aspect, final long value, final String... tags) { - recordGaugeValue(aspect, value, tags); - } - - /** - * Records an execution time in milliseconds for the specified named operation. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the timed operation - * @param timeInMs - * the time in milliseconds - * @param tags - * array of tags to be added to the data - */ - @Override - public void recordExecutionTime(final String aspect, final long timeInMs, final String... tags) { - send(String.format("%s%s:%d|ms%s", prefix, aspect, timeInMs, tagString(tags))); - } - - /** - * Convenience method equivalent to {@link #recordExecutionTime(String, long, String[])}. - */ - @Override - public void time(final String aspect, final long value, final String... tags) { - recordExecutionTime(aspect, value, tags); - } - - /** - * Records a value for the specified named histogram. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the histogram - * @param value - * the value to be incorporated in the histogram - * @param tags - * array of tags to be added to the data - */ - @Override - public void recordHistogramValue(final String aspect, final double value, final String... tags) { - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - send(String.format("%s%s:%s|h%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); - } - - /** - * Convenience method equivalent to {@link #recordHistogramValue(String, double, String[])}. - */ - @Override - public void histogram(final String aspect, final double value, final String... tags) { - recordHistogramValue(aspect, value, tags); - } - - /** - * Records a value for the specified named histogram. - * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the histogram - * @param value - * the value to be incorporated in the histogram - * @param tags - * array of tags to be added to the data - */ - @Override - public void recordHistogramValue(final String aspect, final long value, final String... tags) { - send(String.format("%s%s:%d|h%s", prefix, aspect, value, tagString(tags))); - } - - /** - * Convenience method equivalent to {@link #recordHistogramValue(String, long, String[])}. - */ - @Override - public void histogram(final String aspect, final long value, final String... tags) { - recordHistogramValue(aspect, value, tags); - } - - private String eventMap(final Event event) { - final StringBuilder res = new StringBuilder(""); - - final long millisSinceEpoch = event.getMillisSinceEpoch(); - if (millisSinceEpoch != -1) { - res.append("|d:").append(millisSinceEpoch / 1000); - } - - final String hostname = event.getHostname(); - if (hostname != null) { - res.append("|h:").append(hostname); - } - - final String aggregationKey = event.getAggregationKey(); - if (aggregationKey != null) { - res.append("|k:").append(aggregationKey); - } - - final String priority = event.getPriority(); - if (priority != null) { - res.append("|p:").append(priority); - } - - final String alertType = event.getAlertType(); - if (alertType != null) { - res.append("|t:").append(alertType); - } - - return res.toString(); } - /** - * Records an event - * - *

This method is a DataDog extension, and may not work with other servers.

- * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param event - * The event to record - * @param tags - * array of tags to be added to the data - * - * @see http://docs.datadoghq.com/guides/dogstatsd/#events-1 - */ - @Override - public void recordEvent(final Event event, final String... tags) { - final String title = prefix + event.getTitle(); - final String text = event.getText(); - send(String.format("_e{%d,%d}:%s|%s%s%s", - title.length(), text.length(), title, text, eventMap(event), tagString(tags))); - } - - /** - * Records a run status for the specified named service check. - * - *

This method is a DataDog extension, and may not work with other servers.

- * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param sc - * the service check object - */ - @Override - public void recordServiceCheckRun(final ServiceCheck sc) { - send(toStatsDString(sc)); - } - - private String toStatsDString(final ServiceCheck sc) { - // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks - final StringBuilder sb = new StringBuilder(); - sb.append(String.format("_sc|%s|%d", sc.getName(), sc.getStatus())); - if (sc.getTimestamp() > 0) { - sb.append(String.format("|d:%d", sc.getTimestamp())); - } - if (sc.getHostname() != null) { - sb.append(String.format("|h:%s", sc.getHostname())); - } - sb.append(tagString(sc.getTags())); - if (sc.getMessage() != null) { - sb.append(String.format("|m:%s", sc.getEscapedMessage())); - } - return sb.toString(); - } - - /** - * Convenience method equivalent to {@link #recordServiceCheckRun(ServiceCheck sc)}. - */ - @Override - public void serviceCheck(final ServiceCheck sc) { - recordServiceCheckRun(sc); - } - - - /** - * Records a value for the specified set. - * - * Sets are used to count the number of unique elements in a group. If you want to track the number of - * unique visitor to your site, sets are a great way to do that. - * - *

This method is a DataDog extension, and may not work with other servers.

- * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the set - * @param value - * the value to track - * @param tags - * array of tags to be added to the data - * - * @see http://docs.datadoghq.com/guides/dogstatsd/#sets - */ - @Override - public void recordSetValue(final String aspect, final String value, final String... tags) { - // documentation is light, but looking at dogstatsd source, we can send string values - // here instead of numbers - send(String.format("%s%s:%s|s%s", prefix, aspect, value, tagString(tags))); - } - - private void send(final String message) { + protected void send(String message) { queue.offer(message); } - public static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8"); - + private void blockingSend(String message) { + super.send(message); + } private class QueueConsumer implements Runnable { - private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES); - - private final Callable addressLookup; - - QueueConsumer(final Callable addressLookup) { - this.addressLookup = addressLookup; - } - @Override public void run() { while(!executor.isShutdown()) { try { - final String message = queue.poll(1, TimeUnit.SECONDS); - if(null != message) { - final InetSocketAddress address = addressLookup.call(); - final byte[] data = message.getBytes(MESSAGE_CHARSET); - if(sendBuffer.remaining() < (data.length + 1)) { - blockingSend(address); - } - if(sendBuffer.position() > 0) { - sendBuffer.put( (byte) '\n'); - } - sendBuffer.put(data); - if(null == queue.peek()) { - blockingSend(address); - } + if (queue.peek() == null) { + Thread.sleep(10); + } else { + blockingSend(queue.take()); } } catch (final Exception e) { handler.handle(e); } } } - - private void blockingSend(final InetSocketAddress address) throws IOException { - final int sizeOfBuffer = sendBuffer.position(); - sendBuffer.flip(); - - final int sentBytes = clientChannel.send(sendBuffer, address); - sendBuffer.limit(sendBuffer.capacity()); - sendBuffer.rewind(); - - if (sizeOfBuffer != sentBytes) { - handler.handle( - new IOException( - String.format( - "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", - sendBuffer.toString(), - address.getHostName(), - address.getPort(), - sentBytes, - sizeOfBuffer))); - } - } - } - - /** - * Create dynamic lookup for the given host name and port. - * - * @param hostname the host name of the targeted StatsD server - * @param port the port of the targeted StatsD server - * @return a function to perform the lookup - * @see NonBlockingStatsDClient#NonBlockingStatsDClient(String, String[], StatsDClientErrorHandler, Callable) - */ - public static Callable volatileAddressResolution(final String hostname, final int port) { - return new Callable() { - @Override public InetSocketAddress call() throws UnknownHostException { - return new InetSocketAddress(InetAddress.getByName(hostname), port); - } - }; - } - - /** - * Lookup the address for the given host name and cache the result. - * - * @param hostname the host name of the targeted StatsD server - * @param port the port of the targeted StatsD server - * @return a function that cached the result of the lookup - * @throws Exception if the lookup fails, i.e. {@link UnknownHostException} - */ - public static Callable staticAddressResolution(final String hostname, final int port) throws Exception { - final InetSocketAddress address = volatileAddressResolution(hostname, port).call(); - return new Callable() { - @Override public InetSocketAddress call() { - return address; - } - }; - } - - private static Callable staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException { - try { - return staticAddressResolution(hostname, port); - } catch (final Exception e) { - throw new StatsDClientException("Failed to lookup StatsD host", e); - } } } diff --git a/src/test/java/com/timgroup/statsd/BlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/BlockingStatsDClientTest.java new file mode 100644 index 0000000..6b497f0 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/BlockingStatsDClientTest.java @@ -0,0 +1,11 @@ +package com.timgroup.statsd; + +/** + * @author sfreitag + */ +public class BlockingStatsDClientTest extends StatsDClientTest { + @Override + public BlockingStatsDClient buildClient(String prefix, String[] constantTags) { + return new BlockingStatsDClient(prefix, STATSD_SERVER_HOST, STATSD_SERVER_PORT, constantTags); + } +} diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index a8c94dc..eeeed56 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1,432 +1,11 @@ package com.timgroup.statsd; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.net.SocketException; -import java.util.Locale; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertEquals; - -public class NonBlockingStatsDClientTest { - - private static final int STATSD_SERVER_PORT = 17254; - private final NonBlockingStatsDClient client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT); - private DummyStatsDServer server; - - @Before - public void start() throws SocketException { - server = new DummyStatsDServer(STATSD_SERVER_PORT); - } - - @After - public void stop() throws Exception { - client.stop(); - server.close(); - } - - @Test(timeout=5000L) public void - sends_counter_value_to_statsd() throws Exception { - - - client.count("mycount", 24); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); - } - - @Test(timeout=5000L) public void - sends_counter_value_to_statsd_with_null_tags() throws Exception { - - - client.count("mycount", 24, (java.lang.String[]) null); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); - } - - @Test(timeout=5000L) public void - sends_counter_value_to_statsd_with_empty_tags() throws Exception { - - - client.count("mycount", 24); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); - } - - @Test(timeout=5000L) public void - sends_counter_value_to_statsd_with_tags() throws Exception { - - - client.count("mycount", 24, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_counter_increment_to_statsd() throws Exception { - - - client.incrementCounter("myinc"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myinc:1|c")); - } - - @Test(timeout=5000L) public void - sends_counter_increment_to_statsd_with_tags() throws Exception { - - - client.incrementCounter("myinc", "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myinc:1|c|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_counter_decrement_to_statsd() throws Exception { - - - client.decrementCounter("mydec"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mydec:-1|c")); - } - - @Test(timeout=5000L) public void - sends_counter_decrement_to_statsd_with_tags() throws Exception { - - - client.decrementCounter("mydec", "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mydec:-1|c|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_gauge_to_statsd() throws Exception { - - - client.recordGaugeValue("mygauge", 423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:423|g")); - } - - @Test(timeout=5000L) public void - sends_large_double_gauge_to_statsd() throws Exception { - - - client.recordGaugeValue("mygauge", 123456789012345.67890); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:123456789012345.67|g")); - } - - @Test(timeout=5000L) public void - sends_exact_double_gauge_to_statsd() throws Exception { - - - client.recordGaugeValue("mygauge", 123.45678901234567890); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:123.456789|g")); - } - - @Test(timeout=5000L) public void - sends_double_gauge_to_statsd() throws Exception { - - - client.recordGaugeValue("mygauge", 0.423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:0.423|g")); - } - - @Test(timeout=5000L) public void - sends_gauge_to_statsd_with_tags() throws Exception { - - - client.recordGaugeValue("mygauge", 423, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:423|g|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_double_gauge_to_statsd_with_tags() throws Exception { - - - client.recordGaugeValue("mygauge", 0.423, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:0.423|g|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_histogram_to_statsd() throws Exception { - - - client.recordHistogramValue("myhistogram", 423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:423|h")); - } - - @Test(timeout=5000L) public void - sends_double_histogram_to_statsd() throws Exception { - - - client.recordHistogramValue("myhistogram", 0.423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:0.423|h")); - } - - @Test(timeout=5000L) public void - sends_histogram_to_statsd_with_tags() throws Exception { - - - client.recordHistogramValue("myhistogram", 423, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:423|h|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_double_histogram_to_statsd_with_tags() throws Exception { - - - client.recordHistogramValue("myhistogram", 0.423, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:0.423|h|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_timer_to_statsd() throws Exception { - - - client.recordExecutionTime("mytime", 123); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms")); - } - - /** - * A regression test for this i18n number formatting bug - * @throws Exception - */ - @Test public void - sends_timer_to_statsd_from_locale_with_unamerican_number_formatting() throws Exception { - - Locale originalDefaultLocale = Locale.getDefault(); - - // change the default Locale to one that uses something other than a '.' as the decimal separator (Germany uses a comma) - Locale.setDefault(Locale.GERMANY); - - try { - - - client.recordExecutionTime("mytime", 123, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms|#baz,foo:bar")); - } finally { - // reset the default Locale in case changing it has side-effects - Locale.setDefault(originalDefaultLocale); - } - } - - - @Test(timeout=5000L) public void - sends_timer_to_statsd_with_tags() throws Exception { - - - client.recordExecutionTime("mytime", 123, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms|#baz,foo:bar")); - } - - - @Test(timeout=5000L) public void - sends_gauge_mixed_tags() throws Exception { - - final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT, new String[] {"instance:foo", "app:bar"}); - empty_prefix_client.gauge("value", 423, "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.value:423|g|#app:bar,instance:foo,baz")); - } - - @Test(timeout=5000L) public void - sends_gauge_constant_tags_only() throws Exception { - - final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT, new String[] {"instance:foo", "app:bar"}); - empty_prefix_client.gauge("value", 423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.value:423|g|#app:bar,instance:foo")); - } - - @Test(timeout=5000L) public void - sends_gauge_empty_prefix() throws Exception { - - final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("", "localhost", STATSD_SERVER_PORT); - empty_prefix_client.gauge("top.level.value", 423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("top.level.value:423|g")); - } - - @Test(timeout=5000L) public void - sends_gauge_null_prefix() throws Exception { - - final NonBlockingStatsDClient null_prefix_client = new NonBlockingStatsDClient(null, "localhost", STATSD_SERVER_PORT); - null_prefix_client.gauge("top.level.value", 423); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("top.level.value:423|g")); - } - - @Test(timeout=5000L) public void - sends_event() throws Exception { - - final Event event = Event.builder() - .withTitle("title1") - .withText("text1") - .withDate(1234567000) - .withHostname("host1") - .withPriority(Event.Priority.LOW) - .withAggregationKey("key1") - .withAlertType(Event.AlertType.ERROR) - .build(); - client.recordEvent(event); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|h:host1|k:key1|p:low|t:error")); - } - - @Test(timeout=5000L) public void - sends_partial_event() throws Exception { - - final Event event = Event.builder() - .withTitle("title1") - .withText("text1") - .withDate(1234567000) - .build(); - client.recordEvent(event); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567")); - } - - @Test(timeout=5000L) public void - sends_event_with_tags() throws Exception { - - final Event event = Event.builder() - .withTitle("title1") - .withText("text1") - .withDate(1234567000) - .withHostname("host1") - .withPriority(Event.Priority.LOW) - .withAggregationKey("key1") - .withAlertType(Event.AlertType.ERROR) - .build(); - client.recordEvent(event, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|h:host1|k:key1|p:low|t:error|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_partial_event_with_tags() throws Exception { - - final Event event = Event.builder() - .withTitle("title1") - .withText("text1") - .withDate(1234567000) - .build(); - client.recordEvent(event, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_event_empty_prefix() throws Exception { - - final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("", "localhost", STATSD_SERVER_PORT); - final Event event = Event.builder() - .withTitle("title1") - .withText("text1") - .withDate(1234567000) - .withHostname("host1") - .withPriority(Event.Priority.LOW) - .withAggregationKey("key1") - .withAlertType(Event.AlertType.ERROR) - .build(); - empty_prefix_client.recordEvent(event, "foo:bar", "baz"); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("_e{6,5}:title1|text1|d:1234567|h:host1|k:key1|p:low|t:error|#baz,foo:bar")); - } - - @Test(timeout=5000L) public void - sends_service_check() throws Exception { - final String inputMessage = "\u266c \u2020\u00f8U \n\u2020\u00f8U \u00a5\u00bau|m: T0\u00b5 \u266a"; // "♬ †øU \n†øU ¥ºu|m: T0µ ♪" - final String outputMessage = "\u266c \u2020\u00f8U \\n\u2020\u00f8U \u00a5\u00bau|m\\: T0\u00b5 \u266a"; // note the escaped colon - final String[] tags = {"key1:val1", "key2:val2"}; - final ServiceCheck sc = ServiceCheck.builder() - .withName("my_check.name") - .withStatus(ServiceCheck.Status.WARNING) - .withMessage(inputMessage) - .withHostname("i-abcd1234") - .withTags(tags) - .withTimestamp(1420740000) - .build(); - - assertEquals(outputMessage, sc.getEscapedMessage()); - - client.serviceCheck(sc); - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains(String.format("_sc|my_check.name|1|d:1420740000|h:i-abcd1234|#key2:val2,key1:val1|m:%s", - outputMessage))); - } - - @Test(timeout=5000L) public void - sends_nan_gauge_to_statsd() throws Exception { - client.recordGaugeValue("mygauge", Double.NaN); - - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.mygauge:NaN|g")); - } - - @Test(timeout=5000L) public void - sends_set_to_statsd() throws Exception { - client.recordSetValue("myset", "myuserid"); - - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myset:myuserid|s")); - - } - - @Test(timeout=5000L) public void - sends_set_to_statsd_with_tags() throws Exception { - client.recordSetValue("myset", "myuserid", "foo:bar", "baz"); - - server.waitForMessage(); - - assertThat(server.messagesReceived(), contains("my.prefix.myset:myuserid|s|#baz,foo:bar")); - +/** + * @author sfreitag + */ +public class NonBlockingStatsDClientTest extends StatsDClientTest { + @Override + public NonBlockingStatsDClient buildClient(String prefix, String[] constantTags) { + return new NonBlockingStatsDClient(prefix, STATSD_SERVER_HOST, STATSD_SERVER_PORT, constantTags); } } diff --git a/src/test/java/com/timgroup/statsd/StatsDClientTest.java b/src/test/java/com/timgroup/statsd/StatsDClientTest.java new file mode 100644 index 0000000..e8deb01 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/StatsDClientTest.java @@ -0,0 +1,441 @@ +package com.timgroup.statsd; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.SocketException; +import java.util.Locale; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; + +public abstract class StatsDClientTest { + + protected static final String STATSD_SERVER_HOST = "localhost"; + protected static final int STATSD_SERVER_PORT = 17254; + + private DummyStatsDServer server; + + public T buildClient(String prefix) { + return buildClient(prefix, null); + } + + abstract public T buildClient(String prefix, String[] constantTags); + + private final T client = buildClient("my.prefix"); + + @Before + public void start() throws SocketException { + server = new DummyStatsDServer(STATSD_SERVER_PORT); + } + + @After + public void stop() throws Exception { + client.stop(); + server.close(); + } + + @Test(timeout=5000L) public void + sends_counter_value_to_statsd() throws Exception { + + + client.count("mycount", 24); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); + } + + @Test(timeout=5000L) public void + sends_counter_value_to_statsd_with_null_tags() throws Exception { + + + client.count("mycount", 24, (java.lang.String[]) null); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); + } + + @Test(timeout=5000L) public void + sends_counter_value_to_statsd_with_empty_tags() throws Exception { + + + client.count("mycount", 24); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); + } + + @Test(timeout=5000L) public void + sends_counter_value_to_statsd_with_tags() throws Exception { + + + client.count("mycount", 24, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_counter_increment_to_statsd() throws Exception { + + + client.incrementCounter("myinc"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myinc:1|c")); + } + + @Test(timeout=5000L) public void + sends_counter_increment_to_statsd_with_tags() throws Exception { + + + client.incrementCounter("myinc", "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myinc:1|c|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_counter_decrement_to_statsd() throws Exception { + + + client.decrementCounter("mydec"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mydec:-1|c")); + } + + @Test(timeout=5000L) public void + sends_counter_decrement_to_statsd_with_tags() throws Exception { + + + client.decrementCounter("mydec", "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mydec:-1|c|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:423|g")); + } + + @Test(timeout=5000L) public void + sends_large_double_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 123456789012345.67890); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:123456789012345.67|g")); + } + + @Test(timeout=5000L) public void + sends_exact_double_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 123.45678901234567890); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:123.456789|g")); + } + + @Test(timeout=5000L) public void + sends_double_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 0.423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:0.423|g")); + } + + @Test(timeout=5000L) public void + sends_gauge_to_statsd_with_tags() throws Exception { + + + client.recordGaugeValue("mygauge", 423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:423|g|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_double_gauge_to_statsd_with_tags() throws Exception { + + + client.recordGaugeValue("mygauge", 0.423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:0.423|g|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_histogram_to_statsd() throws Exception { + + + client.recordHistogramValue("myhistogram", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:423|h")); + } + + @Test(timeout=5000L) public void + sends_double_histogram_to_statsd() throws Exception { + + + client.recordHistogramValue("myhistogram", 0.423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:0.423|h")); + } + + @Test(timeout=5000L) public void + sends_histogram_to_statsd_with_tags() throws Exception { + + + client.recordHistogramValue("myhistogram", 423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:423|h|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_double_histogram_to_statsd_with_tags() throws Exception { + + + client.recordHistogramValue("myhistogram", 0.423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:0.423|h|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_timer_to_statsd() throws Exception { + + + client.recordExecutionTime("mytime", 123); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms")); + } + + /** + * A regression test for this i18n number formatting bug + * @throws Exception + */ + @Test public void + sends_timer_to_statsd_from_locale_with_unamerican_number_formatting() throws Exception { + + Locale originalDefaultLocale = Locale.getDefault(); + + // change the default Locale to one that uses something other than a '.' as the decimal separator (Germany uses a comma) + Locale.setDefault(Locale.GERMANY); + + try { + + + client.recordExecutionTime("mytime", 123, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms|#baz,foo:bar")); + } finally { + // reset the default Locale in case changing it has side-effects + Locale.setDefault(originalDefaultLocale); + } + } + + + @Test(timeout=5000L) public void + sends_timer_to_statsd_with_tags() throws Exception { + + + client.recordExecutionTime("mytime", 123, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms|#baz,foo:bar")); + } + + + @Test(timeout=5000L) public void + sends_gauge_mixed_tags() throws Exception { + + final T empty_prefix_client = buildClient("my.prefix", new String[] {"instance:foo", "app:bar"}); + empty_prefix_client.gauge("value", 423, "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.value:423|g|#app:bar,instance:foo,baz")); + } + + @Test(timeout=5000L) public void + sends_gauge_constant_tags_only() throws Exception { + + final T empty_prefix_client = buildClient("my.prefix", new String[] {"instance:foo", "app:bar"}); + empty_prefix_client.gauge("value", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.value:423|g|#app:bar,instance:foo")); + } + + @Test(timeout=5000L) public void + sends_gauge_empty_prefix() throws Exception { + + final T empty_prefix_client = buildClient(""); + empty_prefix_client.gauge("top.level.value", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("top.level.value:423|g")); + } + + @Test(timeout=5000L) public void + sends_gauge_null_prefix() throws Exception { + + final T null_prefix_client = buildClient(null); + null_prefix_client.gauge("top.level.value", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("top.level.value:423|g")); + } + + @Test(timeout=5000L) public void + sends_event() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + client.recordEvent(event); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|h:host1|k:key1|p:low|t:error")); + } + + @Test(timeout=5000L) public void + sends_partial_event() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .build(); + client.recordEvent(event); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567")); + } + + @Test(timeout=5000L) public void + sends_event_with_tags() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + client.recordEvent(event, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|h:host1|k:key1|p:low|t:error|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_partial_event_with_tags() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .build(); + client.recordEvent(event, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_event_empty_prefix() throws Exception { + + final T empty_prefix_client = buildClient(""); + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + empty_prefix_client.recordEvent(event, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{6,5}:title1|text1|d:1234567|h:host1|k:key1|p:low|t:error|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_service_check() throws Exception { + final String inputMessage = "\u266c \u2020\u00f8U \n\u2020\u00f8U \u00a5\u00bau|m: T0\u00b5 \u266a"; // "♬ †øU \n†øU ¥ºu|m: T0µ ♪" + final String outputMessage = "\u266c \u2020\u00f8U \\n\u2020\u00f8U \u00a5\u00bau|m\\: T0\u00b5 \u266a"; // note the escaped colon + final String[] tags = {"key1:val1", "key2:val2"}; + final ServiceCheck sc = ServiceCheck.builder() + .withName("my_check.name") + .withStatus(ServiceCheck.Status.WARNING) + .withMessage(inputMessage) + .withHostname("i-abcd1234") + .withTags(tags) + .withTimestamp(1420740000) + .build(); + + assertEquals(outputMessage, sc.getEscapedMessage()); + + client.serviceCheck(sc); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains(String.format("_sc|my_check.name|1|d:1420740000|h:i-abcd1234|#key2:val2,key1:val1|m:%s", + outputMessage))); + } + + @Test(timeout=5000L) public void + sends_nan_gauge_to_statsd() throws Exception { + client.recordGaugeValue("mygauge", Double.NaN); + + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:NaN|g")); + } + + @Test(timeout=5000L) public void + sends_set_to_statsd() throws Exception { + client.recordSetValue("myset", "myuserid"); + + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myset:myuserid|s")); + + } + + @Test(timeout=5000L) public void + sends_set_to_statsd_with_tags() throws Exception { + client.recordSetValue("myset", "myuserid", "foo:bar", "baz"); + + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myset:myuserid|s|#baz,foo:bar")); + + } +} From 69cf36e05cbff80362d53a4f7e54c0d01e9b9491 Mon Sep 17 00:00:00 2001 From: Sean Freitag Date: Mon, 8 Feb 2016 12:09:26 -0600 Subject: [PATCH 3/6] Optimizing imports for the project. --- .../statsd/NonBlockingStatsDClient.java | 18 +----------------- .../NonBlockingStatsDClientPerfTest.java | 7 ++++--- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 4312f98..d38f8d3 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -1,23 +1,7 @@ package com.timgroup.statsd; -import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.charset.Charset; -import java.text.DecimalFormat; -import java.text.DecimalFormatSymbols; -import java.text.NumberFormat; -import java.util.Locale; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * A simple StatsD client implementation facilitating metrics recording. diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java index 70c1219..d12be3d 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java @@ -1,14 +1,15 @@ package com.timgroup.statsd; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + import java.net.SocketException; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; import static org.junit.Assert.assertEquals; From ebdbfbe977c03417e8b1de7954d64410ce199708 Mon Sep 17 00:00:00 2001 From: Sean Freitag Date: Mon, 8 Feb 2016 12:52:49 -0600 Subject: [PATCH 4/6] Fixing some inconsistent documentation. --- .../timgroup/statsd/BlockingStatsDClient.java | 26 +++++++++---------- .../statsd/NonBlockingStatsDClient.java | 6 ++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java index 97773f1..9bfeeac 100644 --- a/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java @@ -82,7 +82,7 @@ protected NumberFormat initialValue() { /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have + * specified host and port. All messages sent via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -105,7 +105,7 @@ public BlockingStatsDClient(final String prefix, final String hostname, final in /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have + * specified host and port. All messages sent via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -130,7 +130,7 @@ public BlockingStatsDClient(final String prefix, final String hostname, final in /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have + * specified host and port. All messages sent via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -265,7 +265,7 @@ String tagString(final String[] tags) { /** * Adjusts the specified counter by a given delta. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the counter to adjust @@ -282,7 +282,7 @@ public void count(final String aspect, final long delta, final String... tags) { /** * Increments the specified counter by one. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the counter to increment @@ -305,7 +305,7 @@ public void increment(final String aspect, final String... tags) { /** * Decrements the specified counter by one. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the counter to decrement @@ -328,7 +328,7 @@ public void decrement(final String aspect, final String... tags) { /** * Records the latest fixed value for the specified named gauge. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the gauge @@ -356,7 +356,7 @@ public void gauge(final String aspect, final double value, final String... tags) /** * Records the latest fixed value for the specified named gauge. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the gauge @@ -381,7 +381,7 @@ public void gauge(final String aspect, final long value, final String... tags) { /** * Records an execution time in milliseconds for the specified named operation. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the timed operation @@ -406,7 +406,7 @@ public void time(final String aspect, final long value, final String... tags) { /** * Records a value for the specified named histogram. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the histogram @@ -433,7 +433,7 @@ public void histogram(final String aspect, final double value, final String... t /** * Records a value for the specified named histogram. * - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param aspect * the name of the histogram @@ -491,7 +491,7 @@ private String eventMap(final Event event) { * *

This method is a DataDog extension, and may not work with other servers.

* - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param event * The event to record @@ -513,7 +513,7 @@ public void recordEvent(final Event event, final String... tags) { * *

This method is a DataDog extension, and may not work with other servers.

* - *

This method is non-blocking and is guaranteed not to throw an exception.

+ *

This method is guaranteed not to throw an exception.

* * @param sc * the service check object diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index d38f8d3..d67a520 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -48,7 +48,7 @@ public final class NonBlockingStatsDClient extends BlockingStatsDClient { /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have + * specified host and port. All messages sent via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -71,7 +71,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have + * specified host and port. All messages sent via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -96,7 +96,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have + * specified host and port. All messages sent via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot From 1e75b99e49bd871afafa1c595f91354bc997bc5d Mon Sep 17 00:00:00 2001 From: Sean Freitag Date: Tue, 12 Apr 2016 09:47:22 -0500 Subject: [PATCH 5/6] Fixing the concurrency efficiency of the NonBlockingStatsDClient. --- .../com/timgroup/statsd/BlockingStatsDClient.java | 2 +- .../timgroup/statsd/NonBlockingStatsDClient.java | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java index 9bfeeac..f8c7249 100644 --- a/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/BlockingStatsDClient.java @@ -44,7 +44,7 @@ */ public class BlockingStatsDClient implements StatsDClient { - private static final int PACKET_SIZE_BYTES = 1500; + protected static final int PACKET_SIZE_BYTES = 1500; /** * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index d67a520..e348595 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -177,13 +177,24 @@ private void blockingSend(String message) { } private class QueueConsumer implements Runnable { + private boolean canAddChunk(String message, String chunk) { + return message.getBytes(MESSAGE_CHARSET).length + chunk.getBytes(MESSAGE_CHARSET).length + "\n".getBytes(MESSAGE_CHARSET).length < PACKET_SIZE_BYTES; + } + @Override public void run() { while(!executor.isShutdown()) { try { if (queue.peek() == null) { Thread.sleep(10); } else { - blockingSend(queue.take()); + String message = ""; + do { + if (message.length() > 0) + message += "\n"; + message += queue.take(); + } while (queue.peek() != null && canAddChunk(message, queue.peek())); + + blockingSend(message); } } catch (final Exception e) { handler.handle(e); From 53ef4b4c962c9dd0f530e2b686efc830b2feb925 Mon Sep 17 00:00:00 2001 From: Sean Freitag Date: Tue, 12 Apr 2016 10:23:51 -0500 Subject: [PATCH 6/6] Reverting changes to NonBlockingStatsDClient made when adding the BlockingStatsDClient. --- .../statsd/NonBlockingStatsDClient.java | 557 +++++++++++++++++- 1 file changed, 531 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index e348595..42d9980 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -1,7 +1,23 @@ package com.timgroup.statsd; +import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.concurrent.*; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.charset.Charset; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.text.NumberFormat; +import java.util.Locale; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * A simple StatsD client implementation facilitating metrics recording. @@ -32,7 +48,46 @@ * @author Tom Denley * */ -public final class NonBlockingStatsDClient extends BlockingStatsDClient { +public final class NonBlockingStatsDClient implements StatsDClient { + + private static final int PACKET_SIZE_BYTES = 1500; + + private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() { + @Override public void handle(final Exception e) { /* No-op */ } + }; + + /** + * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to + * create one pre thread as this seems to offer a significant performance improvement over creating one per-thread: + * http://stackoverflow.com/a/1285297/2648 + * https://github.com/indeedeng/java-dogstatsd-client/issues/4 + */ + private static final ThreadLocal NUMBER_FORMATTERS = new ThreadLocal() { + @Override + protected NumberFormat initialValue() { + + // Always create the formatter for the US locale in order to avoid this bug: + // https://github.com/indeedeng/java-dogstatsd-client/issues/3 + final NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); + numberFormatter.setGroupingUsed(false); + numberFormatter.setMaximumFractionDigits(6); + + // we need to specify a value for Double.NaN that is recognized by dogStatsD + if (numberFormatter instanceof DecimalFormat) { // better safe than a runtime error + final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; + final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); + symbols.setNaN("NaN"); + decimalFormat.setDecimalFormatSymbols(symbols); + } + + return numberFormatter; + } + }; + + private final String prefix; + private final DatagramChannel clientChannel; + private final StatsDClientErrorHandler handler; + private final String constantTagsRendered; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); @@ -48,7 +103,7 @@ public final class NonBlockingStatsDClient extends BlockingStatsDClient { /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages sent via this client will have + * specified host and port. All messages send via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -71,7 +126,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages sent via this client will have + * specified host and port. All messages send via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -96,7 +151,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages sent via this client will have + * specified host and port. All messages send via this client will have * their keys prefixed with the specified string. The new client will * attempt to open a connection to the StatsD server immediately upon * instantiation, and may throw an exception if that a connection cannot @@ -147,8 +202,35 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final */ public NonBlockingStatsDClient(final String prefix, String[] constantTags, final StatsDClientErrorHandler errorHandler, final Callable addressLookup) throws StatsDClientException { - super(prefix, constantTags, errorHandler, addressLookup); - this.executor.submit(new QueueConsumer()); + if((prefix != null) && (!prefix.isEmpty())) { + this.prefix = String.format("%s.", prefix); + } else { + this.prefix = ""; + } + if(errorHandler == null) { + handler = NO_OP_HANDLER; + } + else { + handler = errorHandler; + } + + /* Empty list should be null for faster comparison */ + if((constantTags != null) && (constantTags.length == 0)) { + constantTags = null; + } + + if(constantTags != null) { + constantTagsRendered = tagString(constantTags, null); + } else { + constantTagsRendered = null; + } + + try { + clientChannel = DatagramChannel.open(); + } catch (final Exception e) { + throw new StatsDClientException("Failed to start StatsD client", e); + } + executor.submit(new QueueConsumer(addressLookup)); } /** @@ -163,43 +245,466 @@ public void stop() { } catch (final Exception e) { handler.handle(e); - } finally { - super.stop(); + } + finally { + if (clientChannel != null) { + try { + clientChannel.close(); + } + catch (final IOException e) { + handler.handle(e); + } + } } } - protected void send(String message) { - queue.offer(message); + /** + * Generate a suffix conveying the given tag list to the client + */ + static String tagString(final String[] tags, final String tagPrefix) { + final StringBuilder sb; + if(tagPrefix != null) { + if((tags == null) || (tags.length == 0)) { + return tagPrefix; + } + sb = new StringBuilder(tagPrefix); + sb.append(","); + } else { + if((tags == null) || (tags.length == 0)) { + return ""; + } + sb = new StringBuilder("|#"); + } + + for(int n=tags.length - 1; n>=0; n--) { + sb.append(tags[n]); + if(n > 0) { + sb.append(","); + } + } + return sb.toString(); + } + + /** + * Generate a suffix conveying the given tag list to the client + */ + String tagString(final String[] tags) { + return tagString(tags, constantTagsRendered); + } + + /** + * Adjusts the specified counter by a given delta. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to adjust + * @param delta + * the amount to adjust the counter by + * @param tags + * array of tags to be added to the data + */ + @Override + public void count(final String aspect, final long delta, final String... tags) { + send(String.format("%s%s:%d|c%s", prefix, aspect, delta, tagString(tags))); + } + + /** + * Increments the specified counter by one. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to increment + * @param tags + * array of tags to be added to the data + */ + @Override + public void incrementCounter(final String aspect, final String... tags) { + count(aspect, 1, tags); + } + + /** + * Convenience method equivalent to {@link #incrementCounter(String, String[])}. + */ + @Override + public void increment(final String aspect, final String... tags) { + incrementCounter(aspect, tags); + } + + /** + * Decrements the specified counter by one. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to decrement + * @param tags + * array of tags to be added to the data + */ + @Override + public void decrementCounter(final String aspect, final String... tags) { + count(aspect, -1, tags); + } + + /** + * Convenience method equivalent to {@link #decrementCounter(String, String[])}. + */ + @Override + public void decrement(final String aspect, final String... tags) { + decrementCounter(aspect, tags); + } + + /** + * Records the latest fixed value for the specified named gauge. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordGaugeValue(final String aspect, final double value, final String... tags) { + /* Intentionally using %s rather than %f here to avoid + * padding with extra 0s to represent precision */ + send(String.format("%s%s:%s|g%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. + */ + @Override + public void gauge(final String aspect, final double value, final String... tags) { + recordGaugeValue(aspect, value, tags); + } + + + /** + * Records the latest fixed value for the specified named gauge. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordGaugeValue(final String aspect, final long value, final String... tags) { + send(String.format("%s%s:%d|g%s", prefix, aspect, value, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. + */ + @Override + public void gauge(final String aspect, final long value, final String... tags) { + recordGaugeValue(aspect, value, tags); + } + + /** + * Records an execution time in milliseconds for the specified named operation. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the timed operation + * @param timeInMs + * the time in milliseconds + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordExecutionTime(final String aspect, final long timeInMs, final String... tags) { + send(String.format("%s%s:%d|ms%s", prefix, aspect, timeInMs, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordExecutionTime(String, long, String[])}. + */ + @Override + public void time(final String aspect, final long value, final String... tags) { + recordExecutionTime(aspect, value, tags); + } + + /** + * Records a value for the specified named histogram. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordHistogramValue(final String aspect, final double value, final String... tags) { + /* Intentionally using %s rather than %f here to avoid + * padding with extra 0s to represent precision */ + send(String.format("%s%s:%s|h%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, double, String[])}. + */ + @Override + public void histogram(final String aspect, final double value, final String... tags) { + recordHistogramValue(aspect, value, tags); + } + + /** + * Records a value for the specified named histogram. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordHistogramValue(final String aspect, final long value, final String... tags) { + send(String.format("%s%s:%d|h%s", prefix, aspect, value, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, long, String[])}. + */ + @Override + public void histogram(final String aspect, final long value, final String... tags) { + recordHistogramValue(aspect, value, tags); + } + + private String eventMap(final Event event) { + final StringBuilder res = new StringBuilder(""); + + final long millisSinceEpoch = event.getMillisSinceEpoch(); + if (millisSinceEpoch != -1) { + res.append("|d:").append(millisSinceEpoch / 1000); + } + + final String hostname = event.getHostname(); + if (hostname != null) { + res.append("|h:").append(hostname); + } + + final String aggregationKey = event.getAggregationKey(); + if (aggregationKey != null) { + res.append("|k:").append(aggregationKey); + } + + final String priority = event.getPriority(); + if (priority != null) { + res.append("|p:").append(priority); + } + + final String alertType = event.getAlertType(); + if (alertType != null) { + res.append("|t:").append(alertType); + } + + return res.toString(); + } + + /** + * Records an event + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param event + * The event to record + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#events-1 + */ + @Override + public void recordEvent(final Event event, final String... tags) { + final String title = prefix + event.getTitle(); + final String text = event.getText(); + send(String.format("_e{%d,%d}:%s|%s%s%s", + title.length(), text.length(), title, text, eventMap(event), tagString(tags))); + } + + /** + * Records a run status for the specified named service check. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param sc + * the service check object + */ + @Override + public void recordServiceCheckRun(final ServiceCheck sc) { + send(toStatsDString(sc)); + } + + private String toStatsDString(final ServiceCheck sc) { + // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks + final StringBuilder sb = new StringBuilder(); + sb.append(String.format("_sc|%s|%d", sc.getName(), sc.getStatus())); + if (sc.getTimestamp() > 0) { + sb.append(String.format("|d:%d", sc.getTimestamp())); + } + if (sc.getHostname() != null) { + sb.append(String.format("|h:%s", sc.getHostname())); + } + sb.append(tagString(sc.getTags())); + if (sc.getMessage() != null) { + sb.append(String.format("|m:%s", sc.getEscapedMessage())); + } + return sb.toString(); + } + + /** + * Convenience method equivalent to {@link #recordServiceCheckRun(ServiceCheck sc)}. + */ + @Override + public void serviceCheck(final ServiceCheck sc) { + recordServiceCheckRun(sc); + } + + + /** + * Records a value for the specified set. + * + * Sets are used to count the number of unique elements in a group. If you want to track the number of + * unique visitor to your site, sets are a great way to do that. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the set + * @param value + * the value to track + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#sets + */ + @Override + public void recordSetValue(final String aspect, final String value, final String... tags) { + // documentation is light, but looking at dogstatsd source, we can send string values + // here instead of numbers + send(String.format("%s%s:%s|s%s", prefix, aspect, value, tagString(tags))); } - private void blockingSend(String message) { - super.send(message); + private void send(final String message) { + queue.offer(message); } + public static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8"); + + private class QueueConsumer implements Runnable { - private boolean canAddChunk(String message, String chunk) { - return message.getBytes(MESSAGE_CHARSET).length + chunk.getBytes(MESSAGE_CHARSET).length + "\n".getBytes(MESSAGE_CHARSET).length < PACKET_SIZE_BYTES; + private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES); + + private final Callable addressLookup; + + QueueConsumer(final Callable addressLookup) { + this.addressLookup = addressLookup; } @Override public void run() { while(!executor.isShutdown()) { try { - if (queue.peek() == null) { - Thread.sleep(10); - } else { - String message = ""; - do { - if (message.length() > 0) - message += "\n"; - message += queue.take(); - } while (queue.peek() != null && canAddChunk(message, queue.peek())); - - blockingSend(message); + final String message = queue.poll(1, TimeUnit.SECONDS); + if(null != message) { + final InetSocketAddress address = addressLookup.call(); + final byte[] data = message.getBytes(MESSAGE_CHARSET); + if(sendBuffer.remaining() < (data.length + 1)) { + blockingSend(address); + } + if(sendBuffer.position() > 0) { + sendBuffer.put( (byte) '\n'); + } + sendBuffer.put(data); + if(null == queue.peek()) { + blockingSend(address); + } } } catch (final Exception e) { handler.handle(e); } } } + + private void blockingSend(final InetSocketAddress address) throws IOException { + final int sizeOfBuffer = sendBuffer.position(); + sendBuffer.flip(); + + final int sentBytes = clientChannel.send(sendBuffer, address); + sendBuffer.limit(sendBuffer.capacity()); + sendBuffer.rewind(); + + if (sizeOfBuffer != sentBytes) { + handler.handle( + new IOException( + String.format( + "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", + sendBuffer.toString(), + address.getHostName(), + address.getPort(), + sentBytes, + sizeOfBuffer))); + } + } + } + + /** + * Create dynamic lookup for the given host name and port. + * + * @param hostname the host name of the targeted StatsD server + * @param port the port of the targeted StatsD server + * @return a function to perform the lookup + * @see NonBlockingStatsDClient#NonBlockingStatsDClient(String, String[], StatsDClientErrorHandler, Callable) + */ + public static Callable volatileAddressResolution(final String hostname, final int port) { + return new Callable() { + @Override public InetSocketAddress call() throws UnknownHostException { + return new InetSocketAddress(InetAddress.getByName(hostname), port); + } + }; + } + + /** + * Lookup the address for the given host name and cache the result. + * + * @param hostname the host name of the targeted StatsD server + * @param port the port of the targeted StatsD server + * @return a function that cached the result of the lookup + * @throws Exception if the lookup fails, i.e. {@link UnknownHostException} + */ + public static Callable staticAddressResolution(final String hostname, final int port) throws Exception { + final InetSocketAddress address = volatileAddressResolution(hostname, port).call(); + return new Callable() { + @Override public InetSocketAddress call() { + return address; + } + }; + } + + private static Callable staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException { + try { + return staticAddressResolution(hostname, port); + } catch (final Exception e) { + throw new StatsDClientException("Failed to lookup StatsD host", e); + } } }