diff --git a/migrations/tenant/0059-search-functions-user-metadata.sql b/migrations/tenant/0059-search-functions-user-metadata.sql new file mode 100644 index 000000000..4549b5acf --- /dev/null +++ b/migrations/tenant/0059-search-functions-user-metadata.sql @@ -0,0 +1,730 @@ +-- Add user_metadata to all search/list SQL functions +-- Each function's RETURNS TABLE gains a user_metadata jsonb column, +-- and the query bodies include user_metadata in selects. +-- Folders return NULL for user_metadata. +-- +-- DROP is required because CREATE OR REPLACE cannot change return types. + +-- Drop old function signatures (return type is changing) +DO $$ BEGIN DROP FUNCTION IF EXISTS storage.list_objects_with_delimiter(text,text,text,integer,text,text,text); EXCEPTION WHEN OTHERS THEN NULL; END; $$; +DO $$ BEGIN DROP FUNCTION IF EXISTS storage.search(text,text,integer,integer,integer,text,text,text); EXCEPTION WHEN OTHERS THEN NULL; END; $$; +DO $$ BEGIN DROP FUNCTION IF EXISTS storage.search_v2(text,text,integer,integer,text,text,text,text); EXCEPTION WHEN OTHERS THEN NULL; END; $$; +DO $$ BEGIN DROP FUNCTION IF EXISTS storage.search_by_timestamp(text,text,integer,integer,text,text,text,text); EXCEPTION WHEN OTHERS THEN NULL; END; $$; + +-- ============================================================================ +-- list_objects_with_delimiter: Add user_metadata support +-- ============================================================================ +CREATE OR REPLACE FUNCTION storage.list_objects_with_delimiter( + _bucket_id text, + prefix_param text, + delimiter_param text, + max_keys integer DEFAULT 100, + start_after text DEFAULT '', + next_token text DEFAULT '', + sort_order text DEFAULT 'asc' +) +RETURNS TABLE ( + name text, + id uuid, + metadata jsonb, + updated_at timestamptz, + created_at timestamptz, + last_accessed_at timestamptz, + user_metadata jsonb +) +SECURITY INVOKER +LANGUAGE plpgsql STABLE +AS $func$ +DECLARE + v_peek_name TEXT; + v_current RECORD; + v_common_prefix TEXT; + + -- Configuration + v_is_asc BOOLEAN; + v_prefix TEXT; + v_start TEXT; + v_upper_bound TEXT; + v_file_batch_size INT; + + -- Seek state + v_next_seek TEXT; + v_count INT := 0; + + -- Dynamic SQL for batch query only + v_batch_query TEXT; + +BEGIN + -- ======================================================================== + -- INITIALIZATION + -- ======================================================================== + v_is_asc := lower(coalesce(sort_order, 'asc')) = 'asc'; + v_prefix := coalesce(prefix_param, ''); + v_start := CASE WHEN coalesce(next_token, '') <> '' THEN next_token ELSE coalesce(start_after, '') END; + v_file_batch_size := LEAST(GREATEST(max_keys * 2, 100), 1000); + + -- Calculate upper bound for prefix filtering (bytewise, using COLLATE "C") + IF v_prefix = '' THEN + v_upper_bound := NULL; + ELSIF right(v_prefix, 1) = delimiter_param THEN + v_upper_bound := left(v_prefix, -1) || chr(ascii(delimiter_param) + 1); + ELSE + v_upper_bound := left(v_prefix, -1) || chr(ascii(right(v_prefix, 1)) + 1); + END IF; + + -- Build batch query (dynamic SQL - called infrequently, amortized over many rows) + IF v_is_asc THEN + IF v_upper_bound IS NOT NULL THEN + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" >= $2 ' || + 'AND o.name COLLATE "C" < $3 ORDER BY o.name COLLATE "C" ASC LIMIT $4'; + ELSE + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" >= $2 ' || + 'ORDER BY o.name COLLATE "C" ASC LIMIT $4'; + END IF; + ELSE + IF v_upper_bound IS NOT NULL THEN + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" < $2 ' || + 'AND o.name COLLATE "C" >= $3 ORDER BY o.name COLLATE "C" DESC LIMIT $4'; + ELSE + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" < $2 ' || + 'ORDER BY o.name COLLATE "C" DESC LIMIT $4'; + END IF; + END IF; + + -- ======================================================================== + -- SEEK INITIALIZATION: Determine starting position + -- ======================================================================== + IF v_start = '' THEN + IF v_is_asc THEN + v_next_seek := v_prefix; + ELSE + -- DESC without cursor: find the last item in range + IF v_upper_bound IS NOT NULL THEN + SELECT o.name INTO v_next_seek FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_prefix AND o.name COLLATE "C" < v_upper_bound + ORDER BY o.name COLLATE "C" DESC LIMIT 1; + ELSIF v_prefix <> '' THEN + SELECT o.name INTO v_next_seek FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_prefix + ORDER BY o.name COLLATE "C" DESC LIMIT 1; + ELSE + SELECT o.name INTO v_next_seek FROM storage.objects o + WHERE o.bucket_id = _bucket_id + ORDER BY o.name COLLATE "C" DESC LIMIT 1; + END IF; + + IF v_next_seek IS NOT NULL THEN + v_next_seek := v_next_seek || delimiter_param; + ELSE + RETURN; + END IF; + END IF; + ELSE + -- Cursor provided: determine if it refers to a folder or leaf + IF EXISTS ( + SELECT 1 FROM storage.objects o + WHERE o.bucket_id = _bucket_id + AND o.name COLLATE "C" LIKE v_start || delimiter_param || '%' + LIMIT 1 + ) THEN + -- Cursor refers to a folder + IF v_is_asc THEN + v_next_seek := v_start || chr(ascii(delimiter_param) + 1); + ELSE + v_next_seek := v_start || delimiter_param; + END IF; + ELSE + -- Cursor refers to a leaf object + IF v_is_asc THEN + v_next_seek := v_start || delimiter_param; + ELSE + v_next_seek := v_start; + END IF; + END IF; + END IF; + + -- ======================================================================== + -- MAIN LOOP: Hybrid peek-then-batch algorithm + -- Uses STATIC SQL for peek (hot path) and DYNAMIC SQL for batch + -- ======================================================================== + LOOP + EXIT WHEN v_count >= max_keys; + + -- STEP 1: PEEK using STATIC SQL (plan cached, very fast) + IF v_is_asc THEN + IF v_upper_bound IS NOT NULL THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_next_seek AND o.name COLLATE "C" < v_upper_bound + ORDER BY o.name COLLATE "C" ASC LIMIT 1; + ELSE + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_next_seek + ORDER BY o.name COLLATE "C" ASC LIMIT 1; + END IF; + ELSE + IF v_upper_bound IS NOT NULL THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" < v_next_seek AND o.name COLLATE "C" >= v_prefix + ORDER BY o.name COLLATE "C" DESC LIMIT 1; + ELSIF v_prefix <> '' THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" < v_next_seek AND o.name COLLATE "C" >= v_prefix + ORDER BY o.name COLLATE "C" DESC LIMIT 1; + ELSE + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" < v_next_seek + ORDER BY o.name COLLATE "C" DESC LIMIT 1; + END IF; + END IF; + + EXIT WHEN v_peek_name IS NULL; + + -- STEP 2: Check if this is a FOLDER or FILE + v_common_prefix := storage.get_common_prefix(v_peek_name, v_prefix, delimiter_param); + + IF v_common_prefix IS NOT NULL THEN + -- FOLDER: Emit and skip to next folder (no heap access needed) + name := rtrim(v_common_prefix, delimiter_param); + id := NULL; + updated_at := NULL; + created_at := NULL; + last_accessed_at := NULL; + metadata := NULL; + user_metadata := NULL; + RETURN NEXT; + v_count := v_count + 1; + + -- Advance seek past the folder range + IF v_is_asc THEN + v_next_seek := left(v_common_prefix, -1) || chr(ascii(delimiter_param) + 1); + ELSE + v_next_seek := v_common_prefix; + END IF; + ELSE + -- FILE: Batch fetch using DYNAMIC SQL (overhead amortized over many rows) + -- For ASC: upper_bound is the exclusive upper limit (< condition) + -- For DESC: prefix is the inclusive lower limit (>= condition) + FOR v_current IN EXECUTE v_batch_query USING _bucket_id, v_next_seek, + CASE WHEN v_is_asc THEN COALESCE(v_upper_bound, v_prefix) ELSE v_prefix END, v_file_batch_size + LOOP + v_common_prefix := storage.get_common_prefix(v_current.name, v_prefix, delimiter_param); + + IF v_common_prefix IS NOT NULL THEN + -- Hit a folder: exit batch, let peek handle it + v_next_seek := v_current.name; + EXIT; + END IF; + + -- Emit file + name := v_current.name; + id := v_current.id; + updated_at := v_current.updated_at; + created_at := v_current.created_at; + last_accessed_at := v_current.last_accessed_at; + metadata := v_current.metadata; + user_metadata := v_current.user_metadata; + RETURN NEXT; + v_count := v_count + 1; + + -- Advance seek past this file + IF v_is_asc THEN + v_next_seek := v_current.name || delimiter_param; + ELSE + v_next_seek := v_current.name; + END IF; + + EXIT WHEN v_count >= max_keys; + END LOOP; + END IF; + END LOOP; +END; +$func$; + + +-- ============================================================================ +-- search: Add user_metadata support +-- ============================================================================ +CREATE OR REPLACE FUNCTION storage.search( + prefix text, + bucketname text, + limits int DEFAULT 100, + levels int DEFAULT 1, + offsets int DEFAULT 0, + search text DEFAULT '', + sortcolumn text DEFAULT 'name', + sortorder text DEFAULT 'asc' +) +RETURNS TABLE ( + name text, + id uuid, + updated_at timestamptz, + created_at timestamptz, + last_accessed_at timestamptz, + metadata jsonb, + user_metadata jsonb +) +SECURITY INVOKER +LANGUAGE plpgsql STABLE +AS $func$ +DECLARE + v_peek_name TEXT; + v_current RECORD; + v_common_prefix TEXT; + v_delimiter CONSTANT TEXT := '/'; + + -- Configuration + v_limit INT; + v_prefix TEXT; + v_prefix_lower TEXT; + v_is_asc BOOLEAN; + v_order_by TEXT; + v_sort_order TEXT; + v_upper_bound TEXT; + v_file_batch_size INT; + + -- Dynamic SQL for batch query only + v_batch_query TEXT; + + -- Seek state + v_next_seek TEXT; + v_count INT := 0; + v_skipped INT := 0; +BEGIN + -- ======================================================================== + -- INITIALIZATION + -- ======================================================================== + v_limit := LEAST(coalesce(limits, 100), 1500); + v_prefix := coalesce(prefix, '') || coalesce(search, ''); + v_prefix_lower := lower(v_prefix); + v_is_asc := lower(coalesce(sortorder, 'asc')) = 'asc'; + v_file_batch_size := LEAST(GREATEST(v_limit * 2, 100), 1000); + + -- Validate sort column + CASE lower(coalesce(sortcolumn, 'name')) + WHEN 'name' THEN v_order_by := 'name'; + WHEN 'updated_at' THEN v_order_by := 'updated_at'; + WHEN 'created_at' THEN v_order_by := 'created_at'; + WHEN 'last_accessed_at' THEN v_order_by := 'last_accessed_at'; + ELSE v_order_by := 'name'; + END CASE; + + v_sort_order := CASE WHEN v_is_asc THEN 'asc' ELSE 'desc' END; + + -- ======================================================================== + -- NON-NAME SORTING: Use path_tokens approach (unchanged) + -- ======================================================================== + IF v_order_by != 'name' THEN + RETURN QUERY EXECUTE format( + $sql$ + WITH folders AS ( + SELECT path_tokens[$1] AS folder + FROM storage.objects + WHERE objects.name ILIKE $2 || '%%' + AND bucket_id = $3 + AND array_length(objects.path_tokens, 1) <> $1 + GROUP BY folder + ORDER BY folder %s + ) + (SELECT folder AS "name", + NULL::uuid AS id, + NULL::timestamptz AS updated_at, + NULL::timestamptz AS created_at, + NULL::timestamptz AS last_accessed_at, + NULL::jsonb AS metadata, + NULL::jsonb AS user_metadata FROM folders) + UNION ALL + (SELECT path_tokens[$1] AS "name", + id, updated_at, created_at, last_accessed_at, metadata, user_metadata + FROM storage.objects + WHERE objects.name ILIKE $2 || '%%' + AND bucket_id = $3 + AND array_length(objects.path_tokens, 1) = $1 + ORDER BY %I %s) + LIMIT $4 OFFSET $5 + $sql$, v_sort_order, v_order_by, v_sort_order + ) USING levels, v_prefix, bucketname, v_limit, offsets; + RETURN; + END IF; + + -- ======================================================================== + -- NAME SORTING: Hybrid skip-scan with batch optimization + -- ======================================================================== + + -- Calculate upper bound for prefix filtering + IF v_prefix_lower = '' THEN + v_upper_bound := NULL; + ELSIF right(v_prefix_lower, 1) = v_delimiter THEN + v_upper_bound := left(v_prefix_lower, -1) || chr(ascii(v_delimiter) + 1); + ELSE + v_upper_bound := left(v_prefix_lower, -1) || chr(ascii(right(v_prefix_lower, 1)) + 1); + END IF; + + -- Build batch query (dynamic SQL - called infrequently, amortized over many rows) + IF v_is_asc THEN + IF v_upper_bound IS NOT NULL THEN + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" >= $2 ' || + 'AND lower(o.name) COLLATE "C" < $3 ORDER BY lower(o.name) COLLATE "C" ASC LIMIT $4'; + ELSE + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" >= $2 ' || + 'ORDER BY lower(o.name) COLLATE "C" ASC LIMIT $4'; + END IF; + ELSE + IF v_upper_bound IS NOT NULL THEN + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" < $2 ' || + 'AND lower(o.name) COLLATE "C" >= $3 ORDER BY lower(o.name) COLLATE "C" DESC LIMIT $4'; + ELSE + v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata, o.user_metadata ' || + 'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" < $2 ' || + 'ORDER BY lower(o.name) COLLATE "C" DESC LIMIT $4'; + END IF; + END IF; + + -- Initialize seek position + IF v_is_asc THEN + v_next_seek := v_prefix_lower; + ELSE + -- DESC: find the last item in range first (static SQL) + IF v_upper_bound IS NOT NULL THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_prefix_lower AND lower(o.name) COLLATE "C" < v_upper_bound + ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1; + ELSIF v_prefix_lower <> '' THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_prefix_lower + ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1; + ELSE + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname + ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1; + END IF; + + IF v_peek_name IS NOT NULL THEN + v_next_seek := lower(v_peek_name) || v_delimiter; + ELSE + RETURN; + END IF; + END IF; + + -- ======================================================================== + -- MAIN LOOP: Hybrid peek-then-batch algorithm + -- Uses STATIC SQL for peek (hot path) and DYNAMIC SQL for batch + -- ======================================================================== + LOOP + EXIT WHEN v_count >= v_limit; + + -- STEP 1: PEEK using STATIC SQL (plan cached, very fast) + IF v_is_asc THEN + IF v_upper_bound IS NOT NULL THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_next_seek AND lower(o.name) COLLATE "C" < v_upper_bound + ORDER BY lower(o.name) COLLATE "C" ASC LIMIT 1; + ELSE + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_next_seek + ORDER BY lower(o.name) COLLATE "C" ASC LIMIT 1; + END IF; + ELSE + IF v_upper_bound IS NOT NULL THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" < v_next_seek AND lower(o.name) COLLATE "C" >= v_prefix_lower + ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1; + ELSIF v_prefix_lower <> '' THEN + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" < v_next_seek AND lower(o.name) COLLATE "C" >= v_prefix_lower + ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1; + ELSE + SELECT o.name INTO v_peek_name FROM storage.objects o + WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" < v_next_seek + ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1; + END IF; + END IF; + + EXIT WHEN v_peek_name IS NULL; + + -- STEP 2: Check if this is a FOLDER or FILE + v_common_prefix := storage.get_common_prefix(lower(v_peek_name), v_prefix_lower, v_delimiter); + + IF v_common_prefix IS NOT NULL THEN + -- FOLDER: Handle offset, emit if needed, skip to next folder + IF v_skipped < offsets THEN + v_skipped := v_skipped + 1; + ELSE + name := split_part(rtrim(storage.get_common_prefix(v_peek_name, v_prefix, v_delimiter), v_delimiter), v_delimiter, levels); + id := NULL; + updated_at := NULL; + created_at := NULL; + last_accessed_at := NULL; + metadata := NULL; + user_metadata := NULL; + RETURN NEXT; + v_count := v_count + 1; + END IF; + + -- Advance seek past the folder range + IF v_is_asc THEN + v_next_seek := lower(left(v_common_prefix, -1)) || chr(ascii(v_delimiter) + 1); + ELSE + v_next_seek := lower(v_common_prefix); + END IF; + ELSE + -- FILE: Batch fetch using DYNAMIC SQL (overhead amortized over many rows) + -- For ASC: upper_bound is the exclusive upper limit (< condition) + -- For DESC: prefix_lower is the inclusive lower limit (>= condition) + FOR v_current IN EXECUTE v_batch_query + USING bucketname, v_next_seek, + CASE WHEN v_is_asc THEN COALESCE(v_upper_bound, v_prefix_lower) ELSE v_prefix_lower END, v_file_batch_size + LOOP + v_common_prefix := storage.get_common_prefix(lower(v_current.name), v_prefix_lower, v_delimiter); + + IF v_common_prefix IS NOT NULL THEN + -- Hit a folder: exit batch, let peek handle it + v_next_seek := lower(v_current.name); + EXIT; + END IF; + + -- Handle offset skipping + IF v_skipped < offsets THEN + v_skipped := v_skipped + 1; + ELSE + -- Emit file + name := split_part(v_current.name, v_delimiter, levels); + id := v_current.id; + updated_at := v_current.updated_at; + created_at := v_current.created_at; + last_accessed_at := v_current.last_accessed_at; + metadata := v_current.metadata; + user_metadata := v_current.user_metadata; + RETURN NEXT; + v_count := v_count + 1; + END IF; + + -- Advance seek past this file + IF v_is_asc THEN + v_next_seek := lower(v_current.name) || v_delimiter; + ELSE + v_next_seek := lower(v_current.name); + END IF; + + EXIT WHEN v_count >= v_limit; + END LOOP; + END IF; + END LOOP; +END; +$func$; + + +-- ============================================================================ +-- search_v2: Add user_metadata support +-- ============================================================================ +CREATE OR REPLACE FUNCTION storage.search_v2( + prefix text, + bucket_name text, + limits int DEFAULT 100, + levels int DEFAULT 1, + start_after text DEFAULT '', + sort_order text DEFAULT 'asc', + sort_column text DEFAULT 'name', + sort_column_after text DEFAULT '' +) +RETURNS TABLE ( + key text, + name text, + id uuid, + updated_at timestamptz, + created_at timestamptz, + last_accessed_at timestamptz, + metadata jsonb, + user_metadata jsonb +) +SECURITY INVOKER +LANGUAGE plpgsql STABLE +AS $func$ +DECLARE + v_sort_col text; + v_sort_ord text; + v_limit int; +BEGIN + -- Cap limit to maximum of 1500 records + v_limit := LEAST(coalesce(limits, 100), 1500); + + -- Validate and normalize sort_order + v_sort_ord := lower(coalesce(sort_order, 'asc')); + IF v_sort_ord NOT IN ('asc', 'desc') THEN + v_sort_ord := 'asc'; + END IF; + + -- Validate and normalize sort_column + v_sort_col := lower(coalesce(sort_column, 'name')); + IF v_sort_col NOT IN ('name', 'updated_at', 'created_at') THEN + v_sort_col := 'name'; + END IF; + + -- Route to appropriate implementation + IF v_sort_col = 'name' THEN + -- Use list_objects_with_delimiter for name sorting (most efficient: O(k * log n)) + RETURN QUERY + SELECT + split_part(l.name, '/', levels) AS key, + l.name AS name, + l.id, + l.updated_at, + l.created_at, + l.last_accessed_at, + l.metadata, + l.user_metadata + FROM storage.list_objects_with_delimiter( + bucket_name, + coalesce(prefix, ''), + '/', + v_limit, + start_after, + '', + v_sort_ord + ) l; + ELSE + -- Use aggregation approach for timestamp sorting + -- Not efficient for large datasets but supports correct pagination + RETURN QUERY SELECT * FROM storage.search_by_timestamp( + prefix, bucket_name, v_limit, levels, start_after, + v_sort_ord, v_sort_col, sort_column_after + ); + END IF; +END; +$func$; + + +-- ============================================================================ +-- search_by_timestamp: Add user_metadata support +-- ============================================================================ +CREATE OR REPLACE FUNCTION storage.search_by_timestamp( + p_prefix text, + p_bucket_id text, + p_limit int, + p_level int, + p_start_after text, + p_sort_order text, + p_sort_column text, + p_sort_column_after text +) +RETURNS TABLE ( + key text, + name text, + id uuid, + updated_at timestamptz, + created_at timestamptz, + last_accessed_at timestamptz, + metadata jsonb, + user_metadata jsonb +) +SECURITY INVOKER +LANGUAGE plpgsql STABLE +AS $func$ +DECLARE + v_cursor_op text; + v_query text; + v_prefix text; +BEGIN + v_prefix := coalesce(p_prefix, ''); + + IF p_sort_order = 'asc' THEN + v_cursor_op := '>'; + ELSE + v_cursor_op := '<'; + END IF; + + v_query := format($sql$ + WITH raw_objects AS ( + SELECT + o.name AS obj_name, + o.id AS obj_id, + o.updated_at AS obj_updated_at, + o.created_at AS obj_created_at, + o.last_accessed_at AS obj_last_accessed_at, + o.metadata AS obj_metadata, + o.user_metadata AS obj_user_metadata, + storage.get_common_prefix(o.name, $1, '/') AS common_prefix + FROM storage.objects o + WHERE o.bucket_id = $2 + AND o.name COLLATE "C" LIKE $1 || '%%' + ), + -- Aggregate common prefixes (folders) + -- Both created_at and updated_at use MIN(obj_created_at) to match the old prefixes table behavior + aggregated_prefixes AS ( + SELECT + rtrim(common_prefix, '/') AS name, + NULL::uuid AS id, + MIN(obj_created_at) AS updated_at, + MIN(obj_created_at) AS created_at, + NULL::timestamptz AS last_accessed_at, + NULL::jsonb AS metadata, + NULL::jsonb AS user_metadata, + TRUE AS is_prefix + FROM raw_objects + WHERE common_prefix IS NOT NULL + GROUP BY common_prefix + ), + leaf_objects AS ( + SELECT + obj_name AS name, + obj_id AS id, + obj_updated_at AS updated_at, + obj_created_at AS created_at, + obj_last_accessed_at AS last_accessed_at, + obj_metadata AS metadata, + obj_user_metadata AS user_metadata, + FALSE AS is_prefix + FROM raw_objects + WHERE common_prefix IS NULL + ), + combined AS ( + SELECT * FROM aggregated_prefixes + UNION ALL + SELECT * FROM leaf_objects + ), + filtered AS ( + SELECT * + FROM combined + WHERE ( + $5 = '' + OR ROW( + date_trunc('milliseconds', %I), + name COLLATE "C" + ) %s ROW( + COALESCE(NULLIF($6, '')::timestamptz, 'epoch'::timestamptz), + $5 + ) + ) + ) + SELECT + split_part(name, '/', $3) AS key, + name, + id, + updated_at, + created_at, + last_accessed_at, + metadata, + user_metadata + FROM filtered + ORDER BY + COALESCE(date_trunc('milliseconds', %I), 'epoch'::timestamptz) %s, + name COLLATE "C" %s + LIMIT $4 + $sql$, + p_sort_column, + v_cursor_op, + p_sort_column, + p_sort_order, + p_sort_order + ); + + RETURN QUERY EXECUTE v_query + USING v_prefix, p_bucket_id, p_level, p_limit, p_start_after, p_sort_column_after; +END; +$func$; diff --git a/src/internal/database/migrations/types.ts b/src/internal/database/migrations/types.ts index 651d4e091..2c0cda601 100644 --- a/src/internal/database/migrations/types.ts +++ b/src/internal/database/migrations/types.ts @@ -58,4 +58,5 @@ export const DBMigration = { 'fix-optimized-search-function': 56, 's3-multipart-uploads-metadata': 57, 'operation-ergonomics': 58, + 'search-functions-user-metadata': 59, } as const diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 5e50fa510..1401f27ab 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -380,7 +380,15 @@ export class StorageKnexDB implements Database { const query = knex .table('objects') .where('bucket_id', bucketId) - .select(['id', 'name', 'metadata', 'updated_at', 'created_at', 'last_accessed_at']) + .select([ + 'id', + 'name', + 'metadata', + 'user_metadata', + 'updated_at', + 'created_at', + 'last_accessed_at', + ]) .limit(options?.maxKeys || 100) // only allow these values for sort columns, "name" is excluded intentionally as it is the default and used as tie breaker when sorting by other columns diff --git a/src/test/object.test.ts b/src/test/object.test.ts index dcd5397df..16bb5d35f 100644 --- a/src/test/object.test.ts +++ b/src/test/object.test.ts @@ -2779,6 +2779,49 @@ describe('testing list objects', () => { tnx = undefined } }) + + test('list returns user_metadata for files uploaded with custom metadata', async () => { + // Upload a file with custom user_metadata + const file = fs.createReadStream(`./src/test/assets/sadcat.jpg`) + const uploadResponse = await appInstance.inject({ + method: 'POST', + url: '/object/bucket2/metadata-list-test.jpg', + headers: { + authorization: `Bearer ${await serviceKeyAsync}`, + 'x-upsert': 'true', + 'x-metadata': Buffer.from( + JSON.stringify({ custom_field: 'hello', another: 'world' }) + ).toString('base64'), + }, + payload: file, + }) + expect(uploadResponse.statusCode).toBe(200) + + // List files and find our uploaded file + const listResponse = await appInstance.inject({ + method: 'POST', + url: '/object/list/bucket2', + headers: { + authorization: `Bearer ${await serviceKeyAsync}`, + }, + payload: { + prefix: '', + limit: 100, + offset: 0, + }, + }) + expect(listResponse.statusCode).toBe(200) + const results = JSON.parse(listResponse.body) as any[] + const found = results.find((r: any) => r.name === 'metadata-list-test.jpg') + expect(found).toBeDefined() + expect(found.user_metadata).toEqual({ custom_field: 'hello', another: 'world' }) + + // Folders should have null user_metadata + const folder = results.find((r: any) => r.id === null) + if (folder) { + expect(folder.user_metadata).toBeNull() + } + }) }) describe('x-robots-tag header', () => {