diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 16e9f8bd..fadefd17 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,51 +5,75 @@ updates: target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 - package-ecosystem: "cargo" directory: "/crates/app/tests/http_search_filter" target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 + - package-ecosystem: "cargo" directory: "/crates/wasm-blueprints/rust/" target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 + - package-ecosystem: "github-actions" directory: "/" target-branch: "develop" schedule: interval: "monthly" + cooldown: + default-days: 7 + - package-ecosystem: "docker" directory: "/" target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 - package-ecosystem: "gomod" directory: "/crates/wasm-blueprints/golang/" target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 + - package-ecosystem: "rust-toolchain" directory: "/" target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 + - package-ecosystem: "npm" directory: "/crates/wasm-blueprints/js/" target-branch: "develop" schedule: interval: "weekly" + cooldown: + default-days: 7 + - package-ecosystem: "npm" directory: "/docs" target-branch: "develop" schedule: - interval: "weekly" \ No newline at end of file + interval: "weekly" + cooldown: + default-days: 7 diff --git a/.github/workflows/actionlint.yml b/.github/workflows/actionlint.yml index 87e959eb..d61c0bd9 100644 --- a/.github/workflows/actionlint.yml +++ b/.github/workflows/actionlint.yml @@ -17,8 +17,24 @@ jobs: contents: read steps: - name: Checkout - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: actionlint - uses: raven-actions/actionlint@v2.1.0 + uses: raven-actions/actionlint@963d4779ef039e217e5d0e6fd73ce9ab7764e493 + continue-on-error: true + + zizmor: + name: Run zizmor + runs-on: ubuntu-latest + permissions: + security-events: write # Required for upload-sarif (used by zizmor-action) to upload SARIF files. + contents: read # Only needed for private repos. Needed to clone the repo. + actions: read # Only needed for private repos. Needed for upload-sarif to read workflow run info. + steps: + - name: Checkout repository + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + - name: Run zizmor + uses: zizmorcore/zizmor-action@71321a20a9ded102f6e9ce5718a2fcec2c4f70d8 # v0.5.2 \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index efc03634..ac798cd5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,6 +15,7 @@ env: RUST_BACKTRACE: 1 RUSTDOCFLAGS: "--deny warnings" MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 + RUST_CHANNEL: stable concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -27,13 +28,15 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - name: Install Rust + run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 with: cache-all-crates: true - name: Run cargo check @@ -46,14 +49,16 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 - - uses: taiki-e/install-action@cargo-hack + - name: Install Rust + run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 + - uses: taiki-e/install-action@f0dca4a997b86923eb83a82ddcbeb2af1989065c - run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev - run: cargo hack check --feature-powerset --no-dev-deps @@ -70,13 +75,13 @@ jobs: - beta - nightly steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{ matrix.rust }} - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ matrix.rust }} + rustup default ${{ matrix.rust }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Run cargo build run: cargo build @@ -109,10 +114,10 @@ jobs: target: x86_64-pc-windows-msvc features: "--no-default-features --features ssl-vendored,libz-static" steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: houseabsolute/actions-rust-cross@v1 + - uses: houseabsolute/actions-rust-cross@576730dbfbb705690d43bd285987a3094fd84874 with: target: ${{ matrix.platforms.target }} args: "--locked ${{ matrix.platforms.features }}" @@ -126,13 +131,14 @@ jobs: contents: read if: ${{ github.actor != 'dependabot[bot]' }} steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Check lockfile is updated run: cargo update --locked @@ -143,14 +149,14 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - components: clippy - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src clippy + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Run cargo clippy run: cargo clippy --all-targets --all-features -- --deny warnings @@ -161,13 +167,14 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Install git-cliff run: cargo install --locked cargo-deny - name: Run cargo deny @@ -180,14 +187,14 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - components: rustfmt - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Run cargo fmt run: cargo fmt --all -- --check @@ -198,11 +205,10 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: Swatinem/rust-cache@v2 - - uses: bnjbvr/cargo-machete@main + - uses: bnjbvr/cargo-machete@b81ce1560c5fbd0210cb66d88bf210329ff04266 tests: needs: check @@ -211,13 +217,13 @@ jobs: contents: read runs-on: ubuntu-latest steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src + rustup default ${{ env.RUST_CHANNEL }} - run: cargo install cargo-nextest --locked - run: cargo nextest run --locked env: @@ -231,17 +237,18 @@ jobs: contents: read runs-on: ubuntu-latest steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src clippy + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Build documentation run: cargo doc --no-deps --document-private-items --verbose - name: Setup Node - uses: actions/setup-node@v6 + uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f with: node-version: 24 cache-dependency-path: docs/package-lock.json @@ -258,7 +265,7 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - run: cargo install --locked cargo-shear @@ -272,10 +279,10 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: lycheeverse/lychee-action@v2 + - uses: lycheeverse/lychee-action@8646ba30535128ac92d33dfc9133794bfdd9b411 name: Link Checker # https://github.com/lycheeverse/lychee/issues/1405 with: @@ -293,13 +300,13 @@ jobs: id-token: write steps: - name: Checkout - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 + uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f - name: Build - uses: docker/build-push-action@v6 + uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 with: context: . push: false @@ -314,11 +321,11 @@ jobs: contents: read steps: - name: Checkout Actions Repository - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Check spelling of the project - uses: crate-ci/typos@master + uses: crate-ci/typos@02ea592e44b3a53c302f697cddca7641cd051c3d cargo-deny: name: Cargo deny @@ -327,10 +334,10 @@ jobs: contents: read needs: [check] steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: EmbarkStudios/cargo-deny-action@v2 + - uses: EmbarkStudios/cargo-deny-action@3fd3802e88374d3fe9159b834c7714ec57d6c979 # https://github.com/orhun/PKGBUILDs/tree/master/yozefu archlinux: @@ -339,13 +346,15 @@ jobs: permissions: contents: read container: - image: archlinux:base-20250928.0.426921 + image: index.docker.io/library/archlinux@sha256:9a72b5e3c1675683016cb065f513deea7c65836cb5bd22b88c89353098faa40f volumes: - /tmp/yozefu-read-only:/tmp/yozefu-readonly:ro needs: [check] steps: - name: Checkout Actions Repository - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd + with: + persist-credentials: false - name: Install dependencies run: pacman -Sy --noconfirm --needed cargo devtools gcc-libs openssl cmake gcc clang base-devel # - name: Use GCC 14 @@ -378,8 +387,10 @@ jobs: - os: ubuntu-24.04-arm target: aarch64-unknown-linux-gnu steps: - - uses: actions/checkout@v6 - - uses: cachix/install-nix-action@v31 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd + with: + persist-credentials: false + - uses: cachix/install-nix-action@616559265b40713947b9c190a8ff4b507b5df49b with: nix_path: nixpkgs=channel:nixos-unstable - run: nix build diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml index 82326259..1f27bf55 100644 --- a/.github/workflows/changelog.yml +++ b/.github/workflows/changelog.yml @@ -4,6 +4,7 @@ env: CARGO_TERM_COLOR: always RUST_BACKTRACE: 1 RUSTDOCFLAGS: '--deny warnings' + RUST_CHANNEL: stable on: workflow_dispatch: @@ -20,14 +21,16 @@ jobs: pull-requests: write steps: - name: Check out repository - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: 'true' # needed to push the new changelog fetch-tags: 'true' fetch-depth: 0 - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} cargo + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - id: release run: echo "version=v$(cargo pkgid --manifest-path crates/bin/Cargo.toml | cut -d '@' -f2)" >> "$GITHUB_OUTPUT" - name: Install git-cliff @@ -57,7 +60,7 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Check out repository - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: ref: "changelog/${{ env.VERSION }}" persist-credentials: false @@ -70,6 +73,6 @@ jobs: branch=$(git branch --show-current) gh pr create --head "$branch" --title "Changelog for ${VERSION}" --body "This PR updates the changelog for version ${VERSION}." fi - env: + env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} VERSION: ${{ steps.release.outputs.version }} diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml index aec9cdfa..387b5d15 100644 --- a/.github/workflows/code-coverage.yml +++ b/.github/workflows/code-coverage.yml @@ -13,6 +13,7 @@ env: RUST_BACKTRACE: 1 RUSTDOCFLAGS: '--deny warnings' MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 + RUST_CHANNEL: stable concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -25,18 +26,19 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rustfmt rust-src clippy + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Install cargo-tarpaulin run: cargo install --locked cargo-tarpaulin - name: Run cargo tarpaulin run: cargo tarpaulin --all-features --out Xml - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v5 + uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/developer-experience.yml b/.github/workflows/developer-experience.yml index 283c6e51..04e9c440 100644 --- a/.github/workflows/developer-experience.yml +++ b/.github/workflows/developer-experience.yml @@ -7,6 +7,7 @@ env: MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 GOLANG_VERSION: 1.23.3 JS_VERSION: 22 + RUST_CHANNEL: stable on: schedule: @@ -23,23 +24,23 @@ concurrency: jobs: try-it: - runs-on: ubuntu-latest - permissions: - contents: read - steps: - - uses: actions/checkout@v6 + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Setup JBang - uses: jbangdev/setup-jbang@main + uses: jbangdev/setup-jbang@312c53a4221283f9c8e4aa84d8261b82f0c680fb - name: Setup JDK - uses: actions/setup-java@v5 + uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 with: distribution: 'temurin' java-version: '21' - name: Run try-it.sh - run: bash docs/try-it.sh - env: + run: bash docs/try-it.sh + env: YOZEFU_API_URL: http://localhost:8081/schemas/types wasm-rust: @@ -47,19 +48,19 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable + - uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8 with: toolchain: stable - name: Setup Go ${{ env.GOLANG_VERSION }} - uses: actions/setup-go@v6 + uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c with: go-version: ${{ env.GOLANG_VERSION }} - name: Install extism CLI run: go install github.com/extism/cli/extism@latest - - uses: Swatinem/rust-cache@v2 + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Temporary fix in Cargo.toml run: sed -i -E 's#git = "ssh.+#path = "../../wasm-types" }#g' crates/wasm-blueprints/rust/Cargo.toml - name: Build a Rust search filter @@ -70,16 +71,16 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Setup Go ${{ env.GOLANG_VERSION }} - uses: actions/setup-go@v6 + uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c with: go-version: ${{ env.GOLANG_VERSION }} - name: Install extism CLI run: go install github.com/extism/cli/extism@latest - - uses: acifani/setup-tinygo@v2 + - uses: acifani/setup-tinygo@db56321a62b9a67922bb9ac8f9d085e218807bb3 with: tinygo-version: '0.34.0' - name: Build a Golang search filter @@ -90,15 +91,15 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Setup JS ${{ env.JS_VERSION }} - uses: actions/setup-node@v6 + uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f with: node-version: ${{ env.JS_VERSION }} - name: Setup Go ${{ env.GOLANG_VERSION }} - uses: actions/setup-go@v6 + uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c with: go-version: ${{ env.GOLANG_VERSION }} - name: Install extism CLI @@ -123,8 +124,8 @@ jobs: - name: js directory: ./crates/wasm-blueprints/js/ steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Run docker build - run: docker build ${{ matrix.package.directory }} \ No newline at end of file + run: docker build ${{ matrix.package.directory }} diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index ea55bf17..49821d06 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -7,8 +7,6 @@ on: permissions: contents: read - pages: write - id-token: write concurrency: group: pages @@ -17,25 +15,29 @@ concurrency: jobs: build: runs-on: ubuntu-latest + permissions: + contents: read + id-token: write steps: - name: Checkout - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: fetch-depth: 0 + persist-credentials: false - name: Setup Node - uses: actions/setup-node@v6 + uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f with: node-version: 24 cache: npm cache-dependency-path: docs/package-lock.json - name: Setup Pages - uses: actions/configure-pages@v5 + uses: actions/configure-pages@983d7736d9b0ae728b81ab479565c72886d7745b - name: Install dependencies run: npm install --prefix docs - name: Build with VitePress run: npm run build --prefix docs - name: Upload artifact - uses: actions/upload-pages-artifact@v4 + uses: actions/upload-pages-artifact@7b1f4a764d45c48632c6b24a0339c27f5614fb0b with: path: docs/.vitepress/dist @@ -47,7 +49,9 @@ jobs: needs: build runs-on: ubuntu-latest name: Deploy + permissions: + pages: write steps: - name: Deploy to GitHub Pages id: deployment - uses: actions/deploy-pages@v4 \ No newline at end of file + uses: actions/deploy-pages@d6db90164ac5ed86f2b6aed7e0febac5b3c0c03e diff --git a/.github/workflows/mutant.yml b/.github/workflows/mutant.yml index a903ffa4..ae82baa9 100644 --- a/.github/workflows/mutant.yml +++ b/.github/workflows/mutant.yml @@ -11,6 +11,7 @@ env: RUST_BACKTRACE: 1 RUSTDOCFLAGS: '--deny warnings' MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 + RUST_CHANNEL: stable concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -23,14 +24,15 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rust-src + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Install cargo-mutants run: cargo install --locked cargo-mutants - name: Run cargo mutants - run: cargo mutants --manifest-path ./crates/lib/Cargo.toml --all-features \ No newline at end of file + run: cargo mutants --manifest-path ./crates/lib/Cargo.toml --all-features diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index f8b36567..edf26de0 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -6,6 +6,7 @@ env: RUSTDOCFLAGS: '--deny warnings' MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 DOCKER_REGISTRY: ghcr.io + RUST_CHANNEL: stable permissions: id-token: write @@ -32,12 +33,14 @@ jobs: outputs: version: ${{ steps.release.outputs.version }} steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} cargo + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - id: release run: echo "version=$(cargo pkgid --manifest-path crates/bin/Cargo.toml | cut -d '@' -f2)" >> "$GITHUB_OUTPUT" @@ -46,7 +49,7 @@ jobs: name: Create release needs: version steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Create github release @@ -87,17 +90,17 @@ jobs: target: x86_64-pc-windows-msvc features: "--no-default-features --features ssl-vendored" steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev if: matrix.platforms.os == 'ubuntu-latest' || matrix.platforms.os == 'ubuntu-24.04-arm' - - uses: houseabsolute/actions-rust-cross@v1 + - uses: houseabsolute/actions-rust-cross@576730dbfbb705690d43bd285987a3094fd84874 with: target: ${{ matrix.platforms.target }} args: "--verbose --locked --release ${{ matrix.platforms.features }}" strip: true - + - name: Setup variables id: variables shell: bash @@ -125,7 +128,7 @@ jobs: VERSION: ${{ needs.version.outputs.version }} - name: Attest - uses: actions/attest-build-provenance@v3 + uses: actions/attest-build-provenance@977bb373ede98d70efdf65b84cb5f73e068dcc2a with: subject-path: "${{ steps.variables.outputs.source }}" subject-name: "${{ steps.variables.outputs.archive }}-${{ needs.version.outputs.version }}" @@ -153,86 +156,85 @@ jobs: if: matrix.platforms.os != 'windows-latest' shell: bash working-directory: ./dist - run: | - tar cvzf "${{ steps.variables.outputs.archive }}.tar.gz" "${{ steps.variables.outputs.binaryName }}" + run: "tar cvzf \"${{ steps.variables.outputs.archive }}.tar.gz\" \"${{ steps.variables.outputs.binaryName }}\" \n" - name: Clean release directory shell: bash run: rm -f "dist/${{ steps.variables.outputs.binaryName }}" - - name: cargo install cargo-cyclonedx - run: cargo install --locked cargo-cyclonedx + run: cargo install --locked cargo-cyclonedx - name: Generate SBOM - run: cargo cyclonedx --describe binaries --format json ${{ matrix.platforms.features }} --target ${{ matrix.platforms.target }} + run: cargo cyclonedx --describe binaries --format json ${{ matrix.platforms.features }} --target ${{ matrix.platforms.target }} - name: Rename SBOM + shell: bash run: mv crates/bin/${{ steps.variables.outputs.name }}_bin.cdx.json "dist/${{ steps.variables.outputs.archive }}.cdx.json" - - name: Upload binary + shell: bash run: gh release upload "v${{ needs.version.outputs.version }}" dist/* --clobber env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # https://docs.github.com/en/actions/use-cases-and-examples/publishing-packages/publishing-docker-images -# https://docs.github.com/en/actions/use-cases-and-examples/publishing-packages/publishing-docker-images publish-docker-image: name: Docker image runs-on: ubuntu-latest needs: [version, create-release] permissions: - contents: read - packages: write - attestations: write - id-token: write + contents: read + packages: write + attestations: write + id-token: write steps: - - name: Checkout - uses: actions/checkout@v6 - with: - persist-credentials: false - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Extract metadata - id: meta - uses: docker/metadata-action@v5 - with: - tags: | - type=raw,value=latest,enable={{is_default_branch}} - type=raw,value=${{ needs.version.outputs.version }} - images: ${{ env.DOCKER_REGISTRY }}/${{ github.repository }} - labels: | - org.opencontainers.image.description=Yozefu is a CLI tool for Apache kafka. It allows you to navigate topics and search Kafka records. - org.opencontainers.image.vendor=Yann Prono - org.opencontainers.image.licenses=Apache-2.0 - env: - DOCKER_METADATA_ANNOTATIONS_LEVELS: manifest,index - - name: Login to GitHub Container Registry - uses: docker/login-action@v3 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push - uses: docker/build-push-action@v6 - id: push - with: - context: . - push: true - tags: ${{ steps.meta.outputs.tags }} - cache-from: type=gha - cache-to: type=gha,mode=max - labels: ${{ steps.meta.outputs.labels }} - sbom: true - annotations: ${{ steps.meta.outputs.annotations }} - # https://github.com/actions/attest-build-provenance - - name: Generate artifact attestation - uses: actions/attest-build-provenance@v3 - with: - subject-name: ${{ env.DOCKER_REGISTRY }}/${{ github.repository }} - subject-digest: ${{ steps.push.outputs.digest }} - push-to-registry: true - + - name: Checkout + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd + with: + persist-credentials: false + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f + - name: Extract metadata + id: meta + uses: docker/metadata-action@c299e40c65443455700f0fdfc63efafe5b349051 + with: + tags: | + type=raw,value=latest,enable={{is_default_branch}} + type=raw,value=${{ needs.version.outputs.version }} + images: ${{ env.DOCKER_REGISTRY }}/${{ github.repository }} + labels: | + org.opencontainers.image.description=Yozefu is a CLI tool for Apache kafka. It allows you to navigate topics and search Kafka records. + org.opencontainers.image.vendor=Yann Prono + org.opencontainers.image.licenses=Apache-2.0 + env: + DOCKER_METADATA_ANNOTATIONS_LEVELS: manifest,index + - name: Login to GitHub Container Registry + uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build and push + uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 + id: push + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + cache-from: type=gha + cache-to: type=gha,mode=max + labels: ${{ steps.meta.outputs.labels }} + sbom: true + annotations: ${{ steps.meta.outputs.annotations }} + # https://github.com/actions/attest-build-provenance + - name: Generate artifact attestation + uses: actions/attest-build-provenance@977bb373ede98d70efdf65b84cb5f73e068dcc2a + with: + subject-name: ${{ env.DOCKER_REGISTRY }}/${{ github.repository }} + subject-digest: ${{ steps.push.outputs.digest }} + push-to-registry: true + publish-to-registry: runs-on: ubuntu-latest name: Publish to registry @@ -242,16 +244,18 @@ jobs: permissions: id-token: write steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false # https://crates.io/docs/trusted-publishing - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - - uses: Swatinem/rust-cache@v2 - - uses: rust-lang/crates-io-auth-action@v1 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} cargo + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 + - uses: rust-lang/crates-io-auth-action@b7e9a28eded4986ec6b1fa40eeee8f8f165559ec id: auth - run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev - run: cargo publish --workspace env: - CARGO_REGISTRY_TOKEN: ${{ steps.auth.outputs.token }} \ No newline at end of file + CARGO_REGISTRY_TOKEN: ${{ steps.auth.outputs.token }} diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index d89a7d36..55b04735 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -5,7 +5,7 @@ env: RUST_BACKTRACE: 1 RUSTDOCFLAGS: '--deny warnings' MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 - + RUST_CHANNEL: stable on: schedule: - cron: "0 10 * * MON" # every monday, at 10AM @@ -26,13 +26,14 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - uses: Swatinem/rust-cache@v2 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} cargo + rustup default ${{ env.RUST_CHANNEL }} - name: Install cargo-edit run: cargo install --locked cargo-edit - name: Check for outdated dependencies @@ -43,14 +44,38 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - uses: Swatinem/rust-cache@v2 - - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rust-src + rustup default ${{ env.RUST_CHANNEL }} - name: Install cargo-outdated run: cargo install --locked cargo-audit - name: Cargo audit - run: cargo audit \ No newline at end of file + run: cargo audit + + docker: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - name: Checkout + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd + with: + persist-credentials: false + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f + - name: Build + uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 + with: + context: . + push: false + tags: maif/yozefu:latest + load: true + - name: Docker image + uses: anchore/scan-action@522bcc5cc7e62e4ce7cbe30f22372b991b432751 + with: + image: "maif/yozefu:latest" \ No newline at end of file diff --git a/.github/workflows/semver-checks.yml b/.github/workflows/semver-checks.yml index 4f8db852..6f678d15 100644 --- a/.github/workflows/semver-checks.yml +++ b/.github/workflows/semver-checks.yml @@ -5,6 +5,7 @@ env: RUST_BACKTRACE: 1 RUSTDOCFLAGS: '--deny warnings' MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 + RUST_CHANNEL: stable on: pull_request: @@ -48,7 +49,7 @@ jobs: contents: write runs-on: ubuntu-latest steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false fetch-depth: 0 @@ -59,10 +60,11 @@ jobs: xargs -I {} gh api -X DELETE "/repos/${{ github.repository }}/issues/comments/{}" < comments.txt env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - uses: dtolnay/rust-toolchain@master - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} cargo + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: List the releases on GitHub id: current run: echo "version=$(git tag --sort=-creatordate | head -n 1)" >> "$GITHUB_OUTPUT" @@ -90,14 +92,15 @@ jobs: runs-on: ubuntu-latest if: ${{ needs.setup.outputs.nextStep != 'compute' }} steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false fetch-depth: 0 - - uses: dtolnay/rust-toolchain@master - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rust-src + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Install cargo-semver-checks run: cargo install --locked cargo-semver-checks - name: Show release type @@ -145,14 +148,15 @@ jobs: if: ${{ needs.setup.outputs.nextStep == 'compute' }} runs-on: ubuntu-latest steps: - - uses: actions/checkout@v6 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false fetch-depth: 0 - - uses: dtolnay/rust-toolchain@master - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} cargo + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Install cargo-semver-checks run: cargo install --locked cargo-semver-checks - name: Determine the next version @@ -186,4 +190,4 @@ jobs: gh pr comment ${{ github.event.number }} --body-file ./report.md env: GH_TOKEN: ${{ github.token }} - branch : '${{ toJSON(github.head_ref) }}' \ No newline at end of file + branch: '${{ toJSON(github.head_ref) }}' diff --git a/.github/workflows/tag.yml b/.github/workflows/tag.yml index c9de6cea..087d07d1 100644 --- a/.github/workflows/tag.yml +++ b/.github/workflows/tag.yml @@ -5,6 +5,7 @@ env: RUST_BACKTRACE: 1 RUSTDOCFLAGS: '--deny warnings' MINIMUM_SUPPORTED_RUST_VERSION: 1.85.0 + RUST_CHANNEL: stable permissions: contents: write @@ -26,16 +27,16 @@ jobs: contents: write steps: - name: Checkout - uses: actions/checkout@v6 + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: persist-credentials: false - name: Get tags run: git fetch --tags origin - - name: Install Rust - uses: dtolnay/rust-toolchain@stable - with: - components: rustc, cargo - - uses: Swatinem/rust-cache@v2 + - run: | + rustup update --no-self-update ${{ env.RUST_CHANNEL }} + rustup component add --toolchain ${{ env.RUST_CHANNEL }} rust-src,cargo + rustup default ${{ env.RUST_CHANNEL }} + - uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 - name: Configure git run: | git config --global user.name "${ACTOR}" @@ -50,7 +51,7 @@ jobs: run: echo "version=$(cargo pkgid --manifest-path crates/bin/Cargo.toml | cut -d '@' -f2)" >> "$GITHUB_OUTPUT" - name: Create git tag id: tag - run: | + run: | if git tag -a "v${VERSION}" -m "${VERSION}"; then echo "created=true" >> "$GITHUB_OUTPUT" else diff --git a/Cargo.lock b/Cargo.lock index 2be2356a..c8515ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloca" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7d05ea6aea7e9e64d25b9156ba2fee3fdd659e34e41063cd2fc7cd020d7f4" +dependencies = [ + "cc", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -47,6 +56,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "1.0.0" @@ -638,6 +653,12 @@ dependencies = [ "winx", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "castaway" version = "0.2.4" @@ -704,6 +725,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.0", +] + [[package]] name = "chrono" version = "0.4.44" @@ -718,6 +750,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "circular-buffer" version = "1.1.0" @@ -951,6 +1010,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "cranelift-assembler-x64" version = "0.128.4" @@ -1098,6 +1166,41 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "950046b2aa2492f9a536f5f4f9a3de7b9e2476e575e05bd6c333371add4d98f3" +dependencies = [ + "alloca", + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "itertools 0.13.0", + "num-traits", + "oorandom", + "page_size", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8d80a2f4f5b554395e47b5d8305bc3d27813bacb73493eb1001e8f76dae29ea" +dependencies = [ + "cast", + "itertools 0.13.0", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -1169,6 +1272,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.7" @@ -1333,6 +1442,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "deunicode" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04" + [[package]] name = "difflib" version = "0.4.0" @@ -1623,6 +1738,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "fake" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6be833b323a56361118a747470a45a1bcd5c52a2ec9b1e40c83dafe687e453" +dependencies = [ + "deunicode", + "either", + "rand 0.10.0", + "uuid", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1936,6 +2063,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", + "rand_core 0.10.0", "wasip2", "wasip3", ] @@ -1989,6 +2117,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "zerocopy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2845,6 +2984,16 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.8.0" @@ -2918,6 +3067,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mock_json" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d9cedfa3802606b21f6d48afca77b0bafe4205b6f3a4ab59bc044e92319fca" +dependencies = [ + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", +] + [[package]] name = "mockito" version = "1.7.2" @@ -3257,6 +3418,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "open" version = "5.3.3" @@ -3321,6 +3488,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -3543,6 +3720,34 @@ dependencies = [ "time", ] +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polling" version = "3.11.0" @@ -3890,6 +4095,17 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.0", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -3928,6 +4144,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" + [[package]] name = "rand_xorshift" version = "0.4.0" @@ -4727,6 +4949,12 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -4734,7 +4962,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -5361,6 +5589,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -5835,7 +6073,9 @@ dependencies = [ "atomic", "getrandom 0.4.2", "js-sys", + "md-5", "serde_core", + "sha1_smol", "wasm-bindgen", ] @@ -7287,10 +7527,15 @@ name = "yozefu-app" version = "0.0.28" dependencies = [ "chrono", + "criterion", "directories", "extism", + "fake", + "futures", + "futures-batch", "indexmap 2.13.0", "itertools 0.14.0", + "mock_json", "rdkafka", "resolve-path", "schemars 1.2.1", @@ -7365,7 +7610,6 @@ dependencies = [ "copypasta", "crossterm 0.29.0", "futures", - "futures-batch", "indexmap 2.13.0", "insta", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 5db28ed3..7f721cb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ serde_json = { version = "1.0.149", features = ["preserve_order"] } serde = { version = "1.0.228", features = ["derive"] } strum = {version = "0.28.0" } tracing = "0.1.44" +tokio = { version = "1.50.0" } +futures = "0.3.32" schemars = { version = "1.2.1" } @@ -92,3 +94,4 @@ invalid_from_utf8 = "deny" never_type_fallback_flowing_into_unsafe = "deny" ptr_to_integer_transmute_in_consts = "deny" static_mut_refs = "deny" + diff --git a/benches/search_engine.rs b/benches/search_engine.rs new file mode 100644 index 00000000..c266401f --- /dev/null +++ b/benches/search_engine.rs @@ -0,0 +1,9 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use std::hint::black_box; + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("fib 20", |b| b.iter(|| fibonacci(black_box(20)))); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 3b524b64..bcca06df 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -27,13 +27,21 @@ resolve-path = "0.1.0" directories = "6.0.0" chrono = "0.4.44" tracing = { workspace = true } +futures-batch = "0.7.0" +futures = { workspace = true } + [dev-dependencies] testing_logger = "0.1.1" -schemars = { workspace = true } - - +schemars = { workspace = true, features = ["indexmap2", "url2", "chrono04"] } +criterion = { version = "0.8.2", features = [ "html_reports" ] } +mock_json = "0.1.8" +fake = { version = "5.1.0", features = ["uuid"] } [features] ssl-vendored = ["rdkafka/ssl-vendored"] gssapi-vendored = ["rdkafka/gssapi-vendored"] + +[[bench]] +name = "search_engine" +harness = false diff --git a/crates/app/benches/search_engine.rs b/crates/app/benches/search_engine.rs new file mode 100644 index 00000000..dee89b70 --- /dev/null +++ b/crates/app/benches/search_engine.rs @@ -0,0 +1,118 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use fake::{Fake, uuid::UUIDv7}; +use lib::{DataType, KafkaRecord, SearchQuery}; +use mock_json::mock; +use serde_json::json; +use std::hint::black_box; +use std::{collections::BTreeMap, env::temp_dir}; +use yozefu_app::search::{Search, SearchContext}; + +fn generate_mock_value() -> serde_json::Value { + mock(&json!({ + "code":0, + "msg":"just text", + "data":[{ + "id":"@Id|10", + "title": "@Title", + "datetime":"@DateTime", + "author":{ + "name":"@Name", + "id":"@Guid", + "email":"@Email", + "id_number":"@IdNumber", + "ip":"@Ip", + "phones":["@Phone", 1, 3], + "blog":"@Url", + "avatar":"@Image|80X80|f7f7f7|fff" + } + }, 10, 50] + })) +} + +#[allow(dead_code)] +fn generate_mock_key() -> serde_json::Value { + mock(&json!({ + "code":0, + "msg":"just text", + "data":[{ + "id":"@Id|10", + "title": "@Title", + "datetime":"@DateTime", + "author":{ + "name":"@Name", + "id":"@Guid", + "email":"@Email", + "id_number":"@IdNumber", + "ip":"@Ip", + "phones":["@Phone", 1, 3], + "blog":"@Url", + "avatar":"@Image|80X80|f7f7f7|fff" + } + }, 10, 50] + })) +} + +fn generate_mock_kafka_record() -> KafkaRecord { + let topic: String = UUIDv7.fake(); + let key: String = UUIDv7.fake(); + let value = generate_mock_value(); + let value_as_string = serde_json::to_string(&value).unwrap(); + + KafkaRecord { + key: DataType::String(key.clone()), + value: DataType::Json(value), + timestamp: Some((0..i64::MAX).fake::()), + topic, + partition: (0..16).fake::(), + offset: (0..100_000_000).fake::(), + headers: BTreeMap::new(), + key_schema: None, + value_schema: None, + size: (0..18000).fake::(), + key_as_string: serde_json::to_string(&key).unwrap(), + value_as_string, + } +} + +fn read_all(c: &mut Criterion) { + let filter_dir = temp_dir(); + let (_, search_query) = SearchQuery::parse("from begin").unwrap(); + c.bench_function("read all records", |b| { + b.iter(|| { + //thread::sleep(std::time::Duration::from_millis(2000)); + let record = generate_mock_kafka_record(); + let context = SearchContext::new(&record, &filter_dir); + let _ = search_query.matches(black_box(&context)); + }) + }); +} + +fn even_offset(c: &mut Criterion) { + let filter_dir = temp_dir(); + let (_, search_query) = SearchQuery::parse("from begin partition == 10").unwrap(); + c.bench_function("partition == 10", |b| { + b.iter(|| { + //thread::sleep(std::time::Duration::from_millis(2000)); + let record = generate_mock_kafka_record(); + let context = SearchContext::new(&record, &filter_dir); + let _ = search_query.matches(black_box(&context)); + }) + }); +} + +fn value_contains_string(c: &mut Criterion) { + let filter_dir = temp_dir(); + let (_, search_query) = + SearchQuery::parse("from begin value contains 'fff' or key contains '34'").unwrap(); + c.bench_function("value contains 'fff' or value key contains '34'", |b| { + b.iter(|| { + //thread::sleep(std::time::Duration::from_millis(2000)); + let record = generate_mock_kafka_record(); + let context = SearchContext::new(&record, &filter_dir); + let _ = search_query.matches(black_box(&context)); + }) + }); +} + +criterion_group!(benches, read_all, even_offset, value_contains_string); +criterion_main!(benches); diff --git a/crates/app/src/admin/mod.rs b/crates/app/src/admin/mod.rs index c906a4af..3b1efb1a 100644 --- a/crates/app/src/admin/mod.rs +++ b/crates/app/src/admin/mod.rs @@ -1,21 +1,34 @@ -use lib::{Error, TopicConfig}; +use std::{collections::HashSet, time::Duration}; + +use itertools::Itertools; +use lib::{ConsumerGroupDetail, Error, TopicConfig, TopicDetail}; use rdkafka::{ - ClientConfig, + Offset, TopicPartitionList, admin::{AdminClient as RDAdminClient, AdminOptions, ResourceSpecifier}, client::DefaultClientContext, config::FromClientConfigAndContext, + consumer::{BaseConsumer, Consumer, StreamConsumer}, }; +use tracing::warn; + +use crate::configuration::{Configuration, InternalConfig}; pub struct AdminClient { client: RDAdminClient, + config: InternalConfig, options: AdminOptions, } impl AdminClient { - pub fn new(config: ClientConfig) -> Result { - let client = RDAdminClient::from_config_and_context(&config, DefaultClientContext)?; + pub fn new(config: InternalConfig) -> Result { + let client = + RDAdminClient::from_config_and_context(&config.client_config(), DefaultClientContext)?; let options = AdminOptions::new(); - Ok(Self { client, options }) + Ok(Self { + client, + config, + options, + }) } /// Loads the configuration details for the specified topic from the Kafka cluster. pub async fn topic_config(&self, topic: &str) -> Result, Error> { @@ -35,4 +48,99 @@ impl AdminClient { None => Ok(None), } } + + /// Returns the topics details for a given list topics + /// This function is not ready yet + pub fn topic_details(&self, topics: HashSet) -> Result, Error> { + let mut results = vec![]; + for topic in topics { + let consumer: BaseConsumer = self.config.create_kafka_consumer()?; + let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?; + let metadata = metadata.topics().first().unwrap(); + let mut detail = TopicDetail { + name: topic.clone(), + replicas: metadata.partitions().first().unwrap().replicas().len(), + partitions: metadata.partitions().len(), + consumer_groups: vec![], + count: self.count_records_in_topic(&topic)?, + config: None, + }; + let mut consumer_groups = vec![]; + let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?; + for g in metadata.groups() { + consumer_groups.push(ConsumerGroupDetail { + name: g.name().to_string(), + members: vec![], //Self::parse_members(g, g.members())?, + state: g.state().parse()?, + }); + } + detail.consumer_groups = consumer_groups; + results.push(detail); + } + + Ok(results) + } + + pub fn estimate_number_of_records_to_read( + &self, + topic_partition_list: &TopicPartitionList, + ) -> Result { + let client: StreamConsumer = self.config.create_kafka_consumer()?; + let mut count = 0; + for t in topic_partition_list.elements() { + // this function call be very slow + let watermarks: (i64, i64) = + match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) { + Ok(i) => i, + Err(e) => { + warn!( + "I was not able to fetch watermarks of topic '{}', partition {}: {}", + t.topic(), + t.partition(), + e + ); + (0, 0) + } + }; + count += match t.offset() { + Offset::Beginning => watermarks.1 - watermarks.0, + Offset::End => 0, + Offset::Stored => 1, + Offset::Invalid => 1, + Offset::Offset(o) => watermarks.1 - o, + Offset::OffsetTail(o) => o, + } + } + Ok(count) + } + + fn count_records_in_topic(&self, topic: &str) -> Result { + let mut count = 0; + let consumer: BaseConsumer = self.config.create_kafka_consumer()?; + let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?; + let metadata_topic = metadata.topics().first(); + if metadata_topic.is_none() { + return Ok(0); + } + + let metadata_topic = metadata_topic.unwrap(); + for partition in metadata_topic.partitions() { + let watermarks = + consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?; + count += watermarks.1 - watermarks.0; + } + + Ok(count) + } + + pub fn list_topics(&self) -> Result, Error> { + let consumer: StreamConsumer = self.config.create_kafka_consumer()?; + let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?; + let topics = metadata + .topics() + .iter() + .map(|t| t.name().to_string()) + .collect_vec(); + Ok(topics) + } } diff --git a/crates/app/src/app.rs b/crates/app/src/app.rs index 62694910..7ef75ff3 100644 --- a/crates/app/src/app.rs +++ b/crates/app/src/app.rs @@ -1,14 +1,13 @@ //! This app is both a kafka consumer and a kafka admin client. use lib::{ - ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicConfig, TopicDetail, - kafka::SchemaRegistryClient, search::offset::FromOffset, + Error, ExportedKafkaRecord, KafkaRecord, TopicConfig, TopicDetail, kafka::SchemaRegistryClient, }; use rdkafka::{ - Offset, TopicPartitionList, - consumer::{BaseConsumer, Consumer, StreamConsumer}, + TopicPartitionList, + consumer::{Consumer as AA, StreamConsumer}, }; use thousands::Separable; -use tracing::{info, warn}; +use tracing::info; use std::{collections::HashSet, fs, time::Duration}; @@ -17,7 +16,8 @@ use itertools::Itertools; use crate::{ AdminClient, configuration::{Configuration, ConsumerConfig, InternalConfig, YozefuConfig}, - search::{Search, ValidSearchQuery}, + consumer::Consumer, + search::ValidSearchQuery, }; /// Struct exposing different functions for consuming kafka records. @@ -26,21 +26,14 @@ pub struct App { pub cluster: String, pub config: InternalConfig, pub search_query: ValidSearchQuery, - //pub output_file: PathBuf, } impl App { - pub fn new( - cluster: String, - config: InternalConfig, - search_query: ValidSearchQuery, - // output_file: PathBuf, - ) -> Self { + pub fn new(cluster: String, config: InternalConfig, search_query: ValidSearchQuery) -> Self { Self { cluster, config, search_query, - // output_file, } } @@ -51,31 +44,18 @@ impl App { } } + pub fn create_consumer_2(&self, topics: &Vec) -> Result { + Consumer::new( + self.config.specific.clone(), + self.consumer_config(), + self.search_query.query().clone(), + topics, + ) + } + /// Create a kafka consumer pub fn create_consumer(&self, topics: &Vec) -> Result { - let offset = self.search_query.offset().unwrap_or(FromOffset::End); - match offset { - FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning), - FromOffset::End => self.assign_partitions(topics, Offset::End), - FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)), - FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)), - FromOffset::Timestamp(timestamp) => { - let consumer: StreamConsumer = self.config.create_kafka_consumer()?; - let mut tp = TopicPartitionList::new(); - for t in topics { - let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?; - for m in metadata.topics() { - for p in m.partitions() { - tp.add_partition(m.name(), p.id()); - } - } - } - tp.set_all_offsets(Offset::Offset(timestamp))?; - let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?; - consumer.assign(&tt)?; - Ok(consumer) - } - } + Ok(self.create_consumer_2(topics)?.stream_consumer()) } pub fn consumer_config(&self) -> ConsumerConfig { @@ -124,33 +104,9 @@ impl App { &self, topic_partition_list: &TopicPartitionList, ) -> Result { - let client: StreamConsumer = self.create_assigned_consumer()?; - let mut count = 0; - for t in topic_partition_list.elements() { - // this function call be very slow - let watermarks: (i64, i64) = - match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) { - Ok(i) => i, - Err(e) => { - warn!( - "I was not able to fetch watermarks of topic '{}', partition {}: {}", - t.partition(), - t.topic(), - e - ); - (0, 0) - } - }; - count += match t.offset() { - Offset::Beginning => watermarks.1 - watermarks.0, - Offset::End => 0, - Offset::Stored => 1, - Offset::Invalid => 1, - Offset::Offset(o) => watermarks.1 - o, - Offset::OffsetTail(o) => o, - } - } - + let count = self + .admin_client()? + .estimate_number_of_records_to_read(topic_partition_list)?; info!( "{} records are about to be consumed from the following topic partitions: [{}]", count.separate_with_underscores(), @@ -163,98 +119,21 @@ impl App { Ok(count) } - fn create_assigned_consumer(&self) -> Result { - self.config.create_kafka_consumer() - } - - /// Assigns topics to a consumer - fn assign_partitions( - &self, - topics: &Vec, - offset: Offset, - ) -> Result { - let consumer = self.create_assigned_consumer()?; - let mut assignments = TopicPartitionList::new(); - for topic in topics { - let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?; - for t in metadata.topics() { - for p in t.partitions() { - assignments.add_partition_offset(topic, p.id(), offset)?; - } - } - } - consumer.assign(&assignments)?; - info!("New Consumer created, about to consume {topics:?}"); - Ok(consumer) - } - - /// Returns the topics details for a given list topics - /// This function is not ready yet pub fn topic_details(&self, topics: HashSet) -> Result, Error> { - let mut results = vec![]; - for topic in topics { - let consumer: BaseConsumer = self.config.create_kafka_consumer()?; - let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?; - let metadata = metadata.topics().first().unwrap(); - let mut detail = TopicDetail { - name: topic.clone(), - replicas: metadata.partitions().first().unwrap().replicas().len(), - partitions: metadata.partitions().len(), - consumer_groups: vec![], - count: self.count_records_in_topic(&topic)?, - config: None, - }; - let mut consumer_groups = vec![]; - let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?; - for g in metadata.groups() { - consumer_groups.push(ConsumerGroupDetail { - name: g.name().to_string(), - members: vec![], //Self::parse_members(g, g.members())?, - state: g.state().parse()?, - }); - } - detail.consumer_groups = consumer_groups; - results.push(detail); - } - - Ok(results) + self.admin_client()?.topic_details(topics) } pub async fn topic_config_of(&self, topic: &str) -> Result, Error> { - AdminClient::new(self.config.client_config())? - .topic_config(topic) - .await + self.admin_client()?.topic_config(topic).await } - pub fn count_records_in_topic(&self, topic: &str) -> Result { - let mut count = 0; - let consumer: BaseConsumer = self.config.create_kafka_consumer()?; - let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?; - let metadata_topic = metadata.topics().first(); - if metadata_topic.is_none() { - return Ok(0); - } - - let metadata_topic = metadata_topic.unwrap(); - for partition in metadata_topic.partitions() { - let watermarks = - consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?; - count += watermarks.1 - watermarks.0; - } - - Ok(count) + pub fn admin_client(&self) -> Result { + AdminClient::new(self.config.clone()) } /// Lists available kafka topics on the cluster. pub fn list_topics(&self) -> Result, Error> { - let consumer: StreamConsumer = self.create_assigned_consumer()?; - let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?; - let topics = metadata - .topics() - .iter() - .map(|t| t.name().to_string()) - .collect_vec(); - Ok(topics) + self.admin_client()?.list_topics() } // TODO https://github.com/fede1024/rust-rdkafka/pull/680 diff --git a/crates/app/src/configuration/yozefu_config.rs b/crates/app/src/configuration/yozefu_config.rs index 1c02d071..6ac46644 100644 --- a/crates/app/src/configuration/yozefu_config.rs +++ b/crates/app/src/configuration/yozefu_config.rs @@ -15,7 +15,7 @@ pub struct YozefuConfig { } impl YozefuConfig { - pub(super) fn new(cluster: &str, cluster_config: ClusterConfig) -> Self { + pub fn new(cluster: &str, cluster_config: ClusterConfig) -> Self { Self { cluster: cluster.to_string(), cluster_config, diff --git a/crates/app/src/consumer.rs b/crates/app/src/consumer.rs new file mode 100644 index 00000000..7a86a068 --- /dev/null +++ b/crates/app/src/consumer.rs @@ -0,0 +1,135 @@ +//! A custom Kafka consumer for Yozefu. +//! this module wraps the rdkafka consumer and provides additional functionalities. + +use std::time::Duration; + +use futures::{StreamExt, future}; +use futures_batch::TryChunksTimeoutStreamExt; +use lib::{Error, SearchQuery, search::offset::FromOffset}; +use rdkafka::{ + Offset, TopicPartitionList, + consumer::{Consumer as _, stream_consumer::StreamConsumer}, + message::BorrowedMessage, +}; + +use crate::{ + configuration::{Configuration, ConsumerConfig, YozefuConfig}, + search::Search, +}; + +pub struct Consumer { + consumer_config: ConsumerConfig, + consumer: StreamConsumer, +} + +impl Consumer { + pub fn new( + config: YozefuConfig, + consumer_config: ConsumerConfig, + query: SearchQuery, + topics: &Vec, + ) -> Result { + let consumer: StreamConsumer = config.create_kafka_consumer()?; + let assignments = Self::create_assignments(&config, query, topics)?; + consumer.assign(&assignments)?; + + Ok(Self { + consumer_config, + consumer, + }) + } + + pub async fn consume( + &self, + mut process_records_closure: impl FnMut( + Result>, rdkafka::error::KafkaError>, + ), + ) -> Result<(), Error> { + let future = self + .consumer + .stream() + .try_chunks_timeout( + self.consumer_config.buffer_capacity, + Duration::from_millis(self.consumer_config.timeout_in_ms), + ) + .for_each(|bulk_of_records| { + process_records_closure(bulk_of_records); + future::ready(()) + }); + + let _: () = future.await; + Ok(()) + } + + pub fn stream_consumer(self) -> StreamConsumer { + self.consumer + } + + pub fn assignment(&self) -> Result { + self.consumer.assignment() + } + + fn create_assignments( + config: &YozefuConfig, + query: SearchQuery, + topics: &Vec, + ) -> Result { + let offset = query.offset().unwrap_or(FromOffset::End); + let mut assignments = TopicPartitionList::new(); + for topic in topics { + let consumer: StreamConsumer = config.create_kafka_consumer()?; + let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?; + let assignments_for_topic = match offset { + FromOffset::Beginning => { + Self::assign_partitions(topic, &metadata, Offset::Beginning) + } + FromOffset::End => Self::assign_partitions(topic, &metadata, Offset::End), + FromOffset::Offset(o) => { + Self::assign_partitions(topic, &metadata, Offset::Offset(o)) + } + FromOffset::OffsetTail(o) => { + Self::assign_partitions(topic, &metadata, Offset::OffsetTail(o)) + } + FromOffset::Timestamp(timestamp) => { + let mut assignments = TopicPartitionList::new(); + for m in metadata.topics() { + for p in m.partitions() { + assignments.add_partition(m.name(), p.id()); + } + } + assignments.set_all_offsets(Offset::Offset(timestamp))?; + consumer.offsets_for_times(assignments, Duration::from_secs(60))? + } + }; + + for elem in assignments_for_topic.elements() { + assignments + .add_partition_offset(elem.topic(), elem.partition(), elem.offset()) + .expect( + "Failed to add partition to assignment in 'create_assignments' function", + ); + } + } + + Ok(assignments) + } + + /// Assigns topics to a consumer + fn assign_partitions( + topic: &str, + metadata: &rdkafka::metadata::Metadata, + offset: Offset, + ) -> TopicPartitionList { + let mut assignments = TopicPartitionList::new(); + for m in metadata.topics() { + for p in m.partitions() { + assignments + .add_partition_offset(topic, p.id(), offset) + .expect( + "Failed to add partition to assignment in 'assign_partitions' function", + ); + } + } + assignments + } +} diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index c2b3ee00..b0d19a9a 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -5,6 +5,7 @@ pub mod admin; mod app; pub mod configuration; +pub mod consumer; pub mod search; pub use admin::*; diff --git a/crates/app/src/search/mod.rs b/crates/app/src/search/mod.rs index 6d95d987..4abaae8b 100644 --- a/crates/app/src/search/mod.rs +++ b/crates/app/src/search/mod.rs @@ -4,7 +4,7 @@ use extism::{Manifest, Plugin, Wasm}; use filter::{CACHED_FILTERS, PARSE_PARAMETERS_FUNCTION_NAME}; use itertools::Itertools; use lib::{ - KafkaRecord, SearchQuery, parse_search_query, + KafkaRecord, SearchQuery, search::{ filter::{Filter, Parameter}, offset::FromOffset, @@ -76,7 +76,7 @@ impl ValidSearchQuery { impl ValidSearchQuery { pub fn from(input: &str, filters_directory: &Path) -> Result { - let query = parse_search_query(input).map_err(lib::Error::Search)?.1; + let query = SearchQuery::parse(input).map_err(lib::Error::Search)?.1; let filters = query.filters(); for filter in filters { let name = filter.name; diff --git a/crates/bin/Cargo.toml b/crates/bin/Cargo.toml index af5aa676..e9df16a8 100644 --- a/crates/bin/Cargo.toml +++ b/crates/bin/Cargo.toml @@ -22,7 +22,7 @@ name = "yozf" path = "src/main.rs" [dependencies] -tokio = { version = "1", features = ["full", "tracing"] } +tokio = { workspace = true, features = ["full", "tracing"] } command = { workspace = true } [features] diff --git a/crates/command/Cargo.toml b/crates/command/Cargo.toml index 3634932b..42e59d02 100644 --- a/crates/command/Cargo.toml +++ b/crates/command/Cargo.toml @@ -31,9 +31,9 @@ strum = { workspace = true, features = ["derive", "strum_macros"] } indicatif = { version = "0.18.4", features = ["tokio"] } tempfile = "3.27.0" tokio-util = "0.7.18" -futures = "0.3.32" +futures = { workspace = true } itertools = "0.14.0" -tokio = { version = "1", features = ["full", "tracing"] } +tokio = { workspace = true, features = ["full", "tracing"] } rdkafka = { version = "0.39.0", features = [ "cmake-build", "curl-static", diff --git a/crates/command/src/headless/mod.rs b/crates/command/src/headless/mod.rs index aabec8d0..ffe87c07 100644 --- a/crates/command/src/headless/mod.rs +++ b/crates/command/src/headless/mod.rs @@ -80,7 +80,7 @@ impl Headless { return; }, Some(message) = rx_dd.recv() => { - let record = KafkaRecord::parse(message, &mut schema_registry).await; + let record = KafkaRecord::parse_with_schema_registry(message, &mut schema_registry).await; let context = SearchContext::new(&record, &filters_directory); if search_query.matches(&context) { records_channel.0.send(record).unwrap(); diff --git a/crates/lib/src/kafka/kafka_record.rs b/crates/lib/src/kafka/kafka_record.rs index e5dbd72a..6a1f5518 100644 --- a/crates/lib/src/kafka/kafka_record.rs +++ b/crates/lib/src/kafka/kafka_record.rs @@ -71,25 +71,43 @@ impl KafkaRecord { #[cfg(feature = "native")] impl KafkaRecord { - pub async fn parse( - owned_message: OwnedMessage, - schema_registry: &mut Option, - ) -> Self { - let mut headers: BTreeMap = BTreeMap::new(); - if let Some(old_headers) = owned_message.headers() { - for header in old_headers.iter() { - headers.insert( - header.key.to_string(), - header - .value - .map(|e| { - String::from_utf8(e.to_vec()).unwrap_or("".to_string()) - }) - .unwrap_or_default(), - ); + pub fn parse(owned_message: OwnedMessage) -> Self { + let headers = Self::extract_headers(&owned_message); + let size = owned_message.payload().map_or(0, <[u8]>::len) + + owned_message.key().map_or(0, <[u8]>::len); + + let (key, key_schema, value, value_schema) = match owned_message.topic() { + "__consumer_offsets" => { + extract_key_and_value_from_consumer_offsets_topics(&owned_message) + } + _ => { + let key = Self::extract_data(owned_message.key()); + let value = Self::extract_data(owned_message.payload()); + (key, None, value, None) } + }; + + Self { + value_as_string: value.to_string(), + value, + key_as_string: key.to_string(), + key, + topic: owned_message.topic().to_string(), + timestamp: owned_message.timestamp().to_millis(), + partition: owned_message.partition(), + offset: owned_message.offset(), + headers, + key_schema, + value_schema, + size, } + } + pub async fn parse_with_schema_registry( + owned_message: OwnedMessage, + schema_registry: &mut Option, + ) -> Self { + let headers = Self::extract_headers(&owned_message); let size = owned_message.payload().map_or(0, <[u8]>::len) + owned_message.key().map_or(0, <[u8]>::len); @@ -278,6 +296,33 @@ impl KafkaRecord { } } } + + fn extract_data(payload: Option<&[u8]>) -> DataType { + let schema_id = SchemaId::parse(payload); + if schema_id.is_none() { + return Self::payload_to_data_type(payload, None); + } + Self::payload_to_data_type(payload, None) + } + + fn extract_headers(owned_message: &OwnedMessage) -> BTreeMap { + let mut headers: BTreeMap = BTreeMap::new(); + if let Some(old_headers) = owned_message.headers() { + for header in old_headers.iter() { + headers.insert( + header.key.to_string(), + header + .value + .map(|e| { + String::from_utf8(e.to_vec()).unwrap_or("".to_string()) + }) + .unwrap_or_default(), + ); + } + } + + headers + } } #[test] diff --git a/crates/lib/src/kafka/kafka_record_test.rs b/crates/lib/src/kafka/kafka_record_test.rs index 1d3209ad..f60b4d39 100644 --- a/crates/lib/src/kafka/kafka_record_test.rs +++ b/crates/lib/src/kafka/kafka_record_test.rs @@ -8,8 +8,8 @@ use crate::{ kafka::{SchemaId, schema::Schema}, }; -#[tokio::test] -async fn test_kafka_record_deserialization() { +#[test] +fn test_kafka_record_deserialization() { let payload = b"\x00\x00\x00\x00\x01{\"key\":\"value\"}"; let message = OwnedMessage::new( Some(payload.to_vec()), @@ -20,7 +20,7 @@ async fn test_kafka_record_deserialization() { 313, None, ); - let record = KafkaRecord::parse(message, &mut None).await; + let record = KafkaRecord::parse(message); assert_eq!(record.size, 20); assert_eq!( record.timestamp_as_local_date_time(), diff --git a/crates/lib/src/lib.rs b/crates/lib/src/lib.rs index b596ec1b..e388c125 100644 --- a/crates/lib/src/lib.rs +++ b/crates/lib/src/lib.rs @@ -5,10 +5,7 @@ pub mod error; #[cfg(feature = "native")] -pub use { - error::Error, kafka::ExportedKafkaRecord, kafka::topic::*, search::SearchQuery, - search::parse_search_query, -}; +pub use {error::Error, kafka::ExportedKafkaRecord, kafka::topic::*, search::SearchQuery}; pub mod kafka; pub mod search; diff --git a/crates/lib/src/search/atom.rs b/crates/lib/src/search/atom.rs index 42086393..7b01badf 100644 --- a/crates/lib/src/search/atom.rs +++ b/crates/lib/src/search/atom.rs @@ -29,7 +29,6 @@ pub(crate) fn parse_atom(input: &str) -> IResult<&str, Atom> { delimited(wsi(tag("(")), parse_or_expression, wsi(tag(")"))), |expr: Expression| Atom::Parenthesis(Box::new(expr)), ), - //map(parse_symbol, Atom::Symbol), )) .parse(input) } diff --git a/crates/lib/src/search/compare/mod_test.rs b/crates/lib/src/search/compare/mod_test.rs index 6309f739..1f580eb9 100644 --- a/crates/lib/src/search/compare/mod_test.rs +++ b/crates/lib/src/search/compare/mod_test.rs @@ -1,10 +1,10 @@ -use crate::search::{compare::parse_compare, parse_search_query}; +use crate::{SearchQuery, search::compare::parse_compare}; #[test] fn test_parse_compare() { assert!(parse_compare(r#"timestamp between "2024-05-28T17:55:08.145+02:00" and now"#).is_ok()); assert!( - parse_search_query( + SearchQuery::parse( r#"timestamp between "2024-05-28T17:55:08.145+02:00" and now from begin"# ) .is_ok() @@ -14,7 +14,7 @@ fn test_parse_compare() { #[test] fn test_parse_search_query() { assert!( - parse_search_query( + SearchQuery::parse( r#"timestamp between "2024-05-28T17:55:08.145+02:00" and now from begin"# ) .is_ok() diff --git a/crates/lib/src/search/mod.rs b/crates/lib/src/search/mod.rs index 1572952d..eab3a504 100644 --- a/crates/lib/src/search/mod.rs +++ b/crates/lib/src/search/mod.rs @@ -65,8 +65,6 @@ pub use order::Order; pub use order::OrderBy; #[cfg(feature = "native")] pub use search_query::SearchQuery; -#[cfg(feature = "native")] -pub use search_query::parse_search_query; use serde::Deserialize; use serde::Serialize; @@ -78,6 +76,8 @@ pub mod filter_test; pub mod number_test; #[cfg(test)] pub mod offset_test; +#[cfg(test)] +pub mod symbol_test; /// Result of a search filter evaluation. #[derive(Debug, PartialEq, Clone, Default, Deserialize, Serialize)] diff --git a/crates/lib/src/search/search_query.rs b/crates/lib/src/search/search_query.rs index 21b4af27..ba2daa6b 100644 --- a/crates/lib/src/search/search_query.rs +++ b/crates/lib/src/search/search_query.rs @@ -32,6 +32,43 @@ impl SearchQuery { pub fn is_empty(&self) -> bool { self.limit.is_none() && self.from.is_none() && self.expression.is_empty() } + + pub fn parse(input: &str) -> Result<(&str, SearchQuery), SearchError> { + map( + many_till( + alt(( + parse_from_offset_clause, + parse_limit, + parse_expression, + parse_order_by, + )), + wsi(eof), + ), + |clauses| { + let mut s = SearchQuery::default(); + for c in clauses.0 { + match c { + SearchClause::Limit(i) => s.limit = Some(i), + SearchClause::From(f) => s.from = Some(f), + SearchClause::Expression(u) => s.expression = u, + SearchClause::OrderBy(order, k) => { + s.order_by = OrderBy::new(order, k.unwrap_or(OrderKeyword::Asc)); + } //SearchClause::GroupByKey => s.group_by_key = true, + } + } + s + }, + ) + .parse(input) + .map_err(|e| { + let remaining = match e { + nom::Err::Incomplete(_) => input.to_string(), + nom::Err::Error(s) => s.input.to_string(), + nom::Err::Failure(s) => s.input.to_string(), + }; + SearchError::Parse(remaining) + }) + } } impl std::fmt::Display for SearchQuery { @@ -67,49 +104,12 @@ impl Default for SearchQuery { } } -pub fn parse_search_query(input: &str) -> Result<(&str, SearchQuery), SearchError> { - map( - many_till( - alt(( - parse_from_offset_clause, - parse_limit, - parse_expression, - parse_order_by, - )), - wsi(eof), - ), - |clauses| { - let mut s = SearchQuery::default(); - for c in clauses.0 { - match c { - SearchClause::Limit(i) => s.limit = Some(i), - SearchClause::From(f) => s.from = Some(f), - SearchClause::Expression(u) => s.expression = u, - SearchClause::OrderBy(order, k) => { - s.order_by = OrderBy::new(order, k.unwrap_or(OrderKeyword::Asc)); - } //SearchClause::GroupByKey => s.group_by_key = true, - } - } - s - }, - ) - .parse(input) - .map_err(|e| { - let remaining = match e { - nom::Err::Incomplete(_) => input.to_string(), - nom::Err::Error(s) => s.input.to_string(), - nom::Err::Failure(s) => s.input.to_string(), - }; - SearchError::Parse(remaining) - }) -} - #[test] fn test_parse_search_query() { - assert!(parse_search_query(r#" from end - 10"#).is_ok()); + assert!(SearchQuery::parse(r#" from end - 10"#).is_ok()); } #[test] fn test_parse_search_query_with_json_path() { - assert!(parse_search_query(r#"from end - 10 value.sequenceNum == "115568969""#).is_ok()); + assert!(SearchQuery::parse(r#"from end - 10 value.sequenceNum == "115568969""#).is_ok()); } diff --git a/crates/lib/src/search/symbol.rs b/crates/lib/src/search/symbol.rs index a499a0d1..05cc656e 100644 --- a/crates/lib/src/search/symbol.rs +++ b/crates/lib/src/search/symbol.rs @@ -25,21 +25,6 @@ pub enum Symbol { Header(String), } -// pub(crate) fn parse_symbol(input: &str) -> IResult<&str, Symbol> { -// alt(( -// parse_offset, -// parse_timestamp_symbol, -// parse_topic, -// parse_partition, -// parse_key, -// parse_size, -// parse_value, -// map(parse_value_symbol, |e| e.0), -// map(parse_header_symbol, |e| e.0), -// )) -// .parse(input) -// } - pub(crate) fn parse_offset(input: &str) -> IResult<&str, Symbol> { value(Symbol::Offset, wsi(alt((tag("offset"), tag("o"))))).parse(input) } diff --git a/crates/lib/src/search/symbol_test.rs b/crates/lib/src/search/symbol_test.rs index 972e3e8c..a436813e 100644 --- a/crates/lib/src/search/symbol_test.rs +++ b/crates/lib/src/search/symbol_test.rs @@ -1,43 +1,49 @@ -use crate::search::symbol::{parse_symbol, Symbol}; +use crate::search::symbol::{ + Symbol, parse_key, parse_offset, parse_partition, parse_size, parse_timestamp_symbol, + parse_topic, parse_value, +}; #[test] fn test_parse_value() { - assert_eq!(parse_symbol(r#"value"#), Ok(("", Symbol::Value(None)))); - assert_eq!(parse_symbol(r#"v"#), Ok(("", Symbol::Value(None)))); + assert_eq!(parse_value(r#"value"#), Ok(("", Symbol::Value(None)))); + assert_eq!(parse_value(r#"v"#), Ok(("", Symbol::Value(None)))); } #[test] fn test_parse_topic() { - assert_eq!(parse_symbol(r#"topic"#), Ok(("", Symbol::Topic))); - assert_eq!(parse_symbol(r#"t"#), Ok(("", Symbol::Topic))); + assert_eq!(parse_topic(r#"topic"#), Ok(("", Symbol::Topic))); + assert_eq!(parse_topic(r#"t"#), Ok(("", Symbol::Topic))); } #[test] fn test_parse_key() { - assert_eq!(parse_symbol(r#"key"#), Ok(("", Symbol::Key))); - assert_eq!(parse_symbol(r#"k"#), Ok(("", Symbol::Key))); + assert_eq!(parse_key(r#"key"#), Ok(("", Symbol::Key))); + assert_eq!(parse_key(r#"k"#), Ok(("", Symbol::Key))); } #[test] fn test_parse_partition() { - assert_eq!(parse_symbol(r#"partition"#), Ok(("", Symbol::Partition))); - assert_eq!(parse_symbol(r#"p"#), Ok(("", Symbol::Partition))); + assert_eq!(parse_partition(r#"partition"#), Ok(("", Symbol::Partition))); + assert_eq!(parse_partition(r#"p"#), Ok(("", Symbol::Partition))); } #[test] fn test_parse_offset() { - assert_eq!(parse_symbol(r#"offset"#), Ok(("", Symbol::Offset))); - assert_eq!(parse_symbol(r#"o"#), Ok(("", Symbol::Offset))); + assert_eq!(parse_offset(r#"offset"#), Ok(("", Symbol::Offset))); + assert_eq!(parse_offset(r#"o"#), Ok(("", Symbol::Offset))); } #[test] fn test_parse_timestamp() { - assert_eq!(parse_symbol(r#"timestamp"#), Ok(("", Symbol::Timestamp))); - assert_eq!(parse_symbol(r#"ts"#), Ok(("", Symbol::Timestamp))); + assert_eq!( + parse_timestamp_symbol(r#"timestamp"#), + Ok(("", Symbol::Timestamp)) + ); + assert_eq!(parse_timestamp_symbol(r#"ts"#), Ok(("", Symbol::Timestamp))); } #[test] fn test_parse_size() { - assert_eq!(parse_symbol(r#"size"#), Ok(("", Symbol::Size))); - assert_eq!(parse_symbol(r#"si"#), Ok(("", Symbol::Size))); + assert_eq!(parse_size(r#"size"#), Ok(("", Symbol::Size))); + assert_eq!(parse_size(r#"si"#), Ok(("", Symbol::Size))); } diff --git a/crates/lib/tests/deserializers/avro/mod.rs b/crates/lib/tests/deserializers/avro/mod.rs index b2414b7c..80d9edd8 100644 --- a/crates/lib/tests/deserializers/avro/mod.rs +++ b/crates/lib/tests/deserializers/avro/mod.rs @@ -47,7 +47,8 @@ async fn test_avro_record() { "/schemas/ids/2" => "./inputs/schemas/value.json" }}; - let record = KafkaRecord::parse(owned_message, &mut Some(schema_client)).await; + let record = + KafkaRecord::parse_with_schema_registry(owned_message, &mut Some(schema_client)).await; insta::with_settings!({sort_maps => true}, { assert_json_snapshot!(record); }); @@ -67,7 +68,8 @@ async fn test_avro_record_unknown_primitive_type() { "/schemas/ids/1" => "./inputs/schemas/key.json", "/schemas/ids/3" => "./inputs/schemas/value-with-reference.json" }}; - let record = KafkaRecord::parse(owned_message, &mut Some(schema_client)).await; + let record = + KafkaRecord::parse_with_schema_registry(owned_message, &mut Some(schema_client)).await; insta::with_settings!({sort_maps => true}, { assert_json_snapshot!(record); }); @@ -89,7 +91,8 @@ async fn test_avro_record_with_schema_reference() { "/schemas/ids/3" => "./inputs/schemas/value-with-reference.json" }}; - let record = KafkaRecord::parse(owned_message, &mut Some(schema_client)).await; + let record = + KafkaRecord::parse_with_schema_registry(owned_message, &mut Some(schema_client)).await; insta::with_settings!({sort_maps => true}, { assert_json_snapshot!(record); }); @@ -114,7 +117,8 @@ async fn test_avro_record_with_multiple_schema_references() { "/schemas/ids/4" => "./inputs/schemas/value-with-multiple-references.json" }}; - let record = KafkaRecord::parse(owned_message, &mut Some(schema_client)).await; + let record = + KafkaRecord::parse_with_schema_registry(owned_message, &mut Some(schema_client)).await; insta::with_settings!({sort_maps => true}, { assert_json_snapshot!(record); }); diff --git a/crates/lib/tests/deserializers/text/mod.rs b/crates/lib/tests/deserializers/text/mod.rs index 97be8838..248d6216 100644 --- a/crates/lib/tests/deserializers/text/mod.rs +++ b/crates/lib/tests/deserializers/text/mod.rs @@ -1,6 +1,5 @@ use insta::{assert_debug_snapshot, glob}; use std::fs; -use tokio::runtime::Runtime; use yozefu_lib::{ExportedKafkaRecord, KafkaRecord}; use crate::{KeyValue, fix_timezone}; @@ -17,13 +16,25 @@ fn test_exported_record() { #[test] fn test_parse_records() { - let rt: Runtime = Runtime::new().unwrap(); + glob!("inputs/record*.json", |path| { + let input = fs::read_to_string(path).unwrap(); + let key_value: KeyValue = serde_json::from_str(&input).unwrap(); + let owned_message = key_value.into_owned_message(); + assert_debug_snapshot!(KafkaRecord::parse(owned_message)); + }); +} + +#[test] +fn test_parse_records_with_schema_registry() { + let rt = tokio::runtime::Runtime::new().unwrap(); glob!("inputs/record*.json", |path| { let input = fs::read_to_string(path).unwrap(); let key_value: KeyValue = serde_json::from_str(&input).unwrap(); let owned_message = key_value.into_owned_message(); rt.block_on(async { - assert_debug_snapshot!(KafkaRecord::parse(owned_message, &mut None).await); + assert_debug_snapshot!( + KafkaRecord::parse_with_schema_registry(owned_message, &mut None).await + ); }); }); } diff --git a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-2.json.snap b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-2.json.snap index 01638f0e..a20b716f 100644 --- a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-2.json.snap +++ b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-2.json.snap @@ -1,6 +1,6 @@ --- source: crates/lib/tests/deserializers/text/mod.rs -expression: "KafkaRecord::parse(owned_message, &mut None).await" +expression: "KafkaRecord::parse(owned_message)" input_file: crates/lib/tests/deserializers/text/inputs/record-2.json --- KafkaRecord { @@ -11,29 +11,15 @@ KafkaRecord { partition: 0, offset: 0, headers: {}, - key_schema: Some( - Schema { - id: SchemaId( - 1, - ), - schema_type: None, - }, - ), - value_schema: Some( - Schema { - id: SchemaId( - 2, - ), - schema_type: None, - }, - ), + key_schema: None, + value_schema: None, size: 10, key: String( - "Yozefu was not able to retrieve the schema 1 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 1]\n String: \0\0\0\0\u{1}", + "\0\0\0\0\u{1}", ), - key_as_string: "Yozefu was not able to retrieve the schema 1 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 1]\n String: \0\0\0\0\u{1}", + key_as_string: "\0\0\0\0\u{1}", value: String( - "Yozefu was not able to retrieve the schema 2 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 2]\n String: \0\0\0\0\u{2}", + "\0\0\0\0\u{2}", ), - value_as_string: "Yozefu was not able to retrieve the schema 2 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 2]\n String: \0\0\0\0\u{2}", + value_as_string: "\0\0\0\0\u{2}", } diff --git a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-3.json.snap b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-3.json.snap index 66efedc4..31071559 100644 --- a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-3.json.snap +++ b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records@record-3.json.snap @@ -1,6 +1,6 @@ --- source: crates/lib/tests/deserializers/text/mod.rs -expression: "KafkaRecord::parse(owned_message, &mut None).await" +expression: "KafkaRecord::parse(owned_message)" input_file: crates/lib/tests/deserializers/text/inputs/record-3.json --- KafkaRecord { @@ -11,29 +11,15 @@ KafkaRecord { partition: 0, offset: 0, headers: {}, - key_schema: Some( - Schema { - id: SchemaId( - 1, - ), - schema_type: None, - }, - ), - value_schema: Some( - Schema { - id: SchemaId( - 2, - ), - schema_type: None, - }, - ), + key_schema: None, + value_schema: None, size: 14, - key: Json( - Object {}, + key: String( + "\0\0\0\0\u{1}{}", ), - key_as_string: "{}", - value: Json( - Object {}, + key_as_string: "\0\0\0\0\u{1}{}", + value: String( + "\0\0\0\0\u{2}{}", ), - value_as_string: "{}", + value_as_string: "\0\0\0\0\u{2}{}", } diff --git a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-1.json.snap b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-1.json.snap new file mode 100644 index 00000000..a582d2d7 --- /dev/null +++ b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-1.json.snap @@ -0,0 +1,25 @@ +--- +source: crates/lib/tests/deserializers/text/mod.rs +expression: "KafkaRecord::parse_with_schema_registry(owned_message, &mut None).await" +input_file: crates/lib/tests/deserializers/text/inputs/record-1.json +--- +KafkaRecord { + topic: "my-topic", + timestamp: Some( + 0, + ), + partition: 0, + offset: 0, + headers: {}, + key_schema: None, + value_schema: None, + size: 2, + key: String( + "A", + ), + key_as_string: "A", + value: String( + "A", + ), + value_as_string: "A", +} diff --git a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-2.json.snap b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-2.json.snap new file mode 100644 index 00000000..383183c9 --- /dev/null +++ b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-2.json.snap @@ -0,0 +1,39 @@ +--- +source: crates/lib/tests/deserializers/text/mod.rs +expression: "KafkaRecord::parse_with_schema_registry(owned_message, &mut None).await" +input_file: crates/lib/tests/deserializers/text/inputs/record-2.json +--- +KafkaRecord { + topic: "my-topic", + timestamp: Some( + 0, + ), + partition: 0, + offset: 0, + headers: {}, + key_schema: Some( + Schema { + id: SchemaId( + 1, + ), + schema_type: None, + }, + ), + value_schema: Some( + Schema { + id: SchemaId( + 2, + ), + schema_type: None, + }, + ), + size: 10, + key: String( + "Yozefu was not able to retrieve the schema 1 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 1]\n String: \0\0\0\0\u{1}", + ), + key_as_string: "Yozefu was not able to retrieve the schema 1 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 1]\n String: \0\0\0\0\u{1}", + value: String( + "Yozefu was not able to retrieve the schema 2 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 2]\n String: \0\0\0\0\u{2}", + ), + value_as_string: "Yozefu was not able to retrieve the schema 2 because there is no schema registry configured. Please visit https://maif.github.io/yozefu/schema-registry/ for more details.\nPayload: [0, 0, 0, 0, 2]\n String: \0\0\0\0\u{2}", +} diff --git a/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-3.json.snap b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-3.json.snap new file mode 100644 index 00000000..f46ac675 --- /dev/null +++ b/crates/lib/tests/deserializers/text/snapshots/r#mod__deserializers__text__parse_records_with_schema_registry@record-3.json.snap @@ -0,0 +1,39 @@ +--- +source: crates/lib/tests/deserializers/text/mod.rs +expression: "KafkaRecord::parse_with_schema_registry(owned_message, &mut None).await" +input_file: crates/lib/tests/deserializers/text/inputs/record-3.json +--- +KafkaRecord { + topic: "my-topic", + timestamp: Some( + 0, + ), + partition: 0, + offset: 0, + headers: {}, + key_schema: Some( + Schema { + id: SchemaId( + 1, + ), + schema_type: None, + }, + ), + value_schema: Some( + Schema { + id: SchemaId( + 2, + ), + schema_type: None, + }, + ), + size: 14, + key: Json( + Object {}, + ), + key_as_string: "{}", + value: Json( + Object {}, + ), + value_as_string: "{}", +} diff --git a/crates/lib/tests/search/mod.rs b/crates/lib/tests/search/mod.rs index d35c9486..33a9f09b 100644 --- a/crates/lib/tests/search/mod.rs +++ b/crates/lib/tests/search/mod.rs @@ -1,6 +1,6 @@ use insta::{assert_debug_snapshot, glob}; use std::fs; -use yozefu_lib::parse_search_query; +use yozefu_lib::SearchQuery; #[test] fn test_search_queries() { @@ -13,7 +13,7 @@ fn test_search_queries() { filters => vec![ ("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{6,}\\+[0-9]{2}:[0-9]{2}", "[datetime]"), ]}, { - assert_debug_snapshot!(parse_search_query(input)); + assert_debug_snapshot!(SearchQuery::parse(input)); }); }); } diff --git a/crates/lib/tests/search/snapshots/r#mod__search__search_queries@1.sql.snap b/crates/lib/tests/search/snapshots/r#mod__search__search_queries@1.sql.snap index 48d4b2c6..2b34b949 100644 --- a/crates/lib/tests/search/snapshots/r#mod__search__search_queries@1.sql.snap +++ b/crates/lib/tests/search/snapshots/r#mod__search__search_queries@1.sql.snap @@ -1,7 +1,7 @@ --- source: crates/lib/tests/search/mod.rs description: "from beginning where my-wasm-filter(\"cool\", \"cat\") or (timestamp between \"2024-05-28T17:55:08.145+02:00\" and now and value contains \"foundation\") order by key desc limit 1_000" -expression: parse_search_query(input) +expression: "SearchQuery::parse(input)" input_file: crates/lib/tests/search/inputs/1.sql --- Ok( diff --git a/crates/tui/Cargo.toml b/crates/tui/Cargo.toml index 3b0ea605..94454600 100644 --- a/crates/tui/Cargo.toml +++ b/crates/tui/Cargo.toml @@ -14,10 +14,12 @@ repository.workspace = true rust-version.workspace = true [dependencies] -tokio = { version = "1", features = ["full", "tracing"] } +tokio = { workspace = true, features = ["full", "tracing"] } serde = { workspace = true } serde_json = { workspace = true } -tui-input = { version = "0.15.0", features = ["crossterm"], default-features = false } +tui-input = { version = "0.15.0", features = [ + "crossterm", +], default-features = false } chrono = "0.4.44" strum = { workspace = true, features = ["derive", "strum_macros"] } ratatui = { version = "0.30.0", features = [ @@ -29,7 +31,7 @@ itertools = "0.14.0" bytesize = { version = "2.3.1" } nom = "8.0.0" throbber-widgets-tui = "0.11.0" -futures = "0.3.32" +futures = { workspace = true } open = "5.3.3" tokio-util = "0.7.18" thousands = "0.2.0" @@ -41,7 +43,6 @@ app = { workspace = true } tracing = { workspace = true } rdkafka = { version = "0.39.0", features = ["cmake-build"] } timeago = "0.6.0" -futures-batch = "0.7.0" syntect = "5.3.0" resolve-path = "0.1.0" diff --git a/crates/tui/src/component/ui.rs b/crates/tui/src/component/ui.rs index 76dca1ac..9f8c9742 100644 --- a/crates/tui/src/component/ui.rs +++ b/crates/tui/src/component/ui.rs @@ -1,16 +1,14 @@ //! Module gathering the code to run the terminal user interface. -use app::App; +use app::consumer::Consumer; use app::search::{Search, SearchContext}; +use app::{AdminClient, App}; use chrono::DateTime; use crossterm::event::KeyEvent; -use futures::{StreamExt, future}; -use futures_batch::TryChunksTimeoutStreamExt; use itertools::Itertools; use lib::KafkaRecord; use ratatui::prelude::Rect; use rdkafka::Message; -use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::message::OwnedMessage; use std::collections::HashSet; use std::time::Duration; @@ -42,7 +40,6 @@ pub struct Ui { impl Ui { pub fn new(app: App, query: &str, selected_topics: Vec, state: State) -> Self { let (records_sender, records_receiver) = tokio::sync::mpsc::unbounded_channel(); - info!("hello from tui ui"); Self { should_quit: false, worker: CancellationToken::new(), @@ -58,8 +55,8 @@ impl Ui { app: &App, topics: Vec, tx: UnboundedSender, - ) -> Result { - match app.create_consumer(&topics) { + ) -> Result { + match app.create_consumer_2(&topics) { Ok(c) => Ok(c), Err(e) => { tx.send(Action::Notification(Notification::new( @@ -103,7 +100,6 @@ impl Ui { let token = self.worker.clone(); let search_query = self.app.search_query.query().clone(); let app = self.app.clone(); - let txx = tx.clone(); let topics = self.topics.clone(); let (tx_dd, mut rx_dd) = mpsc::unbounded_channel::(); @@ -122,7 +118,7 @@ impl Ui { return; }, Some(message) = rx_dd.recv() => { - let record = KafkaRecord::parse(message, &mut schema_registry).await; + let record = KafkaRecord::parse_with_schema_registry(message, &mut schema_registry).await; let context = SearchContext::new(&record, &filters_directory); let span = trace_span!("matching", offset = %record.offset, partition = %record.partition, topic = %record.topic); let search_span = span.enter(); @@ -157,12 +153,11 @@ impl Ui { } }).unwrap(); - let consumer_config = self.app.consumer_config(); tokio::task::Builder::new() .name("kafka-consumer") .spawn(async move { let _ = tx.send(Action::Consuming); - let consumer = match Self::create_consumer(&app, topics.clone(), txx.clone()) { + let consumer = match Self::create_consumer(&app, topics.clone(), tx.clone()) { Ok(c) => c, Err(e) => { let _ = tx.send(Action::StopConsuming()); @@ -184,15 +179,8 @@ impl Ui { .unwrap(); let mut current_time = Instant::now(); - let _ = consumer - .stream() - .take_until(token.cancelled()) - .try_chunks_timeout( - consumer_config.buffer_capacity, - Duration::from_millis(consumer_config.timeout_in_ms), - ) - .for_each(|bulk_of_records| { - // For example, TopicAuthorizationFailed + consumer + .consume(|bulk_of_records| { if let Err(ref e) = bulk_of_records { let _ = tx.send(Action::Notification(Notification::new( Level::Error, @@ -225,10 +213,9 @@ impl Ui { ))) .unwrap(); } - future::ready(()) }) - .await; - consumer.unassign().unwrap(); + .await + .unwrap(); info!("Consumer is terminated"); token.cancel(); let _ = tx.send(Action::StopConsuming()); @@ -279,12 +266,13 @@ impl Ui { } pub(crate) fn load_topics(&mut self, action_tx: UnboundedSender) { - let app = self.app.clone(); + let admin_client = AdminClient::new(self.app.config.clone()).unwrap(); + let cluster = self.app.config.cluster().to_string(); tokio::task::Builder::new() .name("topics-loader") .spawn(async move { - info!("Listing topics from the cluster"); - match app.list_topics() { + info!("Listing topics of the '{}' cluster", cluster); + match admin_client.list_topics() { Ok(topics) => { action_tx.send(Action::Topics(topics)).unwrap(); } diff --git a/docs/json-schemas/kafka-record.json b/docs/json-schemas/kafka-record.json index e41d9c00..98c065af 100644 --- a/docs/json-schemas/kafka-record.json +++ b/docs/json-schemas/kafka-record.json @@ -119,23 +119,9 @@ }, "DataType": { "anyOf": [ + true, { "type": "string" - }, - { - "type": "number" - }, - { - "type": "null" - }, - { - "type": "boolean" - }, - { - "type": "array" - }, - { - "type": "object" } ] }