Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 228 additions & 65 deletions cmd/entire/cli/checkpoint/committed_reader_resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package checkpoint
import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/entireio/cli/cmd/entire/cli/checkpoint/id"
Expand All @@ -16,92 +17,254 @@ type CommittedReader interface {
ReadSessionContent(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (*SessionContent, error)
}

// ResolveCommittedReaderForCheckpoint resolves which committed checkpoint reader
// should be used for a specific checkpoint ID.
//
// Fallback behavior:
// - Try v2 first when preferCheckpointsV2 is true
// - Fall back to v1 for any v2 failure except context cancellation
// - During the v2 migration period, a valid v1 copy should never be blocked
// by a corrupt or unreadable v2 copy
func ResolveCommittedReaderForCheckpoint(
ctx context.Context,
checkpointID id.CheckpointID,
v1Store *GitStore,
v2Store *V2GitStore,
preferCheckpointsV2 bool,
) (CommittedReader, *CheckpointSummary, error) {
if err := ctx.Err(); err != nil {
return nil, nil, err //nolint:wrapcheck // Propagating context cancellation
// CommittedListReader provides read and list access to committed checkpoint data.
// GitStore, V2GitStore, and DualCheckpointReader implement this interface.
type CommittedListReader interface {
CommittedReader
ListCommitted(ctx context.Context) ([]CommittedInfo, error)
ReadSessionMetadata(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (*CommittedMetadata, error)
}

type CommittedReadMode int

const (
CommittedReadV1 CommittedReadMode = iota
CommittedReadDual
CommittedReadV2
)

func CommittedReadModeForOptions(checkpointsV2Enabled bool, checkpointsVersion int) CommittedReadMode {
if checkpointsVersion == 2 {
return CommittedReadV2
}
if checkpointsV2Enabled {
return CommittedReadDual
}
return CommittedReadV1
}

func NewCommittedReader(v1Store *GitStore, v2Store *V2GitStore, mode CommittedReadMode) (CommittedListReader, error) { //nolint:ireturn // Factory selects among v1, v2, and dual reader implementations.
switch mode {
case CommittedReadV2:
if v2Store == nil {
return nil, errors.New("v2 committed checkpoint reader unavailable")
}
return v2Store, nil
case CommittedReadDual:
switch {
case v2Store == nil && v1Store == nil:
return nil, errors.New("committed checkpoint reader unavailable")
case v2Store == nil:
return v1Store, nil
case v1Store == nil:
return v2Store, nil
default:
return &DualCheckpointReader{v2: v2Store, v1: v1Store}, nil
}
case CommittedReadV1:
if v1Store == nil {
return nil, errors.New("v1 committed checkpoint reader unavailable")
}
return v1Store, nil
default:
return nil, fmt.Errorf("unknown committed checkpoint read mode: %d", mode)
}
}

type DualCheckpointReader struct {
v2 *V2GitStore
v1 *GitStore
}

func (r *DualCheckpointReader) ReadCommitted(ctx context.Context, checkpointID id.CheckpointID) (*CheckpointSummary, error) {
summary, err := r.v2.ReadCommitted(ctx, checkpointID)
if err == nil && summary != nil {
return summary, nil
}
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
if err != nil {
logV2Fallback(ctx, "v2 ReadCommitted failed, falling back to v1", checkpointID, err)
}
return r.v1.ReadCommitted(ctx, checkpointID)
}

func (r *DualCheckpointReader) ReadSessionContent(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (*SessionContent, error) {
content, err := r.v2.ReadSessionContent(ctx, checkpointID, sessionIndex)
if err == nil {
return content, nil
}
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
if errors.Is(err, ErrNoTranscript) {
fallbackContent, fallbackErr := r.readFallbackSessionContent(ctx, checkpointID, sessionIndex)
if fallbackErr == nil {
return fallbackContent, nil
}
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
return nil, fallbackReadError(err, "read v1 fallback session content", fallbackErr)
}
return r.readV1SessionContentByIndex(ctx, checkpointID, sessionIndex, err)
}

func (r *DualCheckpointReader) ReadSessionMetadata(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (*CommittedMetadata, error) {
metadata, err := r.v2.ReadSessionMetadata(ctx, checkpointID, sessionIndex)
if err == nil {
return metadata, nil
}
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
return r.readV1SessionMetadataByIndex(ctx, checkpointID, sessionIndex, err)
}

// ReadSessionMetadataAndPrompts is intentionally v2-only because callers pair
// this metadata with the v2 compact transcript. Returning v1 raw content here
// would bypass the checkpoint transcript offset handling in ReadSessionContent.
func (r *DualCheckpointReader) ReadSessionMetadataAndPrompts(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (*SessionContent, error) {
return r.v2.ReadSessionMetadataAndPrompts(ctx, checkpointID, sessionIndex)
}

func (r *DualCheckpointReader) ReadSessionCompactTranscript(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) ([]byte, error) {
return r.v2.ReadSessionCompactTranscript(ctx, checkpointID, sessionIndex)
}

if preferCheckpointsV2 && v2Store != nil {
summary, err := v2Store.ReadCommitted(ctx, checkpointID)
if err == nil && summary != nil {
return v2Store, summary, nil
func (r *DualCheckpointReader) ListCommitted(ctx context.Context) ([]CommittedInfo, error) {
v2Committed, v2Err := r.v2.ListCommitted(ctx)
v1Committed, v1Err := r.v1.ListCommitted(ctx)

if v2Err != nil {
logging.Debug(ctx, "v2 ListCommitted failed, using v1 only",
slog.String("error", v2Err.Error()),
)
if v1Err != nil {
return nil, fmt.Errorf("listing checkpoints: %w", v1Err)
}
if err != nil && ctx.Err() != nil {
return nil, nil, ctx.Err() //nolint:wrapcheck // Propagating context cancellation
return v1Committed, nil
}

if v1Err != nil {
logging.Debug(ctx, "v1 ListCommitted failed, returning v2 only",
slog.String("error", v1Err.Error()),
)
return v2Committed, nil
}

seen := make(map[id.CheckpointID]struct{}, len(v2Committed))
for _, c := range v2Committed {
seen[c.CheckpointID] = struct{}{}
}
committed := make([]CommittedInfo, 0, len(v2Committed)+len(v1Committed))
committed = append(committed, v2Committed...)
for _, c := range v1Committed {
if _, ok := seen[c.CheckpointID]; !ok {
committed = append(committed, c)
}
if err != nil && !errors.Is(err, ErrCheckpointNotFound) && !errors.Is(err, ErrNoTranscript) {
logging.Debug(ctx, "v2 ReadCommitted failed, falling back to v1",
slog.String("checkpoint_id", checkpointID.String()),
slog.String("error", err.Error()),
)
}
return committed, nil
}

func (r *DualCheckpointReader) readFallbackSessionContent(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (*SessionContent, error) {
metadata, err := r.v2.ReadSessionMetadata(ctx, checkpointID, sessionIndex)
if err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
return nil, err
}
if metadata == nil || metadata.SessionID == "" {
return nil, ErrNoTranscript
}
return r.v1.ReadSessionContentByID(ctx, checkpointID, metadata.SessionID)
}

if v1Store == nil {
return nil, nil, ErrCheckpointNotFound
func (r *DualCheckpointReader) readV1SessionContentByIndex(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int, originalErr error) (*SessionContent, error) {
content, err := r.v1.ReadSessionContent(ctx, checkpointID, sessionIndex)
if err == nil {
return content, nil
}
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
return nil, fallbackReadError(originalErr, "read v1 session content", err)
}

summary, err := v1Store.ReadCommitted(ctx, checkpointID)
func (r *DualCheckpointReader) readV1SessionMetadataByIndex(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int, originalErr error) (*CommittedMetadata, error) {
metadata, err := r.v1.ReadSessionMetadata(ctx, checkpointID, sessionIndex)
if err == nil {
return metadata, nil
}
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr //nolint:wrapcheck // Propagating context cancellation
}
return nil, fallbackReadError(originalErr, "read v1 session metadata", err)
}

func fallbackReadError(primaryErr error, fallbackOperation string, fallbackErr error) error {
if fallbackErr == nil {
return primaryErr
}
return errors.Join(primaryErr, fmt.Errorf("%s: %w", fallbackOperation, fallbackErr))
}

func logV2Fallback(ctx context.Context, message string, checkpointID id.CheckpointID, err error) {
if errors.Is(err, ErrCheckpointNotFound) || errors.Is(err, ErrNoTranscript) {
return
}
logging.Debug(ctx, message,
slog.String("checkpoint_id", checkpointID.String()),
slog.String("error", err.Error()),
)
}

// ReadCommittedCheckpoint reads a committed checkpoint summary and normalizes
// a nil store response into ErrCheckpointNotFound.
func ReadCommittedCheckpoint(ctx context.Context, reader CommittedReader, checkpointID id.CheckpointID) (*CheckpointSummary, error) {
if err := ctx.Err(); err != nil {
return nil, err //nolint:wrapcheck // Propagating context cancellation
}

summary, err := reader.ReadCommitted(ctx, checkpointID)
if err != nil {
return nil, nil, err
return nil, fmt.Errorf("read committed checkpoint: %w", err)
}
if summary == nil {
return nil, nil, ErrCheckpointNotFound
return nil, ErrCheckpointNotFound
}
return summary, nil
}

return v1Store, summary, nil
// ReadLatestSessionContent reads the latest session from an already-resolved
// committed reader and summary.
func ReadLatestSessionContent(ctx context.Context, reader CommittedReader, checkpointID id.CheckpointID, summary *CheckpointSummary) (*SessionContent, error) {
if summary == nil || len(summary.Sessions) == 0 {
return nil, ErrCheckpointNotFound
}
latestIndex := len(summary.Sessions) - 1
content, err := reader.ReadSessionContent(ctx, checkpointID, latestIndex)
if err != nil {
return nil, fmt.Errorf("read session %d content: %w", latestIndex, err)
}
return content, nil
}

// ResolveRawSessionLogForCheckpoint resolves the raw transcript log bytes for a
// checkpoint with v2-first, v1-fallback behavior.
//
// Fallback behavior:
// - Try v2 first when preferCheckpointsV2 is true
// - Fall back to v1 for any v2 failure except context cancellation
func ResolveRawSessionLogForCheckpoint(
ctx context.Context,
checkpointID id.CheckpointID,
v1Store *GitStore,
v2Store *V2GitStore,
preferCheckpointsV2 bool,
) ([]byte, string, error) {
func ReadRawSessionLogForCheckpoint(ctx context.Context, reader CommittedReader, checkpointID id.CheckpointID) ([]byte, string, error) {
if err := ctx.Err(); err != nil {
return nil, "", err //nolint:wrapcheck // Propagating context cancellation
}

if preferCheckpointsV2 && v2Store != nil {
content, sessionID, err := v2Store.GetSessionLog(ctx, checkpointID)
if err == nil && len(content) > 0 {
return content, sessionID, nil
}
if err != nil && ctx.Err() != nil {
return nil, "", ctx.Err() //nolint:wrapcheck // Propagating context cancellation
}
if err != nil && !errors.Is(err, ErrCheckpointNotFound) && !errors.Is(err, ErrNoTranscript) {
logging.Debug(ctx, "v2 GetSessionLog failed, falling back to v1",
slog.String("checkpoint_id", checkpointID.String()),
slog.String("error", err.Error()),
)
}
summary, err := ReadCommittedCheckpoint(ctx, reader, checkpointID)
if err != nil {
return nil, "", err
}

if v1Store == nil {
return nil, "", ErrCheckpointNotFound
content, err := ReadLatestSessionContent(ctx, reader, checkpointID, summary)
if err != nil {
return nil, "", err
}

return v1Store.GetSessionLog(ctx, checkpointID)
return content.Transcript, content.Metadata.SessionID, nil
}
Loading
Loading