From 085040c7f3ba2c8d0843718a1a7913ec7244a1b0 Mon Sep 17 00:00:00 2001 From: Dokyung Lee <112409928+zipdoki@users.noreply.github.com> Date: Sun, 7 Jun 2026 20:38:45 +0900 Subject: [PATCH 1/2] Add prune API skeleton --- .../core/edge/payload/EdgePrunePayload.kt | 38 +++++++++++++++++++ .../engine/service/MutationService.kt | 10 +++++ .../api/graph/v3/EdgeMutationController.kt | 12 ++++++ 3 files changed, 60 insertions(+) create mode 100644 core/src/main/kotlin/com/kakao/actionbase/core/edge/payload/EdgePrunePayload.kt diff --git a/core/src/main/kotlin/com/kakao/actionbase/core/edge/payload/EdgePrunePayload.kt b/core/src/main/kotlin/com/kakao/actionbase/core/edge/payload/EdgePrunePayload.kt new file mode 100644 index 00000000..9bb4ffd7 --- /dev/null +++ b/core/src/main/kotlin/com/kakao/actionbase/core/edge/payload/EdgePrunePayload.kt @@ -0,0 +1,38 @@ +package com.kakao.actionbase.core.edge.payload + +import com.kakao.actionbase.core.metadata.common.Direction + +enum class PruneType { + CACHE, +} + +enum class PruneStatus { + PRUNED, + SKIPPED, +} + +data class EdgePruneRequest( + val type: PruneType, + val targets: List, +) + +data class PruneTarget( + val start: Any, + val direction: Direction, +) + +/** + * Prune response body — one [PruneResult] row per structure matched by a target. + */ +data class DataFramePrunePayload( + val results: List, +) + +/** One prune result row, emitted per structure ([name]) matched by a [PruneTarget]. */ +data class PruneResult( + val start: Any, + val direction: Direction, + val type: PruneType, + val name: String, + val status: PruneStatus, +) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt index c111c45d..65343ec0 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt @@ -4,6 +4,9 @@ import com.kakao.actionbase.core.edge.MutationEvent import com.kakao.actionbase.core.edge.MutationKey import com.kakao.actionbase.core.edge.UnresolvedEvent import com.kakao.actionbase.core.edge.payload.MutationResult +import com.kakao.actionbase.core.edge.payload.PruneResult +import com.kakao.actionbase.core.edge.payload.PruneTarget +import com.kakao.actionbase.core.edge.payload.PruneType import com.kakao.actionbase.core.state.EventType import com.kakao.actionbase.core.state.transit import com.kakao.actionbase.engine.Audit @@ -76,6 +79,13 @@ class MutationService( .runEvenIfCancelled() } + fun prune( + database: String, + table: String, + type: PruneType, + targets: List, + ): Mono> = Mono.just(emptyList()) + /** * For `system=ASYNC + label=SYNC` (no force), return the SYNC-shaped status derived from * the highest-version event's EventType so clients keep their contract; mutations are diff --git a/server/src/main/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeMutationController.kt b/server/src/main/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeMutationController.kt index db47890d..f0b35ae5 100644 --- a/server/src/main/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeMutationController.kt +++ b/server/src/main/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeMutationController.kt @@ -1,7 +1,9 @@ package com.kakao.actionbase.server.api.graph.v3 +import com.kakao.actionbase.core.edge.payload.DataFramePrunePayload import com.kakao.actionbase.core.edge.payload.EdgeBulkMutationRequest import com.kakao.actionbase.core.edge.payload.EdgeMutationResponse +import com.kakao.actionbase.core.edge.payload.EdgePruneRequest import com.kakao.actionbase.engine.context.RequestContext import com.kakao.actionbase.engine.metadata.MutationMode import com.kakao.actionbase.engine.service.MutationService @@ -43,4 +45,14 @@ class EdgeMutationController( mutationService .mutate(database, table, request.mutations, lock, syncMode = MutationMode.SYNC, forceSyncMode = force, requestContext = requestContext) .map { ResponseEntity.ok(EdgeMutationResponse.from(it)) } + + @PostMapping("/graph/v3/databases/{database}/tables/{table}/edges/prune") + fun prune( + @PathVariable database: String, + @PathVariable table: String, + @RequestBody request: EdgePruneRequest, + ): Mono> = + mutationService + .prune(database, table, request.type, request.targets) + .map { results -> ResponseEntity.ok(DataFramePrunePayload(results)) } } From 7c8679d923cf436e52e012e3487f93a2f5150271 Mon Sep 17 00:00:00 2001 From: Dokyung Lee <112409928+zipdoki@users.noreply.github.com> Date: Sun, 7 Jun 2026 21:56:35 +0900 Subject: [PATCH 2/2] Add E2E and Spec tests for cache prune --- .../v2/engine/v3/EdgeCachePruneSpec.kt | 306 ++++++++++++++++++ .../api/graph/v3/EdgeCachePruneE2ETest.kt | 165 ++++++++++ 2 files changed, 471 insertions(+) create mode 100644 engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/EdgeCachePruneSpec.kt create mode 100644 server/src/test/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeCachePruneE2ETest.kt diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/EdgeCachePruneSpec.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/EdgeCachePruneSpec.kt new file mode 100644 index 00000000..7bab3869 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/EdgeCachePruneSpec.kt @@ -0,0 +1,306 @@ +package com.kakao.actionbase.v2.engine.v3 + +import com.kakao.actionbase.core.metadata.common.Direction as CoreDirection + +import com.kakao.actionbase.core.edge.payload.EdgeBulkMutationRequest +import com.kakao.actionbase.core.edge.payload.PruneStatus +import com.kakao.actionbase.core.edge.payload.PruneTarget +import com.kakao.actionbase.core.edge.payload.PruneType +import com.kakao.actionbase.core.java.codec.common.hbase.Order +import com.kakao.actionbase.core.metadata.common.Cache +import com.kakao.actionbase.core.metadata.common.CacheField +import com.kakao.actionbase.engine.service.MutationService +import com.kakao.actionbase.engine.service.QueryService +import com.kakao.actionbase.v2.core.metadata.Direction +import com.kakao.actionbase.v2.core.metadata.DirectionType +import com.kakao.actionbase.v2.core.metadata.LabelType +import com.kakao.actionbase.v2.engine.Graph +import com.kakao.actionbase.v2.engine.entity.EntityName +import com.kakao.actionbase.v2.engine.service.ddl.LabelCreateRequest +import com.kakao.actionbase.v2.engine.test.GraphFixtures + +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue + +import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.shouldBe +import reactor.kotlin.test.test + +class EdgeCachePruneSpec : + StringSpec({ + + lateinit var graph: Graph + lateinit var mutationService: MutationService + lateinit var queryService: QueryService + + beforeTest { + graph = GraphFixtures.create() + val engine = V2BackedEngine(graph) + mutationService = MutationService(engine) + queryService = QueryService(engine) + } + + afterTest { + graph.close() + } + + fun createTable( + database: String, + table: String, + caches: List, + ) { + graph.labelDdl + .create( + EntityName(database, table), + LabelCreateRequest( + desc = "cache prune test", + type = LabelType.INDEXED, + schema = GraphFixtures.sampleSchema, + dirType = DirectionType.BOTH, + storage = GraphFixtures.datastoreStorage, + indices = GraphFixtures.sampleIndices, + caches = caches, + ), + ).test() + .assertNext { it.status.name shouldBe "CREATED" } + .verifyComplete() + } + + fun insert( + database: String, + table: String, + json: String, + ) { + mutationService + .mutate(database, table, mapper.readValue(json).mutations) + .test() + .assertNext { } + .verifyComplete() + } + + /** + * Wiring only: the mocked [MutationService.prune] returns the empty contract shape. + * Becomes a real no-cache assertion once the service is wired to the prune data plane. + */ + "prune on a table without caches returns no results" { + val database = GraphFixtures.serviceName + val table = "prune_no_cache_test" + createTable(database, table, caches = emptyList()) + + mutationService + .prune(database, table, PruneType.CACHE, listOf(PruneTarget("1000", CoreDirection.OUT))) + .test() + .assertNext { results -> results.size shouldBe 0 } + .verifyComplete() + } + + /** + * EdgeCache (source=1000, direction=OUT, cache=created_at_desc) + * `limit=2, tolerance=1` + * + * Before: + * | row key | qualifier (DESC) | value | + * |----------------------|------------------|-----------------------------------------| + * | hash|1000|T|-6|OUT|C | ~500 | 2004 | version=1, permission=na, createdAt=500 | + * | | ~400 | 2003 | version=1, permission=na, createdAt=400 | ← limit + * | | ~300 | 2002 | version=1, permission=na, createdAt=300 | ← tolerance + * | | ~200 | 2001 | version=1, permission=na, createdAt=200 | ← over-limit + * | | ~100 | 2000 | version=1, permission=na, createdAt=100 | ← over-limit + * + * After: + * | row key | qualifier (DESC) | value | + * |----------------------|------------------|-----------------------------------------| + * | hash|1000|T|-6|OUT|C | ~500 | 2004 | version=1, permission=na, createdAt=500 | + * | | ~400 | 2003 | version=1, permission=na, createdAt=400 | + * | | ~300 | 2002 | version=1, permission=na, createdAt=300 | + * + * Expected: status=PRUNED, seek OUT → [2004, 2003, 2002] + */ + "prune evicts entries beyond limit + tolerance".config(enabled = false) { + val database = GraphFixtures.serviceName + val table = "prune_evict_test" + val cacheName = "created_at_desc" + createTable( + database, + table, + caches = listOf(Cache(cacheName, listOf(CacheField("createdAt", Order.DESC)), limit = 2, tolerance = 1)), + ) + + insert( + database, + table, + """ + { + "mutations": [ + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2000", "properties": {"permission": "na", "createdAt": 100}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2001", "properties": {"permission": "na", "createdAt": 200}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2002", "properties": {"permission": "na", "createdAt": 300}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2003", "properties": {"permission": "na", "createdAt": 400}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2004", "properties": {"permission": "na", "createdAt": 500}}} + ] + } + """.trimIndent(), + ) + + mutationService + .prune(database, table, PruneType.CACHE, listOf(PruneTarget("1000", CoreDirection.OUT))) + .test() + .assertNext { results -> + results.size shouldBe 1 + results.first().type shouldBe PruneType.CACHE + results.first().name shouldBe cacheName + results.first().status shouldBe PruneStatus.PRUNED + }.verifyComplete() + + // Retained top-3 by createdAt DESC: 500, 400, 300 + queryService + .seek(database, table, cacheName, listOf("1000"), Direction.OUT, 10) + .test() + .assertNext { payload -> + payload.count shouldBe 3 + payload.edges.map { it.target } shouldBe listOf(2004L, 2003L, 2002L) + }.verifyComplete() + } + + /** + * EdgeCache (source=1000, direction=OUT, cache=created_at_desc) — limit=2, tolerance=1 + * Exactly `limit + tolerance` entries (3) → nothing beyond the bound → SKIPPED, untouched. + * + * Before & after (identical — nothing exceeds): + * | row key | qualifier (DESC) | value | + * |----------------------|------------------|-----------------------------------------| + * | hash|1000|T|-6|OUT|C | ~300 | 2002 | version=1, permission=na, createdAt=300 | + * | | ~200 | 2001 | version=1, permission=na, createdAt=200 | + * | | ~100 | 2000 | version=1, permission=na, createdAt=100 | + * + * Expected: status=SKIPPED, seek OUT → 3 edges + */ + "prune is SKIPPED when within limit + tolerance".config(enabled = false) { + val database = GraphFixtures.serviceName + val table = "prune_skip_test" + val cacheName = "created_at_desc" + createTable( + database, + table, + caches = listOf(Cache(cacheName, listOf(CacheField("createdAt", Order.DESC)), limit = 2, tolerance = 1)), + ) + + insert( + database, + table, + """ + { + "mutations": [ + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2000", "properties": {"permission": "na", "createdAt": 100}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2001", "properties": {"permission": "na", "createdAt": 200}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2002", "properties": {"permission": "na", "createdAt": 300}}} + ] + } + """.trimIndent(), + ) + + mutationService + .prune(database, table, PruneType.CACHE, listOf(PruneTarget("1000", CoreDirection.OUT))) + .test() + .assertNext { results -> + results.size shouldBe 1 + results.first().status shouldBe PruneStatus.SKIPPED + }.verifyComplete() + + queryService + .seek(database, table, cacheName, listOf("1000"), Direction.OUT, 10) + .test() + .assertNext { payload -> payload.count shouldBe 3 } + .verifyComplete() + } + + /** + * Dimension fields define independent top-N buckets, so `limit` applies per bucket. + * (untouched). Status is PRUNED because at least one bucket was trimmed. + * + * EdgeCache (source=1000, direction=OUT, cache=permission_created_at_desc) + * `limit=1, tolerance=1` + * + * Before: + * | row key | qualifier (perm ASC, createdAt DESC) | value | + * |----------------------|--------------------------------------|---------------| + * | hash|1000|T|-6|OUT|C | me | ~300 | 2002 | createdAt=300 | + * | | me | ~200 | 2001 | createdAt=200 | + * | | me | ~100 | 2000 | createdAt=100 | ← over-limit in "me", pruned + * | | others | ~200 | 2004 | createdAt=200 | + * | | others | ~100 | 2003 | createdAt=100 | + * + * After ("me" trimmed to top 2; "others" within bound, untouched): + * | row key | qualifier (perm ASC, createdAt DESC) | value | + * |----------------------|--------------------------------------|---------------| + * | hash|1000|T|-6|OUT|C | me | ~300 | 2002 | createdAt=300 | + * | | me | ~200 | 2001 | createdAt=200 | + * | | others | ~200 | 2004 | createdAt=200 | + * | | others | ~100 | 2003 | createdAt=100 | + * + * Expected: status=PRUNED, seek (permission=me) → [2002, 2001]; seek (permission=others) → 2 edges + */ + "prune enforces limit per dimension bucket independently".config(enabled = false) { + val database = GraphFixtures.serviceName + val table = "prune_dimension_test" + val cacheName = "permission_created_at_desc" + createTable( + database, + table, + caches = + listOf( + Cache( + cacheName, + listOf( + CacheField("permission", Order.ASC, dimension = setOf("me", "others")), + CacheField("createdAt", Order.DESC), + ), + limit = 1, + tolerance = 1, + ), + ), + ) + + insert( + database, + table, + """ + { + "mutations": [ + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2000", "properties": {"permission": "me", "createdAt": 100}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2001", "properties": {"permission": "me", "createdAt": 200}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2002", "properties": {"permission": "me", "createdAt": 300}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2003", "properties": {"permission": "others", "createdAt": 100}}}, + {"type": "INSERT", "edge": {"version": 1, "source": "1000", "target": "2004", "properties": {"permission": "others", "createdAt": 200}}} + ] + } + """.trimIndent(), + ) + + mutationService + .prune(database, table, PruneType.CACHE, listOf(PruneTarget("1000", CoreDirection.OUT))) + .test() + .assertNext { results -> results.first().status shouldBe PruneStatus.PRUNED } + .verifyComplete() + + // "me" bucket trimmed to top-2 (300, 200); the lowest (100) evicted. + queryService + .seek(database, table, cacheName, listOf("1000"), Direction.OUT, 10, ranges = "permission=me") + .test() + .assertNext { payload -> + payload.count shouldBe 2 + payload.edges.map { it.target } shouldBe listOf(2002L, 2001L) + }.verifyComplete() + + // "others" bucket within bound — untouched. + queryService + .seek(database, table, cacheName, listOf("1000"), Direction.OUT, 10, ranges = "permission=others") + .test() + .assertNext { payload -> payload.count shouldBe 2 } + .verifyComplete() + } + }) { + companion object { + val mapper = jacksonObjectMapper() + } +} diff --git a/server/src/test/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeCachePruneE2ETest.kt b/server/src/test/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeCachePruneE2ETest.kt new file mode 100644 index 00000000..bffb95d5 --- /dev/null +++ b/server/src/test/kotlin/com/kakao/actionbase/server/api/graph/v3/EdgeCachePruneE2ETest.kt @@ -0,0 +1,165 @@ +package com.kakao.actionbase.server.api.graph.v3 + +import com.kakao.actionbase.server.test.E2ETestBase + +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.springframework.http.MediaType + +/** + * E2E coverage for the prune endpoint `POST .../edges/prune`. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class EdgeCachePruneE2ETest : E2ETestBase() { + private val db = "prune-test-db" + private val edgeTable = "wishlist" + + @BeforeAll + fun setup() { + client + .post() + .uri("/graph/v3/databases") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue("""{"database": "$db", "comment": "prune test db"}""") + .exchange() + .expectStatus() + .isOk + + // Small limit/tolerance so the disabled round-trip can exercise eviction (retain 3). + client + .post() + .uri("/graph/v3/databases/$db/tables") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue( + """ + { + "table": "$edgeTable", + "schema": { + "type": "EDGE", + "source": {"type": "long", "comment": "src"}, + "target": {"type": "long", "comment": "tgt"}, + "properties": [ + {"name": "createdAt", "type": "long", "comment": "ts", "nullable": true} + ], + "direction": "BOTH", + "indexes": [], + "groups": [], + "caches": [ + { + "cache": "recent_wishlist", + "fields": [{"field": "createdAt", "order": "DESC"}], + "limit": 2, + "tolerance": 1 + } + ] + }, + "storage": "datastore://test_namespace/wishlist", + "mode": "SYNC", + "comment": "edge with cache" + } + """.trimIndent(), + ).exchange() + .expectStatus() + .isOk + + client + .post() + .uri("/graph/v3/databases/$db/tables/$edgeTable/edges") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue( + """ + { + "mutations": [ + {"type": "INSERT", "edge": {"version": 1, "source": 1000, "target": 2000, "properties": {"createdAt": 100}}}, + {"type": "INSERT", "edge": {"version": 1, "source": 1000, "target": 2001, "properties": {"createdAt": 200}}}, + {"type": "INSERT", "edge": {"version": 1, "source": 1000, "target": 2002, "properties": {"createdAt": 300}}}, + {"type": "INSERT", "edge": {"version": 1, "source": 1000, "target": 2003, "properties": {"createdAt": 400}}}, + {"type": "INSERT", "edge": {"version": 1, "source": 1000, "target": 2004, "properties": {"createdAt": 500}}} + ] + } + """.trimIndent(), + ).exchange() + .expectStatus() + .isOk + } + + /** Valid request routes end-to-end and returns the contract-shaped body (mocked: empty). */ + @Test + fun `prune routes and returns results envelope`() { + client + .post() + .uri("/graph/v3/databases/$db/tables/$edgeTable/edges/prune") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue("""{"type": "CACHE", "targets": [{"start": 1000, "direction": "OUT"}]}""") + .exchange() + .expectStatus() + .isOk + .expectBody() + .jsonPath("$.results") + .isArray + } + + /** + * Acceptance test for the prune data plane (enable with the implementation PR). + * + * + * EdgeCache (source=1000, direction=OUT, cache=recent_wishlist) + * recent_wishlist: limit=2, tolerance=1 + * + * Before: + * | row key | qualifier (DESC) | value | + * |----------------------|------------------|--------------------------| + * | hash|1000|T|-6|OUT|C | ~500 | 2004 | version=1, createdAt=500 | + * | | ~400 | 2003 | version=1, createdAt=400 | ← limit + * | | ~300 | 2002 | version=1, createdAt=300 | ← tolerance + * | | ~200 | 2001 | version=1, createdAt=200 | ← over-limit + * | | ~100 | 2000 | version=1, createdAt=100 | ← over-limit + * + * After: + * | row key | qualifier (DESC) | value | + * |----------------------|------------------|--------------------------| + * | hash|1000|T|-6|OUT|C | ~500 | 2004 | version=1, createdAt=500 | + * | | ~400 | 2003 | version=1, createdAt=400 | + * | | ~300 | 2002 | version=1, createdAt=300 | + * + * Expected: results=[PRUNED], seek OUT → [2004, 2003, 2002] + */ + @Test + @Disabled("enable when EdgeCache prune data plane is implemented") + fun `prune evicts entries beyond limit + tolerance`() { + client + .post() + .uri("/graph/v3/databases/$db/tables/$edgeTable/edges/prune") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue("""{"type": "CACHE", "targets": [{"start": 1000, "direction": "OUT"}]}""") + .exchange() + .expectStatus() + .isOk + .expectBody() + .jsonPath("$.results.length()") + .isEqualTo(1) + .jsonPath("$.results[0].type") + .isEqualTo("CACHE") + .jsonPath("$.results[0].name") + .isEqualTo("recent_wishlist") + .jsonPath("$.results[0].status") + .isEqualTo("PRUNED") + + // Retained top-3 by createdAt DESC: 2004(500), 2003(400), 2002(300). + client + .get() + .uri("/graph/v3/databases/$db/tables/$edgeTable/edges/seek/recent_wishlist?start=1000&direction=OUT&limit=10") + .exchange() + .expectStatus() + .isOk + .expectBody() + .jsonPath("$.count") + .isEqualTo(3) + .jsonPath("$.edges[0].target") + .isEqualTo(2004) + .jsonPath("$.edges[2].target") + .isEqualTo(2002) + } +}