diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..1804c8c --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1 @@ +This project has merged with datadog/java-dogstatsd-client and now lives in a new home at: http://datadog/java-dogstatsd-client. This repository primarily exists for historical purposes. Please file new issues and pull requests at datadog/java-dogstatsd-client. \ No newline at end of file diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..1804c8c --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1 @@ +This project has merged with datadog/java-dogstatsd-client and now lives in a new home at: http://datadog/java-dogstatsd-client. This repository primarily exists for historical purposes. Please file new issues and pull requests at datadog/java-dogstatsd-client. \ No newline at end of file diff --git a/.gitignore b/.gitignore index c745919..74e5664 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ bin build +*.iml +/target/ +.idea diff --git a/README.md b/README.md index be7a600..734fa1f 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,26 @@ -java-statsd-client +# Project Moved + +This project has merged with [datadog/java-dogstatsd-client](http://github.com/datadog/java-dogstatsd-client). This repository primarily exists for historical purposes. Please file issues and pull requests at [datadog/java-dogstatsd-client](http://github.com/datadog/java-dogstatsd-client). + + +java-dogstatsd-client ================== A statsd client library implemented in Java. Allows for Java applications to easily communicate with statsd. +This version is forked from the upstream [java-statsd-client](https://github.com/youdevise/java-statsd-client) project, adding support for [Datadog](http://datadoghq.com/) extensions for use with [dogstatsd](http://docs.datadoghq.com/guides/dogstatsd/). + +This version also adds support for empty or null prefixes, to allow a client to send arbitrary statistic names. + Downloads --------- -The client jar is distributed via maven central, and can be downloaded [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3Acom.timgroup%20a%3Ajava-statsd-client). +The client jar is distributed via maven central, and can be downloaded [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3Acom.indeed%20a%3Ajava-dogstatsd-client). ```xml - com.timgroup - java-statsd-client - 2.0.0 + com.indeed + java-dogstatsd-client + 2.0.16 ``` @@ -22,13 +31,24 @@ import com.timgroup.statsd.StatsDClient; import com.timgroup.statsd.NonBlockingStatsDClient; public class Foo { - private static final StatsDClient statsd = new NonBlockingStatsDClient("my.prefix", "statsd-host", 8125); + + private static final StatsDClient statsd = new NonBlockingStatsDClient( + "my.prefix", /* prefix to any stats; may be null or empty string */ + "statsd-host", /* common case: localhost */ + 8125, /* port */ + new String[] {"tag:value"} /* Datadog extension: Constant tags, always applied */ + ); public static final void main(String[] args) { - statsd.incrementCounter("bar"); - statsd.recordGaugeValue("baz", 100); - statsd.recordExecutionTime("bag", 25); + statsd.incrementCounter("foo"); + statsd.recordGaugeValue("bar", 100); + statsd.recordGaugeValue("baz", 0.01); /* Datadog extension: support for floating-point gauges */ + statsd.recordHistogramValue("qux", 15); /* Datadog extension: histograms */ + statsd.recordHistogramValue("qux", 15.5); /* ...also floating-point */ + + /* expects times in milliseconds + */ + statsd.recordExecutionTime("bag", 25, "cluster:foo"); /* Datadog extension: cluster tag */ } } ``` - diff --git a/build.xml b/build.xml deleted file mode 100644 index 474b92b..0000000 --- a/build.xml +++ /dev/null @@ -1,98 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/java-statsd-client.pom b/java-statsd-client.pom deleted file mode 100644 index d6d8ad0..0000000 --- a/java-statsd-client.pom +++ /dev/null @@ -1,29 +0,0 @@ - - 4.0.0 - com.timgroup - java-statsd-client - jar - java-statsd-client - @VERSION@ - A tiny library allowing Java applications to communicate with statsd instances easily. - http://github.com/youdevise/java-statsd-client - - - The MIT License (MIT) - http://opensource.org/licenses/MIT - repo - - - - http://github.com/youdevise/java-statsd-client - scm:git:git://github.com/youdevise/java-statsd-client.git - scm:git:git@github.com:youdevise/java-statsd-client.git - - - - scarytom - Tom Denley - tom.denley@timgroup.com - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..36f1061 --- /dev/null +++ b/pom.xml @@ -0,0 +1,67 @@ + + 4.0.0 + + + com.indeed + oss-parent + 13 + + + com.indeed + java-dogstatsd-client + jar + java-dogstatsd-client + 2.0.17-SNAPSHOT + A tiny library allowing Java applications to communicate with DataDog statsd instances easily. + https://github.com/indeedeng/java-dogstatsd-client + + + 1.6 + + + + + The MIT License (MIT) + http://opensource.org/licenses/MIT + repo + + + + + http://github.com/indeedeng/java-dogstatsd-client + scm:git:git://github.com/indeedeng/java-dogstatsd-client.git + scm:git:git@github.com:indeedeng/java-dogstatsd-client.git + + + + + duffy + Charles Duffy + duffy@indeed.com + + + scarytom + Tom Denley + tom.denley@timgroup.com + + + + + + org.hamcrest + hamcrest-core + test + + + org.hamcrest + hamcrest-library + test + + + junit + junit + test + + + + diff --git a/src/main/java/com/timgroup/statsd/Event.java b/src/main/java/com/timgroup/statsd/Event.java new file mode 100644 index 0000000..4507425 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/Event.java @@ -0,0 +1,166 @@ +package com.timgroup.statsd; + +import java.util.Date; + +/** + * An event to send + * @see http://docs.datadoghq.com/guides/dogstatsd/#events + */ +public class Event { + private String title; + private String text; + private long millisSinceEpoch = -1; + private String hostname; + private String aggregationKey; + private String priority; + private String sourceTypeName; + private String alertType; + + public String getTitle() { + return title; + } + + public String getText() { + return text; + } + + /** + * @return -1 if not set + */ + public long getMillisSinceEpoch() { + return millisSinceEpoch; + } + + public String getHostname() { + return hostname; + } + + public String getAggregationKey() { + return aggregationKey; + } + + public String getPriority() { + return priority; + } + + public String getSourceTypeName() { + return sourceTypeName; + } + + public String getAlertType() { + return alertType; + } + + public static Builder builder() { + return new Builder(); + } + + private Event(){} + + public enum Priority { + LOW, NORMAL + } + + public enum AlertType { + ERROR, WARNING, INFO, SUCCESS + } + + @SuppressWarnings({"AccessingNonPublicFieldOfAnotherObject", "PrivateMemberAccessBetweenOuterAndInnerClass", "ParameterHidesMemberVariable"}) + public static class Builder { + private final Event event = new Event(); + private Builder() {} + + public Event build() { + if ((event.title == null) || event.title.isEmpty()) { + throw new IllegalStateException("event title must be set"); + } + if ((event.text == null) || event.text.isEmpty()) { + throw new IllegalStateException("event text must be set"); + } + return event; + } + + /** + * @param title + * Event title ; mandatory + */ + public Builder withTitle(final String title) { + event.title = title; + return this; + } + + /** + * @param text + * Event text ; supports line breaks ; mandatory + */ + public Builder withText(final String text) { + event.text = text; + return this; + } + + /** + * @param date + * Assign a timestamp to the event ; Default: none (Default is the current Unix epoch timestamp when not sent) + */ + public Builder withDate(final Date date) { + event.millisSinceEpoch = date.getTime(); + return this; + } + + /** + * @param millisSinceEpoch + * Assign a timestamp to the event ; Default: none (Default is the current Unix epoch timestamp when not sent) + */ + public Builder withDate(final long millisSinceEpoch) { + event.millisSinceEpoch = millisSinceEpoch; + return this; + } + + /** + * @param hostname + * Assign a hostname to the event ; Default: none + */ + public Builder withHostname(final String hostname) { + event.hostname = hostname; + return this; + } + + /** + * @param aggregationKey + * Assign an aggregation key to the event, to group it with some others ; Default: none + */ + public Builder withAggregationKey(final String aggregationKey) { + event.aggregationKey = aggregationKey; + return this; + } + + /** + * @param priority + * Can be "normal" or "low" ; Default: "normal" + */ + public Builder withPriority(final Priority priority) { + //noinspection StringToUpperCaseOrToLowerCaseWithoutLocale + event.priority = priority.name().toLowerCase(); + return this; + } + + /** + * @param sourceTypeName + * Assign a source type to the event ; Default: none + */ + public Builder withSourceTypeName(final String sourceTypeName) { + event.sourceTypeName = sourceTypeName; + return this; + } + + /** + * @param alertType + * Can be "error", "warning", "info" or "success" ; Default: "info" + */ + public Builder withAlertType(final AlertType alertType) { + //noinspection StringToUpperCaseOrToLowerCaseWithoutLocale + event.alertType = alertType.name().toLowerCase(); + return this; + } + } +} diff --git a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java index 2f6455a..0c9ea45 100644 --- a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java @@ -3,19 +3,29 @@ /** * A No-Op StatsDClient, which can be substituted in when metrics are not * required. - * + * * @author Tom Denley * */ public final class NoOpStatsDClient implements StatsDClient { @Override public void stop() { } - @Override public void count(String aspect, int delta) { } - @Override public void incrementCounter(String aspect) { } - @Override public void increment(String aspect) { } - @Override public void decrementCounter(String aspect) { } - @Override public void decrement(String aspect) { } - @Override public void recordGaugeValue(String aspect, int value) { } - @Override public void gauge(String aspect, int value) { } - @Override public void recordExecutionTime(String aspect, int timeInMs) { } - @Override public void time(String aspect, int value) { } + @Override public void count(String aspect, long delta, String... tags) { } + @Override public void incrementCounter(String aspect, String... tags) { } + @Override public void increment(String aspect, String... tags) { } + @Override public void decrementCounter(String aspect, String... tags) { } + @Override public void decrement(String aspect, String... tags) { } + @Override public void recordGaugeValue(String aspect, double value, String... tags) { } + @Override public void gauge(String aspect, double value, String... tags) { } + @Override public void recordGaugeValue(String aspect, long value, String... tags) { } + @Override public void gauge(String aspect, long value, String... tags) { } + @Override public void recordExecutionTime(String aspect, long timeInMs, String... tags) { } + @Override public void time(String aspect, long value, String... tags) { } + @Override public void recordHistogramValue(String aspect, double value, String... tags) { } + @Override public void histogram(String aspect, double value, String... tags) { } + @Override public void recordHistogramValue(String aspect, long value, String... tags) { } + @Override public void histogram(String aspect, long value, String... tags) { } + @Override public void recordEvent(final Event event, final String... tags) { } + @Override public void recordServiceCheckRun(ServiceCheck sc) { } + @Override public void serviceCheck(ServiceCheck sc) { } + @Override public void recordSetValue(String aspect, String value, String... tags) { } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index aafd383..cd0c0ae 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -1,58 +1,106 @@ package com.timgroup.statsd; -import java.net.DatagramPacket; -import java.net.DatagramSocket; +import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.charset.Charset; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.text.NumberFormat; +import java.util.Locale; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** * A simple StatsD client implementation facilitating metrics recording. - * + * *

Upon instantiation, this client will establish a socket connection to a StatsD instance * running on the specified host and port. Metrics are then sent over this connection as they are * received by the client. *

- * + * *

Three key methods are provided for the submission of data-points for the application under * scrutiny: *

* From the perspective of the application, these methods are non-blocking, with the resulting * IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed * not to throw an exception which may disrupt application execution. *

- * + * *

As part of a clean system shutdown, the {@link #stop()} method should be invoked * on any StatsD clients.

- * + * * @author Tom Denley * */ public final class NonBlockingStatsDClient implements StatsDClient { + private static final int PACKET_SIZE_BYTES = 1500; + private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() { - @Override public void handle(Exception e) { /* No-op */ } + @Override public void handle(final Exception e) { /* No-op */ } + }; + + /** + * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to + * create one pre thread as this seems to offer a significant performance improvement over creating one per-thread: + * http://stackoverflow.com/a/1285297/2648 + * https://github.com/indeedeng/java-dogstatsd-client/issues/4 + */ + private static final ThreadLocal NUMBER_FORMATTERS = new ThreadLocal() { + @Override + protected NumberFormat initialValue() { + + // Always create the formatter for the US locale in order to avoid this bug: + // https://github.com/indeedeng/java-dogstatsd-client/issues/3 + final NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); + numberFormatter.setGroupingUsed(false); + numberFormatter.setMaximumFractionDigits(6); + + // we need to specify a value for Double.NaN that is recognized by dogStatsD + if (numberFormatter instanceof DecimalFormat) { // better safe than a runtime error + final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; + final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); + symbols.setNaN("NaN"); + decimalFormat.setDecimalFormatSymbols(symbols); + } + + return numberFormatter; + } }; private final String prefix; - private final DatagramSocket clientSocket; + private final DatagramChannel clientChannel; private final StatsDClientErrorHandler handler; + private final String constantTagsRendered; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); - @Override public Thread newThread(Runnable r) { - Thread result = delegate.newThread(r); + @Override public Thread newThread(final Runnable r) { + final Thread result = delegate.newThread(r); result.setName("StatsD-" + result.getName()); + result.setDaemon(true); return result; } }); + private final BlockingQueue queue; + /** * Create a new StatsD client communicating with a StatsD instance on the * specified host and port. All messages send via this client will have @@ -62,7 +110,7 @@ public final class NonBlockingStatsDClient implements StatsDClient { * be established. Once a client has been instantiated in this way, all * exceptions thrown during subsequent usage are consumed, guaranteeing * that failures in metrics will not affect normal code execution. - * + * * @param prefix * the prefix to apply to keys sent via this client * @param hostname @@ -72,10 +120,87 @@ public final class NonBlockingStatsDClient implements StatsDClient { * @throws StatsDClientException * if the client could not be started */ - public NonBlockingStatsDClient(String prefix, String hostname, int port) throws StatsDClientException { - this(prefix, hostname, port, NO_OP_HANDLER); + public NonBlockingStatsDClient(final String prefix, final String hostname, final int port) throws StatsDClientException { + this(prefix, hostname, port, Integer.MAX_VALUE); } - + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are consumed, guaranteeing + * that failures in metrics will not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @param queueSize + * the maximum amount of unprocessed messages in the BlockingQueue. + * @throws StatsDClientException + * if the client could not be started + */ + public NonBlockingStatsDClient(final String prefix, final String hostname, final int port, final int queueSize) throws StatsDClientException { + this(prefix, hostname, port, queueSize, null, null); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are consumed, guaranteeing + * that failures in metrics will not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @param constantTags + * tags to be added to all content sent + * @throws StatsDClientException + * if the client could not be started + */ + public NonBlockingStatsDClient(final String prefix, final String hostname, final int port, final String... constantTags) throws StatsDClientException { + this(prefix, hostname, port, Integer.MAX_VALUE, constantTags, null); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are consumed, guaranteeing + * that failures in metrics will not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @param constantTags + * tags to be added to all content sent + * @param queueSize + * the maximum amount of unprocessed messages in the BlockingQueue. + * @throws StatsDClientException + * if the client could not be started + */ + public NonBlockingStatsDClient(final String prefix, final String hostname, final int port, final int queueSize, final String... constantTags) throws StatsDClientException { + this(prefix, hostname, port, queueSize, constantTags, null); + } + /** * Create a new StatsD client communicating with a StatsD instance on the * specified host and port. All messages send via this client will have @@ -86,28 +211,112 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws * exceptions thrown during subsequent usage are passed to the specified * handler and then consumed, guaranteeing that failures in metrics will * not affect normal code execution. - * + * * @param prefix * the prefix to apply to keys sent via this client * @param hostname * the host name of the targeted StatsD server * @param port * the port of the targeted StatsD server + * @param constantTags + * tags to be added to all content sent * @param errorHandler - * handler to use when an exception occurs during usage + * handler to use when an exception occurs during usage, may be null to indicate noop * @throws StatsDClientException * if the client could not be started */ - public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException { - this.prefix = prefix; - this.handler = errorHandler; - + public NonBlockingStatsDClient(final String prefix, final String hostname, final int port, + final String[] constantTags, final StatsDClientErrorHandler errorHandler) throws StatsDClientException { + this(prefix, Integer.MAX_VALUE, constantTags, errorHandler, staticStatsDAddressResolution(hostname, port)); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are passed to the specified + * handler and then consumed, guaranteeing that failures in metrics will + * not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param hostname + * the host name of the targeted StatsD server + * @param port + * the port of the targeted StatsD server + * @param constantTags + * tags to be added to all content sent + * @param errorHandler + * handler to use when an exception occurs during usage, may be null to indicate noop + * @param queueSize + * the maximum amount of unprocessed messages in the BlockingQueue. + * @throws StatsDClientException + * if the client could not be started + */ + public NonBlockingStatsDClient(final String prefix, final String hostname, final int port, final int queueSize, + final String[] constantTags, final StatsDClientErrorHandler errorHandler) throws StatsDClientException { + this(prefix, queueSize, constantTags, errorHandler, staticStatsDAddressResolution(hostname, port)); + } + + /** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are passed to the specified + * handler and then consumed, guaranteeing that failures in metrics will + * not affect normal code execution. + * + * @param prefix + * the prefix to apply to keys sent via this client + * @param constantTags + * tags to be added to all content sent + * @param errorHandler + * handler to use when an exception occurs during usage, may be null to indicate noop + * @param addressLookup + * yields the IP address and socket of the StatsD server + * @param queueSize + * the maximum amount of unprocessed messages in the BlockingQueue. + * @throws StatsDClientException + * if the client could not be started + */ + public NonBlockingStatsDClient(final String prefix, final int queueSize, String[] constantTags, final StatsDClientErrorHandler errorHandler, + final Callable addressLookup) throws StatsDClientException { + if((prefix != null) && (!prefix.isEmpty())) { + this.prefix = String.format("%s.", prefix); + } else { + this.prefix = ""; + } + if(errorHandler == null) { + handler = NO_OP_HANDLER; + } + else { + handler = errorHandler; + } + + /* Empty list should be null for faster comparison */ + if((constantTags != null) && (constantTags.length == 0)) { + constantTags = null; + } + + if(constantTags != null) { + constantTagsRendered = tagString(constantTags, null); + } else { + constantTagsRendered = null; + } + try { - this.clientSocket = new DatagramSocket(); - this.clientSocket.connect(new InetSocketAddress(hostname, port)); - } catch (Exception e) { + clientChannel = DatagramChannel.open(); + } catch (final Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); } + queue = new LinkedBlockingQueue(queueSize); + executor.submit(new QueueConsumer(addressLookup)); } /** @@ -120,139 +329,472 @@ public void stop() { executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); } - catch (Exception e) { + catch (final Exception e) { handler.handle(e); } finally { - if (clientSocket != null) { - clientSocket.close(); + if (clientChannel != null) { + try { + clientChannel.close(); + } + catch (final IOException e) { + handler.handle(e); + } } } } + /** + * Generate a suffix conveying the given tag list to the client + */ + static String tagString(final String[] tags, final String tagPrefix) { + final StringBuilder sb; + if(tagPrefix != null) { + if((tags == null) || (tags.length == 0)) { + return tagPrefix; + } + sb = new StringBuilder(tagPrefix); + sb.append(","); + } else { + if((tags == null) || (tags.length == 0)) { + return ""; + } + sb = new StringBuilder("|#"); + } + + for(int n=tags.length - 1; n>=0; n--) { + sb.append(tags[n]); + if(n > 0) { + sb.append(","); + } + } + return sb.toString(); + } + + /** + * Generate a suffix conveying the given tag list to the client + */ + String tagString(final String[] tags) { + return tagString(tags, constantTagsRendered); + } + /** * Adjusts the specified counter by a given delta. - * + * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the counter to adjust * @param delta * the amount to adjust the counter by + * @param tags + * array of tags to be added to the data */ @Override - public void count(String aspect, int delta) { - send(String.format("%s.%s:%d|c", prefix, aspect, delta)); + public void count(final String aspect, final long delta, final String... tags) { + send(String.format("%s%s:%d|c%s", prefix, aspect, delta, tagString(tags))); } /** * Increments the specified counter by one. - * + * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the counter to increment + * @param tags + * array of tags to be added to the data */ @Override - public void incrementCounter(String aspect) { - count(aspect, 1); + public void incrementCounter(final String aspect, final String... tags) { + count(aspect, 1, tags); } /** - * Convenience method equivalent to {@link #incrementCounter(String)}. + * Convenience method equivalent to {@link #incrementCounter(String, String[])}. */ @Override - public void increment(String aspect) { - incrementCounter(aspect); + public void increment(final String aspect, final String... tags) { + incrementCounter(aspect, tags); } /** * Decrements the specified counter by one. - * + * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the counter to decrement + * @param tags + * array of tags to be added to the data + */ + @Override + public void decrementCounter(final String aspect, final String... tags) { + count(aspect, -1, tags); + } + + /** + * Convenience method equivalent to {@link #decrementCounter(String, String[])}. + */ + @Override + public void decrement(final String aspect, final String... tags) { + decrementCounter(aspect, tags); + } + + /** + * Records the latest fixed value for the specified named gauge. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param tags + * array of tags to be added to the data */ @Override - public void decrementCounter(String aspect) { - count(aspect, -1); + public void recordGaugeValue(final String aspect, final double value, final String... tags) { + /* Intentionally using %s rather than %f here to avoid + * padding with extra 0s to represent precision */ + send(String.format("%s%s:%s|g%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); } /** - * Convenience method equivalent to {@link #decrementCounter(String)}. + * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. */ @Override - public void decrement(String aspect) { - decrementCounter(aspect); + public void gauge(final String aspect, final double value, final String... tags) { + recordGaugeValue(aspect, value, tags); } + /** * Records the latest fixed value for the specified named gauge. - * + * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the gauge * @param value * the new reading of the gauge + * @param tags + * array of tags to be added to the data */ @Override - public void recordGaugeValue(String aspect, int value) { - send(String.format("%s.%s:%d|g", prefix, aspect, value)); + public void recordGaugeValue(final String aspect, final long value, final String... tags) { + send(String.format("%s%s:%d|g%s", prefix, aspect, value, tagString(tags))); } /** - * Convenience method equivalent to {@link #recordGaugeValue(String, int)}. + * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. */ @Override - public void gauge(String aspect, int value) { - recordGaugeValue(aspect, value); + public void gauge(final String aspect, final long value, final String... tags) { + recordGaugeValue(aspect, value, tags); } /** * Records an execution time in milliseconds for the specified named operation. - * + * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the timed operation * @param timeInMs * the time in milliseconds + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordExecutionTime(final String aspect, final long timeInMs, final String... tags) { + send(String.format("%s%s:%d|ms%s", prefix, aspect, timeInMs, tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordExecutionTime(String, long, String[])}. + */ + @Override + public void time(final String aspect, final long value, final String... tags) { + recordExecutionTime(aspect, value, tags); + } + + /** + * Records a value for the specified named histogram. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + @Override + public void recordHistogramValue(final String aspect, final double value, final String... tags) { + /* Intentionally using %s rather than %f here to avoid + * padding with extra 0s to represent precision */ + send(String.format("%s%s:%s|h%s", prefix, aspect, NUMBER_FORMATTERS.get().format(value), tagString(tags))); + } + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, double, String[])}. + */ + @Override + public void histogram(final String aspect, final double value, final String... tags) { + recordHistogramValue(aspect, value, tags); + } + + /** + * Records a value for the specified named histogram. + * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data */ @Override - public void recordExecutionTime(String aspect, int timeInMs) { - send(String.format("%s.%s:%d|ms", prefix, aspect, timeInMs)); + public void recordHistogramValue(final String aspect, final long value, final String... tags) { + send(String.format("%s%s:%d|h%s", prefix, aspect, value, tagString(tags))); } /** - * Convenience method equivalent to {@link #recordExecutionTime(String, int)}. + * Convenience method equivalent to {@link #recordHistogramValue(String, long, String[])}. */ @Override - public void time(String aspect, int value) { - recordExecutionTime(aspect, value); + public void histogram(final String aspect, final long value, final String... tags) { + recordHistogramValue(aspect, value, tags); + } + + private String eventMap(final Event event) { + final StringBuilder res = new StringBuilder(""); + + final long millisSinceEpoch = event.getMillisSinceEpoch(); + if (millisSinceEpoch != -1) { + res.append("|d:").append(millisSinceEpoch / 1000); + } + + final String hostname = event.getHostname(); + if (hostname != null) { + res.append("|h:").append(hostname); + } + + final String aggregationKey = event.getAggregationKey(); + if (aggregationKey != null) { + res.append("|k:").append(aggregationKey); + } + + final String priority = event.getPriority(); + if (priority != null) { + res.append("|p:").append(priority); + } + + final String alertType = event.getAlertType(); + if (alertType != null) { + res.append("|t:").append(alertType); + } + + return res.toString(); + } + + /** + * Records an event + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param event + * The event to record + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#events-1 + */ + @Override + public void recordEvent(final Event event, final String... tags) { + final String title = escapeEventString(prefix + event.getTitle()); + final String text = escapeEventString(event.getText()); + send(String.format("_e{%d,%d}:%s|%s%s%s", + title.length(), text.length(), title, text, eventMap(event), tagString(tags))); + } + + private String escapeEventString(final String title) { + return title.replace("\n", "\\n"); + } + + /** + * Records a run status for the specified named service check. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param sc + * the service check object + */ + @Override + public void recordServiceCheckRun(final ServiceCheck sc) { + send(toStatsDString(sc)); + } + + private String toStatsDString(final ServiceCheck sc) { + // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks + final StringBuilder sb = new StringBuilder(); + sb.append(String.format("_sc|%s|%d", sc.getName(), sc.getStatus())); + if (sc.getTimestamp() > 0) { + sb.append(String.format("|d:%d", sc.getTimestamp())); + } + if (sc.getHostname() != null) { + sb.append(String.format("|h:%s", sc.getHostname())); + } + sb.append(tagString(sc.getTags())); + if (sc.getMessage() != null) { + sb.append(String.format("|m:%s", sc.getEscapedMessage())); + } + return sb.toString(); + } + + /** + * Convenience method equivalent to {@link #recordServiceCheckRun(ServiceCheck sc)}. + */ + @Override + public void serviceCheck(final ServiceCheck sc) { + recordServiceCheckRun(sc); + } + + + /** + * Records a value for the specified set. + * + * Sets are used to count the number of unique elements in a group. If you want to track the number of + * unique visitor to your site, sets are a great way to do that. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the set + * @param value + * the value to track + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#sets + */ + @Override + public void recordSetValue(final String aspect, final String value, final String... tags) { + // documentation is light, but looking at dogstatsd source, we can send string values + // here instead of numbers + send(String.format("%s%s:%s|s%s", prefix, aspect, value, tagString(tags))); } private void send(final String message) { - try { - executor.execute(new Runnable() { - @Override public void run() { - blockingSend(message); + queue.offer(message); + } + + public static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8"); + + + private class QueueConsumer implements Runnable { + private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES); + + private final Callable addressLookup; + + QueueConsumer(final Callable addressLookup) { + this.addressLookup = addressLookup; + } + + @Override public void run() { + while(!executor.isShutdown()) { + try { + final String message = queue.poll(1, TimeUnit.SECONDS); + if(null != message) { + final InetSocketAddress address = addressLookup.call(); + final byte[] data = message.getBytes(MESSAGE_CHARSET); + if(sendBuffer.remaining() < (data.length + 1)) { + blockingSend(address); + } + if(sendBuffer.position() > 0) { + sendBuffer.put( (byte) '\n'); + } + sendBuffer.put(data); + if(null == queue.peek()) { + blockingSend(address); + } + } + } catch (final Exception e) { + handler.handle(e); } - }); + } } - catch (Exception e) { - handler.handle(e); + + private void blockingSend(final InetSocketAddress address) throws IOException { + final int sizeOfBuffer = sendBuffer.position(); + sendBuffer.flip(); + + final int sentBytes = clientChannel.send(sendBuffer, address); + sendBuffer.limit(sendBuffer.capacity()); + sendBuffer.rewind(); + + if (sizeOfBuffer != sentBytes) { + handler.handle( + new IOException( + String.format( + "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", + sendBuffer.toString(), + address.getHostName(), + address.getPort(), + sentBytes, + sizeOfBuffer))); + } } } - private void blockingSend(String message) { + /** + * Create dynamic lookup for the given host name and port. + * + * @param hostname the host name of the targeted StatsD server + * @param port the port of the targeted StatsD server + * @return a function to perform the lookup + * @see NonBlockingStatsDClient#NonBlockingStatsDClient(String, String[], StatsDClientErrorHandler, Callable) + */ + public static Callable volatileAddressResolution(final String hostname, final int port) { + return new Callable() { + @Override public InetSocketAddress call() throws UnknownHostException { + return new InetSocketAddress(InetAddress.getByName(hostname), port); + } + }; + } + + /** + * Lookup the address for the given host name and cache the result. + * + * @param hostname the host name of the targeted StatsD server + * @param port the port of the targeted StatsD server + * @return a function that cached the result of the lookup + * @throws Exception if the lookup fails, i.e. {@link UnknownHostException} + */ + public static Callable staticAddressResolution(final String hostname, final int port) throws Exception { + final InetSocketAddress address = volatileAddressResolution(hostname, port).call(); + return new Callable() { + @Override public InetSocketAddress call() { + return address; + } + }; + } + + private static Callable staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException { try { - final byte[] sendData = message.getBytes(); - final DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length); - clientSocket.send(sendPacket); - } catch (Exception e) { - handler.handle(e); + return staticAddressResolution(hostname, port); + } catch (final Exception e) { + throw new StatsDClientException("Failed to lookup StatsD host", e); } } } diff --git a/src/main/java/com/timgroup/statsd/ServiceCheck.java b/src/main/java/com/timgroup/statsd/ServiceCheck.java new file mode 100644 index 0000000..ab223c5 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/ServiceCheck.java @@ -0,0 +1,103 @@ +package com.timgroup.statsd; + +/** + * A service check model, which is used to format a service check message + * sent to the datadog agent + */ +public class ServiceCheck { + + public enum Status { + OK(0), WARNING(1), CRITICAL(2), UNKNOWN(3); + + private final int val; + Status(final int val) { + this.val = val; + } + } + + private String name, hostname, message; + + private int checkRunId, timestamp; + + private Status status; + + private String[] tags; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + final ServiceCheck res = new ServiceCheck(); + + public Builder withName(final String name) { + res.name = name; + return this; + } + + public Builder withHostname(final String hostname) { + res.hostname = hostname; + return this; + } + + public Builder withMessage(final String message) { + res.message = message; + return this; + } + + public Builder withCheckRunId(final int checkRunId) { + res.checkRunId = checkRunId; + return this; + } + + public Builder withTimestamp(final int timestamp) { + res.timestamp = timestamp; + return this; + } + + public Builder withStatus(final Status status) { + res.status = status; + return this; + } + + public Builder withTags(final String[] tags) { + res.tags = tags; + return this; + } + + public ServiceCheck build() { + return res; + } + } + + private ServiceCheck() { + } + + public String getName() { + return name; + } + + public int getStatus() { + return status.val; + } + + public String getMessage() { + return message; + } + + public String getEscapedMessage() { + return message.replace("\n", "\\n").replace("m:", "m\\:"); + } + + public String getHostname() { + return hostname; + } + + public int getTimestamp() { + return timestamp; + } + + public String[] getTags() { + return tags; + } +} diff --git a/src/main/java/com/timgroup/statsd/StatsDClient.java b/src/main/java/com/timgroup/statsd/StatsDClient.java index ef8bd2a..383f216 100644 --- a/src/main/java/com/timgroup/statsd/StatsDClient.java +++ b/src/main/java/com/timgroup/statsd/StatsDClient.java @@ -3,7 +3,7 @@ /** * Describes a client connection to a StatsD server, which may be used to post metrics * in the form of counters, timers, and gauges. - * + * *

Three key methods are provided for the submission of data-points for the application under * scrutiny: *

    @@ -11,7 +11,7 @@ *
  • {@link #recordGaugeValue} - records the latest fixed value for the specified named gauge
  • *
  • {@link #recordExecutionTime} - records an execution time in milliseconds for the specified named operation
  • *
- * + * * @author Tom Denley * */ @@ -25,78 +25,207 @@ public interface StatsDClient { /** * Adjusts the specified counter by a given delta. - * + * + *

This method is a DataDog extension, and may not work with other servers.

+ * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the counter to adjust * @param delta * the amount to adjust the counter by + * @param tags + * array of tags to be added to the data */ - void count(String aspect, int delta); + void count(String aspect, long delta, String... tags); /** * Increments the specified counter by one. - * + * + *

This method is a DataDog extension, and may not work with other servers.

+ * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the counter to increment + * @param tags + * array of tags to be added to the data */ - void incrementCounter(String aspect); + void incrementCounter(String aspect, String... tags); /** - * Convenience method equivalent to {@link #incrementCounter(String)}. + * Convenience method equivalent to {@link #incrementCounter(String, String[])}. */ - void increment(String aspect); + void increment(String aspect, String... tags); /** * Decrements the specified counter by one. - * + * + *

This method is a DataDog extension, and may not work with other servers.

+ * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the counter to decrement + * @param tags + * array of tags to be added to the data + */ + void decrementCounter(String aspect, String... tags); + + /** + * Convenience method equivalent to {@link #decrementCounter(String, String[])}. + */ + void decrement(String aspect, String... tags); + + /** + * Records the latest fixed value for the specified named gauge. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge */ - void decrementCounter(String aspect); + void recordGaugeValue(String aspect, double value, String... tags); /** - * Convenience method equivalent to {@link #decrementCounter(String)}. + * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. */ - void decrement(String aspect); + void gauge(String aspect, double value, String... tags); /** * Records the latest fixed value for the specified named gauge. - * + * + *

This method is a DataDog extension, and may not work with other servers.

+ * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the gauge * @param value * the new reading of the gauge */ - void recordGaugeValue(String aspect, int value); + void recordGaugeValue(String aspect, long value, String... tags); /** - * Convenience method equivalent to {@link #recordGaugeValue(String, int)}. + * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. */ - void gauge(String aspect, int value); + void gauge(String aspect, long value, String... tags); /** * Records an execution time in milliseconds for the specified named operation. - * + * + *

This method is a DataDog extension, and may not work with other servers.

+ * *

This method is non-blocking and is guaranteed not to throw an exception.

- * + * * @param aspect * the name of the timed operation * @param timeInMs * the time in milliseconds + * @param tags + * array of tags to be added to the data + */ + void recordExecutionTime(String aspect, long timeInMs, String... tags); + + /** + * Convenience method equivalent to {@link #recordExecutionTime(String, long, String[])}. + */ + void time(String aspect, long value, String... tags); + + /** + * Records a value for the specified named histogram. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + void recordHistogramValue(String aspect, double value, String... tags); + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, double, String[])}. + */ + void histogram(String aspect, double value, String... tags); + + /** + * Records a value for the specified named histogram. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the histogram + * @param value + * the value to be incorporated in the histogram + * @param tags + * array of tags to be added to the data + */ + void recordHistogramValue(String aspect, long value, String... tags); + + /** + * Convenience method equivalent to {@link #recordHistogramValue(String, long, String[])}. + */ + void histogram(String aspect, long value, String... tags); + + /** + * Records an event + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param event + * The event to record + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#events-1 + */ + void recordEvent(Event event, String... tags); + + /** + * Records a run status for the specified named service check. + * + * @param sc + * the service check object */ - void recordExecutionTime(String aspect, int timeInMs); + void recordServiceCheckRun(ServiceCheck sc); /** - * Convenience method equivalent to {@link #recordExecutionTime(String, int)}. + * Convenience method equivalent to {@link #recordServiceCheckRun(ServiceCheck sc)}. + */ + void serviceCheck(ServiceCheck sc); + + /** + * Records a value for the specified set. + * + * Sets are used to count the number of unique elements in a group. If you want to track the number of + * unique visitor to your site, sets are a great way to do that. + * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the set + * @param value + * the value to track + * @param tags + * array of tags to be added to the data + * + * @see http://docs.datadoghq.com/guides/dogstatsd/#sets */ - void time(String aspect, int value); + void recordSetValue(String aspect, String value, String... tags); -} \ No newline at end of file +} diff --git a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java new file mode 100644 index 0000000..fa3f0e2 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java @@ -0,0 +1,54 @@ + +package com.timgroup.statsd; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; + + +final class DummyStatsDServer { + private final List messagesReceived = new ArrayList(); + private final DatagramSocket server; + + public DummyStatsDServer(int port) throws SocketException { + server = new DatagramSocket(port); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while(!server.isClosed()) { + try { + final DatagramPacket packet = new DatagramPacket(new byte[1500], 1500); + server.receive(packet); + for(String msg : new String(packet.getData(), NonBlockingStatsDClient.MESSAGE_CHARSET).split("\n")) { + messagesReceived.add(msg.trim()); + } + } catch (IOException e) { + } + } + } + }); + thread.setDaemon(true); + thread.start(); + } + + public void waitForMessage() { + while (messagesReceived.isEmpty()) { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + } + } + } + + public List messagesReceived() { + return new ArrayList(messagesReceived); + } + + public void close() { + server.close(); + } + +} diff --git a/src/test/java/com/timgroup/statsd/EventTest.java b/src/test/java/com/timgroup/statsd/EventTest.java new file mode 100644 index 0000000..7ff1f57 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/EventTest.java @@ -0,0 +1,83 @@ +package com.timgroup.statsd; + +import org.junit.Test; + +import java.util.Date; + +import static org.junit.Assert.assertEquals; + +public class EventTest { + @Test + public void builds() { + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + + assertEquals("title1", event.getTitle()); + assertEquals("text1", event.getText()); + assertEquals(1234, event.getMillisSinceEpoch()); + assertEquals("host1", event.getHostname()); + assertEquals("low", event.getPriority()); + assertEquals("key1", event.getAggregationKey()); + assertEquals("error", event.getAlertType()); + } + + @Test + public void builds_with_defaults() { + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .build(); + + assertEquals("title1", event.getTitle()); + assertEquals("text1", event.getText()); + assertEquals(-1, event.getMillisSinceEpoch()); + assertEquals(null, event.getHostname()); + assertEquals(null, event.getPriority()); + assertEquals(null, event.getAggregationKey()); + assertEquals(null, event.getAlertType()); + } + + @Test (expected = IllegalStateException.class) + public void fails_without_title() { + Event.builder().withText("text1") + .withDate(1234) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + } + + @Test (expected = IllegalStateException.class) + public void fails_without_text() { + Event.builder().withTitle("title1") + .withDate(1234) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + } + + @Test + public void builds_with_date() { + final long expectedMillis = 1234567000; + final Date date = new Date(expectedMillis); + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(date) + .build(); + + assertEquals("title1", event.getTitle()); + assertEquals("text1", event.getText()); + assertEquals(expectedMillis, event.getMillisSinceEpoch()); + } +} diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java new file mode 100644 index 0000000..70c1219 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java @@ -0,0 +1,59 @@ +package com.timgroup.statsd; + + +import java.net.SocketException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public final class NonBlockingStatsDClientPerfTest { + + + private static final int STATSD_SERVER_PORT = 17255; + private static final Random RAND = new Random(); + private final NonBlockingStatsDClient client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT); + private final ExecutorService executor = Executors.newFixedThreadPool(20); + private DummyStatsDServer server; + + @Before + public void start() throws SocketException { + server = new DummyStatsDServer(STATSD_SERVER_PORT); + } + + @After + public void stop() throws Exception { + client.stop(); + server.close(); + } + + @Test(timeout=30000) + public void perf_test() throws Exception { + + int testSize = 10000; + for(int i = 0; i < testSize; ++i) { + executor.submit(new Runnable() { + public void run() { + client.count("mycount", RAND.nextInt()); + } + }); + + } + + executor.shutdown(); + executor.awaitTermination(20, TimeUnit.SECONDS); + + for(int i = 0; i < 20000 && server.messagesReceived().size() < testSize; i += 50) { + try { + Thread.sleep(50); + } catch (InterruptedException ex) {} + } + + assertEquals(testSize, server.messagesReceived().size()); + } +} diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 4e879ef..95799d1 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1,104 +1,432 @@ package com.timgroup.statsd; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; -import java.net.DatagramPacket; -import java.net.DatagramSocket; import java.net.SocketException; -import java.util.ArrayList; -import java.util.List; +import java.util.Locale; -import org.junit.After; -import org.junit.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; public class NonBlockingStatsDClientTest { private static final int STATSD_SERVER_PORT = 17254; private final NonBlockingStatsDClient client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT); + private DummyStatsDServer server; + + @Before + public void start() throws SocketException { + server = new DummyStatsDServer(STATSD_SERVER_PORT); + } @After public void stop() throws Exception { client.stop(); + server.close(); } @Test(timeout=5000L) public void sends_counter_value_to_statsd() throws Exception { - final DummyStatsDServer server = new DummyStatsDServer(STATSD_SERVER_PORT); - + + + client.count("mycount", 24); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); + } + + @Test(timeout=5000L) public void + sends_counter_value_to_statsd_with_null_tags() throws Exception { + + + client.count("mycount", 24, (java.lang.String[]) null); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); + } + + @Test(timeout=5000L) public void + sends_counter_value_to_statsd_with_empty_tags() throws Exception { + + client.count("mycount", 24); server.waitForMessage(); - + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c")); } + @Test(timeout=5000L) public void + sends_counter_value_to_statsd_with_tags() throws Exception { + + + client.count("mycount", 24, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mycount:24|c|#baz,foo:bar")); + } + @Test(timeout=5000L) public void sends_counter_increment_to_statsd() throws Exception { - final DummyStatsDServer server = new DummyStatsDServer(STATSD_SERVER_PORT); - + + client.incrementCounter("myinc"); server.waitForMessage(); - + assertThat(server.messagesReceived(), contains("my.prefix.myinc:1|c")); } + @Test(timeout=5000L) public void + sends_counter_increment_to_statsd_with_tags() throws Exception { + + + client.incrementCounter("myinc", "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myinc:1|c|#baz,foo:bar")); + } + @Test(timeout=5000L) public void sends_counter_decrement_to_statsd() throws Exception { - final DummyStatsDServer server = new DummyStatsDServer(STATSD_SERVER_PORT); - + + client.decrementCounter("mydec"); server.waitForMessage(); - + assertThat(server.messagesReceived(), contains("my.prefix.mydec:-1|c")); } + @Test(timeout=5000L) public void + sends_counter_decrement_to_statsd_with_tags() throws Exception { + + + client.decrementCounter("mydec", "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mydec:-1|c|#baz,foo:bar")); + } + @Test(timeout=5000L) public void sends_gauge_to_statsd() throws Exception { - final DummyStatsDServer server = new DummyStatsDServer(STATSD_SERVER_PORT); - + + client.recordGaugeValue("mygauge", 423); server.waitForMessage(); - + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:423|g")); } + @Test(timeout=5000L) public void + sends_large_double_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 123456789012345.67890); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:123456789012345.67|g")); + } + + @Test(timeout=5000L) public void + sends_exact_double_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 123.45678901234567890); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:123.456789|g")); + } + + @Test(timeout=5000L) public void + sends_double_gauge_to_statsd() throws Exception { + + + client.recordGaugeValue("mygauge", 0.423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:0.423|g")); + } + + @Test(timeout=5000L) public void + sends_gauge_to_statsd_with_tags() throws Exception { + + + client.recordGaugeValue("mygauge", 423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:423|g|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_double_gauge_to_statsd_with_tags() throws Exception { + + + client.recordGaugeValue("mygauge", 0.423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:0.423|g|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_histogram_to_statsd() throws Exception { + + + client.recordHistogramValue("myhistogram", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:423|h")); + } + + @Test(timeout=5000L) public void + sends_double_histogram_to_statsd() throws Exception { + + + client.recordHistogramValue("myhistogram", 0.423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:0.423|h")); + } + + @Test(timeout=5000L) public void + sends_histogram_to_statsd_with_tags() throws Exception { + + + client.recordHistogramValue("myhistogram", 423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:423|h|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_double_histogram_to_statsd_with_tags() throws Exception { + + + client.recordHistogramValue("myhistogram", 0.423, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myhistogram:0.423|h|#baz,foo:bar")); + } + @Test(timeout=5000L) public void sends_timer_to_statsd() throws Exception { - final DummyStatsDServer server = new DummyStatsDServer(STATSD_SERVER_PORT); - + + client.recordExecutionTime("mytime", 123); server.waitForMessage(); - + assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms")); } - private static final class DummyStatsDServer { - private final List messagesReceived = new ArrayList(); - private final DatagramSocket server; - - public DummyStatsDServer(int port) throws SocketException { - server = new DatagramSocket(port); - new Thread(new Runnable() { - @Override public void run() { - try { - final DatagramPacket packet = new DatagramPacket(new byte[256], 256); - server.receive(packet); - messagesReceived.add(new String(packet.getData()).trim()); - server.close(); - } catch (Exception e) { } - } - }).start(); - } - - public void waitForMessage() { - while (messagesReceived.isEmpty()) { - try { - Thread.sleep(50L); - } catch (InterruptedException e) {}} - } - - public List messagesReceived() { - return new ArrayList(messagesReceived); + /** + * A regression test for this i18n number formatting bug + * @throws Exception + */ + @Test public void + sends_timer_to_statsd_from_locale_with_unamerican_number_formatting() throws Exception { + + Locale originalDefaultLocale = Locale.getDefault(); + + // change the default Locale to one that uses something other than a '.' as the decimal separator (Germany uses a comma) + Locale.setDefault(Locale.GERMANY); + + try { + + + client.recordExecutionTime("mytime", 123, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms|#baz,foo:bar")); + } finally { + // reset the default Locale in case changing it has side-effects + Locale.setDefault(originalDefaultLocale); } } -} \ No newline at end of file + + + @Test(timeout=5000L) public void + sends_timer_to_statsd_with_tags() throws Exception { + + + client.recordExecutionTime("mytime", 123, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mytime:123|ms|#baz,foo:bar")); + } + + + @Test(timeout=5000L) public void + sends_gauge_mixed_tags() throws Exception { + + final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT, Integer.MAX_VALUE, "instance:foo", "app:bar"); + empty_prefix_client.gauge("value", 423, "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.value:423|g|#app:bar,instance:foo,baz")); + } + + @Test(timeout=5000L) public void + sends_gauge_constant_tags_only() throws Exception { + + final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT, Integer.MAX_VALUE, "instance:foo", "app:bar"); + empty_prefix_client.gauge("value", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.value:423|g|#app:bar,instance:foo")); + } + + @Test(timeout=5000L) public void + sends_gauge_empty_prefix() throws Exception { + + final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("", "localhost", STATSD_SERVER_PORT); + empty_prefix_client.gauge("top.level.value", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("top.level.value:423|g")); + } + + @Test(timeout=5000L) public void + sends_gauge_null_prefix() throws Exception { + + final NonBlockingStatsDClient null_prefix_client = new NonBlockingStatsDClient(null, "localhost", STATSD_SERVER_PORT); + null_prefix_client.gauge("top.level.value", 423); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("top.level.value:423|g")); + } + + @Test(timeout=5000L) public void + sends_event() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1\nline2") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + client.recordEvent(event); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,12}:my.prefix.title1|text1\\nline2|d:1234567|h:host1|k:key1|p:low|t:error")); + } + + @Test(timeout=5000L) public void + sends_partial_event() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .build(); + client.recordEvent(event); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567")); + } + + @Test(timeout=5000L) public void + sends_event_with_tags() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + client.recordEvent(event, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|h:host1|k:key1|p:low|t:error|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_partial_event_with_tags() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .build(); + client.recordEvent(event, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{16,5}:my.prefix.title1|text1|d:1234567|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_event_empty_prefix() throws Exception { + + final NonBlockingStatsDClient empty_prefix_client = new NonBlockingStatsDClient("", "localhost", STATSD_SERVER_PORT); + final Event event = Event.builder() + .withTitle("title1") + .withText("text1") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .build(); + empty_prefix_client.recordEvent(event, "foo:bar", "baz"); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("_e{6,5}:title1|text1|d:1234567|h:host1|k:key1|p:low|t:error|#baz,foo:bar")); + } + + @Test(timeout=5000L) public void + sends_service_check() throws Exception { + final String inputMessage = "\u266c \u2020\u00f8U \n\u2020\u00f8U \u00a5\u00bau|m: T0\u00b5 \u266a"; // "♬ †øU \n†øU ¥ºu|m: T0µ ♪" + final String outputMessage = "\u266c \u2020\u00f8U \\n\u2020\u00f8U \u00a5\u00bau|m\\: T0\u00b5 \u266a"; // note the escaped colon + final String[] tags = {"key1:val1", "key2:val2"}; + final ServiceCheck sc = ServiceCheck.builder() + .withName("my_check.name") + .withStatus(ServiceCheck.Status.WARNING) + .withMessage(inputMessage) + .withHostname("i-abcd1234") + .withTags(tags) + .withTimestamp(1420740000) + .build(); + + assertEquals(outputMessage, sc.getEscapedMessage()); + + client.serviceCheck(sc); + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains(String.format("_sc|my_check.name|1|d:1420740000|h:i-abcd1234|#key2:val2,key1:val1|m:%s", + outputMessage))); + } + + @Test(timeout=5000L) public void + sends_nan_gauge_to_statsd() throws Exception { + client.recordGaugeValue("mygauge", Double.NaN); + + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.mygauge:NaN|g")); + } + + @Test(timeout=5000L) public void + sends_set_to_statsd() throws Exception { + client.recordSetValue("myset", "myuserid"); + + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myset:myuserid|s")); + + } + + @Test(timeout=5000L) public void + sends_set_to_statsd_with_tags() throws Exception { + client.recordSetValue("myset", "myuserid", "foo:bar", "baz"); + + server.waitForMessage(); + + assertThat(server.messagesReceived(), contains("my.prefix.myset:myuserid|s|#baz,foo:bar")); + + } +}