Skip to content

Commit

Permalink
add support for es-tags-as-fields through env
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Montoya <[email protected]>
  • Loading branch information
jam01 committed Jan 26, 2021
1 parent d518715 commit ec9e957
Show file tree
Hide file tree
Showing 3 changed files with 799 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.jaegertracing.spark.dependencies.Utils;
import io.jaegertracing.spark.dependencies.model.KeyValue;
import io.jaegertracing.spark.dependencies.model.Process;
import io.jaegertracing.spark.dependencies.model.Reference;
import io.jaegertracing.spark.dependencies.model.Span;
import org.apache.parquet.Files;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -38,9 +43,28 @@ public class SpanDeserializer extends StdDeserializer<Span> {

// TODO Spark incorrectly serializes object mapper, therefore reinitializing here
private ObjectMapper objectMapper = JsonHelper.configure(new ObjectMapper());
private final boolean tagsAsFieldsAll;
private final List<String> tagsAsFields;
private final String tagsAsFieldsDotReplacement;

public SpanDeserializer() {
public SpanDeserializer() throws IOException {
super(Span.class);

tagsAsFieldsAll = Boolean.parseBoolean(Utils.getEnv("ES_TAGS_AS_FIELDS_ALL", "false"));
tagsAsFieldsDotReplacement = Utils.getEnv("ES_TAGS_AS_FIELDS_DOT_REPLACEMENT", "@");
tagsAsFields = new ArrayList<>();

if (!tagsAsFieldsAll) {
String configFile = Utils.getEnv("ES_TAGS_AS_FIELDS_CONFIG_FILE", null);
if (configFile != null) {
tagsAsFields.addAll(Files.readAllLines(new File(configFile), Charset.defaultCharset()));
}

String include = Utils.getEnv("ES_TAGS_AS_FIELDS_INCLUDE", null);
if (include != null) {
tagsAsFields.addAll(Arrays.asList(include.split(",")));
}
}
}

@Override
Expand Down Expand Up @@ -78,14 +102,26 @@ private List<KeyValue> addTagFields(List<KeyValue> tags, Map<String, Object> tag
result.addAll(tags);
List<KeyValue> collect = tagFields.entrySet().stream().map(stringObjectEntry -> {
KeyValue kv = new KeyValue();
kv.setKey(stringObjectEntry.getKey());
kv.setKey(mapTag(stringObjectEntry.getKey()));
kv.setValueString(stringObjectEntry.getValue().toString());
return kv;
}).collect(Collectors.toList());
result.addAll(collect);
return result;
}

private String mapTag(String value) {
if (tagsAsFieldsAll) {
return value.replaceAll(tagsAsFieldsDotReplacement, ".");
}

if (tagsAsFields.contains(value)) {
return value.replaceAll(tagsAsFieldsDotReplacement, ".");
}

return value;
}

private List<Reference> deserializeReferences(JsonNode node) throws JsonProcessingException {
List<Reference> references = new ArrayList<>();
JsonNode parentSpanID = node.get("parentSpanID");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2017 The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.jaegertracing.spark.dependencies.elastic;

import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.jaegertracing.spark.dependencies.DependenciesSparkHelper;
import io.jaegertracing.spark.dependencies.elastic.json.JsonHelper;
import io.jaegertracing.spark.dependencies.model.Dependency;
import io.jaegertracing.spark.dependencies.model.Span;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;

public class TagsAsFieldsTest {
@Test
public void shouldMapAllTags() throws IOException, ReflectiveOperationException {
ObjectMapper objectMapper = JsonHelper.configure(new ObjectMapper());
updateEnv("ES_TAGS_AS_FIELDS_ALL", "true");

List<Span> spans = objectMapper.readValue(this.getClass().getClassLoader().getResource("spans.json"), new TypeReference<List<Span>>(){});

JavaSparkContext context = new JavaSparkContext("local[*]", "test");
JavaPairRDD<String, Iterable<Span>> traces = context.parallelize(spans)
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces, "placeholder");

context.stop();

assertEquals(10, dependencyLinks.size());
}

@SuppressWarnings("unchecked")
private static void updateEnv(String name, String val) throws ReflectiveOperationException {
Map<String, String> env = System.getenv();
Field field = env.getClass().getDeclaredField("m");
field.setAccessible(true);
((Map<String, String>) field.get(env)).put(name, val);
}
}
Loading

0 comments on commit ec9e957

Please sign in to comment.