diff --git a/kite-examples-oozie/README.md b/kite-examples-oozie/README.md new file mode 100644 index 0000000..5964ee0 --- /dev/null +++ b/kite-examples-oozie/README.md @@ -0,0 +1,76 @@ +# Kite Oozie Versioned Datasets Example + +This example demonstrates creating Oozie applications using versioned Datasets as described in https://groups.google.com/a/cloudera.org/d/msg/cdk-dev/uUm-wOv1B3o/Sm6cDVBMusoJ. + +This example uses some different Entity models than discussed in the thread. This example uses 3 entity models: + +* Person +* PersonOutcomes +* PersonSummary + +For each entity model there is a corresponding Oozie coordinator that produces "nominal_time" partitions in a Dataset +oriented around that model. + +The data flow is Person -> PersonOutcomes -> PersonSummary. The model is inspired by a processing system in the healthcare +arena where raw person data is processed into a form containing a number of outcomes for each person. Later, each +person's outcomes might be reduced into a record representing a high-level summary of the person. + +The entity models used in the example are bare skeletons and realistic processing logic to transform between the models +is absent. What those might look like in a more realistic system is left to the reader's imagination. + +Person is the first dataset in this example. The input used to produce that Dataset is a simple text file. + +## Prerequisites + +The later steps below are done purely to contend with the limited resources of running Hadoop in a VM. Executing this test on a normally sized cluster would only require the steps through restarting Oozie. + +* Running instance of [CDH5 Quickstart VM](http://www.cloudera.com/content/support/en/downloads/download-components/download-products.html?productID=F6mO278Rvo). The example's POM was setup and tested with a CDH5.2 VM. + +* Copy persons.txt from src/main/resources to /user/cloudera directory in HDFS + +* Copy datasets.xml from src/main/resources to /user/cloudera/apps directory in HDFS + +* Drop the following jars in /var/lib/oozie + * kite-data-core-1.0.1-SNAPSHOT.jar + * kite-data-oozie-1.0.1-SNAPSHOT.jar + * kite-hadoop-compatibility-1.0.1-SNAPSHOT.jar + * kite-data-hive-1.0.1-SNAPSHOT.jar + * commons-jexl-2.1.1.jar + * jackson-core-2.3.1.jar + * jackson-databind-2.3.1.jar + +* Add the following to oozie-site.xml Safety Valve: + + + oozie.service.URIHandlerService.uri.handlers + org.apache.oozie.dependency.FSURIHandler,org.apache.oozie.dependency.HCatURIHandler,org.kitesdk.data.oozie.KiteURIHandler + + +* Restart oozie + +* Tweak YARN config (in service yarn -> Gateway Base Group -> Resource Management) in Cloudera Manager. + * ApplicationMaster Memory: 128 + * ApplicationMaster Java Maximum Heap Size: 100 + * Map Task Memory: 128 + * Reduce Task Memory: 128 + * Map Task Max Heap: 100 + * Reduce Task Max Heap: 100 + +* Deploy YARN client configuration via Service -> yarn -> Actions -> Deploy Client Configuration + +* Tweak YARN service configurations: + * Service -> yarn -> Configuration -> ResourceManager Base Group -> Resource Management -> Container Memory Increment: 256 + * Service -> yarn -> Configuration -> NodeManager Base Group -> Resource Management -> Container Virtual CPU Cores: 16 + +* Restart yarn + +* Consider stopping Cloudera Management Services in Cloudera Manager to free more resources on the VM + +## Running +* Run "mvn clean package -Pdeploy-example" + * this will create the Person, PersonOutcomes, and PersonSummary base Datasets and deploy and start the associated + coordinators that populate "nominalTime" partitions of the datasets. Look at the coordinators and workflows in Hue to + see them progress. + +## Re-deploying +After making any code/configuration changes, kill the running coordinators from Hue and re-run "mvn clean package -Pdeploy-example" diff --git a/kite-examples-oozie/pom.xml b/kite-examples-oozie/pom.xml new file mode 100644 index 0000000..c929473 --- /dev/null +++ b/kite-examples-oozie/pom.xml @@ -0,0 +1,350 @@ + + + + 4.0.0 + + org.kitesdk.examples + kite-examples-oozie + 1.0.1-SNAPSHOT + jar + + Kite Oozie Examples + + + + 1.1.0 + UTF-8 + + + + + cdh.repo + https://repository.cloudera.com/artifactory/cloudera-repos + Cloudera Repositories + + false + + + + + + cdh.repo + https://repository.cloudera.com/artifactory/cloudera-repos + Cloudera Repositories + + false + + + + + + + org.kitesdk + kite-data-core + ${kite-version} + + + org.kitesdk + kite-data-crunch + ${kite-version} + + + + org.kitesdk + kite-data-hive + ${kite-version} + + + + org.apache.hive + hive-exec + 0.12.0-cdh5.0.0 + + + org.apache.crunch + crunch-core + 0.9.0-hadoop2 + + + org.apache.avro + avro-mapred + + + + + + org.apache.avro + avro-mapred + hadoop2 + 1.7.5 + + + + org.apache.avro + avro-ipc + + + + + + + + org.apache.hadoop + hadoop-client + 2.3.0-cdh5.0.0 + provided + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.5.1 + + 1.6 + 1.6 + true + true + + + + org.apache.avro + avro-maven-plugin + 1.7.5 + + String + false + private + + + + generate-sources + + idl-protocol + + + + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.9 + + true + + + + + + + + + deploy-example + + + + org.kitesdk + kite-maven-plugin + ${kite-version} + + + org.kitesdk + kite-hadoop-dependencies + pom + ${kite-version} + + + org.apache.hive + hive-exec + 0.12.0-cdh5.0.0 + + + org.apache.hive + hive-serde + 0.12.0-cdh5.0.0 + + + org.kitesdk + kite-data-hive + ${kite-version} + + + org.apache.oozie + oozie-client + 4.0.0-cdh5.0.0 + + + + + create-person-dataset + package + + delete-dataset + create-dataset + + false + + repo:hive://quickstart.cloudera:9083 + Person + default + org.kitesdk.examples.oozie.Person + provided("nominal_time", "string") + + + + person-workflow + package + + package-app + deploy-app + run-app + + false + + kite-examples-oozie-person-app + coordinator + Person-wf + hdfs://quickstart.cloudera + org.kitesdk.examples.oozie.PersonTool + true + + $${inputPathUri} + $${outputDatasetUri} + + ${basedir}/src/main/resources/person-coordinator.xml + + hdfs://quickstart.cloudera + quickstart.cloudera:8032 + + + /user/${user.name}/apps + view:hive://quickstart.cloudera:9083 + /user/${user.name}/apps/datasets.xml + /user/cloudera/persons.txt + + http://quickstart.cloudera:11000/oozie + + + + create-personoutcomes-dataset + package + + delete-dataset + create-dataset + + false + + repo:hive://quickstart.cloudera:9083 + PersonOutcomes + default + org.kitesdk.examples.oozie.PersonOutcomes + provided("nominal_time", "string") + + + + personoutcomes-workflow + package + + package-app + deploy-app + run-app + + false + + kite-examples-oozie-personoutcomes-app + coordinator + PersonOutcomes-wf + hdfs://quickstart.cloudera + org.kitesdk.examples.oozie.PersonOutcomesTool + true + + $${inputDatasetUri} + $${outputDatasetUri} + + ${basedir}/src/main/resources/personoutcomes-coordinator.xml + + hdfs://quickstart.cloudera + quickstart.cloudera:8032 + + + /user/${user.name}/apps + view:hive://quickstart.cloudera:9083 + /user/${user.name}/apps/datasets.xml + + http://quickstart.cloudera:11000/oozie + + + + create-personsummary-dataset + package + + delete-dataset + create-dataset + + false + + repo:hive://quickstart.cloudera:9083 + PersonSummary + default + org.kitesdk.examples.oozie.PersonSummary + provided("nominal_time", "string") + + + + personsummary-workflow + package + + package-app + deploy-app + run-app + + false + + kite-examples-oozie-personsummary-app + coordinator + PersonSummary-wf + hdfs://quickstart.cloudera + org.kitesdk.examples.oozie.PersonSummaryTool + true + + $${inputDatasetUri} + $${outputDatasetUri} + + ${basedir}/src/main/resources/personsummary-coordinator.xml + + hdfs://quickstart.cloudera + quickstart.cloudera:8032 + + + /user/${user.name}/apps + view:hive://quickstart.cloudera:9083 + /user/${user.name}/apps/datasets.xml + + http://quickstart.cloudera:11000/oozie + + + + + + + + + diff --git a/kite-examples-oozie/src/main/avro/person.avdl b/kite-examples-oozie/src/main/avro/person.avdl new file mode 100644 index 0000000..351fe4c --- /dev/null +++ b/kite-examples-oozie/src/main/avro/person.avdl @@ -0,0 +1,9 @@ +@namespace("org.kitesdk.examples.oozie") +protocol PersonProtocol { + record Person { + string id; + string first_name; + string last_name; + int age; + } +} \ No newline at end of file diff --git a/kite-examples-oozie/src/main/avro/personoutcomes.avdl b/kite-examples-oozie/src/main/avro/personoutcomes.avdl new file mode 100644 index 0000000..b31ba23 --- /dev/null +++ b/kite-examples-oozie/src/main/avro/personoutcomes.avdl @@ -0,0 +1,8 @@ +@namespace("org.kitesdk.examples.oozie") +protocol PersonOutcomesProtocol { + record PersonOutcomes { + string person_id; + + // a real model would have some more interesting attributes + } +} diff --git a/kite-examples-oozie/src/main/avro/personsummary.avdl b/kite-examples-oozie/src/main/avro/personsummary.avdl new file mode 100644 index 0000000..48e34f6 --- /dev/null +++ b/kite-examples-oozie/src/main/avro/personsummary.avdl @@ -0,0 +1,8 @@ +@namespace("org.kitesdk.examples.oozie") +protocol PersonSummaryProtocol { + record PersonSummary { + string person_id; + + // a real model would have some more interesting attributes + } +} diff --git a/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonOutcomesTool.java b/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonOutcomesTool.java new file mode 100644 index 0000000..b8d6e9e --- /dev/null +++ b/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonOutcomesTool.java @@ -0,0 +1,54 @@ +package org.kitesdk.examples.oozie; + +import java.net.URI; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.util.CrunchTool; +import org.apache.hadoop.util.ToolRunner; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.View; +import org.kitesdk.data.crunch.CrunchDatasets; + +public class PersonOutcomesTool extends CrunchTool { + + @Override + public int run(String[] args) throws Exception { + final URI inputDatasetUri = new URI(args[0]); + final URI outputDatasetUri = new URI(args[1]); + + View inputDataset = Datasets.load(inputDatasetUri, Person.class); + + final PCollection persons = read(CrunchDatasets.asSource(inputDataset)); + + final PCollection personOutcmes = doSomeProcessing(persons); + + write(personOutcmes, CrunchDatasets.asTarget(outputDatasetUri)); + + final PipelineResult result = run(); + if(result == null) { + throw new RuntimeException("Result of the run was null!"); + } else if (!result.succeeded()) { + throw new RuntimeException("Pipeline run failed!"); + } + return 0; + } + + private PCollection doSomeProcessing(final PCollection persons) { + // a real example would have some more interesting logic here + return persons.parallelDo(new MapFn() { + + @Override + public PersonOutcomes map(final Person input) { + return PersonOutcomes.newBuilder().setPersonId(input.getId()).build(); + } + }, Avros.records(PersonOutcomes.class)); + } + + + public static void main(final String[] args) throws Exception { + ToolRunner.run(new PersonOutcomesTool(), args); + } + +} diff --git a/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonSummaryTool.java b/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonSummaryTool.java new file mode 100644 index 0000000..0aa1e2c --- /dev/null +++ b/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonSummaryTool.java @@ -0,0 +1,56 @@ +package org.kitesdk.examples.oozie; + +import java.net.URI; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.util.CrunchTool; +import org.apache.hadoop.util.ToolRunner; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.View; +import org.kitesdk.data.crunch.CrunchDatasets; + +public class PersonSummaryTool extends CrunchTool { + + @Override + public int run(String[] args) throws Exception { + final URI inputDatasetUri = new URI(args[0]); + final URI outputDatasetUri = new URI(args[1]); + + View inputDataset = Datasets.load(inputDatasetUri, PersonOutcomes.class); + + final PCollection personOutcomes = read(CrunchDatasets.asSource(inputDataset)); + + final PCollection personSummaries = doSomeProcessing(personOutcomes); + + write(personSummaries, CrunchDatasets.asTarget(outputDatasetUri)); + + final PipelineResult result = run(); + if(result == null) { + throw new RuntimeException("Result of the run was null!"); + } else if (!result.succeeded()) { + throw new RuntimeException("Pipeline run failed!"); + } + return 0; + } + + private PCollection doSomeProcessing(final PCollection personOutcomes) { + // a real example would have some more interesting logic here + return personOutcomes.parallelDo( + new MapFn() { + + @Override + public PersonSummary map(final PersonOutcomes input) { + return PersonSummary.newBuilder().setPersonId(input.getPersonId()) + .build(); + } + }, Avros.records(PersonSummary.class)); + } + + + public static void main(final String[] args) throws Exception { + ToolRunner.run(new PersonSummaryTool(), args); + } + +} diff --git a/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonTool.java b/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonTool.java new file mode 100644 index 0000000..7abf6be --- /dev/null +++ b/kite-examples-oozie/src/main/java/org/kitesdk/examples/oozie/PersonTool.java @@ -0,0 +1,62 @@ +package org.kitesdk.examples.oozie; + +import java.net.URI; +import java.util.StringTokenizer; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.io.From; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.util.CrunchTool; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.kitesdk.data.crunch.CrunchDatasets; + +public class PersonTool extends CrunchTool { + + @Override + public int run(String[] args) throws Exception { + final Path inputPath = new Path(args[0]); + final URI outputDatasetUri = new URI(args[1]); + + final PCollection rawPersons = read(From.textFile(inputPath)); + + final PCollection processedPersons = doSomeProcessing(rawPersons); + + write(processedPersons, CrunchDatasets.asTarget(outputDatasetUri)); + + final PipelineResult result = run(); + if(result == null) { + throw new RuntimeException("Result of the run was null!"); + } else if (!result.succeeded()) { + throw new RuntimeException("Pipeline run failed!"); + } + return 0; + } + + private static class PersonProcessing extends MapFn { + private static final long serialVersionUID = -4763189327890300396L; + + @Override + public Person map(final String input) { + final StringTokenizer tokenizer = new StringTokenizer(input, ","); + final String id = tokenizer.nextToken(); + final String firstName = tokenizer.nextToken(); + final String lastName = tokenizer.nextToken(); + final int age = Integer.parseInt(tokenizer.nextToken()); + Person person = Person.newBuilder().setId(id).setFirstName(firstName) + .setLastName(lastName).setAge(age).build(); + return person; + } + } + + private PCollection doSomeProcessing(final PCollection rawPersons) { + PersonProcessing mapFn = new PersonProcessing(); + return rawPersons.parallelDo(mapFn, Avros.records(Person.class)); + } + + public static void main(final String[] args) throws Exception { + ToolRunner.run(new PersonTool(), args); + } + +} diff --git a/kite-examples-oozie/src/main/resources/datasets.xml b/kite-examples-oozie/src/main/resources/datasets.xml new file mode 100644 index 0000000..1dadc5d --- /dev/null +++ b/kite-examples-oozie/src/main/resources/datasets.xml @@ -0,0 +1,14 @@ + + + ${datasetRepositoryUri}/default/Person?nominal_time=${YEAR}${MONTH}${DAY}${HOUR}${MINUTE} + + + + ${datasetRepositoryUri}/default/PersonOutcomes?nominal_time=${YEAR}${MONTH}${DAY}${HOUR}${MINUTE} + + + + ${datasetRepositoryUri}/default/PersonSummary?nominal_time=${YEAR}${MONTH}${DAY}${HOUR}${MINUTE} + + + \ No newline at end of file diff --git a/kite-examples-oozie/src/main/resources/person-coordinator.xml b/kite-examples-oozie/src/main/resources/person-coordinator.xml new file mode 100644 index 0000000..7b47cc2 --- /dev/null +++ b/kite-examples-oozie/src/main/resources/person-coordinator.xml @@ -0,0 +1,26 @@ + + + ${datasetDefinitionsPath} + + + + ${coord:current(0)} + + + + + ${workflowsPath}/kite-examples-oozie-person-app + + + inputPathUri + ${inputPathUri} + + + outputDatasetUri + ${coord:dataOut('output')} + + + + + \ No newline at end of file diff --git a/kite-examples-oozie/src/main/resources/personoutcomes-coordinator.xml b/kite-examples-oozie/src/main/resources/personoutcomes-coordinator.xml new file mode 100644 index 0000000..cf2635d --- /dev/null +++ b/kite-examples-oozie/src/main/resources/personoutcomes-coordinator.xml @@ -0,0 +1,31 @@ + + + ${datasetDefinitionsPath} + + + + ${coord:current(0)} + + + + + ${coord:current(0)} + + + + + ${workflowsPath}/kite-examples-oozie-personoutcomes-app + + + inputDatasetUri + ${coord:dataIn('input')} + + + outputDatasetUri + ${coord:dataOut('output')} + + + + + \ No newline at end of file diff --git a/kite-examples-oozie/src/main/resources/persons.txt b/kite-examples-oozie/src/main/resources/persons.txt new file mode 100644 index 0000000..436eae2 --- /dev/null +++ b/kite-examples-oozie/src/main/resources/persons.txt @@ -0,0 +1,2 @@ +1,Johh,Doe,55 +2,Jim,Johnson,12 \ No newline at end of file diff --git a/kite-examples-oozie/src/main/resources/personsummary-coordinator.xml b/kite-examples-oozie/src/main/resources/personsummary-coordinator.xml new file mode 100644 index 0000000..45fe56b --- /dev/null +++ b/kite-examples-oozie/src/main/resources/personsummary-coordinator.xml @@ -0,0 +1,31 @@ + + + ${datasetDefinitionsPath} + + + + ${coord:current(0)} + + + + + ${coord:current(0)} + + + + + ${workflowsPath}/kite-examples-oozie-personsummary-app + + + inputDatasetUri + ${coord:dataIn('input')} + + + outputDatasetUri + ${coord:dataOut('output')} + + + + + \ No newline at end of file