From c6bfa7a16046d434a3a1e3cc7c823c31cc52de55 Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Thu, 11 Jun 2015 17:45:15 -0400 Subject: [PATCH] CDK-476 oozie UriHandler for Kite URIs --- kite-data/kite-data-oozie/pom.xml | 194 +++++++++ .../data/oozie/KiteLauncherURIHandler.java | 46 ++ .../kitesdk/data/oozie/KiteURIHandler.java | 157 +++++++ .../data/oozie/TestKiteURIHandler.java | 392 ++++++++++++++++++ kite-data/pom.xml | 2 +- 5 files changed, 790 insertions(+), 1 deletion(-) create mode 100644 kite-data/kite-data-oozie/pom.xml create mode 100644 kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteLauncherURIHandler.java create mode 100644 kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteURIHandler.java create mode 100644 kite-data/kite-data-oozie/src/test/java/org/kitesdk/data/oozie/TestKiteURIHandler.java diff --git a/kite-data/kite-data-oozie/pom.xml b/kite-data/kite-data-oozie/pom.xml new file mode 100644 index 0000000000..6691ac2d2e --- /dev/null +++ b/kite-data/kite-data-oozie/pom.xml @@ -0,0 +1,194 @@ + + + + + 4.0.0 + kite-data-oozie + + + org.kitesdk + kite-data + 1.0.1-SNAPSHOT + + + Kite Data Oozie Module + + The Kite Data Oozie module provides Oozie support for working with Kite datasets. + + + + 4.1.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-source-plugin + + + org.apache.maven.plugins + maven-javadoc-plugin + + org.kitesdk.data.oozie + + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.rat + apache-rat-plugin + + + org.codehaus.mojo + findbugs-maven-plugin + + + org.apache.maven.plugins + maven-antrun-plugin + + + compile + + run + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-project-info-reports-plugin + + + false + + index + summary + dependency-info + dependencies + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + + + + + org.kitesdk + kite-data-core + ${project.parent.version} + + + + + org.kitesdk + ${artifact.hadoop-deps} + pom + provided + + + + tomcat + jasper-runtime + + + + + org.kitesdk + kite-hadoop-compatibility + + + + + org.apache.oozie + oozie-core + ${vers.oozie4} + + + + + com.google.code.findbugs + jsr305 + provided + true + + + com.google.code.findbugs + annotations + provided + + + + + junit + junit + test + + + org.kitesdk + kite-data-core + ${project.parent.version} + test-jar + test + + + org.kitesdk + ${artifact.hadoop-test-deps} + pom + test + + + + + + cdh5 + + + hadoop.profile + cdh5 + + + + ${vers.oozie} + + + + + diff --git a/kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteLauncherURIHandler.java b/kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteLauncherURIHandler.java new file mode 100644 index 0000000000..72f4b65dcc --- /dev/null +++ b/kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteLauncherURIHandler.java @@ -0,0 +1,46 @@ +/** + * Copyright 2015 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kitesdk.data.oozie; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.action.hadoop.LauncherException; +import org.apache.oozie.action.hadoop.LauncherURIHandler; + +public class KiteLauncherURIHandler implements LauncherURIHandler { + + @Override + public boolean create(final URI uri, final Configuration conf) + throws LauncherException { + throw new UnsupportedOperationException( + "Creation of resources is not supported for Kite URIs."); + } + + @Override + public boolean delete(final URI uri, final Configuration conf) + throws LauncherException { + throw new UnsupportedOperationException( + "Deletion of resources is not supported for Kite URIs."); + } + + @Override + public List> getClassesForLauncher() { + return Collections.emptyList(); + } + +} diff --git a/kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteURIHandler.java b/kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteURIHandler.java new file mode 100644 index 0000000000..bce6798b23 --- /dev/null +++ b/kite-data/kite-data-oozie/src/main/java/org/kitesdk/data/oozie/KiteURIHandler.java @@ -0,0 +1,157 @@ +/** + * Copyright 2015 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kitesdk.data.oozie; + +import java.net.URI; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.XException; +import org.apache.oozie.action.hadoop.LauncherURIHandler; +import org.apache.oozie.dependency.URIHandler; +import org.apache.oozie.dependency.URIHandlerException; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.URIHandlerService; +import org.apache.oozie.util.XLog; +import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetNotFoundException; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.Signalable; +import org.kitesdk.data.URIBuilder; +import org.kitesdk.data.View; + +/** + * A Kite URI handler that works with {@link Signalable} views. + * + * To be considered as {@link #exists(URI, Configuration, String) existing} the + * view must have been signaled as ready. + */ +public class KiteURIHandler implements URIHandler { + + private static final XLog LOG = XLog.getLog(KiteURIHandler.class); + + private Set supportedSchemes; + private List> classesToShip; + + @Override + public void init(final Configuration conf) { + supportedSchemes = new HashSet(); + final String[] schemes = conf.getStrings( + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX + + this.getClass().getSimpleName() + + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "view", "dataset"); + supportedSchemes.addAll(Arrays.asList(schemes)); + classesToShip = new KiteLauncherURIHandler().getClassesForLauncher(); + } + + @Override + public Set getSupportedSchemes() { + return supportedSchemes; + } + + @Override + public Class getLauncherURIHandlerClass() { + return KiteLauncherURIHandler.class; + } + + @Override + public List> getClassesForLauncher() { + return classesToShip; + } + + @Override + public DependencyType getDependencyType(final URI uri) + throws URIHandlerException { + return DependencyType.PULL; + } + + @Override + public void registerForNotification(final URI uri, final Configuration conf, + final String user, final String actionID) throws URIHandlerException { + throw new UnsupportedOperationException( + "Notifications are not supported for " + uri); + } + + @Override + public boolean unregisterFromNotification(final URI uri, final String actionID) { + throw new UnsupportedOperationException( + "Notifications are not supported for " + uri); + } + + @Override + public Context getContext(final URI uri, final Configuration conf, + final String user) throws URIHandlerException { + return null; + } + + @SuppressWarnings("rawtypes") + @Override + public boolean exists(final URI uri, final Context context) throws URIHandlerException { + try { + View view = Datasets.load(uri); + if(view instanceof Signalable) { + return ((Signalable)view).isReady(); + } + } catch (IllegalArgumentException ex) { + LOG.error("the URI " + uri + " was not a view or dataset"); + } catch (DatasetNotFoundException ex) { + LOG.error("the dataset for the URI "+ uri +" did not actually exist"); + } catch (DatasetException e) { + throw new HadoopAccessorException(ErrorCode.E0902, e); + } + // didn't meet all the requirements for a URI/view that we want to use with oozie + return false; + } + + @Override + public boolean exists(final URI uri, final Configuration conf, final String user) + throws URIHandlerException { + // currently does not handle access limitations between the oozie user and + // a potential dataset owner + return exists(uri, null); + } + + @Override + public String getURIWithDoneFlag(final String uri, final String doneFlag) + throws URIHandlerException { + return uri; + } + + @Override + public void validate(final String uri) throws URIHandlerException { + try { + URI uriParsed = URI.create(uri); + String scheme = uriParsed.getScheme(); + if(!(URIBuilder.VIEW_SCHEME.equals(scheme) || URIBuilder.DATASET_SCHEME.equals(scheme))) { + LOG.error("Unexpected scheme: view, uri was "+ uri); + XException xException = new XException(ErrorCode.E0904, scheme, uri); + throw new URIHandlerException(xException); + } + } catch (IllegalArgumentException iae) { + XException xException = new XException(ErrorCode.E0906, iae); + throw new URIHandlerException(xException); + } + } + + @Override + public void destroy() { + } + +} diff --git a/kite-data/kite-data-oozie/src/test/java/org/kitesdk/data/oozie/TestKiteURIHandler.java b/kite-data/kite-data-oozie/src/test/java/org/kitesdk/data/oozie/TestKiteURIHandler.java new file mode 100644 index 0000000000..af33ea9a70 --- /dev/null +++ b/kite-data/kite-data-oozie/src/test/java/org/kitesdk/data/oozie/TestKiteURIHandler.java @@ -0,0 +1,392 @@ +/** + * Copyright 2015 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kitesdk.data.oozie; + +import org.kitesdk.data.PartitionView; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.XException; +import org.apache.oozie.dependency.URIHandlerException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Formats; +import org.kitesdk.data.MiniDFSTest; +import org.kitesdk.data.PartitionStrategy; +import org.kitesdk.data.Signalable; +import org.kitesdk.data.RefinableView; +import org.kitesdk.data.View; +import org.kitesdk.data.oozie.KiteURIHandler; +import org.kitesdk.data.spi.DatasetRepository; +import org.kitesdk.data.spi.OptionBuilder; +import org.kitesdk.data.spi.Registration; +import org.kitesdk.data.spi.URIPattern; +import org.kitesdk.data.spi.filesystem.FileSystemDatasetRepository; + +@RunWith(Parameterized.class) +public class TestKiteURIHandler extends MiniDFSTest { + + protected static final String NAMESPACE = "ns1"; + protected static final String NAME = "provider_test1"; + + @Parameterized.Parameters + public static Collection data() { + Object[][] data = new Object[][] { + { false }, // default to local FS + { true } }; // default to distributed FS + return Arrays.asList(data); + } + + // whether this should use the DFS provided by MiniDFSTest + protected boolean distributed; + + protected Configuration conf; + protected DatasetDescriptor testDescriptor; + protected FileSystem fs; + + public TestKiteURIHandler(boolean distributed) { + this.distributed = distributed; + } + + public DatasetRepository newRepo() { + return new FileSystemDatasetRepository.Builder() + .configuration(conf) + .rootDirectory(URI.create("target/data")) + .build(); + } + + @After + public void removeDataPath() throws IOException { + fs.delete(new Path("target/data"), true); + } + + @Before + public void setUp() throws IOException, URISyntaxException { + this.conf = (distributed ? + MiniDFSTest.getConfiguration() : + new Configuration()); + + this.fs = FileSystem.get(conf); + + this.testDescriptor = new DatasetDescriptor.Builder() + .format(Formats.AVRO) + .schema(SchemaBuilder.record("Event").fields() + .requiredLong("timestamp") + .requiredString("message") + .endRecord()) + .partitionStrategy(new PartitionStrategy.Builder() + .year("timestamp") + .month("timestamp") + .day("timestamp") + .build()) + .build(); + + uriHandler = new KiteURIHandler(); + + } + + private KiteURIHandler uriHandler; + + @Test + public void uriToNonExistantDatasetsView() throws URIHandlerException, URISyntaxException { + URI uri = new URI("view:file:target/data/data/nomailbox?message=hello"); + Assert.assertFalse("URIs to datasets that don't exist should return false", + uriHandler.exists(uri, null)); + } + + @Test + public void supportedSchemes() { + uriHandler.init(new Configuration()); + Set scheme = new HashSet(); + scheme.add("view"); + scheme.add("dataset"); + Assert.assertEquals(scheme, uriHandler.getSupportedSchemes()); + } + + @Test (expected = UnsupportedOperationException.class) + public void registerNotifications() throws URISyntaxException, URIHandlerException { + URI uri = new URI("view:hdfs://localhost:9083/default/person?version=201404240000"); + + uriHandler.registerForNotification(uri,new Configuration(), "user","234"); + } + + @Test (expected = UnsupportedOperationException.class) + public void unregisterNotifications() throws URISyntaxException { + URI uri = new URI("view:hdfs://localhost:9083/default/person?version=201404240000"); + + uriHandler.unregisterFromNotification(uri, "2324"); + } + + @Test + public void checkURIDoesNotExist() throws URIHandlerException, IOException{ + DatasetRepository repository = newRepo(); + Dataset dataset = repository.create("data","notreadymailbox", testDescriptor); + + RefinableView view = dataset.with("message", "hello"); + + Assert.assertFalse(uriHandler.exists(view.getUri(), null)); + } + + @Test + public void checkURIExistsView() throws URIHandlerException, IOException{ + DatasetRepository repository = newRepo(); + Dataset dataset = repository.create("data","readymailbox", testDescriptor); + + View view = dataset.with("message", "hello"); + ((Signalable)view).signalReady(); + + Assert.assertTrue(uriHandler.exists(view.getUri(), null)); + } + + @Test + public void checkURIExistsDataset() throws URIHandlerException, IOException{ + DatasetRepository repository = newRepo(); + Dataset dataset = repository.create("data","readymailbox", testDescriptor); + + ((Signalable)((View)dataset)).signalReady(); + + Assert.assertTrue(uriHandler.exists(dataset.getUri(), null)); + } + + @Test + public void validateInvalidScheme() throws URIHandlerException { + String uri = "repo:hdfs:/default/cloudera/users?favoriteColor=pink"; + try { + uriHandler.validate(uri); + Assert.fail("Validate with an invalid schema should have thrown an exception"); + } catch (XException ex) { + Assert.assertEquals(ErrorCode.E0904, ex.getErrorCode()); + } + } + + @Test + public void validateNotAURI() throws URIHandlerException { + String uri = "clearly not a uri"; + try { + uriHandler.validate(uri); + Assert.fail("Validate with an invalid URI should have thrown an exception"); + } catch (XException ex) { + Assert.assertEquals(ErrorCode.E0906, ex.getErrorCode()); + } + } + + @Test + public void existsForNonReadiableView() throws URIHandlerException, URISyntaxException { + Registration.register( + new URIPattern("unreadiable?absolute=true"), + new URIPattern("unreadiable::namespace/:dataset?absolute=true"), + new UnreadiableDatasetBuilder()); + + URI uri = new URI("view:unreadiable:default/person?version=201404240000"); + + Assert.assertFalse(uriHandler.exists(uri, null)); + } + + //minimal implementation of the dataset stack to get an non-readiable view to load in the handler + private static final class UnreadiableDatasetRepository implements DatasetRepository { + + @Override + public Dataset load(String namespace, String name) { + return null; + } + + @Override + public Dataset load(String namespace, String name, Class type) { + return new UnreadiableDataset(); + } + + @Override + public Dataset create(String namespace, String name, DatasetDescriptor descriptor) { + return null; + } + + @Override + public Dataset create(String namespace, String name, DatasetDescriptor descriptor, Class type) { + return null; + } + + @Override + public Dataset update(String namespace, String name, DatasetDescriptor descriptor) { + return null; + } + + @Override + public Dataset update(String namespace, String name, DatasetDescriptor descriptor, Class type) { + return null; + } + + @Override + public boolean delete(String namespace, String name) { + return false; + } + + @Override + public boolean exists(String namespace, String name) { + return false; + } + + @Override + public Collection namespaces() { + return null; + } + + @Override + public Collection datasets(String namespace) { + return null; + } + + @Override + public URI getUri() { + return null; + } + + } + + @SuppressWarnings("rawtypes") + private static final class UnreadiableDataset implements Dataset { + + @Override + public RefinableView with(String name, Object... values) { + return null; + } + + @Override + public RefinableView from(String name, Comparable value) { + return null; + } + + @Override + public RefinableView fromAfter(String name, Comparable value) { + return null; + } + + @Override + public RefinableView to(String name, Comparable value) { + return null; + } + + @Override + public RefinableView toBefore(String name, Comparable value) { + return null; + } + + @Override + public Dataset getDataset() { + return null; + } + + @Override + public DatasetReader newReader() { + return null; + } + + @Override + public DatasetWriter newWriter() { + return null; + } + + @Override + public boolean includes(E entity) { + return false; + } + + @Override + public boolean deleteAll() { + return false; + } + + @Override + public Iterable> getCoveringPartitions() { + return null; + } + + @Override + public Class getType() { + return null; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public String getName() { + return null; + } + + @Override + public String getNamespace() { + return null; + } + + @Override + public DatasetDescriptor getDescriptor() { + return null; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public Schema getSchema() { + return null; + } + + @Override + public View asSchema(Schema schema) { + return null; + } + + @Override + public View asType(Class type) { + return null; + } + + } + + private static final class UnreadiableDatasetBuilder implements OptionBuilder { + + @Override + public DatasetRepository getFromOptions(Map options) { + return new UnreadiableDatasetRepository(); + } + + } + +} diff --git a/kite-data/pom.xml b/kite-data/pom.xml index 05f264c2eb..94f5d792ee 100644 --- a/kite-data/pom.xml +++ b/kite-data/pom.xml @@ -24,6 +24,7 @@ kite-data-core + kite-data-oozie kite-data-hive kite-data-s3 kite-data-crunch @@ -119,5 +120,4 @@ -