-
Notifications
You must be signed in to change notification settings - Fork 68
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,64 +4,43 @@ This module provides an example of processing event data using Apache Spark. | |||||||||||||||||||||||
|
||||||||||||||||||||||||
## Getting started | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
This example assumes that you're running a CDH5.1 or later cluster (such as the | ||||||||||||||||||||||||
[Cloudera Quickstart VM][getvm]) that has Spark configured. This example requires | ||||||||||||||||||||||||
the `spark-submit` command to execute the Spark job on the cluster. If you're using | ||||||||||||||||||||||||
the Quickstart VM, be sure to run this example from the VM rather than the host | ||||||||||||||||||||||||
computer. | ||||||||||||||||||||||||
This example assumes that you're running a CDH5.1 or later cluster (such as the [Cloudera Quickstart VM][getvm]) that has Spark configured. This example requires the `spark-submit` command to execute the Spark job on the cluster. If you're using the Quickstart VM, run this example from the VM rather than the host computer. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
[getvm]: http://www.cloudera.com/content/support/en/downloads/quickstart_vms.html | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
On the cluster, check out a copy of the code: | ||||||||||||||||||||||||
On the cluster, check out a copy of the code and navigate to the `/spark` directory using the following commands in a terminal window. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```bash | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
git clone https://github.com/kite-sdk/kite-examples.git | ||||||||||||||||||||||||
cd kite-examples | ||||||||||||||||||||||||
cd spark | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
## Building | ||||||||||||||||||||||||
## Building the Application | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
To build the project, type | ||||||||||||||||||||||||
To build the project, enter the following command in a terminal window. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```bash | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
mvn install | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
## Running | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
### Create and populate the events dataset | ||||||||||||||||||||||||
## Creating and Populating the Events Dataset | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
First we need to create and populate the `events` dataset. | ||||||||||||||||||||||||
In this example, you store raw events in a Hive-backed dataset so that you can process the results using Hive. Use `CreateEvents`, provided with the demo, to both create and populate random event records. Execute the following command from a terminal window in the `kite-examples/spark` directory. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
We store the raw events in a Hive-backed dataset so you can also process the data | ||||||||||||||||||||||||
using Impala or Hive. We'll use a tool provided with the demo to both create and | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||||||||||||||||||||||||
populate the random events: | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```bash | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
mvn exec:java -Dexec.mainClass="org.kitesdk.examples.spark.CreateEvents" | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
You can browse the generated events using [Hue on the QuickstartVM](http://localhost:8888/metastore/table/default/events/read). | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
### Use Spark to correlate events | ||||||||||||||||||||||||
## Using Spark to Correlate Events | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
Now we want to use Spark to correlate events from the same IP address within a | ||||||||||||||||||||||||
five minute window. Before we implement our algorithm, we need to configure Spark. | ||||||||||||||||||||||||
In particular, we need to set up Spark to use the Kryo serialization library and | ||||||||||||||||||||||||
configure Kryo to automatically serialize our Avro objects. | ||||||||||||||||||||||||
In this example, you use Spark to correlate events generated from the same IP address within a five-minute window. Begin by configuring Spark to use the Kryo serialization library. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```java | ||||||||||||||||||||||||
// Create our Spark configuration and get a Java context | ||||||||||||||||||||||||
SparkConf sparkConf = new SparkConf() | ||||||||||||||||||||||||
.setAppName("Correlate Events") | ||||||||||||||||||||||||
// Configure the use of Kryo serialization including our Avro registrator | ||||||||||||||||||||||||
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||||||||||||||||||||||||
.set("spark.kryo.registrator", "org.kitesdk.examples.spark.AvroKyroRegistrator"); | ||||||||||||||||||||||||
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
Register your Avro classes with the following Scala class to use Avro's specific binary serialization for both the `StandardEvent` and `CorrelatedEvents` classes. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
We can register our Avro classes with a small bit of Scala code: | ||||||||||||||||||||||||
### AvroKyroRegistrator.scala | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
```scala | ||||||||||||||||||||||||
class AvroKyroRegistrator extends KryoRegistrator { | ||||||||||||||||||||||||
|
@@ -72,22 +51,34 @@ class AvroKyroRegistrator extends KryoRegistrator { | |||||||||||||||||||||||
} | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
This will register the use of Avro's specific binary serialization for bot the | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong. |
||||||||||||||||||||||||
`StandardEvent` and `CorrelatedEvents` classes. | ||||||||||||||||||||||||
### Highlights from CorrelateEventsTask.class | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
The following snippets show examples of code you use to configure and invoke Spark tasks. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
Configure Kryo to automatically serialize Avro objects. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```java | ||||||||||||||||||||||||
// Create the Spark configuration and get a Java context | ||||||||||||||||||||||||
SparkConf sparkConf = new SparkConf() | ||||||||||||||||||||||||
.setAppName("Correlate Events") | ||||||||||||||||||||||||
// Configure the use of Kryo serialization including the Avro registrator | ||||||||||||||||||||||||
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||||||||||||||||||||||||
.set("spark.kryo.registrator", "org.kitesdk.examples.spark.AvroKyroRegistrator"); | ||||||||||||||||||||||||
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); | ||||||||||||||||||||||||
`` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
In order to access our Hive-backed datasets from remote Spark tasks, we need to | ||||||||||||||||||||||||
register some JARs in Spark's equivalent of the Hadoop DistributedCache: | ||||||||||||||||||||||||
To access Hive-backed datasets from remote Spark tasks, | ||||||||||||||||||||||||
register JARs in the Spark equivalent of the Hadoop DistributedCache: | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```java | ||||||||||||||||||||||||
// Register some classes that will be needed in remote Spark tasks | ||||||||||||||||||||||||
// Register classes needed for remote Spark tasks | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong. |
||||||||||||||||||||||||
addJarFromClass(sparkContext, getClass()); | ||||||||||||||||||||||||
addJars(sparkContext, System.getenv("HIVE_HOME"), "lib"); | ||||||||||||||||||||||||
sparkContext.addFile(System.getenv("HIVE_HOME")+"/conf/hive-site.xml"); | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
Now we're ready to read from the events dataset by configuring the MapReduce | ||||||||||||||||||||||||
`DatasetKeyInputFormat` and then using Spark's built-in support to generate an | ||||||||||||||||||||||||
RDD form an `InputFormat`. | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
|
||||||||||||||||||||||||
Configure the MapReduce `DatasetKeyInputFormat` to enable the application to read from the _events_ dataset. Use Spark built-in support to generate an | ||||||||||||||||||||||||
RDD (Resilient Distributed Dataset) from the input format. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```java | ||||||||||||||||||||||||
Configuration conf = new Configuration(); | ||||||||||||||||||||||||
|
@@ -97,9 +88,7 @@ JavaPairRDD<StandardEvent, Void> events = sparkContext.newAPIHadoopRDD(conf, | |||||||||||||||||||||||
DatasetKeyInputFormat.class, StandardEvent.class, Void.class); | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
We can now process the events as needed. Once we have our finall RDD, we can | ||||||||||||||||||||||||
configure `DatasetKeyOutputFormat` in the same way and use the | ||||||||||||||||||||||||
`saveAsNewAPIHadoopFile` method to persist the data to our output dataset. | ||||||||||||||||||||||||
The application can now process events as needed. Using your RDD, configure `DatasetKeyOutputFormat` the same way and use `saveAsNewAPIHadoopFile` to store data in an output dataset. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```java | ||||||||||||||||||||||||
DatasetKeyOutputFormat.configure(conf).writeTo(correlatedEventsUri).withType(CorrelatedEvents.class); | ||||||||||||||||||||||||
|
@@ -108,21 +97,51 @@ matches.saveAsNewAPIHadoopFile("dummy", CorrelatedEvents.class, Void.class, | |||||||||||||||||||||||
DatasetKeyOutputFormat.class, conf); | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
You can run the example Spark job by executing the following: | ||||||||||||||||||||||||
In a terminal window, run the Spark job using the following command. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
```bash | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
spark-submit --class org.kitesdk.examples.spark.CorrelateEvents --jars $(mvn dependency:build-classpath | grep -v '^\[' | sed -e 's/:/,/g') target/kite-spark-demo-*.jar | ||||||||||||||||||||||||
``` | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
You can browse the correlated events using [Hue on the QuickstartVM](http://localhost:8888/metastore/table/default/correlated_events/read). | ||||||||||||||||||||||||
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
|
ip | numCorrelations |
---|---|
192.168.121.116 | 32 |
192.168.106.157 | 30 |
192.168.64.16 | 28 |
192.168.148.78 | 26 |
192.168.28.19 | 26 |
192.168.89.91 | 24 |
192.168.128.124 | 24 |
192.168.137.101 | 24 |
192.168.137.188 | 24 |
192.168.161.107 | 24 |
This comment has been minimized.
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
Feb 19, 2015
Obviously, it would be better if CorrelatedEventsTask.java compiled and ran the first time.
This comment has been minimized.
This comment has been minimized.
Sorry, something went wrong.
joey
Feb 20, 2015
Member
Ok, I figured out the cause of this problem. Unfortunately, the solution is to make the ugly command line more ugly. If we get to a consensus on how we want to handle that (script versus one-liner) we can get the right solution that doesn't require running it twice.
This comment has been minimized.
This comment has been minimized.
Sorry, something went wrong.
DennisDawson
Feb 19, 2015
The access control issue is something in the VM, so I don't think we can do anything to fix it, unless we ran it as part of the mvn install. I don't think it hurts to run it if it's already configured, but I don't know.
This comment has been minimized.
This comment has been minimized.
Sorry, something went wrong.
joey
Feb 20, 2015
Member
This should be fixed in the 5.3 version of the VM. I think that we only require the 5.2 VM, so we should just add this as a prep-step for this rather than as a troubleshooting step.
I can't view the dataset in Impala: It tells me it doesn't support map types for events, or record types for correlated events. This does mean that I have to view it in Hive, which is slow.