Skip to content

Commit

Permalink
Merge branch 'dx_fork' of github.ibm.com:DX/prod-infra-mesos-kafka in…
Browse files Browse the repository at this point in the history
…to joel-dev
  • Loading branch information
Joel Berta committed May 10, 2017
2 parents 4d400d0 + 78a391e commit 327fef4
Show file tree
Hide file tree
Showing 23 changed files with 265 additions and 64 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ project/*
*.ipr
.idea/*
todo.txt
classes
prod-infra-mesos-kafka-*

src/docker/.docker
vagrant/.vagrant
Expand All @@ -14,3 +16,4 @@ kafka-mesos*.properties
kafka-mesos*.jar
jre*.tar.gz
kafka*.tgz
.DS_Store
27 changes: 25 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ plugins {
id 'scala'
id 'idea'
id 'distribution'
id 'maven'
id 'de.undercouch.download' version '2.1.0'
id "com.github.hierynomus.license" version "0.13.1"
id 'com.github.hierynomus.license' version '0.13.1'
}

def scalaVersion = "2.11.8"
Expand Down Expand Up @@ -55,7 +56,9 @@ dependencies {
compile "org.apache.mesos:mesos:$mesosVersion"
compile name: "util-mesos-0.1.0.0"
compile "com.google.protobuf:protobuf-java:2.5.0"
compile "log4j:log4j:1.2.17"
compile group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.7'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.7'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.7'
compile "net.sf.jopt-simple:jopt-simple:4.8"
compile "org.eclipse.jetty:jetty-http:$jettyVersion"
compile "org.eclipse.jetty:jetty-io:$jettyVersion"
Expand Down Expand Up @@ -131,3 +134,23 @@ task downloadKafka(type: Download) {
src "https://archive.apache.org/dist/kafka/${kafkaVersion}/kafka_${kafkaScalaVersion}-${kafkaVersion}.tgz"
dest rootDir
}

// DX added config below here

def artifact = "kafka-mesos-${version}-kafka_$kafkaScalaVersion-${kafkaVersion.replace('_', '.')}"
def uploadUser = project.hasProperty('uploadUser') ? project.getProperty('uploadUser') : ''
def uploadPassword = project.hasProperty('uploadPassword') ? project.getProperty('uploadPassword') : ''
uploadArchives {
repositories {
mavenDeployer {
repository(url: 'http://dx-buildrepository.rtp.raleigh.ibm.com/artifactory/libs-release-local') {
authentication(userName: uploadUser, password: uploadPassword)
}
pom {
groupId = 'com.ibm.dx.kafka'
artifactId = artifact
version = '0.9.5.6'
}
}
}
}
48 changes: 48 additions & 0 deletions log4j.properties.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Sample log4j2.properties file
#

name=PropertiesConfig
property.filename=logs
appenders=socket

# setup the socket appender
appender.socket.type=Socket
appender.socket.name=SOCKET
appender.socket.host=<rsyslog-server>
appender.socket.port=<rsyslog-port>
appender.socket.protocol=tcp

# set the socket appender to use the Rfc5424Layout
appender.socket.layout.type=Rfc5424Layout
appender.socket.layout.appName=MyApp
appender.socket.layout.enterpriseNumber=2
appender.socket.layout.facility=LOCAL0
appender.socket.layout.id=MyApp
appender.socket.layout.includeMDC=true
appender.socket.layout.newLine=false

# add the ability to add custom fields
appender.socket.layout.fields.type=LoggerFields

# add thread, logger, level, exception and broker id
appender.socket.layout.fields.thread.type=KeyValuePair
appender.socket.layout.fields.thread.key=thread
appender.socket.layout.fields.thread.value=%t
appender.socket.layout.fields.logger.type=KeyValuePair
appender.socket.layout.fields.logger.key=logger
appender.socket.layout.fields.logger.value=%c
appender.socket.layout.fields.level.type=KeyValuePair
appender.socket.layout.fields.level.key=level
appender.socket.layout.fields.level.value=%p
appender.socket.layout.fields.exception.type=KeyValuePair
appender.socket.layout.fields.exception.key=exception
appender.socket.layout.fields.exception.value=%enc{%ex}
appender.socket.layout.fields.broker.type=KeyValuePair
appender.socket.layout.fields.broker.key=broker
appender.socket.layout.fields.broker.value=broker-local-0

# set the loggers to use the socket appender
rootLogger.level=debug
rootLogger.appenderRefs=socket
rootLogger.appenderRef.stdout.ref=SOCKET
13 changes: 13 additions & 0 deletions rebase.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

CURRENT_BRANCH="$(git rev-parse --abbrev-ref HEAD)"

git remote add upstream [email protected]:mesos/kafka.git

git fetch upstream && \
git checkout master && \
git pull upstream master && \
git push origin master

git checkout "$CURRENT_BRANCH"
git branch
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@
*/
package ly.stealth.mesos.kafka.interface.impl

import kafka.utils.{ZkUtils => KafkaZkUtils}
import kafka.admin.{BrokerMetadata, AdminUtils => KafkaAdminUtils}
import java.util.Properties

import kafka.admin.{BrokerMetadata, AdminUtils => KafkaAdminUtils}
import kafka.utils.{ZkUtils => KafkaZkUtils}
import ly.stealth.mesos.kafka.interface.{AdminUtilsProxy, FeatureSupport}

import scala.collection.Map


class AdminUtils(zkUrl: String) extends AdminUtilsProxy {


private val DEFAULT_TIMEOUT_MS = 30000
private val zkUtils = KafkaZkUtils(zkUrl, DEFAULT_TIMEOUT_MS, DEFAULT_TIMEOUT_MS, isZkSecurityEnabled = false)

override def addPartitions(name: String, partitions: Int) =
KafkaAdminUtils.addPartitions(zkUtils, name, partitions)

override def fetchAllTopicConfigs(): Map[String, Properties] = KafkaAdminUtils.fetchAllTopicConfigs(zkUtils)

override def createOrUpdateTopicPartitionAssignmentPathInZK(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ trait AdminUtilsProxy {

def fetchAllEntityConfigs(entityType: String): Map[String, Properties]

def addPartitions(name: String, partitions: Int)

def assignReplicasToBrokers(ids: Seq[Int], nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
Expand Down
46 changes: 30 additions & 16 deletions src/scala/main/ly/stealth/mesos/kafka/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Broker(val id: Int = 0) {
OfferResult.Accept(offer, this)
}

def getReservation(offer: Offer): Broker.Reservation = {
def getReservation(offer: Offer, now: Date = new Date()): Broker.Reservation = {
var sharedCpus: Double = 0
var roleCpus: Double = 0
var reservedSharedCpus: Double = 0
Expand Down Expand Up @@ -169,20 +169,32 @@ class Broker(val id: Int = 0) {
)
}

private[kafka] def getSuitablePort(ports: util.List[Range]): Int = {
if (ports.isEmpty) return -1

val ports_ = ports.sortBy(r => r.start)
if (port == null)
return ports_.get(0).start

for (range <- ports_) {
val overlap = range.overlap(port)
if (overlap != null)
return overlap.start
private[kafka] def getSuitablePort(availablePorts: util.List[Range]): Int = {
// no available ports to choose from
if (availablePorts.isEmpty) return -1

// compute allowed usable ports based on broker config and offer
val usablePorts: List[Range] =
if (port == null) availablePorts.toList.sortBy(_.start)
else availablePorts.toList.flatMap { range =>
if (range.overlap(port) == null) None else Some(range.overlap(port))
}.sortBy(_.start)

// no port usable
if (usablePorts.isEmpty) return -1

// try to stick to the previous port if possible
if (stickiness.port != null) {
val preferedPort = new Range(stickiness.port)
for (range <- usablePorts) {
val found = range.overlap(preferedPort)
if (found != null)
return found.start
}
}

-1
// else return first usable port
return usablePorts.get(0).start
}

/*
Expand All @@ -198,8 +210,8 @@ class Broker(val id: Int = 0) {

def shouldStop: Boolean = !active && task != null && !task.stopping

def registerStart(hostname: String): Unit = {
stickiness.registerStart(hostname)
def registerStart(hostname: String, port: Integer): Unit = {
stickiness.registerStart(hostname, port)
failover.resetFailures()
}

Expand Down Expand Up @@ -289,12 +301,14 @@ object Broker {
class Stickiness(_period: Period = new Period("10m")) {
var period: Period = _period
@volatile var hostname: String = null
@volatile var port: Integer = null
@volatile var stopTime: Date = null

def expires: Date = if (stopTime != null) new Date(stopTime.getTime + period.ms) else null

def registerStart(hostname: String): Unit = {
def registerStart(hostname: String, port: Integer): Unit = {
this.hostname = hostname
this.port = port
stopTime = null
}

Expand Down
3 changes: 1 addition & 2 deletions src/scala/main/ly/stealth/mesos/kafka/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import ly.stealth.mesos.kafka.Util.BindAddress
import net.elodina.mesos.util.{Period, Version}

object SchedulerVersion {
val value = "0.10.1.0-SNAPSHOT"
val value = "0.9.5.6"
}

object Config {
Expand Down Expand Up @@ -87,7 +87,6 @@ object Config {

if (props.containsKey("reconciliation-timeout")) reconciliationTimeout = new Period(props.getProperty("reconciliation-timeout"))
if (props.containsKey("reconciliation-attempts")) reconciliationAttempts = Integer.valueOf("reconciliation-attempts")
if (props.containsKey("reconciliation-interval")) reconciliationInterval = new Period(props.getProperty("reconciliation-interval"))

if (props.containsKey("jre")) jre = new File(props.getProperty("jre"))
if (props.containsKey("log")) log = new File(props.getProperty("log"))
Expand Down
1 change: 1 addition & 0 deletions src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ trait BrokerCli {
var stickiness = "stickiness:"
stickiness += " period:" + broker.stickiness.period
if (broker.stickiness.hostname != null) stickiness += ", hostname:" + broker.stickiness.hostname
if (broker.stickiness.port != null) stickiness += ", port:" + broker.stickiness.port
if (broker.stickiness.stopTime != null) stickiness += ", expires:" + Repr.dateTime(broker.stickiness.expires)
printLine(stickiness, indent)

Expand Down
4 changes: 2 additions & 2 deletions src/scala/main/ly/stealth/mesos/kafka/cli/TopicCli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ trait TopicCli {
if (add) {
parser.accepts("broker", "<broker-expr>. Default - *. See below.").withRequiredArg()
.ofType(classOf[String])
parser.accepts("partitions", "partitions count. Default - 1").withRequiredArg()
.ofType(classOf[Integer])
parser.accepts("replicas", "replicas count. Default - 1").withRequiredArg()
.ofType(classOf[Integer])
parser
Expand All @@ -159,6 +157,8 @@ trait TopicCli {
.accepts("startPartitionId", "partition id to begin assignment at. Default - -1 (random)")
.withRequiredArg().ofType(classOf[Integer])
}
parser.accepts("partitions", "partitions count. Default - 1").withRequiredArg()
.ofType(classOf[Integer])
parser.accepts("options", "topic options. Example: flush.ms=60000,retention.ms=6000000")
.withRequiredArg().ofType(classOf[String])

Expand Down
17 changes: 15 additions & 2 deletions src/scala/main/ly/stealth/mesos/kafka/executor/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import java.net.{URL, URLClassLoader}
import java.util
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import ly.stealth.mesos.kafka.Util.BindAddress

import ly.stealth.mesos.kafka.Broker
import ly.stealth.mesos.kafka.Util.BindAddress
import net.elodina.mesos.util.{IO, Version}
import org.apache.log4j.Logger

import scala.collection.JavaConversions._

case class LaunchConfig(
Expand Down Expand Up @@ -239,6 +241,12 @@ object KafkaServer {
))
}

// overwrite the log4j.properties file via the log4jOptions uri parameter
if (config.log4jOptions.size > 0) {
IO.writeFile(new File(Distro.dir + "/config/log4j.properties"),
config.interpolatedLog4jOptions.map(key => s"${key._1}=${key._2}").mkString("\n"))
}

System.setProperty("kafka.logs.dir", "" + new File(dir, "log"))
val props: Properties = this.props(config.interpolatedLog4jOptions, "log4j.properties")

Expand All @@ -262,7 +270,7 @@ object KafkaServer {

private def init(): (File, Loader) = {
// find kafka dir
new File(".").listFiles().toSeq.find {
val dir = new File(".").listFiles().toSeq.find {
f => f.isDirectory && f.getName.startsWith("kafka")
}.map { d =>
val classPath =
Expand All @@ -271,6 +279,11 @@ object KafkaServer {

(d, new Loader(classPath))
}.getOrElse {throw new IllegalStateException("Kafka distribution dir not found") }

// set the log4j configuration file using the system environment properties
System.setProperty("log4j.configurationFile", new File(dir._1, "config/log4j.properties").getPath)

dir
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/scala/main/ly/stealth/mesos/kafka/json/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ class RangeDeserializer extends StdDeserializer[Range](classOf[Range]) {
}
}

case class StickinessModel(period: Period, stopTime: Date, hostname: String)
case class StickinessModel(period: Period, stopTime: Date, hostname: String, port: Integer)

class StickinessSerializer extends StdSerializer[Stickiness](classOf[Stickiness]) {
override def serialize(s: Stickiness, gen: JsonGenerator, provider: SerializerProvider): Unit = {
provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname), gen)
provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname, s.port), gen)
}
}

Expand All @@ -188,6 +188,7 @@ class StickinessDeserializer extends StdDeserializer[Stickiness](classOf[Stickin
val model = p.readValueAs(classOf[StickinessModel])
val s = new Stickiness()
s.hostname = model.hostname
s.port = model.port
s.stopTime = model.stopTime
s.period = model.period
s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ trait BrokerLifecycleManagerComponentImpl extends BrokerLifecycleManagerComponen
broker.task.state = Broker.State.RUNNING
if (status.hasData && status.getData.size() > 0)
broker.task.endpoint = new Broker.Endpoint(status.getData.toStringUtf8)
broker.registerStart(broker.task.hostname)

var port: Integer = null
if (broker.task.endpoint != null) port = broker.task.endpoint.port
logger.info(s"Registering broker at ${broker.task.hostname}:${port}")
broker.registerStart(broker.task.hostname, port)
}

private[this] def onStopped(broker: Broker, status: TaskStatus, failed: Boolean): Unit = {
Expand Down
11 changes: 8 additions & 3 deletions src/scala/main/ly/stealth/mesos/kafka/scheduler/Topics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,17 @@ class Topics {
getTopic(name)
}

def updateTopic(topic: Topic, options: util.Map[String, String]): Topic = {
def updateTopic(topic: Topic, partitions: Int = 1, options: util.Map[String, String]): Topic = {
val config: Properties = new Properties()
for ((k, v) <- options) config.setProperty(k, v)
if (options != null)
for ((k, v) <- options) config.setProperty(k, v)

if (partitions > topic.partitions.size())
AdminUtilsWrapper().addPartitions(topic.name, partitions)

AdminUtilsWrapper().changeTopicConfig(topic.name, config)
topic

getTopic(topic.name)
}

def validateOptions(options: Map[String, String]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ trait TopicApiComponentImpl extends TopicApiComponent {
options
))
} else {
topics.map(t => cluster.topics.updateTopic(cluster.topics.getTopic(t), options))
topics.map(t => cluster.topics.updateTopic(cluster.topics.getTopic(t), partitions, options))
}
Response.ok(ListTopicsResponse(result)).build()
}
Expand Down
Loading

0 comments on commit 327fef4

Please sign in to comment.