Skip to content

Commit a4ff3a3

Browse files
ebyhrmrendi29
andcommitted
Core: Add support for HashiCorp Vault KMS client
Co-Authored-By: Endi Caushi <42871239+mrendi29@users.noreply.github.com>
1 parent 6ce5026 commit a4ff3a3

10 files changed

Lines changed: 654 additions & 1 deletion

File tree

build.gradle

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,26 @@ project(':iceberg-gcp') {
802802
}
803803
}
804804

805+
project(':iceberg-hashicorp') {
806+
test {
807+
useJUnitPlatform()
808+
}
809+
810+
dependencies {
811+
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
812+
implementation project(':iceberg-core')
813+
implementation libs.httpcomponents.httpclient5
814+
implementation libs.jackson.databind
815+
816+
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
817+
testImplementation libs.esotericsoftware.kryo
818+
testImplementation libs.jackson.databind
819+
testImplementation libs.testcontainers
820+
testImplementation libs.testcontainers.junit.jupiter
821+
testImplementation libs.testcontainers.vault
822+
}
823+
}
824+
805825
project(':iceberg-hive-metastore') {
806826
test {
807827
useJUnitPlatform()

core/src/main/java/org/apache/iceberg/CatalogProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ private CatalogProperties() {}
174174
public static final String ENCRYPTION_KMS_TYPE_AWS = "aws";
175175
public static final String ENCRYPTION_KMS_TYPE_AZURE = "azure";
176176
public static final String ENCRYPTION_KMS_TYPE_GCP = "gcp";
177+
public static final String ENCRYPTION_KMS_TYPE_HASHICORP = "hashicorp";
177178

178179
public static final String ENCRYPTION_KMS_IMPL = "encryption.kms-impl";
179180
public static final String ENCRYPTION_KMS_IMPL_AWS =
@@ -182,4 +183,6 @@ private CatalogProperties() {}
182183
"org.apache.iceberg.azure.keymanagement.AzureKeyManagementClient";
183184
public static final String ENCRYPTION_KMS_IMPL_GCP =
184185
"org.apache.iceberg.gcp.GcpKeyManagementClient";
186+
public static final String ENCRYPTION_KMS_IMPL_HASHICORP =
187+
"org.apache.iceberg.hashicorp.VaultKeyManagementClient";
185188
}

core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public static KeyManagementClient createKmsClient(Map<String, String> catalogPro
6363
CatalogProperties.ENCRYPTION_KMS_IMPL_AZURE;
6464
case CatalogProperties.ENCRYPTION_KMS_TYPE_GCP ->
6565
CatalogProperties.ENCRYPTION_KMS_IMPL_GCP;
66+
case CatalogProperties.ENCRYPTION_KMS_TYPE_HASHICORP ->
67+
CatalogProperties.ENCRYPTION_KMS_IMPL_HASHICORP;
6668
default -> throw new IllegalStateException("Unsupported KMS type: " + kmsType);
6769
};
6870
}

docs/docs/encryption.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Currently, encryption is supported in the Hive and REST catalogs for tables with
2828

2929
Two parameters are required to activate encryption of a table:
3030

31-
1. Catalog property that specifies the KMS ("key management service"). It can be either `encryption.kms-type` for pre-defined KMS clients (`aws`, `azure` or `gcp`) or `encryption.kms-impl` with the client class path for custom KMS clients.
31+
1. Catalog property that specifies the KMS ("key management service"). It can be either `encryption.kms-type` for pre-defined KMS clients (`aws`, `azure`, `gcp` or `hashicorp`) or `encryption.kms-impl` with the client class path for custom KMS clients.
3232
2. Table property `encryption.key-id`, that specifies the ID of a master key used to encrypt and decrypt the table. Master keys are stored and managed in the KMS.
3333

3434
For more details on table encryption, see the "Appendix: Internals Overview" [subsection](#appendix-internals-overview).

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,5 +229,6 @@ sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
229229
testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" }
230230
testcontainers-junit-jupiter = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" }
231231
testcontainers-minio = { module = "org.testcontainers:testcontainers-minio", version.ref = "testcontainers" }
232+
testcontainers-vault = { module = "org.testcontainers:testcontainers-vault", version.ref = "testcontainers" }
232233
tez08-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez08" }
233234
tez08-mapreduce = { module = "org.apache.tez:tez-mapreduce", version.ref = "tez08" }
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.hashicorp;
20+
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.node.ObjectNode;
24+
import java.io.Closeable;
25+
import java.io.IOException;
26+
import java.io.UncheckedIOException;
27+
import java.util.OptionalLong;
28+
import org.apache.hc.client5.http.classic.methods.HttpPost;
29+
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
30+
import org.apache.hc.client5.http.impl.classic.HttpClients;
31+
import org.apache.hc.client5.http.protocol.HttpClientContext;
32+
import org.apache.hc.core5.http.ClassicHttpResponse;
33+
import org.apache.hc.core5.http.ContentType;
34+
import org.apache.hc.core5.http.ParseException;
35+
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
36+
import org.apache.hc.core5.http.io.entity.EntityUtils;
37+
import org.apache.hc.core5.http.io.entity.StringEntity;
38+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
39+
40+
/**
41+
* HTTP client for interacting with HashiCorp Vault REST API.
42+
*
43+
* @see <a href=https://developer.hashicorp.com/vault/api-docs>HashiCorp Vault HTTP API</a>
44+
*/
45+
class VaultClient implements Closeable {
46+
private static final ObjectMapper MAPPER = new ObjectMapper();
47+
private static final String VAULT_TOKEN_HEADER = "X-Vault-Token";
48+
49+
private final String address;
50+
private final String transitMount;
51+
private final String appRolePath;
52+
53+
private transient volatile CloseableHttpClient httpClient;
54+
55+
VaultClient(String address, String transitMount, String appRolePath) {
56+
this.address = address;
57+
this.transitMount = transitMount;
58+
this.appRolePath = appRolePath;
59+
}
60+
61+
AuthResult authenticate(String roleId, String secretId) {
62+
ObjectNode requestBody = MAPPER.createObjectNode();
63+
requestBody.put("role_id", roleId);
64+
requestBody.put("secret_id", secretId);
65+
66+
JsonNode response = post("/v1/auth/" + appRolePath + "/login", null, requestBody);
67+
JsonNode authNode = response.get("auth");
68+
if (authNode == null) {
69+
throw new RuntimeException("Failed to authenticate: no auth section in response");
70+
}
71+
72+
String clientToken = authNode.get("client_token").asText();
73+
OptionalLong leaseDuration =
74+
authNode.has("lease_duration")
75+
? OptionalLong.of(authNode.get("lease_duration").asLong())
76+
: OptionalLong.empty();
77+
return new AuthResult(clientToken, leaseDuration);
78+
}
79+
80+
String encrypt(String vaultToken, String wrappingKeyId, String plaintext) {
81+
ObjectNode requestBody = MAPPER.createObjectNode();
82+
requestBody.put("plaintext", plaintext);
83+
84+
JsonNode response =
85+
post("/v1/" + transitMount + "/encrypt/" + wrappingKeyId, vaultToken, requestBody);
86+
87+
JsonNode dataNode = response.get("data");
88+
if (dataNode == null || !dataNode.has("ciphertext")) {
89+
throw new RuntimeException("Failed to wrap key: no ciphertext returned");
90+
}
91+
92+
return dataNode.get("ciphertext").asText();
93+
}
94+
95+
String decrypt(String vaultToken, String wrappingKeyId, String ciphertext) {
96+
ObjectNode requestBody = MAPPER.createObjectNode();
97+
requestBody.put("ciphertext", ciphertext);
98+
99+
JsonNode response =
100+
post("/v1/" + transitMount + "/decrypt/" + wrappingKeyId, vaultToken, requestBody);
101+
102+
JsonNode dataNode = response.get("data");
103+
if (dataNode == null || !dataNode.has("plaintext")) {
104+
throw new RuntimeException("Failed to unwrap key: no plaintext returned");
105+
}
106+
107+
return dataNode.get("plaintext").asText();
108+
}
109+
110+
DataKey generateKey(String vaultToken, String wrappingKeyId) {
111+
ObjectNode requestBody = MAPPER.createObjectNode();
112+
113+
JsonNode response =
114+
post(
115+
"/v1/" + transitMount + "/datakey/plaintext/" + wrappingKeyId, vaultToken, requestBody);
116+
117+
JsonNode dataNode = response.get("data");
118+
if (dataNode == null || !dataNode.has("plaintext") || !dataNode.has("ciphertext")) {
119+
throw new RuntimeException("Failed to generate key: missing plaintext or ciphertext");
120+
}
121+
122+
String plaintext = dataNode.get("plaintext").asText();
123+
String ciphertext = dataNode.get("ciphertext").asText();
124+
return new DataKey(plaintext, ciphertext);
125+
}
126+
127+
private JsonNode post(String path, String token, ObjectNode requestBody) {
128+
HttpPost request = new HttpPost(address + path);
129+
if (token != null) {
130+
request.setHeader(VAULT_TOKEN_HEADER, token);
131+
}
132+
133+
try {
134+
request.setEntity(
135+
new StringEntity(MAPPER.writeValueAsString(requestBody), ContentType.APPLICATION_JSON));
136+
} catch (IOException e) {
137+
throw new UncheckedIOException("Failed to serialize request body", e);
138+
}
139+
140+
try {
141+
return httpClient().execute(request, HttpClientContext.create(), new VaultResponseHandler());
142+
} catch (IOException e) {
143+
throw new UncheckedIOException("Failed to execute Vault request to " + path, e);
144+
}
145+
}
146+
147+
private static class VaultResponseHandler implements HttpClientResponseHandler<JsonNode> {
148+
@Override
149+
public JsonNode handleResponse(ClassicHttpResponse response) {
150+
int statusCode = response.getCode();
151+
Preconditions.checkState(statusCode == 200, "Status must be 200: %d", statusCode);
152+
153+
try {
154+
String responseBody = EntityUtils.toString(response.getEntity());
155+
return MAPPER.readTree(responseBody);
156+
} catch (ParseException e) {
157+
throw new RuntimeException("Failed to parse Vault error response", e);
158+
} catch (IOException e) {
159+
throw new UncheckedIOException("Failed to read response", e);
160+
}
161+
}
162+
}
163+
164+
private CloseableHttpClient httpClient() {
165+
if (httpClient == null) {
166+
synchronized (this) {
167+
if (httpClient == null) {
168+
httpClient = HttpClients.createDefault();
169+
}
170+
}
171+
}
172+
return httpClient;
173+
}
174+
175+
@Override
176+
public void close() {
177+
if (httpClient != null) {
178+
try {
179+
httpClient.close();
180+
} catch (IOException e) {
181+
throw new UncheckedIOException("Failed to close HTTP client", e);
182+
}
183+
}
184+
}
185+
186+
static class AuthResult {
187+
private final String clientToken;
188+
private final OptionalLong leaseDuration;
189+
190+
AuthResult(String clientToken, OptionalLong leaseDuration) {
191+
this.clientToken = clientToken;
192+
this.leaseDuration = leaseDuration;
193+
}
194+
195+
public String clientToken() {
196+
return clientToken;
197+
}
198+
199+
public OptionalLong leaseDuration() {
200+
return leaseDuration;
201+
}
202+
}
203+
204+
static class DataKey {
205+
private final String plaintext;
206+
private final String ciphertext;
207+
208+
DataKey(String plaintext, String ciphertext) {
209+
this.plaintext = plaintext;
210+
this.ciphertext = ciphertext;
211+
}
212+
213+
public String plaintext() {
214+
return plaintext;
215+
}
216+
217+
public String ciphertext() {
218+
return ciphertext;
219+
}
220+
}
221+
}

0 commit comments

Comments
 (0)