-
Notifications
You must be signed in to change notification settings - Fork 262
CDK-476 oozie UriHandler for Kite URIs #364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,194 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <!-- | ||
| ~ Copyright 2015 Cloudera Inc. | ||
| ~ | ||
| ~ Licensed under the Apache License, Version 2.0 (the "License"); | ||
| ~ you may not use this file except in compliance with the License. | ||
| ~ You may obtain a copy of the License at | ||
| ~ | ||
| ~ http://www.apache.org/licenses/LICENSE-2.0 | ||
| ~ | ||
| ~ Unless required by applicable law or agreed to in writing, software | ||
| ~ distributed under the License is distributed on an "AS IS" BASIS, | ||
| ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| ~ See the License for the specific language governing permissions and | ||
| ~ limitations under the License. | ||
| --> | ||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
|
|
||
| <modelVersion>4.0.0</modelVersion> | ||
| <artifactId>kite-data-oozie</artifactId> | ||
|
|
||
| <parent> | ||
| <groupId>org.kitesdk</groupId> | ||
| <artifactId>kite-data</artifactId> | ||
| <version>1.0.1-SNAPSHOT</version> | ||
| </parent> | ||
|
|
||
| <name>Kite Data Oozie Module</name> | ||
| <description> | ||
| The Kite Data Oozie module provides Oozie support for working with Kite datasets. | ||
| </description> | ||
|
|
||
| <properties> | ||
| <vers.oozie4>4.1.0</vers.oozie4> | ||
| </properties> | ||
|
|
||
| <build> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-compiler-plugin</artifactId> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-source-plugin</artifactId> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-javadoc-plugin</artifactId> | ||
| <configuration> | ||
| <excludePackageNames>org.kitesdk.data.oozie</excludePackageNames> | ||
| </configuration> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-surefire-plugin</artifactId> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.apache.rat</groupId> | ||
| <artifactId>apache-rat-plugin</artifactId> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.codehaus.mojo</groupId> | ||
| <artifactId>findbugs-maven-plugin</artifactId> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-antrun-plugin</artifactId> | ||
| <executions> | ||
| <execution> | ||
| <phase>compile</phase> | ||
| <goals> | ||
| <goal>run</goal> | ||
| </goals> | ||
| <configuration> | ||
| <tasks> | ||
| <echo message="Create empty javadoc JAR to satisfy Maven central" /> | ||
| <mkdir dir="target/apidocs" /> | ||
| </tasks> | ||
| </configuration> | ||
| </execution> | ||
| </executions> | ||
| </plugin> | ||
| </plugins> | ||
| </build> | ||
|
|
||
| <reporting> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-project-info-reports-plugin</artifactId> | ||
| <reportSets> | ||
| <reportSet> | ||
| <inherited>false</inherited> | ||
| <reports> | ||
| <report>index</report> | ||
| <report>summary</report> | ||
| <report>dependency-info</report> | ||
| <report>dependencies</report> | ||
| </reports> | ||
| </reportSet> | ||
| </reportSets> | ||
| </plugin> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-javadoc-plugin</artifactId> | ||
| </plugin> | ||
| </plugins> | ||
| </reporting> | ||
|
|
||
| <dependencies> | ||
| <!-- Kite --> | ||
| <dependency> | ||
| <groupId>org.kitesdk</groupId> | ||
| <artifactId>kite-data-core</artifactId> | ||
| <version>${project.parent.version}</version> | ||
| </dependency> | ||
|
|
||
| <!-- Hadoop --> | ||
| <dependency> | ||
| <groupId>org.kitesdk</groupId> | ||
| <artifactId>${artifact.hadoop-deps}</artifactId> | ||
| <type>pom</type> | ||
| <scope>provided</scope> | ||
| <exclusions> | ||
| <exclusion> | ||
| <!-- conflicts with jetty included with HBase --> | ||
| <groupId>tomcat</groupId> | ||
| <artifactId>jasper-runtime</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.kitesdk</groupId> | ||
| <artifactId>kite-hadoop-compatibility</artifactId> | ||
| </dependency> | ||
|
|
||
| <!-- custom URI handlers require Oozie 4.x --> | ||
| <dependency> | ||
| <groupId>org.apache.oozie</groupId> | ||
| <artifactId>oozie-core</artifactId> | ||
| <version>${vers.oozie4}</version> | ||
| </dependency> | ||
|
|
||
| <!-- Misc --> | ||
| <dependency> | ||
| <groupId>com.google.code.findbugs</groupId> | ||
| <artifactId>jsr305</artifactId> | ||
| <scope>provided</scope> | ||
| <optional>true</optional> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>com.google.code.findbugs</groupId> | ||
| <artifactId>annotations</artifactId> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
|
|
||
| <!-- Test Dependencies --> | ||
| <dependency> | ||
| <groupId>junit</groupId> | ||
| <artifactId>junit</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.kitesdk</groupId> | ||
| <artifactId>kite-data-core</artifactId> | ||
| <version>${project.parent.version}</version> | ||
| <type>test-jar</type> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.kitesdk</groupId> | ||
| <artifactId>${artifact.hadoop-test-deps}</artifactId> | ||
| <type>pom</type> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| </dependencies> | ||
|
|
||
| <profiles> | ||
| <profile> | ||
| <id>cdh5</id> | ||
| <activation> | ||
| <property> | ||
| <name>hadoop.profile</name> | ||
| <value>cdh5</value> | ||
| </property> | ||
| </activation> | ||
| <properties> | ||
| <vers.oozie4>${vers.oozie}</vers.oozie4> | ||
| </properties> | ||
| </profile> | ||
| </profiles> | ||
|
|
||
| </project> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /** | ||
| * Copyright 2015 Cloudera Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.kitesdk.data.oozie; | ||
|
|
||
| import java.net.URI; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.oozie.action.hadoop.LauncherException; | ||
| import org.apache.oozie.action.hadoop.LauncherURIHandler; | ||
|
|
||
| public class KiteLauncherURIHandler implements LauncherURIHandler { | ||
|
|
||
| @Override | ||
| public boolean create(final URI uri, final Configuration conf) | ||
| throws LauncherException { | ||
| throw new UnsupportedOperationException( | ||
| "Creation of resources is not supported for Kite URIs."); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean delete(final URI uri, final Configuration conf) | ||
| throws LauncherException { | ||
| throw new UnsupportedOperationException( | ||
| "Deletion of resources is not supported for Kite URIs."); | ||
| } | ||
|
|
||
| @Override | ||
| public List<Class<?>> getClassesForLauncher() { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| /** | ||
| * Copyright 2015 Cloudera Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.kitesdk.data.oozie; | ||
|
|
||
| import java.net.URI; | ||
| import java.util.Arrays; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import org.apache.avro.generic.GenericRecord; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.oozie.ErrorCode; | ||
| import org.apache.oozie.XException; | ||
| import org.apache.oozie.action.hadoop.LauncherURIHandler; | ||
| import org.apache.oozie.dependency.URIHandler; | ||
| import org.apache.oozie.dependency.URIHandlerException; | ||
| import org.apache.oozie.service.HadoopAccessorException; | ||
| import org.apache.oozie.service.URIHandlerService; | ||
| import org.apache.oozie.util.XLog; | ||
| import org.kitesdk.data.DatasetException; | ||
| import org.kitesdk.data.DatasetNotFoundException; | ||
| import org.kitesdk.data.Datasets; | ||
| import org.kitesdk.data.Signalable; | ||
| import org.kitesdk.data.URIBuilder; | ||
| import org.kitesdk.data.View; | ||
|
|
||
| /** | ||
| * A Kite URI handler that works with {@link Signalable} views. | ||
| * | ||
| * To be considered as {@link #exists(URI, Configuration, String) existing} the | ||
| * view must have been signaled as ready. | ||
| */ | ||
| public class KiteURIHandler implements URIHandler { | ||
|
|
||
| private static final XLog LOG = XLog.getLog(KiteURIHandler.class); | ||
|
|
||
| private Set<String> supportedSchemes; | ||
| private List<Class<?>> classesToShip; | ||
|
|
||
| @Override | ||
| public void init(final Configuration conf) { | ||
| supportedSchemes = new HashSet<String>(); | ||
| final String[] schemes = conf.getStrings( | ||
| URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX | ||
| + this.getClass().getSimpleName() | ||
| + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "view", "dataset"); | ||
| supportedSchemes.addAll(Arrays.asList(schemes)); | ||
| classesToShip = new KiteLauncherURIHandler().getClassesForLauncher(); | ||
| } | ||
|
|
||
| @Override | ||
| public Set<String> getSupportedSchemes() { | ||
| return supportedSchemes; | ||
| } | ||
|
|
||
| @Override | ||
| public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { | ||
| return KiteLauncherURIHandler.class; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Class<?>> getClassesForLauncher() { | ||
| return classesToShip; | ||
| } | ||
|
|
||
| @Override | ||
| public DependencyType getDependencyType(final URI uri) | ||
| throws URIHandlerException { | ||
| return DependencyType.PULL; | ||
| } | ||
|
|
||
| @Override | ||
| public void registerForNotification(final URI uri, final Configuration conf, | ||
| final String user, final String actionID) throws URIHandlerException { | ||
| throw new UnsupportedOperationException( | ||
| "Notifications are not supported for " + uri); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean unregisterFromNotification(final URI uri, final String actionID) { | ||
| throw new UnsupportedOperationException( | ||
| "Notifications are not supported for " + uri); | ||
| } | ||
|
|
||
| @Override | ||
| public Context getContext(final URI uri, final Configuration conf, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should be implementing this method. The method is a mechanism to optimize a series of calls to exist(). Its usage can be seen in CoordELFucntions.coord_futureRange_sync and coord_latestRange_sync. The coord_latestRange_sync method is used to look for the latest complete instance (or instances) of a dataset when coord:future() is used in the coordinator definition. Oozie will loop backwards from the latest nominal time all the way back to the initial instance of the dataset looking for an instance that "exists" (is complete). In the worst case you have a dataset that has been around for along time and the latest instance is very old relative to the frequency of the dataset. For example, if the dataset is declared with a frequency of once per day with initial instance declared as 1 year ago and the only instance that is complete is actually one year ago then latest(0) will have to loop through 365 URIs checking exists() before returning that instance. The coord_futureRange_sync method supports coord:future() and loops forward looking for the desired number of compete instances. FSURIHandler uses getContext() to cache the Filesystem instance. HCatURIHandler caches the HCatClient. I think we could cache the View instance so we don't have as heavy of a hit on each exists() check to load up all the Dataset metadata. Separately from talking about this specific method, I had talked to @bbrownz about the idea that perhaps we might want to follow up later with a JIRA to introduce some other caching for optimization. In an environment with lots of Oozie coordinators using Kite datasets and this URI handler, the Oozie server could be making very frequent calls to exists(). Having each call hit HDFS and/or the Hive metastore to load dataset metadata (on Datasets.load()) might take a toll that could be decreased with some more caching of the Dataset instances. There is also the possibility that we could follow up later with an attempt to support a push model (DependencyType.PUSH) instead of a polling model for Hive backed datasets. In any case, I think what is here works and is probably worth merging in as-is and following up with the optimizations in separate later JIRAs. Perhaps we implement the getContext() piece now though as it is the simplest although it only comes into play in certain specific use cases (using coord:latest() or coord:future() - neither of which I expect my organization will actually use anytime soon as our use cases prefer coord:current()).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bbrownz or @ben-roling, could you file the follow-up JIRA issues before we merge this one? That way we keep track. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I'll log the JIRA and post a link here in a minute. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| final String user) throws URIHandlerException { | ||
| return null; | ||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| @Override | ||
| public boolean exists(final URI uri, final Context context) throws URIHandlerException { | ||
| try { | ||
| View<GenericRecord> view = Datasets.load(uri); | ||
| if(view instanceof Signalable) { | ||
| return ((Signalable)view).isReady(); | ||
| } | ||
| } catch (IllegalArgumentException ex) { | ||
| LOG.error("the URI " + uri + " was not a view or dataset"); | ||
| } catch (DatasetNotFoundException ex) { | ||
| LOG.error("the dataset for the URI "+ uri +" did not actually exist"); | ||
| } catch (DatasetException e) { | ||
| throw new HadoopAccessorException(ErrorCode.E0902, e); | ||
| } | ||
| // didn't meet all the requirements for a URI/view that we want to use with oozie | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean exists(final URI uri, final Configuration conf, final String user) | ||
| throws URIHandlerException { | ||
| // currently does not handle access limitations between the oozie user and | ||
| // a potential dataset owner | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this should attempt to read the signal files and catch any IOExceptions that are thrown. Then this would have at least basic support.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Were you thinking along the route of reading the signal files directly (so we'd be able to flex the configuration and user used for the FileSystem access)? That seems like it would bind us very directly to the current implementation, although it would work.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I meant try to do the operation like you do here, but if it fails with a permissions-related IOException, return false instead of propagating the exception.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I follow, thats an interesting trade off. I think returning false would make the most sense if we were approaching it from the perspective that the (oozie process) user without permissions shouldn't know that the dataset exists or not. Propagating the exception through helps to reveal the issue that the permissions are blocking the operation they were hoping would work. I think propagating the exception makes the most sense in that alert a consumer that their permission setup isn't going to work with the URI handler as is. If we were to create documentation around the usage we would definitely want to note that case (outside of the example project I didn't create any, and perhaps should have).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds fine to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is precedent for catching IOException and rethrowing as HadoopAccessorException in FSURIHandler. It seems reasonable to me to be consistent with that.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a catch in the exists method for a DatasetException to re-throw that exception as a HadoopAccessorException. I'd considered the more specific DatasetIOException that IOExceptions are wrapped in but this seemed like it would cover our bases a little better. It also occurred to me that HadoopAccessorException might be less appropriate in the future (for example if we added a signal implementation that was HBase specific) but that felt like I had started to nitpick myself. I'm open to further opinions though 😄 |
||
| return exists(uri, null); | ||
| } | ||
|
|
||
| @Override | ||
| public String getURIWithDoneFlag(final String uri, final String doneFlag) | ||
| throws URIHandlerException { | ||
| return uri; | ||
| } | ||
|
|
||
| @Override | ||
| public void validate(final String uri) throws URIHandlerException { | ||
| try { | ||
| URI uriParsed = URI.create(uri); | ||
| String scheme = uriParsed.getScheme(); | ||
| if(!(URIBuilder.VIEW_SCHEME.equals(scheme) || URIBuilder.DATASET_SCHEME.equals(scheme))) { | ||
| LOG.error("Unexpected scheme: view, uri was "+ uri); | ||
| XException xException = new XException(ErrorCode.E0904, scheme, uri); | ||
| throw new URIHandlerException(xException); | ||
| } | ||
| } catch (IllegalArgumentException iae) { | ||
| XException xException = new XException(ErrorCode.E0906, iae); | ||
| throw new URIHandlerException(xException); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void destroy() { | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit confusing to me. Why is this ant task necessary? I presume this is copy/paste from one of the other sub-projects. It looks like it is in kite-data-hbase, kite-data-hive, kite-data-s3, and kite-hadoop-compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because the module does not have any public classes so there's no javadoc for it. This task generates an empty javadoc JAR, which is needed for Maven central.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that explanation makes sense in the context of the other projects but this project does have public classes and with this ant task removed it does still generate a javadoc JAR.
I think maybe this task just needs to be removed from this POM. Alternatively we can update the package excludes filter for the javadoc plugin (defined in the parent POM) to exclude org.kitesdk.data.oozie directly indicating we choose not to publish javadoc. I don't expect anyone is really going to be reading the javadoc as it's not really a Kite consumer oriented project.
@tomwhite or @rdblue - do you have an opinion about whether or not we should publish the javadoc for these classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should publish javadoc for this module as it doesn't contain classes that users will program to in their programs. So we should update the package excludes filters and leave the ant task as it is in this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Tom. Most of the code here is implementing a public API documented in Oozie, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct and I agree it would be fine not to publish javadoc. These classes will not be consumed by "users" - just by Oozie.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the exclusion to the javadoc plugin 32186c3