Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating a way to set a custom thread preffix #54

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
bin
build

.idea/
39 changes: 25 additions & 14 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

/**
* A simple StatsD client implementation facilitating metrics recording.
*
*
* <p>Upon instantiation, this client will establish a socket connection to a StatsD instance
* running on the specified host and port. Metrics are then sent over this connection as they are
* received by the client.
* </p>
*
*
* <p>Three key methods are provided for the submission of data-points for the application under
* scrutiny:
* <ul>
Expand All @@ -23,10 +23,10 @@
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
* not to throw an exception which may disrupt application execution.
* </p>
*
*
* <p>As part of a clean system shutdown, the {@link #stop()} method should be invoked
* on any StatsD clients.</p>
*
*
* @author Tom Denley
*
*/
Expand All @@ -50,7 +50,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
* be established. Once a client has been instantiated in this way, all
* exceptions thrown during subsequent usage are consumed, guaranteeing
* that failures in metrics will not affect normal code execution.
*
*
* @param prefix
* the prefix to apply to keys sent via this client (can be null or empty for no prefix)
* @param hostname
Expand All @@ -74,7 +74,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws
* exceptions thrown during subsequent usage are passed to the specified
* handler and then consumed, guaranteeing that failures in metrics will
* not affect normal code execution.
*
*
* @param prefix
* the prefix to apply to keys sent via this client (can be null or empty for no prefix)
* @param hostname
Expand All @@ -96,6 +96,17 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC
}
}

public NonBlockingStatsDClient(String prefix, String hostname, String threadPreffix,
int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException {
this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + ".");

try {
this.sender = new NonBlockingUdpSender(hostname, threadPreffix, port, STATS_D_ENCODING, errorHandler);
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
}

/**
* Cleanly shut down this StatsD client. This method may throw an exception if
* the socket cannot be closed.
Expand All @@ -107,9 +118,9 @@ public void stop() {

/**
* Adjusts the specified counter by a given delta.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the counter to adjust
* @param delta
Expand All @@ -125,9 +136,9 @@ public void count(String aspect, long delta, double sampleRate) {

/**
* Records the latest fixed value for the specified named gauge.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the gauge
* @param value
Expand Down Expand Up @@ -165,9 +176,9 @@ private void recordGaugeCommon(String aspect, String value, boolean negative, bo
/**
* StatsD supports counting unique occurrences of events between flushes, Call this method to records an occurrence
* of the specified named event.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the set
* @param eventName
Expand All @@ -180,9 +191,9 @@ public void recordSetEvent(String aspect, String eventName) {

/**
* Records an execution time in milliseconds for the specified named operation.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the timed operation
* @param timeInMs
Expand Down
39 changes: 26 additions & 13 deletions src/main/java/com/timgroup/statsd/NonBlockingUdpSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,52 @@ public final class NonBlockingUdpSender {
private final Charset encoding;
private final DatagramChannel clientSocket;
private final ExecutorService executor;
private StatsDClientErrorHandler handler;
private final StatsDClientErrorHandler handler;

public NonBlockingUdpSender(String hostname, int port, Charset encoding, StatsDClientErrorHandler handler) throws IOException {
this.encoding = encoding;
this.handler = handler;
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));

this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
this.executor = createExecutorService("StatsD-");
}

public NonBlockingUdpSender(String hostname, String threadPreffix, int port, Charset encoding, StatsDClientErrorHandler handler) throws IOException {
this.encoding = encoding;
this.handler = handler;
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));

this.executor = createExecutorService(threadPreffix);
}

private static ExecutorService createExecutorService(String threadPreffix) {
return Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {

@Override
public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName("StatsD-" + result.getName());
result.setName(threadPreffix + result.getName());
result.setDaemon(true);
return result;
}
});
}


public void stop() {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
catch (Exception e) {
} catch (Exception e) {
handler.handle(e);
}
finally {
} finally {
if (clientSocket != null) {
try {
clientSocket.close();
}
catch (Exception e) {
} catch (Exception e) {
handler.handle(e);
}
}
Expand All @@ -56,12 +69,12 @@ public void stop() {
public void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
@Override
public void run() {
blockingSend(message);
}
});
}
catch (Exception e) {
} catch (Exception e) {
handler.handle(e);
}
}
Expand Down