diff --git a/.github/actions/run-minio/action.yml b/.github/actions/run-minio/action.yml new file mode 100644 index 000000000..42fb34e79 --- /dev/null +++ b/.github/actions/run-minio/action.yml @@ -0,0 +1,36 @@ +name: run-minio +author: atimin +description: "Run MinIO server and provision a bucket" +inputs: + access_key: + description: "MinIO access key" + required: true + default: "minioadmin" + secret_key: + description: "MinIO secret key" + required: true + default: "minioadmin" + bucket_name: + description: "Name of the bucket to create" + required: true + default: "test-bucket" +runs: + using: "composite" + steps: + - name: Set up MinIO server + run: | + docker run -d --name minio-server -p 9000:9000 \ + -e "MINIO_ROOT_USER=${{ inputs.access_key }}" \ + -e "MINIO_ROOT_PASSWORD=${{ inputs.secret_key }}" \ + minio/minio server /data + sleep 5 # Wait for the server to start + docker logs minio-server + shell: bash + + - name: Create bucket + run: | + echo "Create bucket ${{ inputs.bucket_name }}" + + docker exec minio-server mc alias set local http://localhost:9000 ${{ inputs.access_key }} ${{ inputs.secret_key }} + docker exec minio-server mc mb local/${{ inputs.bucket_name }} + shell: bash diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 47b99aaaf..d1415cfc1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,7 @@ name: ci on: push: - branches: [main, stable] + branches: [ main, stable ] tags: - "v*" paths-ignore: @@ -10,7 +10,7 @@ on: - CHANGELOG.md pull_request: - branches: [main, stable] + branches: [ main, stable ] paths-ignore: - docs/** - README.md @@ -145,7 +145,7 @@ jobs: - build_binaries strategy: matrix: - os: [ubuntu-24.04, windows-2022, macos-14] + os: [ ubuntu-24.04, windows-2022, macos-14 ] include: - os: ubuntu-24.04 target: x86_64-unknown-linux-gnu @@ -188,6 +188,10 @@ jobs: env: CARGO_TERM_COLOR: always RUST_LOG: debug # expand logging + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + MINIO_BUCKET: test + MINIO_ENDPOINT: http://127.0.0.1:9000 steps: - uses: actions/checkout@v4 @@ -202,10 +206,16 @@ jobs: version: "26.x" repo-token: ${{ secrets.ACTION_GITHUB_TOKEN }} + - uses: ./.github/actions/run-minio + with: + access_key: ${{ env.MINIO_ACCESS_KEY }} + secret_key: ${{ env.MINIO_SECRET_KEY }} + bucket_name: ${{ env.MINIO_BUCKET }} + - name: Generate code coverage env: ARTIFACT_SAS_URL: ${{ secrets.ARTIFACT_SAS_URL }} - run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info + run: cargo llvm-cov --features s3-backend,fs-backend,ci --workspace --lcov --output-path lcov.info - name: Upload coverage to Codecov continue-on-error: true @@ -223,15 +233,20 @@ jobs: - build strategy: matrix: - token: ["", "XXXX"] - cert_path: ["", "/misc/certificate.crt"] - license_path: ["", "/misc/license.lic"] + token: [ "", "XXXX" ] + cert_path: [ "", "/misc/certificate.crt" ] + license_path: [ "", "/misc/license.lic" ] + backend: [ "fs", "s3" ] include: - cert_path: "/misc/certificate.crt" url: https://127.0.0.1:8383 - cert_path: "" url: http://127.0.0.1:8383 timeout-minutes: 5 + env: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + MINIO_BUCKET: test steps: - uses: actions/checkout@v4 @@ -249,7 +264,8 @@ jobs: - name: Create license run: echo '${{secrets.LICENSE_KEY}}' > ./misc/license.lic - - name: Run Database + - name: Run ReductStore with FileSystem backend + if: ${{matrix.backend == 'fs'}} run: | docker run --network=host -v ${PWD}/misc:/misc --env RS_API_TOKEN=${{matrix.token}} \ --name reduct \ @@ -260,6 +276,31 @@ jobs: -d ${{github.repository}} sleep 5 + - name: Run MinIO server and create bucket + if : ${{matrix.backend == 's3'}} + uses: ./.github/actions/run-minio + with: + access_key: ${{ env.MINIO_ACCESS_KEY }} + secret_key: ${{ env.MINIO_SECRET_KEY }} + bucket_name: ${{ env.MINIO_BUCKET }} + + - name: Run ReductStore with S3 backend + if: ${{matrix.backend == 's3'}} + run: | + docker run --network=host -v ${PWD}/misc:/misc \ + --env RS_API_TOKEN=${{matrix.token}} \ + --env RS_CERT_PATH=${{matrix.cert_path}} \ + --env RS_LICENSE_PATH=${{matrix.license_path}} \ + --env RS_CERT_KEY_PATH=/misc/privateKey.key \ + --env RS_STORAGE_TYPE=s3 \ + --env RS_REMOTE_BACKEND_ENDPOINT=${{ env.MINIO_ENDPOINT }} \ + --env RS_REMOTE_ACCESS_KEY=${{ env.MINIO_ACCESS_KEY }} \ + --env RS_REMOTE_SECRET_KEY=${{ env.MINIO_SECRET_KEY }} \ + --env RS_REMOTE_BUCKET=${{ env.MINIO_BUCKET }} \ + --env RS_CORS_ALLOW_ORIGIN="https://first-allowed-origin.com, https://second-allowed-origin.com" \ + --name reduct -d ${{github.repository}} + sleep 5 + - name: Build API tests run: | docker login -u ${{ secrets.DOCKER_USER }} -p ${{ secrets.DOCKER_TOKEN }} @@ -286,6 +327,7 @@ jobs: version: [ "latest", + "v1.15.6", "v1.14.8", "v1.13.5", "v1.12.4", @@ -366,12 +408,16 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - cmd: ["stop", "kill"] + backend: [ "fs", "s3" ] + cmd: [ "stop", "kill" ] needs: - unit_tests - build env: RS_API_TOKEN: XXXX + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + MINIO_BUCKET: test steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 @@ -389,12 +435,33 @@ jobs: docker load --input /tmp/image.tar docker image ls -a - - name: Run ReductStore + - name: Run ReductStore with FileSystem backend + if: ${{matrix.backend == 'fs'}} run: | docker run --network=host -v ./data:/data --env RS_API_TOKEN=${RS_API_TOKEN} --name reductstore -d ${{github.repository}} sleep 5 docker logs reductstore + - name: Run MiniO server and create bucket + if : ${{matrix.backend == 's3'}} + uses: ./.github/actions/run-minio + with: + access_key: ${{ env.MINIO_ACCESS_KEY }} + secret_key: ${{ env.MINIO_SECRET_KEY }} + bucket_name: ${{ env.MINIO_BUCKET }} + + - name: Run ReductStore with S3 backend + if: ${{matrix.backend == 's3'}} + run: | + docker run --network=host -v ./data:/data \ + --env RS_API_TOKEN=${RS_API_TOKEN} \ + --env RS_STORAGE_TYPE=s3 \ + --env RS_REMOTE_BACKEND_ENDPOINT=http://127.0.0.1:9000 \ + --env RS_REMOTE_ACCESS_KEY=${MINIO_ACCESS_KEY} \ + --env RS_REMOTE_SECRET_KEY=${MINIO_SECRET_KEY} \ + --env RS_REMOTE_BUCKET=${MINIO_BUCKET} \ + --name reductstore -d ${{github.repository}} + - name: Upload data run: | pip install -r ./integration_tests/data_check/requirements.txt @@ -407,7 +474,7 @@ jobs: sleep 5 docker logs reductstore-1 - - name: Check data after migraiton + - name: Check data after migration run: python3 ./integration_tests/data_check/checker.py - name: Save docker logs @@ -416,7 +483,7 @@ jobs: - uses: actions/upload-artifact@v4 if: always() with: - name: docker-log-recovery-${{matrix.cmd}} + name: docker-log-recovery-${{matrix.cmd}}-${{matrix.backend}} path: /tmp/docker-log.zip - name: Show replication report @@ -529,7 +596,7 @@ jobs: - unit_tests strategy: matrix: - branch: ["main", "latest"] + branch: [ "main", "latest" ] timeout-minutes: 5 steps: @@ -549,7 +616,6 @@ jobs: --name reductstore \ --env RS_DATA_PATH=/data \ --env RS_API_TOKEN=token \ - --env RS_LOG_LEVEL=DEBUG \ -p 8383:8383 \ -v ./data:/data \ ${{ github.repository }}:latest @@ -651,7 +717,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - arch: [amd64, arm64] + arch: [ amd64, arm64 ] needs: - build_snap - api_tests @@ -684,7 +750,7 @@ jobs: if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') strategy: matrix: - platform: [linux/amd64, linux/arm64] + platform: [ linux/amd64, linux/arm64 ] include: - platform: linux/amd64 cargo_target: x86_64-unknown-linux-gnu diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b9268fc6..0dc4c25d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Integrate S3 Storage Backend, [PR-919](https://github.com/reductstore/reductstore/pull/919) + +### Changed + +- Update extensions for Rust 1.89: ros-ext v0.3.0, select v0.5.0, [PR-921](https://github.com/reductstore/reductstore/pull/921) + +## [1.16.3] - 2025-08-22 + +### Fixed + +- Fix double synchronization of transaction log + integrity checks, [PR-912](https://github.com/reductstore/reductstore/pull/912) + +## [1.16.2] - 2025-08-22 + +### Fixed + +- Prevent data lost of unsynchronized files in case of power failure for unfinished blocks, [PR-909](https://github.com/reductstore/reductstore/pull/909) + +### Security + +- CVE-2025-55159: Update slab up to 0.4.11, [PR-911](https://github.com/reductstore/reductstore/pull/911) + ## [1.16.1] - 2025-08-09 ### Fixed @@ -1089,7 +1113,9 @@ reduct-rs: `ReductClient.url`, `ReductClient.token`, `ReductCientBuilder.try_bui - Initial release with basic HTTP API and FIFO bucket quota -[Unreleased]: https://github.com/reductstore/reductstore/compare/v1.16.1...HEAD +[Unreleased]: https://github.com/reductstore/reductstore/compare/v1.16.3...HEAD +[1.16.3]: https://github.com/reductstore/reductstore/compare/v1.16.2...v1.16.3 +[1.16.2]: https://github.com/reductstore/reductstore/compare/v1.16.1...v1.16.2 [1.16.1]: https://github.com/reductstore/reductstore/compare/v1.16.0...v1.16.1 [1.16.0]: https://github.com/reductstore/reductstore/compare/v1.15.6...v1.16.0 [1.15.6]: https://github.com/reductstore/reductstore/compare/v1.15.5...v1.15.6 diff --git a/Cargo.lock b/Cargo.lock index cf45eddc6..b161ac9c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -54,9 +60,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" dependencies = [ "anstyle", "anstyle-parse", @@ -84,35 +90,35 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.9" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" [[package]] name = "arbitrary" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223" +checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1" dependencies = [ "derive_arbitrary", ] @@ -148,18 +154,18 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "async-trait" -version = "0.1.88" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -174,11 +180,53 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-config" +version = "1.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bc1b40fb26027769f16960d2f4a6bc20c4bb755d403e552c8c1a73af433c246" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.3.1", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d025db5d9f52cbc413b167136afb3d8aeea708c0d8884783cf6253be5e22f6f2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + [[package]] name = "aws-lc-rs" -version = "1.13.1" +version = "1.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fcc8f365936c834db5514fc45aee5b1202d677e6b40e48468aaaa8183ca8c7" +checksum = "5c953fe1ba023e6b7730c0d4b031d06f267f23a46167dcbd40316644b10a17ba" dependencies = [ "aws-lc-sys", "zeroize", @@ -186,9 +234,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61b1d86e7705efe1be1b569bab41d4fa1e14e220b60a160f78de2db687add079" +checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" dependencies = [ "bindgen", "cc", @@ -197,6 +245,371 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-runtime" +version = "1.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c034a1bc1d70e16e7f4e4caf7e9f7693e4c9c24cd91cf17c2a0b21abaebc7c8b" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.104.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38c488cd6abb0ec9811c401894191932e941c5f91dc226043edacd0afa1634bc" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "lru", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.83.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643cd43af212d2a1c4dedff6f044d7e1961e5d9e7cfe773d70f31d9842413886" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.84.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20ec4a95bd48e0db7a424356a161f8d87bd6a4f0af37204775f0da03d9e39fc3" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.85.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "410309ad0df4606bc721aff0d89c3407682845453247213a0ccc5ff8801ee107" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "084c34162187d39e3740cb635acd73c4e3a551a36146ad6fe8883c929c9f876c" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "p256", + "percent-encoding", + "ring", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e190749ea56f8c42bf15dd76c65e14f8f765233e6df9b0506d9d934ebef867c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.63.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56d2df0314b8e307995a3b86d44565dfe9de41f876901a7d71886c756a25979f" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc-fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "182b03393e8c677347fb5705a04a9392695d47d20ef0a2f8cfe28c8e6b9b9778" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.62.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c4dacf2d38996cf729f55e7a762b30918229917eca115de45dfa8dfb97796c9" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147e8eea63a40315d704b97bf9bc9b8c1402ae94f89d5ad6f7550d963309da1b" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.27", + "h2 0.4.12", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.7.0", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.7", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.31", + "rustls-native-certs 0.8.1", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.2", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaa31b350998e703e9826b2104dd6f63be0508666e1aba88137af060e8944047" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9364d5989ac4dd918e5cc4c4bdcc61c9be17dcd2586ea7f69e348fc7c6cab393" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3946acbe1ead1301ba6862e712c7903ca9bb230bdf1fbd1b5ac54158ef2ab1f" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07f5e0fc8a6b3f2303f331b94504bbf754d85488f402d6f1dd7a6080f99afe56" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.3.1", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d498595448e43de7f4296b7b7a18a8a02c61ec9349128c80a368f7c3b4ab11a8" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db87b96cb1b16c024980f133968d52882ca0daaee3a086c6decc500f6c99728" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b069d19bf01e46298eaedd7c6f283fe565a59263e53eebec945f3e6398f42390" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.8.4" @@ -208,10 +621,10 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.7.0", "hyper-util", "itoa", "matchit", @@ -240,8 +653,8 @@ checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -263,8 +676,8 @@ dependencies = [ "bytes", "futures-util", "headers", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -283,7 +696,7 @@ checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -295,16 +708,16 @@ dependencies = [ "arc-swap", "bytes", "fs-err", - "http", - "http-body", - "hyper", + "http 1.3.1", + "http-body 1.0.1", + "hyper 1.7.0", "hyper-util", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.23.31", + "rustls-pemfile 2.2.0", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", ] @@ -323,12 +736,40 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + +[[package]] +name = "base64ct" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" + [[package]] name = "bindgen" version = "0.69.5" @@ -348,15 +789,15 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.104", + "syn 2.0.106", "which", ] [[package]] name = "bitflags" -version = "2.9.1" +version = "2.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" [[package]] name = "block-buffer" @@ -369,9 +810,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.18.1" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" [[package]] name = "byteorder" @@ -385,6 +826,16 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "bytesize" version = "2.0.1" @@ -402,10 +853,11 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.27" +version = "1.2.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" +checksum = "590f9024a68a8c40351881787f1934dc11afd69090f5edb6831464694d836ea3" dependencies = [ + "find-msvc-tools", "jobserver", "libc", "shlex", @@ -422,9 +874,9 @@ dependencies = [ [[package]] name = "cfg-if" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" [[package]] name = "cfg_aliases" @@ -483,12 +935,38 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "constant_time_eq" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -504,11 +982,39 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + +[[package]] +name = "crc-fast" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bf62af4cc77d8fe1c22dde4e721d87f2f54056139d8c412e1366b740305f56f" +dependencies = [ + "crc", + "digest", + "libc", + "rand", + "regex", +] + [[package]] name = "crc32fast" -version = "1.4.2" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" dependencies = [ "cfg-if", ] @@ -534,6 +1040,28 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -550,24 +1078,34 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da692b8d1080ea3045efaab14434d40468c3d8657e42abddfffca87b428f4c1b" +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "deranged" -version = "0.4.0" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" dependencies = [ "powerfmt", ] [[package]] name = "derive_arbitrary" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" +checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -589,7 +1127,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -612,7 +1150,7 @@ checksum = "788160fb30de9cdd857af31c6a2675904b16ece8fc2737b2c7127ba368c9d0f4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -627,12 +1165,44 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der", + "elliptic-curve", + "rfc6979", + "signature", +] + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der", + "digest", + "ff", + "generic-array", + "group", + "pkcs8", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -686,6 +1256,22 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e178e4fba8a2726903f6ba98a6d221e76f9c12c650d5dc0e6afdc50677b49650" + [[package]] name = "fixedbitset" version = "0.5.7" @@ -709,11 +1295,17 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" dependencies = [ "percent-encoding", ] @@ -796,7 +1388,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -868,7 +1460,7 @@ dependencies = [ "js-sys", "libc", "r-efi", - "wasi 0.14.2+wasi-0.2.4", + "wasi 0.14.3+wasi-0.2.4", "wasm-bindgen", ] @@ -880,22 +1472,52 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "glob" -version = "0.3.2" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + +[[package]] +name = "group" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", +] [[package]] name = "h2" -version = "0.4.10" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9421a676d1b147b16b82c9225157dc629087ef8ec4d5e2960f9437a90dac0a5" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" dependencies = [ "atomic-waker", "bytes", "fnv", "futures-core", "futures-sink", - "http", + "http 1.3.1", "indexmap", "slab", "tokio", @@ -905,9 +1527,14 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.4" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "headers" @@ -915,10 +1542,10 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "headers-core", - "http", + "http 1.3.1", "httpdate", "mime", "sha1", @@ -930,7 +1557,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.3.1", ] [[package]] @@ -963,6 +1590,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.3.1" @@ -974,6 +1612,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -981,7 +1630,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.3.1", ] [[package]] @@ -992,8 +1641,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "pin-project-lite", ] @@ -1011,61 +1660,104 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.6.0" +version = "0.14.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" dependencies = [ "bytes", "futures-channel", + "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "h2 0.4.12", + "http 1.3.1", + "http-body 1.0.1", "httparse", "httpdate", "itoa", "pin-project-lite", + "pin-utils", "smallvec", "tokio", "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.3.1", + "hyper 1.7.0", "hyper-util", - "rustls", + "rustls 0.23.31", + "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", "webpki-roots", ] [[package]] name = "hyper-util" -version = "0.1.14" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb" +checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.3.1", + "http-body 1.0.1", + "hyper 1.7.0", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.0", "tokio", "tower-service", "tracing", @@ -1183,9 +1875,9 @@ dependencies = [ [[package]] name = "idna" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" dependencies = [ "idna_adapter", "smallvec", @@ -1204,9 +1896,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", "hashbrown", @@ -1244,18 +1936,18 @@ dependencies = [ [[package]] name = "inventory" -version = "0.3.20" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab08d7cd2c5897f2c949e5383ea7c7db03fb19130ffcfbf7eda795137ae3cb83" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" dependencies = [ "rustversion", ] [[package]] name = "io-uring" -version = "0.7.8" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" dependencies = [ "bitflags", "cfg-if", @@ -1310,9 +2002,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "jobserver" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" dependencies = [ "getrandom 0.3.3", "libc", @@ -1334,7 +2026,7 @@ version = "9.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" dependencies = [ - "base64", + "base64 0.22.1", "js-sys", "pem", "ring", @@ -1357,15 +2049,15 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libbz2-rs-sys" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775bf80d5878ab7c2b1080b5351a48b2f737d9f6f8b383574eebcc22be0dfccb" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" -version = "0.2.174" +version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" [[package]] name = "libloading" @@ -1374,14 +2066,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.53.2", + "windows-targets 0.53.3", ] [[package]] name = "liblzma" -version = "0.4.2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0791ab7e08ccc8e0ce893f6906eb2703ed8739d8e89b57c0714e71bad09024c8" +checksum = "10bf66f4598dc77ff96677c8e763655494f00ff9c1cf79e2eb5bb07bc31f807d" dependencies = [ "liblzma-sys", ] @@ -1399,9 +2091,9 @@ dependencies = [ [[package]] name = "libz-rs-sys" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221" +checksum = "840db8cf39d9ec4dd794376f38acc40d0fc65eec2a8f484f7fd375b84602becd" dependencies = [ "zlib-rs", ] @@ -1440,6 +2132,15 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -1448,11 +2149,11 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "matchers" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" dependencies = [ - "regex-automata 0.1.10", + "regex-automata", ] [[package]] @@ -1461,6 +2162,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.5" @@ -1532,7 +2243,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -1541,6 +2252,12 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "needs_env_var" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3b406fd667619150b3ac88bfa5b2791311d7100c0b91eb6ed6488b82349856d" + [[package]] name = "nom" version = "7.1.3" @@ -1553,12 +2270,11 @@ dependencies = [ [[package]] name = "nu-ansi-term" -version = "0.46.0" +version = "0.50.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" dependencies = [ - "overload", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -1617,10 +2333,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" [[package]] -name = "overload" -version = "0.1.1" +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + +[[package]] +name = "p256" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] [[package]] name = "parking_lot" @@ -1661,15 +2394,15 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" dependencies = [ - "base64", + "base64 0.22.1", "serde", ] [[package]] name = "percent-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "petgraph" @@ -1693,6 +2426,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -1701,9 +2444,9 @@ checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" [[package]] name = "potential_utf" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a" dependencies = [ "zerovec", ] @@ -1757,12 +2500,12 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.35" +version = "0.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061c1221631e079b26479d25bbf2275bfe5917ae8419cd7e34f13bfc2aa7539a" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -1786,18 +2529,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.95" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", "prost-derive", @@ -1805,9 +2548,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", "itertools 0.14.0", @@ -1819,37 +2562,37 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.104", + "syn 2.0.106", "tempfile", ] [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "prost-types" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ "prost", ] [[package]] name = "prost-wkt" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" +checksum = "655944d0ce015e71b3ec21279437e6a09e58433e50c7b0677901f3d5235e74f5" dependencies = [ "chrono", "inventory", @@ -1862,9 +2605,9 @@ dependencies = [ [[package]] name = "prost-wkt-build" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" +checksum = "f869f1443fee474b785e935d92e1007f57443e485f51668ed41943fc01a321a2" dependencies = [ "heck", "prost", @@ -1875,9 +2618,9 @@ dependencies = [ [[package]] name = "prost-wkt-types" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" +checksum = "eeeffd6b9becd4600dd461399f3f71aeda2ff0848802a9ed526cf12e8f42902a" dependencies = [ "chrono", "prost", @@ -1893,9 +2636,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", "cfg_aliases", @@ -1903,8 +2646,8 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls", - "socket2 0.5.10", + "rustls 0.23.31", + "socket2 0.6.0", "thiserror", "tokio", "tracing", @@ -1913,9 +2656,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.12" +version = "0.11.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" dependencies = [ "bytes", "getrandom 0.3.3", @@ -1923,7 +2666,7 @@ dependencies = [ "rand", "ring", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.31", "rustls-pki-types", "slab", "thiserror", @@ -1934,16 +2677,16 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.13" +version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.0", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -1968,7 +2711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ "rand_chacha", - "rand_core", + "rand_core 0.9.3", ] [[package]] @@ -1978,7 +2721,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -1992,22 +2744,22 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.13" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" +checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" dependencies = [ "bitflags", ] [[package]] name = "reduct-base" -version = "1.16.1" +version = "1.17.0" dependencies = [ "async-trait", "bytes", "chrono", "futures", - "http", + "http 1.3.1", "int-enum", "log", "rstest", @@ -2020,23 +2772,26 @@ dependencies = [ [[package]] name = "reduct-macros" -version = "1.16.1" +version = "1.17.0" dependencies = [ "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "reductstore" -version = "1.16.1" +version = "1.17.0" dependencies = [ "assert_matches", "async-stream", "async-trait", + "aws-config", + "aws-credential-types", + "aws-sdk-s3", "axum", "axum-extra", "axum-server", - "base64", + "base64 0.22.1", "byteorder", "bytes", "bytesize", @@ -2046,11 +2801,12 @@ dependencies = [ "dlopen2", "futures-util", "hex", - "hyper", + "hyper 1.7.0", "jsonwebtoken", "log", "mime_guess", "mockall", + "needs_env_var", "prost", "prost-build", "prost-wkt-types", @@ -2061,7 +2817,7 @@ dependencies = [ "reqwest", "ring", "rstest", - "rustls", + "rustls 0.23.31", "serde", "serde_json", "serial_test", @@ -2075,47 +2831,38 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] name = "regex-automata" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] -name = "regex-syntax" -version = "0.6.29" +name = "regex-lite" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +checksum = "943f41321c63ef1c92fd763bfe054d2668f7f225a5c29f0105903dc2fc04ba30" [[package]] name = "regex-syntax" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" [[package]] name = "relative-path" @@ -2125,34 +2872,34 @@ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" [[package]] name = "reqwest" -version = "0.12.22" +version = "0.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" +checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-core", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.7.0", + "hyper-rustls 0.27.7", "hyper-util", "js-sys", "log", "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.31", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-util", "tower", "tower-http", @@ -2165,6 +2912,17 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "ring" version = "0.17.14" @@ -2204,15 +2962,15 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.104", + "syn 2.0.106", "unicode-ident", ] [[package]] name = "rustc-demangle" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" [[package]] name = "rustc-hash" @@ -2250,33 +3008,78 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.59.0", + "windows-sys 0.60.2", +] + +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", ] [[package]] name = "rustls" -version = "0.23.29" +version = "0.23.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1" +checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc" dependencies = [ "aws-lc-rs", "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.4", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework 2.11.1", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.3.0", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -2296,6 +3099,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.4" @@ -2310,9 +3123,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" @@ -2322,24 +3135,93 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "scc" -version = "2.3.4" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4" +checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sdd" -version = "3.0.8" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" + +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der", + "generic-array", + "pkcs8", + "subtle", + "zeroize", +] + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584e070911c7017da6cb2eb0788d09f43d789029b5877d3e5ecc8acf86ceee21" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" +dependencies = [ + "bitflags", + "core-foundation 0.10.1", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] [[package]] name = "semver" @@ -2364,14 +3246,14 @@ checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "serde_json" -version = "1.0.141" +version = "1.0.143" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b9eff21ebe718216c6ec64e1d9ac57087aad11efc64e32002bce4a0d4c03d3" +checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" dependencies = [ "indexmap", "itoa", @@ -2424,7 +3306,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -2438,6 +3320,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2455,13 +3348,23 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.5" +version = "1.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.7" @@ -2482,9 +3385,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" [[package]] name = "smallvec" @@ -2512,6 +3415,16 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2537,9 +3450,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.104" +version = "2.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" dependencies = [ "proc-macro2", "quote", @@ -2563,20 +3476,20 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "tempfile" -version = "3.20.0" +version = "3.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", - "rustix 1.0.7", - "windows-sys 0.59.0", + "rustix 1.0.8", + "windows-sys 0.60.2", ] [[package]] @@ -2604,27 +3517,27 @@ checksum = "451b374529930d7601b1eef8d32bc79ae870b6079b069401709c2a8bf9e75f36" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "thiserror" -version = "2.0.12" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.12" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -2648,12 +3561,11 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "8ca967379f9d8eb8058d86ed467d81d03e81acd45757e4ca341c24affbe8e8e3" dependencies = [ "deranged", - "itoa", "num-conv", "powerfmt", "serde", @@ -2663,15 +3575,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" +checksum = "a9108bb380861b07264b950ded55a44a14a4adc68b9f5efd85aafc3aa4d40a68" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "7182799245a7264ce590b349d90338f1c1affad93d2639aed5f8f69c090b334c" dependencies = [ "num-conv", "time-core", @@ -2689,9 +3601,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" dependencies = [ "tinyvec_macros", ] @@ -2704,9 +3616,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.47.0" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", @@ -2730,7 +3642,17 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", +] + +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", ] [[package]] @@ -2739,15 +3661,15 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.31", "tokio", ] [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", @@ -2781,7 +3703,7 @@ checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ "indexmap", "toml_datetime", - "winnow 0.7.11", + "winnow 0.7.13", ] [[package]] @@ -2809,8 +3731,8 @@ dependencies = [ "bitflags", "bytes", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower", @@ -2838,9 +3760,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "tracing-core" version = "0.1.34" @@ -2864,14 +3798,14 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ "matchers", "nu-ansi-term", "once_cell", - "regex", + "regex-automata", "sharded-slab", "thread_local", "tracing", @@ -2918,7 +3852,7 @@ checksum = "35f5380909ffc31b4de4f4bdf96b877175a016aa2ca98cee39fcfd8c4d53d952" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -2941,9 +3875,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.4" +version = "2.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", "idna", @@ -2951,6 +3885,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -2963,6 +3903,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -2975,6 +3925,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -2992,11 +3948,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" -version = "0.14.2+wasi-0.2.4" +version = "0.14.3+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95" dependencies = [ - "wit-bindgen-rt", + "wit-bindgen", ] [[package]] @@ -3021,7 +3977,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", "wasm-bindgen-shared", ] @@ -3056,7 +4012,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3105,9 +4061,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8782dd5a41a24eed3a4f40b606249b3e236ca61adf1f25ea4d45c73de122b502" +checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2" dependencies = [ "rustls-pki-types", ] @@ -3167,7 +4123,7 @@ checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -3178,7 +4134,7 @@ checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -3229,7 +4185,7 @@ version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" dependencies = [ - "windows-targets 0.53.2", + "windows-targets 0.53.3", ] [[package]] @@ -3250,10 +4206,11 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.2" +version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ + "windows-link", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", @@ -3371,21 +4328,18 @@ dependencies = [ [[package]] name = "winnow" -version = "0.7.11" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74c7b26e3480b707944fc872477815d29a8e429d2f93a1ce000f5fa84a15cbcd" +checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" dependencies = [ "memchr", ] [[package]] -name = "wit-bindgen-rt" -version = "0.39.0" +name = "wit-bindgen" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" -dependencies = [ - "bitflags", -] +checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814" [[package]] name = "writeable" @@ -3393,6 +4347,12 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yoke" version = "0.8.0" @@ -3413,7 +4373,7 @@ checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", "synstructure", ] @@ -3434,7 +4394,7 @@ checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -3454,7 +4414,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", "synstructure", ] @@ -3475,7 +4435,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] @@ -3491,9 +4451,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.2" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" +checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" dependencies = [ "yoke", "zerofrom", @@ -3508,14 +4468,14 @@ checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.104", + "syn 2.0.106", ] [[package]] name = "zip" -version = "4.3.0" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aed4ac33e8eb078c89e6cbb1d5c4c7703ec6d299fc3e7c3695af8f8b423468b" +checksum = "c034aa6c54f654df20e7dc3713bc51705c12f280748fb6d7f40f87c696623e34" dependencies = [ "aes", "arbitrary", @@ -3540,9 +4500,9 @@ dependencies = [ [[package]] name = "zlib-rs" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a" +checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2" [[package]] name = "zopfli" diff --git a/Cargo.toml b/Cargo.toml index 8a4555501..1db4a1597 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ resolver = "2" [workspace.package] -version = "1.16.1" +version = "1.17.0" authors = ["Alexey Timin ", "ReductSoftware UG "] edition = "2021" -rust-version = "1.85.0" +rust-version = "1.89.0" diff --git a/README.md b/README.md index 0cdd71ab3..17a8ee5eb 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Here are the client SDKs available: - [Python Client SDK](https://github.com/reductstore/reduct-py) - [JavaScript Client SDK](https://github.com/reductstore/reduct-js) - [C++ Client SDK](https://github.com/reductstore/reduct-cpp) +- [Go Client SDK](https://github.com/reductstore/reduct-go) ## Tools diff --git a/integration_tests/api/http_test.py b/integration_tests/api/http_test.py index 3e17ccdcc..af4dad8d6 100644 --- a/integration_tests/api/http_test.py +++ b/integration_tests/api/http_test.py @@ -6,7 +6,7 @@ def test_api_version(base_url, session): resp = session.get(f"{base_url}/info") assert resp.status_code == 200 - assert resp.headers["x-reduct-api"] == "1.16" + assert resp.headers["x-reduct-api"] == "1.17" def test_cors_allows_first_allowed_origin(base_url, session): diff --git a/reduct_base/Cargo.toml b/reduct_base/Cargo.toml index 228b8b6d5..7207bd119 100644 --- a/reduct_base/Cargo.toml +++ b/reduct_base/Cargo.toml @@ -27,14 +27,14 @@ all = ["io", "ext"] [dependencies] serde = { version = "1.0.219", features = ["derive"] } -serde_json = { version = "1.0.141", features = ["preserve_order"] } +serde_json = { version = "1.0.143", features = ["preserve_order"] } int-enum = "0.5.0" chrono = { version = "0.4.41", features = ["serde"] } url = "2.5.4" http = "1.2.0" bytes = "1.10.0" -async-trait = { version = "0.1.87" , optional = true } -tokio = { version = "1.47.0", optional = true, features = ["default", "rt", "time"] } +async-trait = { version = "0.1.89" , optional = true } +tokio = { version = "1.47.1", optional = true, features = ["default", "rt", "time"] } log = "0.4.0" thread-id = "5.0.0" futures = "0.3.31" diff --git a/reduct_base/src/batch.rs b/reduct_base/src/batch.rs index 5eca01f9c..1cf2d7dcf 100644 --- a/reduct_base/src/batch.rs +++ b/reduct_base/src/batch.rs @@ -8,7 +8,7 @@ use crate::{unprocessable_entity, Labels}; use http::{HeaderMap, HeaderValue}; pub struct RecordHeader { - pub content_length: usize, + pub content_length: u64, pub content_type: String, pub labels: Labels, } @@ -30,7 +30,7 @@ pub fn parse_batched_header(header: &str) -> Result { .ok_or(unprocessable_entity!("Invalid batched header"))?; let content_length = content_length .trim() - .parse::() + .parse::() .map_err(|_| unprocessable_entity!("Invalid content length"))?; let (content_type, rest) = rest diff --git a/reduct_macros/Cargo.toml b/reduct_macros/Cargo.toml index 1d4d10450..55929c117 100644 --- a/reduct_macros/Cargo.toml +++ b/reduct_macros/Cargo.toml @@ -15,5 +15,5 @@ keywords = ["database", "time-series", "blob", "storage", "reductstore"] proc-macro = true [dependencies] -syn = { version = "2.0.103", features = ["derive"] } +syn = { version = "2.0.106", features = ["derive"] } quote = "1.0.40" diff --git a/reductstore/Cargo.toml b/reductstore/Cargo.toml index 52ebf8e9d..1fc4885a1 100644 --- a/reductstore/Cargo.toml +++ b/reductstore/Cargo.toml @@ -17,10 +17,15 @@ categories = ["database-implementations", "command-line-utilities", "database"] include = ["src/**/*", "Cargo.toml", "Cargo.lock", "build.rs", "README.md", "LICENSE"] [features] -default = ["web-console"] +default = ["web-console", "fs-backend"] web-console = [] select-ext = [] ros-ext = [] +ci = [] # features enables CI-specific tests + + +fs-backend = [] +s3-backend = ["dep:aws-sdk-s3", "dep:aws-config", "dep:aws-credential-types"] [lib] crate-type = ["lib"] @@ -32,24 +37,24 @@ reduct-base = { path = "../reduct_base", version = "1.15.0", features = ["ext"] reduct-macros = { path = "../reduct_macros", version = "1.15.0" } chrono = { version = "0.4.41", features = ["serde"] } -zip = "4.3.0" +zip = "4.5.0" tempfile = "3.20.0" hex = "0.4.3" -prost-wkt-types = "0.6.1" +prost-wkt-types = "0.7.0" rand = "0.9.2" serde = { version = "1.0.219", features = ["derive"] } -serde_json = { version = "1.0.141", features = ["preserve_order"] } +serde_json = { version = "1.0.143", features = ["preserve_order"] } regex = "1.11.1" bytes = "1.10.1" axum = { version = "0.8.4", features = ["default", "macros"] } axum-extra = { version = "0.10.0", features = ["default", "typed-header"] } -tokio = { version = "1.47.0", features = ["full"] } -hyper = { version = "1.6.0", features = ["full"] } +tokio = { version = "1.47.1", features = ["full"] } +hyper = { version = "1.7.0", features = ["full"] } futures-util = "0.3.31" axum-server = { version = "0.7.1", features = ["tls-rustls"] } mime_guess = "2.0.5" bytesize = "2.0.1" -async-trait = "0.1.88" +async-trait = "0.1.89" url = { version = "2.5.4", features = ["serde"] } jsonwebtoken = "9.3.1" base64 = "0.22.1" @@ -58,26 +63,33 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus async-stream = "0.3.6" tower-http = { version = "0.6.6", features = ["cors"] } crc64fast = "1.1.0" -rustls = "0.23.29" +rustls = "0.23.31" byteorder = "1.5.0" crossbeam-channel = "0.5.15" dlopen2 = "0.8.0" log = "0.4" -prost = "0.13.1" +prost = "0.14.1" + + +# optional dependencies for remote backend +aws-sdk-s3 = { version = "1.103.0", optional = true, features = ["rustls"] } +aws-config = { version = "1.8.5", optional = true, features = ["rustls"] } +aws-credential-types = { version = "1.2.5", optional = true, features = ["hardcoded-credentials"]} [build-dependencies] -prost-build = "0.13.1" -reqwest = { version = "0.12.22", default-features = false, features = ["rustls-tls", "blocking", "json"] } +prost-build = "0.14.1" +reqwest = { version = "0.12.23", default-features = false, features = ["rustls-tls", "blocking", "json"] } chrono = "0.4.41" -serde_json = "1.0.141" +serde_json = "1.0.143" [dev-dependencies] mockall = "0.13.1" rstest = "0.26.1" serial_test = "3.2.0" test-log = "0.2.18" -reqwest = { version = "0.12.22", default-features = false, features = ["rustls-tls", "blocking"] } +reqwest = { version = "0.12.23", default-features = false, features = ["rustls-tls", "blocking"] } assert_matches = "1.5" +needs_env_var = "1.1.0" [package.metadata.docs.rs] no-default-features = true diff --git a/reductstore/build.rs b/reductstore/build.rs index cec84e001..88e693b31 100644 --- a/reductstore/build.rs +++ b/reductstore/build.rs @@ -30,10 +30,10 @@ fn main() -> Result<(), Box> { download_web_console("v1.11.2"); #[cfg(feature = "select-ext")] - download_ext("select-ext", "v0.4.2"); + download_ext("select-ext", "v0.5.0"); #[cfg(feature = "ros-ext")] - download_ext("ros-ext", "v0.2.1"); + download_ext("ros-ext", "v0.3.0"); // get build time and commit let build_time = chrono::DateTime::::from(SystemTime::now()) diff --git a/reductstore/src/api.rs b/reductstore/src/api.rs index 87d7e3246..a8a2e657c 100644 --- a/reductstore/src/api.rs +++ b/reductstore/src/api.rs @@ -172,9 +172,12 @@ fn configure_cors(cors_allow_origin: &Vec) -> CorsLayer { #[cfg(test)] mod tests { + use super::*; use crate::asset::asset_manager::create_asset_manager; use crate::auth::token_repository::create_token_repository; + use crate::backend::Backend; use crate::cfg::replication::ReplicationConfig; + use crate::core::file_cache::FILE_CACHE; use crate::ext::ext_repository::create_ext_repository; use crate::replication::create_replication_repo; use axum::body::Body; @@ -189,8 +192,6 @@ mod tests { use rstest::fixture; use std::collections::HashMap; - use super::*; - mod http_error { use super::*; use axum::body::to_bytes; @@ -267,6 +268,12 @@ mod tests { #[fixture] pub(crate) async fn components() -> Arc { let data_path = tempfile::tempdir().unwrap().keep(); + FILE_CACHE.set_storage_backend( + Backend::builder() + .local_data_path(data_path.to_str().unwrap()) + .try_build() + .unwrap(), + ); let storage = Storage::load(data_path.clone(), None); let mut token_repo = create_token_repository(data_path.clone(), "init-token"); diff --git a/reductstore/src/api/entry/common.rs b/reductstore/src/api/entry/common.rs index 3a6db291e..7c5ee2f5a 100644 --- a/reductstore/src/api/entry/common.rs +++ b/reductstore/src/api/entry/common.rs @@ -9,13 +9,13 @@ use reduct_base::unprocessable_entity; use std::collections::HashMap; use std::str::FromStr; -pub(super) fn parse_content_length_from_header(headers: &HeaderMap) -> Result { +pub(super) fn parse_content_length_from_header(headers: &HeaderMap) -> Result { let content_size = headers .get("content-length") .ok_or(unprocessable_entity!("content-length header is required"))? .to_str() .map_err(|_| unprocessable_entity!("content-length header must be a string",))? - .parse::() + .parse::() .map_err(|_| unprocessable_entity!("content-length header must be a number"))?; Ok(content_size) } diff --git a/reductstore/src/api/entry/write_batched.rs b/reductstore/src/api/entry/write_batched.rs index f18aa4951..df1b46e5e 100644 --- a/reductstore/src/api/entry/write_batched.rs +++ b/reductstore/src/api/entry/write_batched.rs @@ -199,15 +199,15 @@ async fn write_chunk( writer: &mut Box, chunk: Bytes, written: &mut usize, - content_size: usize, + content_size: u64, ) -> Result, ReductError> { - let to_write = content_size - *written; + let to_write = content_size - *written as u64; *written += chunk.len(); - let (chunk, rest) = if chunk.len() < to_write { + let (chunk, rest) = if (chunk.len() as u64) < to_write { (chunk, None) } else { - let chuck_to_write = chunk.slice(0..to_write); - (chuck_to_write, Some(chunk.slice(to_write..))) + let chuck_to_write = chunk.slice(0..to_write as usize); + (chuck_to_write, Some(chunk.slice(to_write as usize..))) }; writer @@ -220,7 +220,7 @@ fn check_content_length( headers: &HeaderMap, timed_headers: &Vec<(u64, RecordHeader)>, ) -> Result<(), ReductError> { - let total_content_length: usize = timed_headers + let total_content_length: u64 = timed_headers .iter() .map(|(_, header)| header.content_length) .sum(); @@ -231,7 +231,7 @@ fn check_content_length( .ok_or(unprocessable_entity!("content-length header is required",))? .to_str() .unwrap() - .parse::() + .parse::() .map_err(|_| unprocessable_entity!("Invalid content-length header"))? { return Err(unprocessable_entity!( diff --git a/reductstore/src/auth/token_repository.rs b/reductstore/src/auth/token_repository.rs index 0c41ad1d0..d931dceb3 100644 --- a/reductstore/src/auth/token_repository.rs +++ b/reductstore/src/auth/token_repository.rs @@ -3,6 +3,7 @@ use crate::auth::proto::token::Permissions as ProtoPermissions; use crate::auth::proto::{Token as ProtoToken, TokenRepo as ProtoTokenRepo, TokenRepo}; +use crate::core::file_cache::FILE_CACHE; use chrono::{DateTime, Utc}; use log::{debug, warn}; use prost::bytes::Bytes; @@ -16,6 +17,7 @@ use reduct_base::{ }; use regex::Regex; use std::collections::HashMap; +use std::io::{Read, SeekFrom, Write}; use std::path::PathBuf; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -208,13 +210,23 @@ impl TokenRepository { permission_regex, }; - match std::fs::read(&token_repository.config_path) { - Ok(data) => { + let lock = FILE_CACHE.read(&token_repository.config_path, SeekFrom::Start(0)); + match lock { + Ok(lock) => { debug!( "Loading token repository from {}", token_repository.config_path.as_path().display() ); - let toke_repository = ProtoTokenRepo::decode(&mut Bytes::from(data)) + + let mut buf = Vec::new(); + lock.upgrade() + .unwrap() + .write() + .unwrap() + .read_to_end(&mut buf) + .expect("Could not read token repository"); + + let toke_repository = ProtoTokenRepo::decode(&mut Bytes::from(buf)) .expect("Could not decode token repository"); for token in toke_repository.tokens { token_repository @@ -263,7 +275,14 @@ impl TokenRepository { let mut buf = Vec::new(); repo.encode(&mut buf) .map_err(|_| ReductError::internal_server_error("Could not encode token repository"))?; - std::fs::write(&self.config_path, buf).map_err(|err| { + + let lock = FILE_CACHE + .write_or_create(&self.config_path, SeekFrom::Start(0))? + .upgrade()?; + + let mut file = lock.write()?; + file.set_len(0)?; + file.write_all(&buf).map_err(|err| { internal_server_error!( "Could not write token repository to {}: {}", self.config_path.as_path().display(), @@ -271,6 +290,7 @@ impl TokenRepository { ) })?; + file.sync_all()?; Ok(()) } } diff --git a/reductstore/src/backend.rs b/reductstore/src/backend.rs new file mode 100644 index 000000000..e168fdca5 --- /dev/null +++ b/reductstore/src/backend.rs @@ -0,0 +1,605 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +#[cfg(feature = "fs-backend")] +pub(super) mod fs; + +pub(super) mod remote; + +pub(crate) mod file; +mod noop; + +use crate::backend::file::{AccessMode, OpenOptions}; +use crate::backend::noop::NoopBackend; +use reduct_base::error::ReductError; +use reduct_base::internal_server_error; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +pub(crate) trait StorageBackend { + fn path(&self) -> &PathBuf; + fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()>; + + fn remove(&self, path: &Path) -> std::io::Result<()>; + + fn remove_dir_all(&self, path: &Path) -> std::io::Result<()>; + + fn create_dir_all(&self, path: &Path) -> std::io::Result<()>; + + fn read_dir(&self, path: &Path) -> std::io::Result>; + + fn try_exists(&self, _path: &Path) -> std::io::Result; + + fn upload(&self, path: &Path) -> std::io::Result<()>; + + fn download(&self, path: &Path) -> std::io::Result<()>; + + fn update_local_cache(&self, path: &Path, mode: &AccessMode) -> std::io::Result<()>; + + fn invalidate_locally_cached_files(&self) -> Vec; +} + +pub type BoxedBackend = Box; + +#[derive(Default, Clone, Debug, PartialEq)] +pub enum BackendType { + #[cfg(feature = "fs-backend")] + #[default] + Filesystem, + #[cfg(feature = "s3-backend")] + S3, +} + +#[derive(Default)] +pub struct BackpackBuilder { + backend_type: BackendType, + + local_data_path: Option, + remote_bucket: Option, + remote_cache_path: Option, + remote_region: Option, + remote_endpoint: Option, + remote_access_key: Option, + remote_secret_key: Option, + remote_cache_size: Option, +} + +impl BackpackBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn backend_type(mut self, backend_type: BackendType) -> Self { + self.backend_type = backend_type; + self + } + + pub fn local_data_path(mut self, path: &str) -> Self { + self.local_data_path = Some(path.to_string()); + self + } + + pub fn remote_bucket(mut self, bucket: &str) -> Self { + self.remote_bucket = Some(bucket.to_string()); + self + } + + pub fn remote_cache_path(mut self, path: &str) -> Self { + self.remote_cache_path = Some(path.to_string()); + self + } + + pub fn cache_size(mut self, size: u64) -> Self { + self.remote_cache_size = Some(size); + self + } + + pub fn remote_region(mut self, region: &str) -> Self { + self.remote_region = Some(region.to_string()); + self + } + + pub fn remote_endpoint(mut self, endpoint: &str) -> Self { + self.remote_endpoint = Some(endpoint.to_string()); + self + } + + pub fn remote_access_key(mut self, access_key: &str) -> Self { + self.remote_access_key = Some(access_key.to_string()); + self + } + + pub fn remote_secret_key(mut self, secret_key: &str) -> Self { + self.remote_secret_key = Some(secret_key.to_string()); + self + } + + pub fn try_build(self) -> Result { + let backend: BoxedBackend = match self.backend_type { + #[cfg(feature = "fs-backend")] + BackendType::Filesystem => { + let Some(data_path) = self.local_data_path else { + Err(internal_server_error!( + "local_data_path is required for Filesystem backend", + ))? + }; + + Box::new(fs::FileSystemBackend::new(PathBuf::from(data_path))) + } + + #[cfg(feature = "s3-backend")] + BackendType::S3 => { + let Some(bucket) = self.remote_bucket else { + Err(internal_server_error!( + "remote_bucket is required remote S3 backend" + ))? + }; + + let Some(cache_path) = self.remote_cache_path else { + Err(internal_server_error!( + "remote_cache_path is required remote S3 backend" + ))? + }; + + let Some(access_key) = self.remote_access_key else { + Err(internal_server_error!( + "remote_access_key is required for S3 backend" + ))? + }; + + let Some(secret_key) = self.remote_secret_key else { + Err(internal_server_error!( + "remote_secret_key is required for S3 backend" + ))? + }; + + let Some(cache_size) = self.remote_cache_size else { + Err(internal_server_error!( + "remote_cache_size is required for S3 backend" + ))? + }; + + let settings = remote::RemoteBackendSettings { + connector_type: BackendType::S3, + cache_path: PathBuf::from(cache_path), + endpoint: self.remote_endpoint, + access_key, + secret_key, + region: self.remote_region, + bucket, + cache_size, + }; + + Box::new(remote::RemoteBackend::new(settings)) + } + }; + + Ok(Backend { + backend: Arc::new(backend), + }) + } +} + +pub struct Backend { + backend: Arc, +} + +impl Default for Backend { + fn default() -> Self { + Self { + backend: Arc::new(Box::new(NoopBackend::new())), + } + } +} + +impl Backend { + pub fn builder() -> BackpackBuilder { + BackpackBuilder::new() + } + + /// Create a new instance of `fs::OpenOptions`. + pub fn open_options(&self) -> OpenOptions { + OpenOptions::new(Arc::clone(&self.backend)) + } + + pub fn rename, Q: AsRef>( + &self, + from: P, + to: Q, + ) -> std::io::Result<()> { + self.backend.rename(from.as_ref(), to.as_ref()) + } + + pub fn remove>(&self, path: P) -> std::io::Result<()> { + self.backend.remove(path.as_ref()) + } + + pub fn remove_dir_all>(&self, path: P) -> std::io::Result<()> { + self.backend.remove_dir_all(path.as_ref()) + } + + pub fn create_dir_all>(&self, path: P) -> std::io::Result<()> { + self.backend.create_dir_all(path.as_ref()) + } + + pub fn read_dir(&self, path: &PathBuf) -> std::io::Result> { + self.backend.read_dir(path) + } + + pub fn try_exists>(&self, path: P) -> std::io::Result { + self.backend.try_exists(path.as_ref()) + } + + pub fn invalidate_locally_cached_files(&self) -> Vec { + self.backend.invalidate_locally_cached_files() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockall::mock; + use rstest::*; + use tempfile::tempdir; + + #[cfg(feature = "fs-backend")] + mod fs { + use super::*; + #[rstest] + fn test_backend_builder_fs() { + { + let backend = Backend::builder() + .backend_type(BackendType::Filesystem) + .local_data_path("/tmp/data") + .try_build() + .expect("Failed to build Filesystem backend"); + assert_eq!(backend.backend.path(), &PathBuf::from("/tmp/data")); + } + } + } + + #[cfg(feature = "s3-backend")] + mod s3 { + use super::*; + #[rstest] + fn test_backend_builder_s3() { + { + let backend = Backend::builder() + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_access_key("access_key") + .remote_secret_key("secret_key") + .cache_size(1024 * 1024 * 1024) // 1 GB + .try_build() + .expect("Failed to build S3 backend"); + assert_eq!(backend.backend.path(), &PathBuf::from("/tmp/cache")); + } + } + + #[rstest] + fn test_backend_builder_s3_bucket_missing() { + let err = Backend::builder() + .backend_type(BackendType::S3) + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_access_key("access_key") + .remote_secret_key("secret_key") + .cache_size(1024 * 1024 * 1024) // 1 GB + .try_build() + .err() + .unwrap(); + + assert_eq!( + err, + internal_server_error!("remote_bucket is required remote S3 backend") + ); + } + + #[rstest] + fn test_backend_builder_s3_cache_path_missing() { + let err = Backend::builder() + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_access_key("access_key") + .remote_secret_key("secret_key") + .cache_size(1024 * 1024 * 1024) // 1 GB + .try_build() + .err() + .unwrap(); + + assert_eq!( + err, + internal_server_error!("remote_cache_path is required remote S3 backend") + ); + } + + #[rstest] + fn test_backend_builder_s3_region_missing() { + let result = Backend::builder() + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_access_key("access_key") + .remote_secret_key("secret_key") + .cache_size(1024 * 1024 * 1024) // 1 GB + .try_build(); + + assert!( + result.is_ok(), + "Is not needed for MinIO and other S3-compatible storages" + ); + } + + #[rstest] + fn test_backend_builder_s3_endpoint_missing() { + let result = Backend::builder() + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_access_key("access_key") + .remote_secret_key("secret_key") + .cache_size(1024 * 1024 * 1024) + .try_build(); + + assert!( + result.is_ok(), + "Is not needed for AWS S3 and other S3-compatible storages" + ); + } + + #[rstest] + fn test_backend_builder_s3_access_key_missing() { + let err = Backend::builder() + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_secret_key("secret_key") + .cache_size(1024 * 1024 * 1024) + .try_build() + .err() + .unwrap(); + + assert_eq!( + err, + internal_server_error!("remote_access_key is required for S3 backend") + ); + } + + #[rstest] + fn test_backend_builder_s3_secret_key_missing() { + let err = Backend::builder() + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_access_key("access_key") + .cache_size(1024 * 1024 * 1024) + .try_build() + .err() + .unwrap(); + + assert_eq!( + err, + internal_server_error!("remote_secret_key is required for S3 backend") + ); + } + + #[rstest] + fn test_backend_builder_s3_cache_size_missing() { + let err = Backend::builder() + .backend_type(BackendType::S3) + .backend_type(BackendType::S3) + .remote_bucket("my-bucket") + .remote_cache_path("/tmp/cache") + .remote_region("us-east-1") + .remote_endpoint("http://localhost:9000") + .remote_access_key("access_key") + .remote_secret_key("secret_key") + .try_build() + .err() + .unwrap(); + + assert_eq!( + err, + internal_server_error!("remote_cache_size is required for S3 backend") + ); + } + } + + mod open { + use super::*; + #[rstest] + fn test_backend_open_options(mut mock_backend: MockBackend) { + let path = mock_backend.path().join("test.txt").clone(); + + // download because it is not cached yet + mock_backend + .expect_try_exists() + .returning(move |_| Ok(true)); + mock_backend.expect_download().returning(move |p| { + assert_eq!(path, p); + Ok(()) + }); + + let backend = build_backend(mock_backend); + let file = backend + .open_options() + .create(true) + .write(true) + .open("test.txt") + .unwrap(); + + assert!(file.is_synced()); + assert_eq!(file.mode(), &AccessMode::ReadWrite); + assert_eq!(file.metadata().unwrap().len(), 0); + } + } + + mod rename { + use super::*; + + #[rstest] + fn test_backend_rename(mut mock_backend: MockBackend) { + mock_backend + .expect_rename() + .returning(move |old_path, new_path| { + assert_eq!(old_path, Path::new("old_name.txt")); + assert_eq!(new_path, Path::new("new_name.txt")); + Ok(()) + }); + + let backend = build_backend(mock_backend); + backend.rename("old_name.txt", "new_name.txt").unwrap(); + } + } + + mod remove { + use super::*; + + #[rstest] + fn test_backend_remove(mut mock_backend: MockBackend) { + mock_backend.expect_remove().returning(move |path| { + assert_eq!(path, Path::new("temp_file.txt")); + Ok(()) + }); + + let backend = build_backend(mock_backend); + backend.remove("temp_file.txt").unwrap(); + } + } + + mod remove_dir_all { + use super::*; + + #[rstest] + fn test_backend_remove_dir_all(mut mock_backend: MockBackend) { + mock_backend.expect_remove_dir_all().returning(move |path| { + assert_eq!(path, Path::new("temp_dir")); + Ok(()) + }); + + let backend = build_backend(mock_backend); + backend.remove_dir_all("temp_dir").unwrap(); + } + } + + mod create_dir_all { + use super::*; + + #[rstest] + fn test_backend_create_dir_all(mut mock_backend: MockBackend) { + mock_backend.expect_create_dir_all().returning(move |path| { + assert_eq!(path, Path::new("new_dir")); + Ok(()) + }); + + let backend = build_backend(mock_backend); + backend.create_dir_all("new_dir").unwrap(); + } + } + + mod read_dir { + use super::*; + + #[rstest] + fn test_backend_read_dir(mut mock_backend: MockBackend) { + let expected_files = vec![PathBuf::from("file1.txt"), PathBuf::from("file2.txt")]; + let copy_of_expected = expected_files.clone(); + mock_backend.expect_read_dir().returning(move |path| { + assert_eq!(path, Path::new("some_dir")); + Ok(copy_of_expected.clone()) + }); + + let backend = build_backend(mock_backend); + let files = backend.read_dir(&PathBuf::from("some_dir")).unwrap(); + assert_eq!(files, expected_files); + } + } + + mod try_exists { + use super::*; + + #[rstest] + fn test_backend_try_exists(mut mock_backend: MockBackend) { + mock_backend.expect_try_exists().returning(move |path| { + assert_eq!(path, Path::new("existing_file.txt")); + Ok(true) + }); + + let backend = build_backend(mock_backend); + let exists = backend.try_exists("existing_file.txt").unwrap(); + assert!(exists); + } + } + + mod invalidate_locally_cached_files { + use super::*; + + #[rstest] + fn test_backend_invalidate_locally_cached_files(mut mock_backend: MockBackend) { + let expected_invalidated = vec![ + PathBuf::from("cached_file1.txt"), + PathBuf::from("cached_file2.txt"), + ]; + let copy_of_expected = expected_invalidated.clone(); + mock_backend + .expect_invalidate_locally_cached_files() + .returning(move || copy_of_expected.clone()); + + let backend = build_backend(mock_backend); + let invalidated = backend.invalidate_locally_cached_files(); + assert_eq!(invalidated, expected_invalidated); + } + } + + mock! { + pub Backend {} + + impl StorageBackend for Backend { + fn path(&self) -> &PathBuf; + fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()>; + fn remove(&self, path: &Path) -> std::io::Result<()>; + fn remove_dir_all(&self, path: &Path) -> std::io::Result<()>; + fn create_dir_all(&self, path: &Path) -> std::io::Result<()>; + fn read_dir(&self, path: &Path) -> std::io::Result>; + fn try_exists(&self, path: &Path) -> std::io::Result; + fn upload(&self, path: &Path) -> std::io::Result<()>; + fn download(&self, path: &Path) -> std::io::Result<()>; + fn update_local_cache(&self, path: &Path, mode: &AccessMode) -> std::io::Result<()>; + fn invalidate_locally_cached_files(&self) -> Vec; + } + + } + + #[fixture] + fn mock_backend(path: PathBuf) -> MockBackend { + let mut mock = MockBackend::new(); + mock.expect_path().return_const(path.clone()); + mock + } + + #[fixture] + fn path() -> PathBuf { + tempdir().unwrap().keep() + } + + fn build_backend(mock_backend: MockBackend) -> Backend { + Backend { + backend: Arc::new(Box::new(mock_backend)), + } + } +} diff --git a/reductstore/src/backend/file.rs b/reductstore/src/backend/file.rs new file mode 100644 index 000000000..f470e19af --- /dev/null +++ b/reductstore/src/backend/file.rs @@ -0,0 +1,383 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +use crate::backend::BoxedBackend; +use log::debug; +use std::fs::File as StdFile; +use std::fs::OpenOptions as StdOpenOptions; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +#[derive(PartialEq, Clone, Debug)] +pub enum AccessMode { + Read, + ReadWrite, +} + +pub struct File { + inner: StdFile, + backend: Arc, + path: PathBuf, + last_synced: Instant, + is_synced: bool, + mode: AccessMode, +} + +pub struct OpenOptions { + inner: StdOpenOptions, + backend: Arc, + create: bool, + mode: AccessMode, +} + +impl OpenOptions { + pub(crate) fn new(backend: Arc) -> Self { + Self { + inner: StdOpenOptions::new(), + backend, + create: false, + mode: AccessMode::Read, + } + } + + pub fn read(&mut self, read: bool) -> &mut Self { + self.inner.read(read); + self + } + + pub fn write(&mut self, write: bool) -> &mut Self { + self.inner.write(write); + if write { + self.mode = AccessMode::ReadWrite; + } + self + } + + pub fn create(&mut self, create: bool) -> &mut Self { + self.inner.create(create); + self.create = create; + if create { + self.mode = AccessMode::ReadWrite; + } + self + } + + pub fn open>(&self, path: P) -> std::io::Result { + let full_path = self.backend.path().join(path.as_ref()); + if !full_path.exists() { + // the call initiates downloading the file from remote storage if needed + if self.backend.try_exists(&full_path)? { + self.backend.download(&full_path)?; + } else if !self.create { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("File {:?} does not exist", full_path), + )); + } + } + + let file = self.inner.open(full_path.clone())?; + Ok(File { + inner: file, + backend: Arc::clone(&self.backend), + path: full_path, + last_synced: Instant::now(), + is_synced: true, + mode: self.mode.clone(), + }) + } +} + +impl File { + pub fn sync_all(&mut self) -> std::io::Result<()> { + if self.is_synced() { + return Ok(()); + } + + debug!("File {} synced to storage backend", self.path.display()); + + self.inner.sync_all()?; + self.backend.upload(&self.path)?; + self.last_synced = Instant::now(); + self.is_synced = true; + Ok(()) + } + pub fn metadata(&self) -> std::io::Result { + self.inner.metadata() + } + + pub fn set_len(&mut self, size: u64) -> std::io::Result<()> { + self.is_synced = false; + self.inner.set_len(size) + } + + // Specifically for cache management + pub fn last_synced(&self) -> std::time::Instant { + self.last_synced + } + + pub fn is_synced(&self) -> bool { + self.is_synced + } + + pub fn path(&self) -> &PathBuf { + &self.path + } + + pub fn mode(&self) -> &AccessMode { + &self.mode + } + + pub fn access(&self) -> std::io::Result<()> { + self.backend.update_local_cache(&self.path, &self.mode) + } +} + +impl Read for File { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.inner.read(buf) + } +} + +impl Write for File { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.is_synced = false; + self.inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +impl Seek for File { + fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + self.inner.seek(pos) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::StorageBackend; + use mockall::mock; + use rstest::*; + use std::fs; + use std::path::{Path, PathBuf}; + use tempfile::tempdir; + + mod open_options { + use super::*; + use std::fs; + + #[rstest] + fn test_open_options_read(mut mock_backend: MockBackend) { + let path = mock_backend.path().to_path_buf(); + let copy_path = path.clone(); + + // check if file exists + mock_backend + .expect_try_exists() + .times(1) + .returning(|_| Ok(true)); + // download because it does not exist in cache + mock_backend.expect_download().times(1).returning(move |p| { + assert_eq!(p, copy_path.join("non-existing.txt").as_path()); + fs::create_dir_all(p.parent().unwrap()).unwrap(); + fs::write(©_path.join("non-existing.txt"), "content").unwrap(); + Ok(()) + }); + + let file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .read(true) + .open("non-existing.txt") + .unwrap(); + + assert_eq!(file.mode(), &AccessMode::Read); + assert!(file.is_synced()); + assert_eq!(file.path(), &path.join("non-existing.txt")); + assert_eq!(file.metadata().unwrap().len(), 7); + } + + #[rstest] + fn test_open_options_read_existing(mut mock_backend: MockBackend) { + let path = mock_backend.path().to_path_buf(); + + // no download because it exists in cache + mock_backend.expect_download().times(0); + + let file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .read(true) + .open("test.txt") + .unwrap(); + + assert_eq!(file.mode(), &AccessMode::Read); + assert!(file.is_synced()); + assert_eq!(file.path(), &path.join("test.txt")); + assert_eq!(file.metadata().unwrap().len(), 7); + } + + #[rstest] + fn test_open_options_create_ignore_file_not_exist(mut mock_backend: MockBackend) { + let path = mock_backend.path().to_path_buf(); + let copy_path = path.clone(); + + mock_backend + .expect_try_exists() + .times(1) + .returning(move |p| { + assert_eq!(p, copy_path.join("new_file.txt").as_path()); + Ok(false) + }); + mock_backend.expect_download().times(0); + + let file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .write(true) + .create(true) + .open("new_file.txt") + .unwrap(); + + assert_eq!(file.mode(), &AccessMode::ReadWrite); + assert!(file.is_synced()); + assert_eq!(file.path(), &path.join("new_file.txt")); + assert_eq!(file.metadata().unwrap().len(), 0); + } + } + + mod sync { + use super::*; + use std::io::Write; + + #[rstest] + fn test_file_sync_all(mut mock_backend: MockBackend) { + let path = mock_backend.path().to_path_buf(); + + // expect upload when syncing + mock_backend.expect_upload().times(1).returning(move |p| { + assert_eq!(p, path.join("test.txt").as_path()); + Ok(()) + }); + + let mut file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .write(true) + .open("test.txt") + .unwrap(); + + assert!(file.is_synced()); + file.write_all(b" more").unwrap(); + assert!(!file.is_synced()); + file.sync_all().unwrap(); + assert!(file.is_synced()); + } + + #[rstest] + fn test_is_sync_after_write(mock_backend: MockBackend) { + let mut file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .write(true) + .open("test.txt") + .unwrap(); + + assert!(file.is_synced()); + file.write_all(b" more").unwrap(); + assert!(!file.is_synced()); + } + + #[rstest] + fn test_is_sync_after_set_len(mock_backend: MockBackend) { + let mut file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .write(true) + .open("test.txt") + .unwrap(); + + assert!(file.is_synced()); + file.set_len(10).unwrap(); + assert!(!file.is_synced()); + } + } + + mod access { + use super::*; + + #[rstest] + fn test_file_access_read(mut mock_backend: MockBackend) { + let path = mock_backend.path().to_path_buf(); + + // expect update_local_cache when accessing + mock_backend + .expect_update_local_cache() + .times(1) + .returning(move |p, mode| { + assert_eq!(p, path.join("test.txt").as_path()); + assert_eq!(mode, &AccessMode::Read); + Ok(()) + }); + + let file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .read(true) + .open("test.txt") + .unwrap(); + + file.access().unwrap(); + } + + #[rstest] + fn test_file_access_read_write(mut mock_backend: MockBackend) { + let path = mock_backend.path().to_path_buf(); + + // expect update_local_cache when accessing + mock_backend + .expect_update_local_cache() + .times(1) + .returning(move |p, mode| { + assert_eq!(p, path.join("test.txt").as_path()); + assert_eq!(mode, &AccessMode::ReadWrite); + Ok(()) + }); + + let file = OpenOptions::new(Arc::new(Box::new(mock_backend))) + .write(true) + .open("test.txt") + .unwrap(); + + file.access().unwrap(); + } + } + + mock! { + pub Backend {} + + impl StorageBackend for Backend { + fn path(&self) -> &PathBuf; + fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()>; + fn remove(&self, path: &Path) -> std::io::Result<()>; + fn remove_dir_all(&self, path: &Path) -> std::io::Result<()>; + fn create_dir_all(&self, path: &Path) -> std::io::Result<()>; + fn read_dir(&self, path: &Path) -> std::io::Result>; + fn try_exists(&self, path: &Path) -> std::io::Result; + fn upload(&self, path: &Path) -> std::io::Result<()>; + fn download(&self, path: &Path) -> std::io::Result<()>; + fn update_local_cache(&self, path: &Path, mode: &AccessMode) -> std::io::Result<()>; + fn invalidate_locally_cached_files(&self) -> Vec; + } + + } + + #[fixture] + fn mock_backend(path: PathBuf) -> MockBackend { + // create the file in cache + fs::create_dir_all(path.as_path()).unwrap(); + fs::write(&path.join("test.txt"), "content").unwrap(); + + let mut backend = MockBackend::new(); + backend.expect_path().return_const(path.clone()); + backend + } + + #[fixture] + fn path() -> PathBuf { + tempdir().unwrap().keep() + } +} diff --git a/reductstore/src/backend/fs.rs b/reductstore/src/backend/fs.rs new file mode 100644 index 000000000..c72eb12d5 --- /dev/null +++ b/reductstore/src/backend/fs.rs @@ -0,0 +1,205 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::backend::file::AccessMode; +use crate::backend::StorageBackend; +use std::path::{Path, PathBuf}; + +pub(crate) struct FileSystemBackend { + path: PathBuf, +} + +impl FileSystemBackend { + pub fn new(path: PathBuf) -> Self { + FileSystemBackend { path } + } +} + +impl StorageBackend for FileSystemBackend { + fn path(&self) -> &PathBuf { + &self.path + } + + fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> { + std::fs::rename(from, to) + } + + fn remove(&self, path: &Path) -> std::io::Result<()> { + std::fs::remove_file(path) + } + + fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> { + std::fs::remove_dir_all(path) + } + + fn create_dir_all(&self, path: &Path) -> std::io::Result<()> { + std::fs::create_dir_all(path) + } + + fn read_dir(&self, path: &Path) -> std::io::Result> { + std::fs::read_dir(path).map(|read_dir| { + read_dir + .filter_map(|entry| entry.ok().map(|e| e.path())) + .collect() + }) + } + + fn try_exists(&self, path: &Path) -> std::io::Result { + path.try_exists() + } + + fn upload(&self, _path: &Path) -> std::io::Result<()> { + // do nothing because the file owner is responsible for syncing with fs + Ok(()) + } + + fn download(&self, _path: &Path) -> std::io::Result<()> { + // do nothing because filesystem backend does not need downloading + Ok(()) + } + + fn update_local_cache(&self, _path: &Path, _mode: &AccessMode) -> std::io::Result<()> { + // do nothing because filesystem backend does not need access tracking + Ok(()) + } + + fn invalidate_locally_cached_files(&self) -> Vec { + // do nothing because filesystem backend does not have a cache + vec![] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + use std::fs::OpenOptions; + use std::io::Write; + use tempfile::tempdir; + + mod rename_file { + use super::*; + use std::io::Read; + + #[rstest] + fn test_rename_file(fs_backend: FileSystemBackend) { + let path = fs_backend.path().join("old_name.txt"); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(&path) + .unwrap(); + writeln!(file, "This is a test file.").unwrap(); + + let new_path = path.with_file_name("new_name.txt"); + fs_backend.rename(&path, &new_path).unwrap(); + assert!(!fs_backend.try_exists(&path).unwrap()); + assert!(fs_backend.try_exists(&new_path).unwrap()); + + let mut read_file = OpenOptions::new().read(true).open(&new_path).unwrap(); + let mut content = String::new(); + read_file.read_to_string(&mut content).unwrap(); + assert_eq!(content, "This is a test file.\n"); + } + } + + mod remove_file { + use super::*; + use std::fs::OpenOptions; + + #[rstest] + fn test_remove_file(fs_backend: FileSystemBackend) { + let path = fs_backend.path().join("temp_file.txt"); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(&path) + .unwrap(); + writeln!(file, "Temporary file content.").unwrap(); + + assert!(fs_backend.try_exists(&path).unwrap()); + fs_backend.remove(&path).unwrap(); + assert!(!fs_backend.try_exists(&path).unwrap()); + } + } + + mod remove_dir_all { + use super::*; + use std::fs::OpenOptions; + + #[rstest] + fn test_remove_dir_all(fs_backend: FileSystemBackend) { + let dir_path = fs_backend.path().join("temp_dir"); + fs_backend.create_dir_all(&dir_path).unwrap(); + let file_path = dir_path.join("file.txt"); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .open(&file_path) + .unwrap(); + writeln!(file, "File in temporary directory.").unwrap(); + file.sync_all().unwrap(); + + fs_backend.remove_dir_all(&dir_path).unwrap(); + assert!(!fs_backend.try_exists(&dir_path).unwrap()); + } + } + + mod create_dir_all { + use super::*; + + #[rstest] + fn test_create_dir_all(fs_backend: FileSystemBackend) { + let dir_path = fs_backend.path().join("new_dir/sub_dir"); + fs_backend.create_dir_all(&dir_path).unwrap(); + assert!(fs_backend.try_exists(&dir_path).unwrap()); + } + } + + mod read_dir { + use super::*; + use std::fs::OpenOptions; + + #[rstest] + fn test_read_dir(fs_backend: FileSystemBackend) { + let dir_path = fs_backend.path().join("read_dir_test"); + fs_backend.create_dir_all(&dir_path).unwrap(); + + let file1_path = dir_path.join("file1.txt"); + let _ = OpenOptions::new() + .write(true) + .create(true) + .open(&file1_path) + .unwrap(); + + let file2_path = dir_path.join("file2.txt"); + let _ = OpenOptions::new() + .write(true) + .create(true) + .open(&file2_path) + .unwrap(); + + fs_backend.create_dir_all(&dir_path.join("child/")).unwrap(); + + let entries = fs_backend.read_dir(&dir_path).unwrap(); + let entry_names: Vec = entries + .iter() + .map(|p| p.file_name().unwrap().to_string_lossy().to_string()) + .collect(); + + assert_eq!(entry_names.len(), 3); + assert!(entry_names.contains(&"file1.txt".to_string())); + assert!(entry_names.contains(&"file2.txt".to_string())); + assert!(entry_names.contains(&"child".to_string())); + } + } + + #[fixture] + fn fs_backend() -> FileSystemBackend { + let dir = tempdir().unwrap().keep(); + let backend = FileSystemBackend::new(dir.to_path_buf()); + backend + } +} diff --git a/reductstore/src/backend/noop.rs b/reductstore/src/backend/noop.rs new file mode 100644 index 000000000..2d1c53d01 --- /dev/null +++ b/reductstore/src/backend/noop.rs @@ -0,0 +1,92 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::backend::file::AccessMode; +use crate::backend::StorageBackend; +use std::path::{Path, PathBuf}; + +pub(super) struct NoopBackend; + +impl StorageBackend for NoopBackend { + fn path(&self) -> &PathBuf { + panic!("NoopBackend does not have a path"); + } + + fn rename(&self, _from: &Path, _to: &Path) -> std::io::Result<()> { + Ok(()) + } + + fn remove(&self, _path: &Path) -> std::io::Result<()> { + Ok(()) + } + + fn remove_dir_all(&self, _path: &Path) -> std::io::Result<()> { + Ok(()) + } + + fn create_dir_all(&self, _path: &Path) -> std::io::Result<()> { + Ok(()) + } + + fn read_dir(&self, _path: &Path) -> std::io::Result> { + Ok(vec![]) + } + + fn try_exists(&self, _path: &Path) -> std::io::Result { + Ok(false) + } + + fn upload(&self, _path: &Path) -> std::io::Result<()> { + Ok(()) + } + + fn download(&self, _path: &Path) -> std::io::Result<()> { + Ok(()) + } + + fn update_local_cache(&self, _path: &Path, _mode: &AccessMode) -> std::io::Result<()> { + Ok(()) + } + + fn invalidate_locally_cached_files(&self) -> Vec { + vec![] + } +} + +impl NoopBackend { + pub fn new() -> Self { + NoopBackend + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + + #[rstest] + fn test_noop_backend() { + let backend = NoopBackend::new(); + assert!(!backend.try_exists(Path::new("some/path")).unwrap()); + assert!(backend.read_dir(Path::new("some/path")).unwrap().is_empty()); + assert!(backend.rename(Path::new("from"), Path::new("to")).is_ok()); + assert!(backend.remove(Path::new("some/path")).is_ok()); + assert!(backend.remove_dir_all(Path::new("some/path")).is_ok()); + assert!(backend.create_dir_all(Path::new("some/path")).is_ok()); + assert!(backend.upload(Path::new("some/path")).is_ok()); + assert!(backend.download(Path::new("some/path")).is_ok()); + assert!(backend + .update_local_cache(Path::new("some/path"), &AccessMode::Read) + .is_ok()); + assert!(backend.invalidate_locally_cached_files().is_empty()); + } + + #[rstest] + #[should_panic(expected = "NoopBackend does not have a path")] + fn test_noop_backend_path() { + let backend = NoopBackend::new(); + let _ = backend.path(); + } +} diff --git a/reductstore/src/backend/remote.rs b/reductstore/src/backend/remote.rs new file mode 100644 index 000000000..f946523d0 --- /dev/null +++ b/reductstore/src/backend/remote.rs @@ -0,0 +1,649 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +mod local_cache; + +#[cfg(feature = "s3-backend")] +mod s3_connector; + +use crate::backend::file::AccessMode; +use crate::backend::remote::local_cache::LocalCache; +use crate::backend::{BackendType, StorageBackend}; +use log::debug; +use std::io; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use std::time::Instant; + +#[cfg(feature = "s3-backend")] +use crate::backend::remote::s3_connector::S3Connector; + +#[allow(dead_code)] +pub(super) trait RemoteStorageConnector { + fn download_object(&self, key: &str, dest: &PathBuf) -> Result<(), io::Error>; + fn upload_object(&self, key: &str, src: &PathBuf) -> Result<(), io::Error>; + fn create_dir_all(&self, key: &str) -> Result<(), io::Error>; + fn list_objects(&self, key: &str, recursive: bool) -> Result, io::Error>; + fn remove_object(&self, key: &str) -> Result<(), io::Error>; + fn head_object(&self, key: &str) -> Result; + fn rename_object(&self, from: &str, to: &str) -> Result<(), io::Error>; +} + +#[allow(dead_code)] +pub(crate) struct RemoteBackendSettings { + pub connector_type: BackendType, + pub cache_path: PathBuf, + pub cache_size: u64, + pub endpoint: Option, + pub access_key: String, + pub secret_key: String, + pub region: Option, + pub bucket: String, +} + +struct LocalCacheEntry { + last_accessed: Instant, + size: u64, +} + +pub(crate) struct RemoteBackend { + cache_path: PathBuf, + connector: Box, + local_cache: Mutex, +} + +impl RemoteBackend { + #[allow(dead_code, unused_variables)] + pub fn new(settings: RemoteBackendSettings) -> Self { + let cache_path = settings.cache_path.clone(); + let local_cache = Mutex::new(LocalCache::new(cache_path.clone(), settings.cache_size)); + + let connector = match settings.connector_type { + #[cfg(feature = "s3-backend")] + BackendType::S3 => Box::new(S3Connector::new(settings)), + #[cfg(feature = "fs-backend")] + BackendType::Filesystem => + // panic because we shouldn't be here if filesystem is selected + panic!("Filesystem remote storage backend is not supported, falling back to S3 connector"), + }; + + #[allow(unreachable_code)] + RemoteBackend { + cache_path, + connector, + local_cache, + } + } + + #[cfg(test)] + fn new_test( + connector: Box, + cache_path: PathBuf, + cache_size: u64, + ) -> Self { + let local_cache = Mutex::new(LocalCache::new(cache_path.clone(), cache_size)); + + RemoteBackend { + cache_path, + connector, + local_cache, + } + } +} + +impl StorageBackend for RemoteBackend { + fn path(&self) -> &PathBuf { + &self.cache_path + } + + fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { + let (from_key, to_key) = { + let cache = &mut self.local_cache.lock().unwrap(); + cache.rename(from, to)?; + let from_key = cache.build_key(from); + let to_key = cache.build_key(to); + (from_key, to_key) + }; + + debug!( + "Renaming S3 object from key: {} to key: {}", + from_key, to_key + ); + // at least minio doesn't remove folders recursively, so we need to list and remove all objects + for key in self.connector.list_objects(&from_key, true)? { + self.connector.rename_object( + &format!("{}/{}", from_key, key), + &format!("{}/{}", to_key, key), + )?; + } + + if to.is_dir() { + self.connector + .rename_object(&format!("{}/", from_key), &format!("{}/", to_key))?; + } else { + self.connector.rename_object(&from_key, &to_key)?; + } + + Ok(()) + } + + fn remove(&self, path: &Path) -> io::Result<()> { + let s3_key = { + let cache = &mut self.local_cache.lock().unwrap(); + cache.remove(&path)?; + cache.build_key(path) + }; + + debug!("Removing S3 object for key: {}", s3_key); + self.connector.remove_object(&s3_key) + } + + fn remove_dir_all(&self, path: &Path) -> io::Result<()> { + let s3_key = { + let cache = &mut self.local_cache.lock().unwrap(); + cache.remove_all(&path)?; + cache.build_key(path) + }; + + debug!("Removing S3 directory for key: {}", s3_key); + // at least minio doesn't remove folders recursively, so we need to list and remove all objects + for key in self.connector.list_objects(&s3_key, true)? { + self.connector + .remove_object(&format!("{}/{}", s3_key, key))?; + } + + self.connector.remove_object(&format!("{}/", s3_key)) + } + + fn create_dir_all(&self, path: &Path) -> io::Result<()> { + let s3_key = { + let cache = &self.local_cache.lock().unwrap(); + cache.create_dir_all(&path)?; + cache.build_key(path) + }; + + if s3_key.is_empty() { + return Ok(()); + } + + self.connector.create_dir_all(&s3_key) + } + + fn read_dir(&self, path: &Path) -> io::Result> { + let cache = &self.local_cache.lock().unwrap(); + let s3_key = cache.build_key(path); + + let mut paths = vec![]; + for key in self.connector.list_objects(&s3_key, false)? { + if key == s3_key { + continue; + } + + let local_path = self.cache_path.join(path).join(&key); + if key.ends_with('/') { + debug!( + "Creating local directory {:?} for S3 key: {}", + local_path, key + ); + cache.create_dir_all(&local_path)?; + } + + paths.push(local_path); + } + Ok(paths) + } + + fn try_exists(&self, path: &Path) -> io::Result { + // check cache first and then load from remote if not in cache + let s3_key = { + let cache = &self.local_cache.lock().unwrap(); + if cache.try_exists(path)? { + return Ok(true); + } + + cache.build_key(path) + }; + + debug!("Checking S3 key: {} to local path: {:?}", s3_key, path); + self.connector.head_object(&s3_key) + } + + fn upload(&self, full_path: &Path) -> io::Result<()> { + // upload to remote + let s3_key = { + let cache = &mut self.local_cache.lock().unwrap(); + cache.register_file(full_path)?; + cache.build_key(full_path) + }; + + let full_path = self.cache_path.join(full_path); + debug!( + "Syncing local file {} to S3 key: {}", + full_path.display(), + s3_key + ); + self.connector + .upload_object(&s3_key, &full_path.to_path_buf())?; + + Ok(()) + } + + fn download(&self, path: &Path) -> io::Result<()> { + let s3_key = { + let cache = &mut self.local_cache.lock().unwrap(); + if cache.try_exists(path)? { + return Ok(()); + } + cache.build_key(path) + }; + + let full_path = self.cache_path.join(path); + debug!( + "Downloading S3 key: {} to local path: {:?}", + s3_key, full_path + ); + self.connector.download_object(&s3_key, &full_path)?; + self.local_cache.lock().unwrap().register_file(&full_path)?; + Ok(()) + } + + fn update_local_cache(&self, path: &Path, _mode: &AccessMode) -> std::io::Result<()> { + self.local_cache.lock().unwrap().access_file(path) + } + + fn invalidate_locally_cached_files(&self) -> Vec { + let mut cache = self.local_cache.lock().unwrap(); + cache.invalidate_old_files() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockall::mock; + use rstest::*; + use tempfile::tempdir; + + #[rstest] + fn test_remote_backend_creation(mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let remote_backend = make_remote_backend(mock_connector, path.clone()); + assert_eq!(remote_backend.path(), &path); + } + + mod rename { + use super::*; + use mockall::predicate::eq; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_rename_file(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let from_key = "file1.txt"; + let to_key = "file2.txt"; + + mock_connector + .expect_list_objects() + .with(eq(from_key), eq(true)) + .times(1) + .returning(|_, _| Ok(vec![])); + + mock_connector + .expect_rename_object() + .with(eq(from_key), eq(to_key)) + .times(1) + .returning(|_, _| Ok(())); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let from = path.join(from_key); + let to = path.join(to_key); + fs::create_dir_all(&path).unwrap(); + fs::write(&from, b"test").unwrap(); + + remote_backend.rename(&from, &to).unwrap(); + } + + #[rstest] + fn test_rename_directory(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let from_key = "dir1/"; + let to_key = "dir2/"; + + mock_connector + .expect_list_objects() + .with(eq("dir1"), eq(true)) + .times(1) + .returning(|_, _| Ok(vec!["file_in_dir.txt".to_string()])); + + mock_connector + .expect_rename_object() + .with(eq("dir1/file_in_dir.txt"), eq("dir2/file_in_dir.txt")) + .times(1) + .returning(|_, _| Ok(())); + + mock_connector + .expect_rename_object() + .with(eq(from_key), eq(to_key)) + .times(1) + .returning(|_, _| Ok(())); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let from = path.join(from_key); + let to = path.join(to_key); + fs::create_dir_all(&from).unwrap(); + fs::write(from.join("file_in_dir.txt"), b"test").unwrap(); + + remote_backend.rename(&from, &to).unwrap(); + } + } + + mod remove { + use super::*; + use mockall::predicate::eq; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_remove_file(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let key = "file1.txt"; + + mock_connector + .expect_remove_object() + .with(eq(key)) + .times(1) + .returning(|_| Ok(())); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let file_path = path.join(key); + fs::create_dir_all(&path).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + remote_backend.remove(&file_path).unwrap(); + } + + #[rstest] + fn test_remove_directory(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let dir_key = "dir1/"; + let file_key = "dir1/file_in_dir.txt"; + + mock_connector + .expect_list_objects() + .with(eq("dir1"), eq(true)) + .times(1) + .returning(|_, _| Ok(vec!["file_in_dir.txt".to_string()])); + + mock_connector + .expect_remove_object() + .with(eq(file_key)) + .times(1) + .returning(|_| Ok(())); + + mock_connector + .expect_remove_object() + .with(eq(dir_key)) + .times(1) + .returning(|_| Ok(())); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let dir_path = path.join("dir1"); + fs::create_dir_all(&dir_path).unwrap(); + fs::write(dir_path.join("file_in_dir.txt"), b"test").unwrap(); + + remote_backend.remove_dir_all(&dir_path).unwrap(); + } + } + + mod create { + use super::*; + use mockall::predicate::eq; + use std::path::PathBuf; + + #[rstest] + fn test_create_dir_all(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + mock_connector + .expect_create_dir_all() + .with(eq("dir1")) + .times(1) + .returning(|_| Ok(())); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let dir_path = path.join("dir1"); + remote_backend.create_dir_all(&dir_path).unwrap(); + assert!(dir_path.exists()); + assert!(dir_path.is_dir()); + } + } + + mod read_dir { + use super::*; + use mockall::predicate::eq; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_read_dir(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + mock_connector + .expect_list_objects() + .with(eq("dir1"), eq(false)) + .times(1) + .returning(|_, _| Ok(vec!["file1.txt".to_string(), "subdir/".to_string()])); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let dir_path = path.join("dir1"); + fs::create_dir_all(&dir_path).unwrap(); + + let entries = remote_backend.read_dir(&dir_path).unwrap(); + assert_eq!(entries.len(), 2); + assert!(entries.contains(&dir_path.join("file1.txt"))); + assert!(entries.contains(&dir_path.join("subdir"))); + assert!(dir_path.join("subdir").is_dir()); + } + } + + mod try_exists { + use super::*; + use mockall::predicate::eq; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_try_exists_file_in_cache( + mock_connector: MockRemoteStorageConnector, + path: PathBuf, + ) { + let file_key = "file1.txt"; + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let file_path = path.join(file_key); + fs::create_dir_all(&path).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + assert!(remote_backend.try_exists(&file_path).unwrap()); + } + + #[rstest] + fn test_try_exists_file_in_remote( + mut mock_connector: MockRemoteStorageConnector, + path: PathBuf, + ) { + let file_key = "file1.txt"; + + mock_connector + .expect_head_object() + .with(eq(file_key)) + .times(1) + .returning(|_| Ok(true)); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let file_path = path.join(file_key); + assert!(remote_backend.try_exists(&file_path).unwrap()); + } + + #[rstest] + fn test_try_exists_file_not_exist( + mut mock_connector: MockRemoteStorageConnector, + path: PathBuf, + ) { + let file_key = "file1.txt"; + + mock_connector + .expect_head_object() + .with(eq(file_key)) + .times(1) + .returning(|_| Ok(false)); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let file_path = path.join(file_key); + assert!(!remote_backend.try_exists(&file_path).unwrap()); + } + } + + mod upload { + use super::*; + use mockall::predicate::eq; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_upload_file(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let file_key = "file1.txt"; + + mock_connector + .expect_upload_object() + .with(eq(file_key), eq(path.join(file_key).to_path_buf())) + .times(1) + .returning(|_, _| Ok(())); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + let file_path = path.join(file_key); + fs::create_dir_all(&path).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + remote_backend.upload(&file_path).unwrap(); + } + } + + mod download { + use super::*; + use mockall::predicate::eq; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_download_file(mut mock_connector: MockRemoteStorageConnector, path: PathBuf) { + let file_key = "file1.txt"; + let file_path = path.join(file_key); + + mock_connector + .expect_download_object() + .with(eq(file_key), eq(path.join(file_key).to_path_buf())) + .times(1) + .returning(|_, path| { + fs::create_dir_all(path.parent().unwrap()).unwrap(); + fs::write(path.clone(), b"test").unwrap(); // we need to create the file to register it in cache + Ok(()) + }); + + let remote_backend = make_remote_backend(mock_connector, path.clone()); + + remote_backend.download(&file_path).unwrap(); + } + } + + mod update_local_cache { + use super::*; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_update_local_cache(path: PathBuf) { + let remote_backend = + make_remote_backend(MockRemoteStorageConnector::new(), path.clone()); + + let file_key = "file1.txt"; + let file_path = path.join(file_key); + fs::create_dir_all(&path).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + assert!(remote_backend + .update_local_cache(&file_path, &AccessMode::Read) + .is_ok()); + } + } + + mod invalidate_locally_cached_files { + use super::*; + use std::fs; + use std::path::PathBuf; + + #[rstest] + fn test_invalidate_locally_cached_files(path: PathBuf) { + let remote_backend = make_remote_backend_with_size( + MockRemoteStorageConnector::new(), + path.clone(), + 1, // 1 byte cache size to force invalidation + ); + + let file_key = "file1.txt"; + let file_path = path.join(file_key); + fs::create_dir_all(&path).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + remote_backend + .update_local_cache(&file_path, &AccessMode::Read) + .unwrap(); + let invalidated_files = remote_backend.invalidate_locally_cached_files(); + assert_eq!(invalidated_files, vec![file_path]) + } + } + + mock! { + pub RemoteStorageConnector {} + + impl RemoteStorageConnector for RemoteStorageConnector { + fn download_object(&self, key: &str, dest: &PathBuf) -> Result<(), io::Error>; + fn upload_object(&self, key: &str, src: &PathBuf) -> Result<(), io::Error>; + fn create_dir_all(&self, key: &str) -> Result<(), io::Error>; + fn list_objects(&self, key: &str, recursive: bool) -> Result, io::Error>; + fn remove_object(&self, key: &str) -> Result<(), io::Error>; + fn head_object(&self, key: &str) -> Result; + fn rename_object(&self, from: &str, to: &str) -> Result<(), io::Error>; + } + } + + #[fixture] + fn mock_connector() -> MockRemoteStorageConnector { + let mock = MockRemoteStorageConnector::new(); + mock + } + + #[fixture] + fn path() -> PathBuf { + tempdir().unwrap().keep() + } + + fn make_remote_backend( + mock_connector: MockRemoteStorageConnector, + path: PathBuf, + ) -> RemoteBackend { + let cache_size = 10 * 1024 * 1024; // 10 MB + make_remote_backend_with_size(mock_connector, path, cache_size) + } + + fn make_remote_backend_with_size( + mock_connector: MockRemoteStorageConnector, + path: PathBuf, + cache_size: u64, + ) -> RemoteBackend { + RemoteBackend::new_test(Box::new(mock_connector), path, cache_size) + } +} diff --git a/reductstore/src/backend/remote/local_cache.rs b/reductstore/src/backend/remote/local_cache.rs new file mode 100644 index 000000000..3cf6c0673 --- /dev/null +++ b/reductstore/src/backend/remote/local_cache.rs @@ -0,0 +1,437 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use log::{info, warn}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::Instant; +use std::{fs, io}; + +pub(super) struct LocalCache { + path: PathBuf, + max_size: u64, + current_size: u64, + entries: HashMap, +} + +#[allow(dead_code)] +impl LocalCache { + pub fn new(path: PathBuf, max_size: u64) -> Self { + info!("Cleaning up local cache at {:?}", path); + if let Err(err) = fs::remove_dir_all(&path) { + if err.kind() != io::ErrorKind::NotFound { + warn!("Failed to clean up local cache at {:?}: {}", path, err); + } + } + + LocalCache { + path, + max_size, + current_size: 0, + entries: HashMap::new(), + } + } + + pub fn build_key(&self, path: &Path) -> String { + path.strip_prefix(&self.path) + .unwrap() + .to_str() + .unwrap_or("") + .to_string() + .replace('\\', "/") // normalize Windows paths + } + + pub fn remove(&mut self, path: &Path) -> io::Result<()> { + if let Err(err) = fs::remove_file(path) { + if err.kind() != io::ErrorKind::NotFound { + return Err(err); + } + } + + if let Some(entry) = self.entries.remove(path) { + self.current_size -= entry.size; + } + Ok(()) + } + + pub fn remove_all(&mut self, dir: &Path) -> io::Result<()> { + fs::remove_dir_all(dir)?; + + let paths_to_remove: Vec = self + .entries + .keys() + .filter(|p| p.starts_with(dir)) + .cloned() + .collect(); + + for path in paths_to_remove { + if let Err(err) = self.remove(&path) { + warn!("Failed to remove cached file {:?}: {}", path, err); + } + } + + Ok(()) + } + + pub fn rename(&mut self, from: &Path, to: &Path) -> io::Result<()> { + fs::rename(from, to)?; + + if let Some(entry) = self.entries.remove(from) { + self.entries.insert(to.to_path_buf(), entry); + } + Ok(()) + } + + pub fn create_dir_all(&self, path: &Path) -> io::Result<()> { + fs::create_dir_all(path) + } + + pub fn try_exists(&self, path: &Path) -> io::Result { + let full_path = self.path.join(path); + full_path.try_exists() + } + + pub fn register_file(&mut self, path: &Path) -> io::Result<()> { + let metadata = fs::metadata(&path)?; + let file_size = metadata.len(); + + self.current_size += file_size; + if let Some(previous) = self.entries.insert( + path.to_path_buf(), + crate::backend::remote::LocalCacheEntry { + last_accessed: Instant::now(), + size: file_size, + }, + ) { + self.current_size -= previous.size; + } + + Ok(()) + } + + pub fn access_file(&mut self, path: &Path) -> io::Result<()> { + if let Some(entry) = self.entries.get_mut(path) { + entry.last_accessed = Instant::now(); + } else { + self.register_file(path)?; + } + Ok(()) + } + + pub fn invalidate_old_files(&mut self) -> Vec { + let mut removed_files = vec![]; + + if self.current_size <= self.max_size { + return removed_files; + } + + let mut entries: Vec<(&PathBuf, &crate::backend::remote::LocalCacheEntry)> = + self.entries.iter().collect(); + entries.sort_by_key(|&(_, entry)| entry.last_accessed); + + for (path, entry) in entries { + if self.current_size <= self.max_size { + break; + } + + self.current_size -= entry.size; + removed_files.push(path.clone()); + } + + for path in &removed_files { + self.entries.remove(path); + } + + removed_files + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + + mod new { + use super::*; + use std::fs; + + #[rstest] + fn test_remove_folder_before_create(path: PathBuf) { + fs::create_dir_all(&path).unwrap(); + fs::write(path.join("test_file"), b"test").unwrap(); + + let _local_cache = LocalCache::new(path.clone(), 1024 * 1024); + + assert!(!path.exists()); + } + + #[rstest] + fn test_if_path_does_not_exist(path: PathBuf) { + let cache_path = path.join("non_existent_cache"); + let _local_cache = LocalCache::new(cache_path.clone(), 1024 * 1024); + + assert!(!cache_path.exists()); + } + } + + mod build_key { + use super::*; + + #[rstest] + fn test_build_key(local_cache: LocalCache) { + let file_path = local_cache.path.join("dir").join("file.txt"); + let key = local_cache.build_key(&file_path); + + assert_eq!(key, "dir/file.txt"); + } + } + + mod remove { + use super::*; + + #[rstest] + fn test_remove_registered_file(mut local_cache: LocalCache) { + let file_path = local_cache.path.join("file_to_remove.txt"); + fs::create_dir_all(local_cache.path.clone()).unwrap(); + fs::write(&file_path, b"test").unwrap(); + local_cache.register_file(&file_path).unwrap(); + + assert!(file_path.exists()); + assert!(local_cache.entries.contains_key(&file_path)); + assert_eq!(local_cache.current_size, 4); + + local_cache.remove(&file_path).unwrap(); + + assert!(!file_path.exists()); + assert!(!local_cache.entries.contains_key(&file_path)); + assert_eq!(local_cache.current_size, 0); + } + + #[rstest] + fn test_remove_non_registered_file(mut local_cache: LocalCache) { + let file_path = local_cache.path.join("file_to_remove.txt"); + fs::create_dir_all(local_cache.path.clone()).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + assert!(file_path.exists()); + assert!(!local_cache.entries.contains_key(&file_path)); + assert_eq!(local_cache.current_size, 0); + + local_cache.remove(&file_path).unwrap(); + + assert!(!file_path.exists()); + assert!(!local_cache.entries.contains_key(&file_path)); + assert_eq!(local_cache.current_size, 0); + } + } + + mod remove_all { + use super::*; + + #[rstest] + fn test_remove_all_registered_files(mut local_cache: LocalCache) { + let dir_to_remove = local_cache.path.join("dir_to_remove"); + let file_path1 = dir_to_remove.join("file1.txt"); + let file_path2 = dir_to_remove.join("file2.txt"); + fs::create_dir_all(&dir_to_remove).unwrap(); + fs::write(&file_path1, b"test1").unwrap(); + fs::write(&file_path2, b"test2").unwrap(); + local_cache.register_file(&file_path1).unwrap(); + local_cache.register_file(&file_path2).unwrap(); + + let dir_to_keep = local_cache.path.join("dir_to_keep"); + let file_path3 = dir_to_keep.join("file1.txt"); + fs::create_dir_all(&dir_to_keep).unwrap(); + fs::write(&file_path3, b"test1").unwrap(); + local_cache.register_file(&file_path3).unwrap(); + + assert!(dir_to_remove.exists()); + assert!(dir_to_keep.exists()); + assert!(local_cache.entries.contains_key(&file_path1)); + assert!(local_cache.entries.contains_key(&file_path2)); + assert!(local_cache.entries.contains_key(&file_path3)); + assert_eq!(local_cache.current_size, 15); + + local_cache.remove_all(&dir_to_remove).unwrap(); + + assert!(!dir_to_remove.exists()); + assert!(dir_to_keep.exists()); + assert!(!local_cache.entries.contains_key(&file_path1)); + assert!(!local_cache.entries.contains_key(&file_path2)); + assert!(local_cache.entries.contains_key(&file_path3)); + assert_eq!(local_cache.current_size, 5); + } + + #[rstest] + fn test_remove_all_non_registered_files(mut local_cache: LocalCache) { + let dir_to_remove = local_cache.path.join("dir_to_remove"); + let file_path1 = dir_to_remove.join("file1.txt"); + let file_path2 = dir_to_remove.join("file2.txt"); + fs::create_dir_all(&dir_to_remove).unwrap(); + fs::write(&file_path1, b"test1").unwrap(); + fs::write(&file_path2, b"test2").unwrap(); + + assert!(dir_to_remove.exists()); + assert!(local_cache.entries.is_empty()); + assert_eq!(local_cache.current_size, 0); + + local_cache.remove_all(&dir_to_remove).unwrap(); + + assert!(!dir_to_remove.exists()); + assert!(local_cache.entries.is_empty()); + assert_eq!(local_cache.current_size, 0); + } + } + + mod create_dir_all { + use super::*; + + #[rstest] + fn test_create_new_directory(local_cache: LocalCache) { + let new_dir = local_cache.path.join("new_dir"); + + assert!(!new_dir.exists()); + + local_cache.create_dir_all(&new_dir).unwrap(); + + assert!(new_dir.exists()); + assert!(new_dir.is_dir()); + } + + #[rstest] + fn test_create_existing_directory(local_cache: LocalCache) { + let existing_dir = local_cache.path.join("existing_dir"); + fs::create_dir_all(&existing_dir).unwrap(); + + assert!(existing_dir.exists()); + assert!(existing_dir.is_dir()); + + local_cache.create_dir_all(&existing_dir).unwrap(); + + assert!(existing_dir.exists()); + assert!(existing_dir.is_dir()); + } + } + + mod try_exists { + use super::*; + + #[rstest] + fn test_try_exists_existing_file(local_cache: LocalCache) { + let file_path = local_cache.path.join("existing_file.txt"); + fs::create_dir_all(local_cache.path.clone()).unwrap(); + fs::write(&file_path, b"test").unwrap(); + + assert!(local_cache.try_exists(&file_path).unwrap()); + } + + #[rstest] + fn test_try_exists_non_existing_file(local_cache: LocalCache) { + let file_path = local_cache.path.join("non_existing_file.txt"); + + assert!(!local_cache.try_exists(&file_path).unwrap()); + } + } + + mod register_file { + use super::*; + + #[rstest] + fn test_register_new_file(local_cache_with_file: (LocalCache, PathBuf)) { + let (local_cache, file_path) = local_cache_with_file; + assert!(local_cache.entries.contains_key(&file_path)); + assert_eq!(local_cache.current_size, 4); + } + + #[rstest] + fn test_register_existing_file(local_cache_with_file: (LocalCache, PathBuf)) { + let (mut local_cache, file_path) = local_cache_with_file; + fs::write(&file_path, b"test-more-data").unwrap(); + + local_cache.register_file(&file_path).unwrap(); + assert!(local_cache.entries.contains_key(&file_path)); + assert_eq!(local_cache.current_size, 14, "should recalculate size"); + } + } + + mod access_file { + use super::*; + use std::thread; + use std::time::Duration; + + #[rstest] + fn test_access_registered_file(local_cache_with_file: (LocalCache, PathBuf)) { + let (mut local_cache, file_path) = local_cache_with_file; + let previous_access_time = local_cache.entries.get(&file_path).unwrap().last_accessed; + + thread::sleep(Duration::from_millis(10)); + local_cache.access_file(&file_path).unwrap(); + let new_access_time = local_cache.entries.get(&file_path).unwrap().last_accessed; + + assert!(new_access_time > previous_access_time); + assert_eq!(local_cache.current_size, 4); + } + } + + mod invalidate_old_files { + use super::*; + use std::thread; + use std::time::Duration; + + #[rstest] + fn test_invalidate_when_under_limit(mut local_cache: LocalCache) { + let removed_files = local_cache.invalidate_old_files(); + assert!(removed_files.is_empty()); + assert_eq!(local_cache.current_size, 0); + } + + #[rstest] + fn test_invalidate_when_over_limit(mut local_cache: LocalCache) { + let file_path1 = local_cache.path.join("file1.txt"); + let file_path2 = local_cache.path.join("file2.txt"); + fs::create_dir_all(local_cache.path.clone()).unwrap(); + fs::write(&file_path1, b"data1").unwrap(); + fs::write(&file_path2, b"data2").unwrap(); + local_cache.register_file(&file_path1).unwrap(); + local_cache.register_file(&file_path2).unwrap(); + + local_cache.access_file(&file_path2).unwrap(); + thread::sleep(Duration::from_millis(10)); // ensure different access times + local_cache.access_file(&file_path1).unwrap(); + + assert_eq!(local_cache.current_size, 10); + + // Set max size to 6 to force invalidation + local_cache.max_size = 6; + let removed_files = local_cache.invalidate_old_files(); + + assert_eq!(removed_files.len(), 1); + assert!(removed_files.contains(&file_path2), "file2 remains"); + assert!(!removed_files.contains(&file_path1)); + assert_eq!(local_cache.current_size, 5); + } + } + + #[fixture] + fn path() -> PathBuf { + let temp_dir = tempfile::tempdir().unwrap(); + temp_dir.keep().join("cache") + } + + #[fixture] + fn local_cache(path: PathBuf) -> LocalCache { + LocalCache::new(path.clone(), 1024 * 1024) + } + + #[fixture] + fn local_cache_with_file(path: PathBuf) -> (LocalCache, PathBuf) { + let mut local_cache = LocalCache::new(path.clone(), 1024 * 1024); + let file_path = path.join("file.txt"); + fs::create_dir_all(path.clone()).unwrap(); + fs::write(&file_path, b"test").unwrap(); + local_cache.register_file(&file_path).unwrap(); + (local_cache, file_path) + } +} diff --git a/reductstore/src/backend/remote/s3_connector.rs b/reductstore/src/backend/remote/s3_connector.rs new file mode 100644 index 000000000..c2402171a --- /dev/null +++ b/reductstore/src/backend/remote/s3_connector.rs @@ -0,0 +1,642 @@ +// Copyright 2025 ReductSoftware UG +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::backend::remote::RemoteBackendSettings; +use crate::backend::remote::RemoteStorageConnector; +use aws_config::{BehaviorVersion, Region}; +use aws_credential_types::Credentials; +use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::Client; +use log::{debug, error, info}; +use std::collections::HashSet; +use std::io; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tokio::runtime::Runtime; +use tokio::task::block_in_place; +pub(super) struct S3Connector { + bucket: String, + client: Arc, + rt: Arc, + prefix: &'static str, +} + +impl S3Connector { + pub fn new(settings: RemoteBackendSettings) -> Self { + let rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .thread_name("remote-client-worker") + .enable_all() + .build() + .unwrap(), + ); + + let base_config = aws_config::defaults(BehaviorVersion::latest()).region( + settings + .region + .as_ref() + .map(|r| Region::new(r.clone())) + .unwrap_or_else(|| Region::new("notset".to_string())), + ); + + info!("Initializing S3 client for bucket: {}", settings.bucket); + let base = block_in_place(|| rt.block_on(base_config.load())); + + let creds = Credentials::from_keys( + settings.access_key.clone(), + settings.secret_key.clone(), + None, + ); + let conf = aws_sdk_s3::config::Builder::from(&base) + .set_endpoint_url(settings.endpoint) + .clone() + .credentials_provider(creds) + .force_path_style(true) + .request_checksum_calculation( + aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired, + ) + .build(); + + let client = Client::from_conf(conf.clone()); + + S3Connector { + client: Arc::new(client), + bucket: settings.bucket, + rt, + prefix: "r/", + } + } +} + +impl RemoteStorageConnector for S3Connector { + fn download_object(&self, key: &str, dest: &PathBuf) -> Result<(), io::Error> { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + let key = format!("{}{}", self.prefix, key); + + block_in_place(move || { + rt.block_on(async { + let mut resp = client + .get_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(|e| { + error!("S3 get_object error: {}", DisplayErrorContext(&e)); + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 get_object error bucket={}, key={}: {}", + &self.bucket, + &key, + e.message().unwrap_or("connection error") + ), + ) + })?; + let mut file = tokio::fs::File::create(dest).await?; + + while let Some(chunk) = resp.body.next().await { + let data = chunk?; + file.write_all(&data).await?; + } + file.flush().await?; + file.sync_all().await?; + Ok(()) + }) + }) + } + fn upload_object(&self, key: &str, src: &PathBuf) -> Result<(), io::Error> { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + let key = format!("{}{}", self.prefix, key); + + block_in_place(move || { + rt.block_on(async { + let stream = ByteStream::from_path(src).await?; + + client + .put_object() + .bucket(&self.bucket) + .key(&key) + .body(stream) + .send() + .await + .map_err(|e| { + error!("S3 put_object error: {}", DisplayErrorContext(&e)); + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 put_object error bucket={}, key={}: {}", + &self.bucket, + &key, + e.message().unwrap_or("connection error") + ), + ) + })?; + Ok(()) + }) + }) + } + fn create_dir_all(&self, key: &str) -> Result<(), io::Error> { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + + let dir_key = if key.ends_with('/') { + format!("{}{}", self.prefix, key) + } else { + format!("{}{}/", self.prefix, key) + }; + + block_in_place(|| { + rt.block_on(async { + client + .put_object() + .bucket(&self.bucket) + .key(&dir_key) + .body(Vec::new().into()) + .send() + .await + .map_err(|e| { + error!("S3 put_object: {}", DisplayErrorContext(&e)); + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 put_object error bucket={}, key={}: {}", + &self.bucket, + dir_key, + e.message().unwrap_or("connection error") + ), + ) + })?; + Ok(()) + }) + }) + } + fn list_objects(&self, key: &str, recursive: bool) -> Result, io::Error> { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + let prefix = if key.ends_with("/") || key.is_empty() { + format!("{}{}", self.prefix, key) + } else { + format!("{}{}/", self.prefix, key) + }; + + block_in_place(|| { + rt.block_on(async { + let mut keys = HashSet::new(); + let mut continuation_token = None; + + loop { + let resp = client + .list_objects_v2() + .bucket(&self.bucket) + .set_continuation_token(continuation_token.clone()) + .prefix(&prefix) + .send() + .await + .map_err(|e| { + error!("S3 list_objects_v2 error: {}", DisplayErrorContext(&e)); + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 list_objects_v2 error bucket={}, key={}: {}", + &self.bucket, + &prefix, + e.message().unwrap_or("connection error") + ), + ) + })?; + + for object in resp.contents() { + // Didn't find a better way to filter out "subdirectories" + let Some(key) = object.key() else { continue }; + if key == &prefix { + continue; + } + + let key = key.strip_prefix(&prefix).unwrap_or(key); + if recursive { + keys.insert(key.to_string()); + } else { + if let Some((first, _rest)) = key.split_once('/') { + // treat first segment as a "dir" + let dir = format!("{}/", first); + keys.insert(dir); + } else { + // no slash => top-level "file" + keys.insert(key.to_string()); + } + } + } + + if resp.is_truncated().unwrap_or(false) { + continuation_token = resp.next_continuation_token().map(|s| s.to_string()); + } else { + break; + } + } + + let keys = keys.into_iter().collect::>(); + Ok(keys) + }) + }) + } + fn remove_object(&self, key: &str) -> Result<(), io::Error> { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + let key = format!("{}{}", self.prefix, key); + + block_in_place(|| { + rt.block_on(async { + client + .delete_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(|e| { + error!("S3 delete_object error: {}", DisplayErrorContext(&e)); + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 delete_object error bucket={}, key={}: {}", + &self.bucket, + &key, + e.message().unwrap_or("connection error") + ), + ) + })?; + Ok(()) + }) + }) + } + fn head_object(&self, key: &str) -> Result { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + let key = format!("{}{}", self.prefix, key); + + block_in_place(|| { + rt.block_on(async { + let resp = client + .head_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await; + + match resp { + Ok(_) => Ok(true), + Err(e) => { + // Inspect the error + if let SdkError::ServiceError(err) = &e { + if err.err().is_not_found() { + return Ok(false); // Object does not exist + } + } + error!("S3 head_object error: {}", DisplayErrorContext(&e)); + + Err(io::Error::new( + io::ErrorKind::Other, + format!( + "S3 head_object error bucket={}, key={}: {}", + &self.bucket, + &key, + e.message().unwrap_or("connection error") + ), + )) + } + } + }) + }) + } + fn rename_object(&self, from: &str, to: &str) -> Result<(), io::Error> { + let client = Arc::clone(&self.client); + let rt = Arc::clone(&self.rt); + let from_key = format!("{}{}", self.prefix, from); + let to_key = format!("{}{}", self.prefix, to); + + debug!( + "Renaming S3 object from key: {} to key: {}", + &from_key, &to_key + ); + block_in_place(|| { + rt.block_on(async { + client + .rename_object() + .bucket(&self.bucket) + .rename_source(&from_key) + .key(&to_key) + .send() + .await + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 rename_object error bucket={}, from_key={}, to_key={}: {}", + &self.bucket, + &from_key, + &to_key, + e.message().unwrap_or("connection error") + ), + ) + })?; + + // Optionally, delete the source object after copying + client + .delete_object() + .bucket(&self.bucket) + .key(&from_key) + .send() + .await + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!( + "S3 delete_object error bucket={}, key={}: {}", + &self.bucket, + &from_key, + e.message().unwrap_or("connection error") + ), + ) + })?; + Ok(()) + }) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + use std::fs; + use tempfile::tempdir; + + // Dummy tests without S3 connection + // These tests are just to ensure that the code paths are correct + // and that error handling works as expected. + mod dummy { + use super::*; + #[rstest] + fn download_object(connector: S3Connector) { + let key = "test_download.txt"; + let dest = PathBuf::from("/tmp/test_download.txt"); + + assert_eq!( + connector + .download_object(key, &dest) + .err() + .unwrap() + .to_string(), + "S3 get_object error bucket=test-bucket, key=r/test_download.txt: connection error" + ); + } + + #[rstest] + fn upload_object(connector: S3Connector, path: PathBuf) { + let key = "test_upload.txt"; + let src = path.join("test_upload.txt"); + fs::write(&src, b"test upload content").unwrap(); + + assert_eq!( + connector + .upload_object(key, &src) + .err() + .unwrap() + .to_string(), + "S3 put_object error bucket=test-bucket, key=r/test_upload.txt: connection error" + ); + } + + #[rstest] + fn create_dir_all(connector: S3Connector) { + let key = "test_dir/"; + + assert_eq!( + connector.create_dir_all(key).err().unwrap().to_string(), + "S3 put_object error bucket=test-bucket, key=r/test_dir/: connection error" + ); + } + + #[rstest] + fn list_objects(connector: S3Connector) { + let key = "test_list/"; + let recursive = false; + + assert_eq!( + connector + .list_objects(key, recursive) + .err() + .unwrap() + .to_string(), + "S3 list_objects_v2 error bucket=test-bucket, key=r/test_list/: connection error" + ); + } + + #[rstest] + fn remove_object(connector: S3Connector) { + let key = "test_remove.txt"; + + assert_eq!( + connector.remove_object(key).err().unwrap().to_string(), + "S3 delete_object error bucket=test-bucket, key=r/test_remove.txt: connection error" + ); + } + + #[rstest] + fn head_object(connector: S3Connector) { + let key = "test_head.txt"; + + assert_eq!( + connector.head_object(key).err().unwrap().to_string(), + "S3 head_object error bucket=test-bucket, key=r/test_head.txt: connection error" + ); + } + + #[rstest] + fn rename_object(connector: S3Connector) { + let from = "test_rename_from.txt"; + let to = "test_rename_to.txt"; + + assert_eq!(connector.rename_object(from, to).err().unwrap().to_string(), + "S3 rename_object error bucket=test-bucket, from_key=r/test_rename_from.txt, to_key=r/test_rename_to.txt: connection error" + ); + } + + #[fixture] + fn path() -> PathBuf { + tempdir().unwrap().keep() + } + + #[fixture] + fn connector(settings: RemoteBackendSettings) -> S3Connector { + S3Connector::new(settings) + } + + #[fixture] + fn settings() -> RemoteBackendSettings { + RemoteBackendSettings { + connector_type: Default::default(), + cache_path: Default::default(), + bucket: "test-bucket".to_string(), + region: Some("us-east-1".to_string()), + endpoint: Some("http://xxxxx:9000".to_string()), // we do just a dry run + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + cache_size: 0, + } + } + } + + #[cfg(feature = "ci")] + mod ci { + use super::*; + use crate::backend::BackendType; + use crate::core::env; + use crate::core::env::StdEnvGetter; + use serial_test::serial; + use tempfile::tempdir; + + #[rstest] + #[serial] + fn download_object(connector: S3Connector, path: PathBuf) { + let key = "test/test.txt"; + let dest = path.join("downloaded_test.txt"); + assert!(!dest.exists()); + + (connector.download_object(key, &dest).unwrap()); + assert!(dest.exists()); + let content = std::fs::read_to_string(&dest).unwrap(); + assert_eq!(content, "This is a test file for download.\n"); + } + + #[rstest] + #[serial] + fn upload_object(connector: S3Connector, path: PathBuf) { + let key = "test/uploaded_test.txt"; + let src = path.join("uploaded_test.txt"); + fs::write(&src, b"This is a test file for upload.\n").unwrap(); + + (connector.upload_object(key, &src).unwrap()); + assert!(connector.head_object(key).unwrap()); + } + + #[rstest] + #[serial] + fn create_dir_all(connector: S3Connector) { + let key = "test/new_dir/"; + + (connector.create_dir_all(key).unwrap()); + assert!(connector.head_object(key).unwrap()); + } + + #[rstest] + #[serial] + fn list_objects_recursive(connector: S3Connector) { + connector.create_dir_all("test/subdir1/").unwrap(); + connector.create_dir_all("test/subdir1/subdir2").unwrap(); + + let objects = connector.list_objects("", true).unwrap(); + assert_eq!(objects.len(), 3); + assert!(objects.contains(&"test/test.txt".to_string())); + assert!(objects.contains(&"test/subdir1/".to_string())); + assert!(objects.contains(&"test/subdir1/subdir2/".to_string())); + } + + #[rstest] + #[serial] + fn list_objects_non_recursive(connector: S3Connector) { + connector.create_dir_all("test/subdir1/").unwrap(); + connector.create_dir_all("test/subdir1/subdir2").unwrap(); + + let objects = connector.list_objects("", false).unwrap(); + assert_eq!(objects.len(), 1); + assert!(objects.contains(&"test/".to_string())); + } + + #[rstest] + #[serial] + fn rename_object(connector: S3Connector) { + let from = "test/uploaded_test.txt"; + let to = "test/renamed_test.txt"; + + (connector.rename_object(from, to).unwrap()); + assert!(!connector.head_object(from).unwrap()); + assert!(connector.head_object(to).unwrap()); + } + + #[rstest] + #[serial] + fn remove_object(connector: S3Connector) { + let key = "test/uploaded_test.txt"; + + (connector.remove_object(key).unwrap()); + assert!(!connector.head_object(key).unwrap()); + } + + #[rstest] + #[serial] + fn head_object(connector: S3Connector) { + let existing_key = "test/test.txt"; + let non_existing_key = "test/non_existing.txt"; + + assert!(connector.head_object(existing_key).unwrap()); + assert!(!connector.head_object(non_existing_key).unwrap()); + } + + #[fixture] + fn path() -> PathBuf { + tempdir().unwrap().keep() + } + + #[fixture] + fn connector(settings: RemoteBackendSettings) -> S3Connector { + let mut connector = S3Connector::new(settings); + connector.prefix = "ci/"; + + for key in connector.list_objects("", true).unwrap() { + connector + .remove_object(&key) + .expect("Failed to clean up S3 bucket"); + } + + let key = "test/test.txt"; + let src = tempdir().unwrap().keep().join("test.txt"); + fs::write(&src, b"This is a test file for download.\n").unwrap(); + connector + .upload_object(key, &src) + .expect("Failed to upload test file to S3"); + connector + } + + #[fixture] + fn settings() -> RemoteBackendSettings { + let mut env = env::Env::new(StdEnvGetter::default()); + RemoteBackendSettings { + connector_type: BackendType::S3, + cache_path: tempdir().unwrap().keep(), + bucket: env + .get_optional("MINIO_BUCKET") + .expect("MINIO_BUCKET must be set"), + region: None, + endpoint: Some( + env.get_optional("MINIO_ENDPOINT") + .unwrap_or("http://127.0.0.1:9000".to_string()), + ), + access_key: env + .get_optional("MINIO_ACCESS_KEY") + .unwrap_or("minioadmin".to_string()), + secret_key: env + .get_optional("MINIO_SECRET_KEY") + .unwrap_or("minioadmin".to_string()), + cache_size: 1000, + } + } + } +} diff --git a/reductstore/src/cfg.rs b/reductstore/src/cfg.rs index 38f96c1b1..6df3ef591 100644 --- a/reductstore/src/cfg.rs +++ b/reductstore/src/cfg.rs @@ -3,14 +3,18 @@ pub mod io; mod provision; +mod remote_storage; pub mod replication; use crate::api::Components; use crate::asset::asset_manager::create_asset_manager; use crate::auth::token_auth::TokenAuthorization; +use crate::backend::Backend; use crate::cfg::io::IoConfig; +use crate::cfg::remote_storage::RemoteStorageConfig; use crate::cfg::replication::ReplicationConfig; use crate::core::env::{Env, GetEnv}; +use crate::core::file_cache::FILE_CACHE; use crate::ext::ext_repository::create_ext_repository; use log::info; use reduct_base::error::ReductError; @@ -46,7 +50,8 @@ pub struct Cfg { pub replications: HashMap, pub io_conf: IoConfig, pub replication_conf: ReplicationConfig, - env: Env, + pub cs_config: RemoteStorageConfig, + pub env: Env, } impl Cfg { @@ -74,6 +79,7 @@ impl Cfg { replications: Self::parse_replications(&mut env), io_conf: Self::parse_io_config(&mut env), replication_conf: Self::parse_replication_config(&mut env, port), + cs_config: Self::parse_remote_storage_cfg(&mut env), env, }; @@ -91,6 +97,44 @@ impl Cfg { } pub fn build(&self) -> Result { + // Initialize storage backend + let mut backend_builder = Backend::builder() + .backend_type(self.cs_config.backend_type.clone()) + .local_data_path(&self.data_path) + .cache_size(self.cs_config.cache_size); + + if let Some(bucket) = &self.cs_config.bucket { + backend_builder = backend_builder.remote_bucket(bucket); + } + + if let Some(region) = &self.cs_config.region { + backend_builder = backend_builder.remote_region(region); + } + + if let Some(endpoint) = &self.cs_config.endpoint { + backend_builder = backend_builder.remote_endpoint(endpoint); + } + + if let Some(access_key) = &self.cs_config.access_key { + backend_builder = backend_builder.remote_access_key(access_key); + } + + if let Some(secret_key) = &self.cs_config.secret_key { + backend_builder = backend_builder.remote_secret_key(secret_key); + } + + if let Some(cache_path) = &self.cs_config.cache_path { + backend_builder = backend_builder.remote_cache_path(cache_path); + } + + FILE_CACHE.set_storage_backend(backend_builder.try_build().map_err(|e| { + internal_server_error!( + "Failed to initialize storage backend at {}: {}", + self.data_path, + e + ) + })?); + let storage = Arc::new(self.provision_buckets()); let token_repo = self.provision_tokens(); let console = create_asset_manager(load_console()); @@ -367,10 +411,55 @@ mod tests { assert_eq!(cfg.ext_path, Some("/tmp/ext".to_string())); } + #[cfg(feature = "fs-backend")] + #[rstest] + fn test_remote_storage_s3() { + // we cover only s3 parts here, filesystem is used as backend + let mut env_getter = MockEnvGetter::new(); + env_getter + .expect_get() + .with(eq("RS_DATA_PATH")) + .return_const(Ok("/tmp/data".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_BUCKET")) + .return_const(Ok("my-bucket".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_ENDPOINT")) + .return_const(Ok("https://s3.example.com".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_REGION")) + .return_const(Ok("us-east-1".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_ACCESS_KEY")) + .return_const(Ok("my-access-key".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_SECRET_KEY")) + .return_const(Ok("my-secret-key".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_CACHE_PATH")) + .return_const(Ok("/tmp/cache".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_CACHE_SIZE")) + .return_const(Ok("1073741824".to_string())); + env_getter + .expect_get() + .return_const(Err(VarError::NotPresent)); + env_getter.expect_all().returning(|| BTreeMap::new()); + let cfg = Cfg::from_env(env_getter); + cfg.build().unwrap(); + } + #[fixture] fn env_getter() -> MockEnvGetter { let mut mock_getter = MockEnvGetter::new(); mock_getter.expect_all().returning(|| BTreeMap::new()); - return mock_getter; + mock_getter } } diff --git a/reductstore/src/cfg/provision/bucket.rs b/reductstore/src/cfg/provision/bucket.rs index 050ff21ba..3bd06a6df 100644 --- a/reductstore/src/cfg/provision/bucket.rs +++ b/reductstore/src/cfg/provision/bucket.rs @@ -1,6 +1,7 @@ // Copyright 2025 ReductSoftware UG // Licensed under the Business Source License 1.1 +use crate::backend::BackendType; use crate::cfg::Cfg; use crate::core::env::{Env, GetEnv}; use crate::license::parse_license; @@ -15,7 +16,16 @@ use std::path::PathBuf; impl Cfg { pub(in crate::cfg) fn provision_buckets(&self) -> Storage { let license = parse_license(self.license_path.clone()); - let storage = Storage::load(PathBuf::from(self.data_path.clone()), license); + let data_path = if self.cs_config.backend_type == BackendType::Filesystem { + self.data_path.clone() + } else { + self.cs_config + .cache_path + .clone() + .expect("Cache path must be set for remote storage") + }; + + let storage = Storage::load(PathBuf::from(data_path), license); for (name, settings) in &self.buckets { let settings = match storage.create_bucket(&name, settings.clone()) { Ok(bucket) => { diff --git a/reductstore/src/cfg/remote_storage.rs b/reductstore/src/cfg/remote_storage.rs new file mode 100644 index 000000000..bebab7112 --- /dev/null +++ b/reductstore/src/cfg/remote_storage.rs @@ -0,0 +1,169 @@ +// Copyright 2025 ReductSoftware UG +// Licensed under the Business Source License 1.1 + +use crate::backend::BackendType; +use crate::cfg::Cfg; +use crate::core::env::{Env, GetEnv}; +use bytesize::ByteSize; + +/// Cloud storage settings +#[derive(Clone, Debug, PartialEq)] +pub struct RemoteStorageConfig { + pub backend_type: BackendType, + pub bucket: Option, + pub endpoint: Option, + pub region: Option, + pub access_key: Option, + pub secret_key: Option, + pub cache_path: Option, + pub cache_size: u64, +} + +impl Cfg { + pub(super) fn parse_remote_storage_cfg(env: &mut Env) -> RemoteStorageConfig { + let secret_key = env.get_masked("RS_REMOTE_SECRET_KEY", "".to_string()); + RemoteStorageConfig { + backend_type: env + .get_optional::("RS_REMOTE_BACKEND_TYPE") + .map(|s| match s.to_lowercase().as_str() { + #[cfg(feature = "s3-backend")] + "s3" => BackendType::S3, + #[cfg(feature = "fs-backend")] + _ => BackendType::Filesystem, + }) + .unwrap_or(BackendType::Filesystem), + bucket: env.get_optional::("RS_REMOTE_BUCKET"), + endpoint: env.get_optional::("RS_REMOTE_ENDPOINT"), + region: env.get_optional::("RS_REMOTE_REGION"), + access_key: env.get_optional::("RS_REMOTE_ACCESS_KEY"), + secret_key: if secret_key.is_empty() { + None + } else { + Some(secret_key) + }, + cache_path: env.get_optional::("RS_REMOTE_CACHE_PATH"), + cache_size: env + .get_optional::("RS_REMOTE_CACHE_SIZE") + .unwrap_or(ByteSize::gb(1)) + .as_u64(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cfg::tests::MockEnvGetter; + use mockall::predicate::eq; + use rstest::*; + use std::env::VarError; + + #[cfg(feature = "fs-backend")] + #[rstest] + fn test_default_remote_storage_cfg() { + let mut env_getter = MockEnvGetter::new(); + env_getter + .expect_get() + .with(eq("RS_REMOTE_BACKEND_TYPE")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_BUCKET")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_ENDPOINT")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_REGION")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_ACCESS_KEY")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_SECRET_KEY")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_CACHE_PATH")) + .return_const(Err(VarError::NotPresent)); + env_getter + .expect_get() + .with(eq("RS_REMOTE_CACHE_SIZE")) + .return_const(Err(VarError::NotPresent)); + + let mut env = Env::new(env_getter); + + let cfg = Cfg::parse_remote_storage_cfg(&mut env); + assert_eq!( + cfg, + RemoteStorageConfig { + backend_type: BackendType::Filesystem, + bucket: None, + endpoint: None, + region: None, + access_key: None, + secret_key: None, + cache_path: None, + cache_size: ByteSize::gb(1).as_u64(), + } + ); + } + + #[cfg(feature = "s3-backend")] + #[rstest] + fn test_custom_remote_storage_cfg() { + let mut env_getter = MockEnvGetter::new(); + env_getter + .expect_get() + .with(eq("RS_REMOTE_BACKEND_TYPE")) + .return_const(Ok("s3".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_BUCKET")) + .return_const(Ok("my-bucket".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_ENDPOINT")) + .return_const(Ok("https://s3.example.com".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_REGION")) + .return_const(Ok("us-west-1".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_ACCESS_KEY")) + .return_const(Ok("my-access-key".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_SECRET_KEY")) + .return_const(Ok("my-secret-key".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_CACHE_PATH")) + .return_const(Ok("/tmp/cache".to_string())); + env_getter + .expect_get() + .with(eq("RS_REMOTE_CACHE_SIZE")) + .return_const(Ok("2GB".to_string())); + let mut env = Env::new(env_getter); + + let cfg = Cfg::parse_remote_storage_cfg(&mut env); + assert_eq!( + cfg, + RemoteStorageConfig { + backend_type: BackendType::S3, + bucket: Some("my-bucket".to_string()), + endpoint: Some("https://s3.example.com".to_string()), + region: Some("us-west-1".to_string()), + access_key: Some("my-access-key".to_string()), + secret_key: Some("my-secret-key".to_string()), + cache_path: Some("/tmp/cache".to_string()), + cache_size: ByteSize::gb(2).as_u64(), + } + ); + } +} diff --git a/reductstore/src/core/cache.rs b/reductstore/src/core/cache.rs index 8eea94bf5..641805f7d 100644 --- a/reductstore/src/core/cache.rs +++ b/reductstore/src/core/cache.rs @@ -138,6 +138,10 @@ impl Cache { self.store.keys().collect() } + pub fn iter_mut(&mut self) -> impl Iterator { + self.store.iter_mut().map(|(k, v)| (k, &mut v.value)) + } + fn discard_old_descriptors(&mut self) -> Vec<(K, V)> { // remove old descriptors let mut removed = Vec::new(); diff --git a/reductstore/src/core/file_cache.rs b/reductstore/src/core/file_cache.rs index f15ad29f6..9ee3d27b8 100644 --- a/reductstore/src/core/file_cache.rs +++ b/reductstore/src/core/file_cache.rs @@ -1,20 +1,32 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 +use crate::backend::file::{AccessMode, File}; +use crate::backend::Backend; use crate::core::cache::Cache; +use log::{debug, warn}; use reduct_base::error::ReductError; use reduct_base::internal_server_error; -use std::fs::{remove_dir_all, remove_file, rename, File}; +use std::fs; use std::io::{Seek, SeekFrom}; use std::path::PathBuf; -use std::sync::{Arc, LazyLock, RwLock, Weak}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, LazyLock, RwLock, RwLockWriteGuard, Weak}; +use std::thread::spawn; use std::time::Duration; const FILE_CACHE_MAX_SIZE: usize = 1024; const FILE_CACHE_TIME_TO_LIVE: Duration = Duration::from_secs(60); -pub(crate) static FILE_CACHE: LazyLock = - LazyLock::new(|| FileCache::new(FILE_CACHE_MAX_SIZE, FILE_CACHE_TIME_TO_LIVE)); +const FILE_CACHE_SYNC_INTERVAL: Duration = Duration::from_millis(60_000); + +pub(crate) static FILE_CACHE: LazyLock = LazyLock::new(|| { + FileCache::new( + FILE_CACHE_MAX_SIZE, + FILE_CACHE_TIME_TO_LIVE, + FILE_CACHE_SYNC_INTERVAL, + ) +}); pub(crate) struct FileWeak { file: Weak>, @@ -22,41 +34,37 @@ pub(crate) struct FileWeak { } impl FileWeak { - fn new(file: FileRc, path: PathBuf) -> Self { + fn new(file: FileRc) -> Self { FileWeak { file: Arc::downgrade(&file), - path, + path: file.read().unwrap().path().clone(), } } pub fn upgrade(&self) -> Result { - self.file.upgrade().ok_or(internal_server_error!( + let file = self.file.upgrade().ok_or(internal_server_error!( "File descriptor for {:?} is no longer available", self.path - )) + ))?; + + // Notify storage backend that the file was accessed + file.read()?.access()?; + Ok(file) } } pub(crate) type FileRc = Arc>; -#[derive(PartialEq)] -enum AccessMode { - Read, - ReadWrite, -} - -struct FileDescriptor { - file_ref: FileRc, - mode: AccessMode, -} - /// A cache to keep file descriptors open /// /// This optimization is needed for network file systems because opening /// and closing files for writing causes synchronization overhead. -#[derive(Clone)] +/// +/// Additionally, it periodically syncs files to disk to ensure data integrity. pub(crate) struct FileCache { - cache: Arc>>, + cache: Arc>>, + stop_sync_worker: Arc, + backpack: Arc>, } impl FileCache { @@ -66,12 +74,83 @@ impl FileCache { /// /// * `max_size` - The maximum number of file descriptors to keep open /// * `ttl` - The time to live for a file descriptor - pub fn new(max_size: usize, ttl: Duration) -> Self { + /// * `sync_interval` - The interval to sync files from cache to disk + fn new(max_size: usize, ttl: Duration, sync_interval: Duration) -> Self { + let cache = Arc::new(RwLock::new(Cache::::new(max_size, ttl))); + let cache_clone = Arc::clone(&cache); + let stop_sync_worker = Arc::new(AtomicBool::new(false)); + let stop_sync_worker_clone = Arc::clone(&stop_sync_worker); + let backpack = Arc::new(RwLock::new(Backend::default())); + let backpack_clone = Arc::clone(&backpack); + + spawn(move || { + // Periodically sync files from cache to disk + while !stop_sync_worker.load(Ordering::Relaxed) { + std::thread::sleep(Duration::from_millis(100)); + Self::sync_rw_and_unused_files(&backpack_clone, &cache, sync_interval, false); + } + }); + FileCache { - cache: Arc::new(RwLock::new(Cache::new(max_size, ttl))), + cache: cache_clone, + stop_sync_worker: stop_sync_worker_clone, + backpack, } } + fn sync_rw_and_unused_files( + backend: &Arc>, + cache: &Arc>>, + sync_interval: Duration, + force: bool, + ) { + let mut cache = cache.write().unwrap(); + let invalidated_files = backend.read().unwrap().invalidate_locally_cached_files(); + for path in invalidated_files { + if let Some(file) = cache.remove(&path) { + if let Err(err) = file.write().unwrap().sync_all() { + warn!("Failed to sync invalidated file {:?}: {}", path, err); + } + } + + fs::remove_file(&path).ok(); + debug!("Removed invalidated file {:?} from cache and storage", path); + } + + for (path, file) in cache.iter_mut() { + let mut file_lock = if force { + // force sync, we need to get a write lock and wait for it + file.write().unwrap() + } else { + // if the file is used by other threads, sync it next time + let Some(file) = file.try_write().ok() else { + continue; + }; + file + }; + + // Sync only writeable files that are not synced yet + // and are not used by other threads + if file_lock.mode() != &AccessMode::ReadWrite + || Arc::strong_count(&file) > 1 + || Arc::weak_count(&file) > 0 + || file_lock.is_synced() + || file_lock.last_synced().elapsed() < sync_interval + { + continue; + } + + if let Err(err) = file_lock.sync_all() { + warn!("Failed to sync file {}: {}", path.display(), err); + continue; + } + } + } + + pub fn set_storage_backend(&self, backpack: Backend) { + *self.backpack.write().unwrap() = backpack; + } + /// Get a file descriptor for reading /// /// If the file is not in the cache, it will be opened and added to the cache. @@ -86,25 +165,23 @@ impl FileCache { /// A file reference pub fn read(&self, path: &PathBuf, pos: SeekFrom) -> Result { let mut cache = self.cache.write()?; - let file = if let Some(desc) = cache.get_mut(path) { - Arc::clone(&desc.file_ref) - } else { - let file = File::options().read(true).open(path)?; + let open_file = |cache| -> Result { + let file = self.backpack.read()?.open_options().read(true).open(path)?; let file = Arc::new(RwLock::new(file)); - cache.insert( - path.clone(), - FileDescriptor { - file_ref: Arc::clone(&file), - mode: AccessMode::Read, - }, - ); - file + Self::save_in_cache_and_sync_discarded(path.clone(), cache, &file); + Ok(file) + }; + + let file = if let Some(file) = cache.get_mut(path) { + Arc::clone(&file) + } else { + open_file(&mut cache)? }; if pos != SeekFrom::Current(0) { file.write()?.seek(pos)?; } - Ok(FileWeak::new(file, path.clone())) + Ok(FileWeak::new(file)) } /// Get a file descriptor for writing @@ -122,38 +199,36 @@ impl FileCache { /// A file reference pub fn write_or_create(&self, path: &PathBuf, pos: SeekFrom) -> Result { let mut cache = self.cache.write()?; - - let file = if let Some(desc) = cache.get_mut(path) { - if desc.mode == AccessMode::ReadWrite { - Arc::clone(&desc.file_ref) - } else { - let rw_file = File::options().write(true).read(true).open(path)?; - desc.file_ref = Arc::new(RwLock::new(rw_file)); - desc.mode = AccessMode::ReadWrite; - - Arc::clone(&desc.file_ref) - } - } else { - let file = File::options() - .create(true) + let open_file = |cache, create| -> Result { + let file = self + .backpack + .read()? + .open_options() + .create(create) .write(true) .read(true) .open(path)?; let file = Arc::new(RwLock::new(file)); - cache.insert( - path.clone(), - FileDescriptor { - file_ref: Arc::clone(&file), - mode: AccessMode::ReadWrite, - }, - ); - file + Self::save_in_cache_and_sync_discarded(path.clone(), cache, &file); + Ok(file) + }; + + let file = if let Some(file) = cache.get_mut(path).cloned() { + let lock = file.read()?; + if lock.mode() == &AccessMode::ReadWrite { + Arc::clone(&file) + } else { + // file was opened in read-only mode, we need to reopen it in read-write mode + open_file(&mut cache, false)? + } + } else { + open_file(&mut cache, true)? }; if pos != SeekFrom::Current(0) { file.write()?.seek(pos)?; } - Ok(FileWeak::new(file, path.clone())) + Ok(FileWeak::new(file)) } /// Removes a file from the file system and the cache. @@ -178,10 +253,7 @@ impl FileCache { pub fn remove(&self, path: &PathBuf) -> Result<(), ReductError> { let mut cache = self.cache.write()?; cache.remove(path); - - if path.try_exists()? { - remove_file(path)?; - } + self.backpack.read()?.remove(path)?; Ok(()) } @@ -189,12 +261,27 @@ impl FileCache { pub fn remove_dir(&self, path: &PathBuf) -> Result<(), ReductError> { self.discard_recursive(path)?; if path.try_exists()? { - remove_dir_all(path)?; + self.backpack.read()?.remove_dir_all(path)?; } Ok(()) } + pub fn create_dir_all(&self, path: &PathBuf) -> Result<(), ReductError> { + self.backpack.read()?.create_dir_all(path)?; + Ok(()) + } + + pub fn read_dir(&self, path: &PathBuf) -> Result, ReductError> { + Ok(self.backpack.read()?.read_dir(path)?) + } + + /// Discards all files in the cache that are under the specified path. + /// + /// This function iterates through the cache and removes all file descriptors + /// whose paths start with the specified `path`. If a file is in read-write mode + /// and has not been synced, it attempts to sync the file before removing it from the cache. + /// pub fn discard_recursive(&self, path: &PathBuf) -> Result<(), ReductError> { let mut cache = self.cache.write()?; let files_to_remove = cache @@ -203,9 +290,19 @@ impl FileCache { .filter(|file_path| file_path.starts_with(path)) .map(|file_path| (*file_path).clone()) .collect::>(); + for file_path in files_to_remove { - cache.remove(&file_path); + if let Some(file) = cache.remove(&file_path) { + // If the file is in read-write mode and not synced, we need to sync it before removing from cache + let mut lock = file.write()?; + if lock.mode() == &AccessMode::ReadWrite && !lock.is_synced() { + if let Err(err) = lock.sync_all() { + warn!("Failed to sync file {}: {}", file_path.display(), err); + } + } + } } + Ok(()) } @@ -225,9 +322,41 @@ impl FileCache { pub fn rename(&self, old_path: &PathBuf, new_path: &PathBuf) -> Result<(), ReductError> { let mut cache = self.cache.write()?; cache.remove(old_path); - rename(old_path, new_path)?; + self.backpack.read()?.rename(old_path, new_path)?; Ok(()) } + + pub fn try_exists(&self, path: &PathBuf) -> Result { + let backpack = self.backpack.read()?; + Ok(backpack.try_exists(path)?) + } + + pub fn force_sync_all(&self) { + Self::sync_rw_and_unused_files(&self.backpack, &self.cache, Duration::from_secs(0), true); + } + + /// Saves a file descriptor in the cache and syncs any discarded files. + /// + /// We need to make sure that we sync all files that were discarded + fn save_in_cache_and_sync_discarded( + path: PathBuf, + cache: &mut RwLockWriteGuard>, + file: &Arc>, + ) -> () { + let discarded_files = cache.insert(path.clone(), Arc::clone(file)); + + for (_, file) in discarded_files { + if let Err(err) = file.write().unwrap().sync_all() { + warn!("Failed to sync file {:?}: {}", path, err); + } + } + } +} + +impl Drop for FileCache { + fn drop(&mut self) { + self.stop_sync_worker.store(true, Ordering::Relaxed); + } } #[cfg(test)] @@ -383,9 +512,89 @@ mod tests { assert_eq!(tmp_dir.exists(), false); } + mod sync_rw_and_unused_files { + use super::*; + + #[rstest] + fn test_sync_unused_files(cache: FileCache, tmp_dir: PathBuf) { + let file_path = tmp_dir.join("test_sync_rw_and_unused_files.txt"); + { + let file_ref = cache + .write_or_create(&file_path, SeekFrom::Start(0)) + .unwrap() + .upgrade() + .unwrap(); + file_ref.write().unwrap().write_all(b"test").unwrap(); + + assert!( + !cache + .cache + .write() + .unwrap() + .get(&file_path) + .unwrap() + .read() + .unwrap() + .is_synced(), + "File should not be synced initially" + ); + } + + // Wait for the sync worker to run + sleep(Duration::from_millis(250)); + + assert!( + cache + .cache + .write() + .unwrap() + .get(&file_path) + .unwrap() + .read() + .unwrap() + .is_synced(), + "File should be synced after sync worker runs" + ); + } + + #[rstest] + fn test_not_sync_used_files(cache: FileCache, tmp_dir: PathBuf) { + let file_path = tmp_dir.join("test_not_sync_unused_files.txt"); + let file_ref = cache + .write_or_create(&file_path, SeekFrom::Start(0)) + .unwrap() + .upgrade() + .unwrap(); + file_ref.write().unwrap().write_all(b"test").unwrap(); + + // Wait for the sync worker to run + sleep(Duration::from_millis(250)); + + assert!( + !cache + .cache + .write() + .unwrap() + .get(&file_path) + .unwrap() + .read() + .unwrap() + .is_synced(), + "File should not be synced after sync worker runs" + ); + } + } + #[fixture] fn cache() -> FileCache { - FileCache::new(2, Duration::from_millis(100)) + let cache = FileCache::new(2, Duration::from_millis(100), Duration::from_millis(100)); + cache.set_storage_backend( + Backend::builder() + .local_data_path(tempfile::tempdir().unwrap().keep().to_str().unwrap()) + .try_build() + .unwrap(), + ); + cache } #[fixture] diff --git a/reductstore/src/lib.rs b/reductstore/src/lib.rs index 58d502f60..9802e414b 100644 --- a/reductstore/src/lib.rs +++ b/reductstore/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 pub mod api; pub mod asset; @@ -9,3 +9,5 @@ pub mod ext; mod license; pub mod replication; pub mod storage; + +pub(crate) mod backend; diff --git a/reductstore/src/main.rs b/reductstore/src/main.rs index cd8ef9d4c..21fc46a1c 100644 --- a/reductstore/src/main.rs +++ b/reductstore/src/main.rs @@ -117,8 +117,7 @@ async fn launch_server() { }; } -// IMPORTANT: Use only current_thread runtime for the main function, we have deadlocks with multi-threaded runtime -#[tokio::main(flavor = "current_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() { launch_server().await; } @@ -231,8 +230,12 @@ mod tests { #[rstest] #[tokio::test] async fn test_shutdown() { + let data_path = tempdir().unwrap().keep(); + env::set_var("RS_DATA_PATH", data_path.to_str().unwrap()); + let handle = Handle::new(); - let storage = Arc::new(Storage::load(tempdir().unwrap().keep(), None)); + let cfg = Cfg::from_env(StdEnvGetter::default()); // init file cache + let storage = cfg.build().unwrap().storage; shutdown_app(handle.clone(), storage.clone()); } diff --git a/reductstore/src/replication/replication_repository.rs b/reductstore/src/replication/replication_repository.rs index aeb501676..7f5aa41e4 100644 --- a/reductstore/src/replication/replication_repository.rs +++ b/reductstore/src/replication/replication_repository.rs @@ -1,8 +1,9 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use crate::cfg::replication::ReplicationConfig; use crate::cfg::DEFAULT_PORT; +use crate::core::file_cache::FILE_CACHE; use crate::replication::proto::replication_repo::Item; use crate::replication::proto::{ Label as ProtoLabel, ReplicationRepo as ProtoReplicationRepo, @@ -21,6 +22,8 @@ use reduct_base::msg::replication_api::{ }; use reduct_base::{not_found, unprocessable_entity}; use std::collections::HashMap; +use std::io::SeekFrom::Start; +use std::io::{Read, Write}; use std::path::PathBuf; use std::sync::Arc; use url::Url; @@ -215,13 +218,25 @@ impl ReplicationRepository { config, }; - match std::fs::read(&repo.repo_path) { - Ok(data) => { + let read_conf_file = || { + let lock = FILE_CACHE + .write_or_create(&repo.repo_path, Start(0)) + .expect("Failed to create or open replication repository file") + .upgrade() + .unwrap(); + + let mut buf = Vec::new(); + lock.write().unwrap().read_to_end(&mut buf)?; + Ok::, ReductError>(buf) + }; + + match read_conf_file() { + Ok(buf) => { debug!( "Reading replication repository from {}", repo.repo_path.as_os_str().to_str().unwrap_or("...") ); - let proto_repo = ProtoReplicationRepo::decode(&mut Bytes::from(data)) + let proto_repo = ProtoReplicationRepo::decode(&mut Bytes::from(buf)) .expect("Error decoding replication repository"); for item in proto_repo.replications { if let Err(err) = @@ -231,14 +246,7 @@ impl ReplicationRepository { } } } - Err(_) => { - debug!( - "Creating a new token repository {}", - repo.repo_path.as_os_str().to_str().unwrap_or("...") - ); - repo.save_repo() - .expect("Failed to create a new token repository"); - } + Err(_) => {} } repo } @@ -260,13 +268,16 @@ impl ReplicationRepository { .encode(&mut buf) .expect("Error encoding replication repository"); - std::fs::write(&self.repo_path, buf).map_err(|e| { - ReductError::internal_server_error(&format!( - "Failed to write replication repository to {}: {}", - self.repo_path.as_os_str().to_str().unwrap_or("..."), - e - )) - }) + let lock = FILE_CACHE + .write_or_create(&self.repo_path, Start(0))? + .upgrade()?; + + let mut file = lock.write()?; + file.set_len(0)?; + file.write_all(&buf)?; + file.sync_all()?; + + Ok(()) } fn create_or_update_replication_task( diff --git a/reductstore/src/replication/replication_sender.rs b/reductstore/src/replication/replication_sender.rs index 051d4d00a..efe8142f8 100644 --- a/reductstore/src/replication/replication_sender.rs +++ b/reductstore/src/replication/replication_sender.rs @@ -473,7 +473,7 @@ mod tests { .begin_write( "test", 20, - MAX_IO_BUFFER_SIZE * CHANNEL_BUFFER_SIZE + 1, + (MAX_IO_BUFFER_SIZE * CHANNEL_BUFFER_SIZE + 1) as u64, "".to_string(), Labels::new(), ) @@ -562,7 +562,7 @@ mod tests { let sender = build_sender(remote_bucket, settings); let transaction = Transaction::WriteRecord(10); - imitate_write_record(&sender, &transaction, (MAX_PAYLOAD_SIZE + 1) as usize); + imitate_write_record(&sender, &transaction, MAX_PAYLOAD_SIZE + 1); assert_eq!(sender.run(), SyncState::SyncedOrRemoved); assert!( @@ -583,7 +583,7 @@ mod tests { assert_eq!(diagnostics.errored, 0, "no errors happened"); } - fn imitate_write_record(sender: &ReplicationSender, transaction: &Transaction, size: usize) { + fn imitate_write_record(sender: &ReplicationSender, transaction: &Transaction, size: u64) { sender .log_map .read() @@ -632,7 +632,8 @@ mod tests { let log_map = Arc::new(RwLock::new(HashMap::new())); let log = RwLock::new( - TransactionLog::try_load_or_create(storage.data_path().join("test.log"), 1000).unwrap(), + TransactionLog::try_load_or_create(&storage.data_path().join("test.log"), 1000) + .unwrap(), ); log_map.write().unwrap().insert("test".to_string(), log); diff --git a/reductstore/src/replication/replication_task.rs b/reductstore/src/replication/replication_task.rs index c3e2696b6..087428da7 100644 --- a/reductstore/src/replication/replication_task.rs +++ b/reductstore/src/replication/replication_task.rs @@ -1,4 +1,4 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use std::collections::HashMap; @@ -131,10 +131,25 @@ impl ReplicationTask { &entry.name, &replication_name, ); - let log = TransactionLog::try_load_or_create( - path, + let log = match TransactionLog::try_load_or_create( + &path, thr_system_options.transaction_log_size, - )?; + ) { + Ok(log) => log, + Err(err) => { + error!( + "Failed to load transaction log for entry '{}': {:?}", + entry.name, err + ); + info!("Creating a new transaction log for entry '{}'", entry.name); + FILE_CACHE.remove(&path)?; + TransactionLog::try_load_or_create( + &path, + thr_system_options.transaction_log_size, + )? + } + }; + logs.insert(entry.name, RwLock::new(log)); } @@ -183,7 +198,7 @@ impl ReplicationTask { info!("Creating a new transaction log: {:?}", path); match TransactionLog::try_load_or_create( - path, + &path, thr_system_options.transaction_log_size, ) { Ok(log) => { @@ -240,7 +255,7 @@ impl ReplicationTask { map.insert( entry_name.clone(), RwLock::new(TransactionLog::try_load_or_create( - Self::build_path_to_transaction_log( + &Self::build_path_to_transaction_log( self.storage.data_path(), &self.settings.src_bucket, &entry_name, @@ -329,9 +344,9 @@ impl Drop for ReplicationTask { #[cfg(test)] mod tests { use super::*; - use std::fs; - use bytes::Bytes; + use std::fs; + use std::io::Write; use crate::core::file_cache::FILE_CACHE; use mockall::mock; @@ -361,8 +376,12 @@ mod tests { } #[rstest] - fn test_transaction_log_init(remote_bucket: MockRmBucket, settings: ReplicationSettings) { - let replication = build_replication(remote_bucket, settings); + fn test_transaction_log_init( + remote_bucket: MockRmBucket, + settings: ReplicationSettings, + path: PathBuf, + ) { + let replication = build_replication(path, remote_bucket, settings); assert_eq!(replication.log_map.read().unwrap().len(), 2); assert!( replication.log_map.read().unwrap().contains_key("test1"), @@ -374,17 +393,58 @@ mod tests { ); } + #[rstest] + fn test_transaction_log_init_err( + remote_bucket: MockRmBucket, + settings: ReplicationSettings, + path: PathBuf, + ) { + { + // create a broken transaction log + let log_path = ReplicationTask::build_path_to_transaction_log( + &path, + &settings.src_bucket, + "test1", + &"test".to_string(), + ); + + fs::create_dir_all(log_path.parent().unwrap()).unwrap(); + let mut log_file = fs::File::create(&log_path).unwrap(); + + log_file + .write_all(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + .unwrap(); + } + + let replication = build_replication(path, remote_bucket, settings); + assert_eq!(replication.log_map.read().unwrap().len(), 2); + let log_len = replication + .log_map + .read() + .unwrap() + .get("test1") + .unwrap() + .read() + .unwrap() + .len(); + assert_eq!( + log_len, 0, + "Task recreated a new transaction log for 'test1' after broken log" + ); + } + #[rstest] fn test_add_new_entry( mut remote_bucket: MockRmBucket, mut notification: TransactionNotification, settings: ReplicationSettings, + path: PathBuf, ) { remote_bucket .expect_write_batch() .returning(|_, _| Ok(ErrorRecordMap::new())); remote_bucket.expect_is_active().return_const(true); - let mut replication = build_replication(remote_bucket, settings.clone()); + let mut replication = build_replication(path, remote_bucket, settings.clone()); notification.entry = "new_entry".to_string(); fs::create_dir_all( @@ -415,12 +475,13 @@ mod tests { mut remote_bucket: MockRmBucket, notification: TransactionNotification, settings: ReplicationSettings, + path: PathBuf, ) { remote_bucket .expect_write_batch() .returning(|_, _| Ok(ErrorRecordMap::new())); remote_bucket.expect_is_active().return_const(true); - let mut replication = build_replication(remote_bucket, settings); + let mut replication = build_replication(path, remote_bucket, settings); replication.notify(notification).unwrap(); sleep(Duration::from_millis(100)); @@ -451,12 +512,13 @@ mod tests { mut remote_bucket: MockRmBucket, notification: TransactionNotification, settings: ReplicationSettings, + path: PathBuf, ) { remote_bucket .expect_write_batch() .returning(|_, _| Ok(ErrorRecordMap::new())); remote_bucket.expect_is_active().return_const(false); - let mut replication = build_replication(remote_bucket, settings); + let mut replication = build_replication(path, remote_bucket, settings); replication.notify(notification).unwrap(); sleep(Duration::from_millis(100)); @@ -487,6 +549,7 @@ mod tests { mut notification: TransactionNotification, mut remote_bucket: MockRmBucket, settings: ReplicationSettings, + path: PathBuf, ) { remote_bucket .expect_write_batch() @@ -497,7 +560,7 @@ mod tests { ..settings }; - let mut replication = build_replication(MockRmBucket::new(), settings.clone()); + let mut replication = build_replication(path, MockRmBucket::new(), settings.clone()); let mut time = 10; for entry in &["test1", "test2"] { @@ -525,12 +588,13 @@ mod tests { mut remote_bucket: MockRmBucket, notification: TransactionNotification, settings: ReplicationSettings, + path: PathBuf, ) { remote_bucket .expect_write_batch() .return_const(Ok(ErrorRecordMap::new())); remote_bucket.expect_is_active().return_const(true); - let mut replication = build_replication(remote_bucket, settings.clone()); + let mut replication = build_replication(path, remote_bucket, settings.clone()); replication.notify(notification.clone()).unwrap(); let path = ReplicationTask::build_path_to_transaction_log( @@ -554,12 +618,13 @@ mod tests { mut remote_bucket: MockRmBucket, notification: TransactionNotification, settings: ReplicationSettings, + path: PathBuf, ) { remote_bucket .expect_write_batch() .return_const(Ok(ErrorRecordMap::new())); remote_bucket.expect_is_active().return_const(true); - let mut replication = build_replication(remote_bucket, settings.clone()); + let mut replication = build_replication(path, remote_bucket, settings.clone()); replication.notify(notification.clone()).unwrap(); let path = ReplicationTask::build_path_to_transaction_log( @@ -590,12 +655,11 @@ mod tests { } fn build_replication( + path: PathBuf, remote_bucket: MockRmBucket, settings: ReplicationSettings, ) -> ReplicationTask { - let tmp_dir = tempfile::tempdir().unwrap().keep(); - - let storage = Arc::new(Storage::load(tmp_dir, None)); + let storage = Arc::new(Storage::load(path, None)); let bucket = storage .create_bucket("src", BucketSettings::default()) @@ -667,6 +731,11 @@ mod tests { } } + #[fixture] + fn path() -> PathBuf { + tempfile::tempdir().unwrap().keep() + } + fn transaction_log_is_empty(replication: &ReplicationTask) -> bool { sleep(Duration::from_millis(50)); sleep(Duration::from_millis(50)); diff --git a/reductstore/src/replication/transaction_log.rs b/reductstore/src/replication/transaction_log.rs index b73e20a3b..6b94be273 100644 --- a/reductstore/src/replication/transaction_log.rs +++ b/reductstore/src/replication/transaction_log.rs @@ -1,9 +1,9 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use crate::core::file_cache::FILE_CACHE; use crate::replication::Transaction; -use log::{debug, error, warn}; +use log::{debug, warn}; use reduct_base::error::ReductError; use reduct_base::internal_server_error; use std::io::{Read, Seek, SeekFrom, Write}; @@ -28,28 +28,6 @@ pub(super) struct TransactionLog { const HEADER_SIZE: usize = 16; const ENTRY_SIZE: usize = 9; -impl Drop for TransactionLog { - fn drop(&mut self) { - // Flush and sync the file in a separate thread. - let file = FILE_CACHE.write_or_create(&self.file_path, SeekFrom::Current(0)); - if file.is_err() { - error!("Failed to open transaction log: {}", file.err().unwrap()); - return; - } - - let file = file.unwrap().upgrade().unwrap(); - let sync = || { - let mut file = file.write()?; - file.flush()?; - file.sync_all()?; - - Ok::<(), ReductError>(()) - }; - - sync().map_or_else(|e| error!("Failed to sync transaction log: {}", e), |r| r); - } -} - impl TransactionLog { /// Create a new transaction log or load an existing one. /// @@ -61,7 +39,7 @@ impl TransactionLog { /// # Returns /// /// A new transaction log instance or an error. - pub fn try_load_or_create(path: PathBuf, capacity: usize) -> Result { + pub fn try_load_or_create(path: &PathBuf, capacity: usize) -> Result { let init_capacity_in_bytes = capacity * ENTRY_SIZE + HEADER_SIZE; let instance = if !path.try_exists()? { @@ -77,7 +55,7 @@ impl TransactionLog { file.sync_all()?; Self { - file_path: path, + file_path: path.clone(), capacity_in_bytes: init_capacity_in_bytes, write_pos: HEADER_SIZE, read_pos: HEADER_SIZE, @@ -94,6 +72,15 @@ impl TransactionLog { }; let write_pos = u64::from_be_bytes(buf[0..8].try_into().unwrap()) as usize; let read_pos = u64::from_be_bytes(buf[8..16].try_into().unwrap()) as usize; + + Self::integrity_check( + path, + init_capacity_in_bytes, + capacity_in_bytes, + write_pos, + read_pos, + )?; + let capacity_in_bytes = if init_capacity_in_bytes != capacity_in_bytes { // If the capacity is changed, we need to check if the log is empty // then we can change the capacity. @@ -105,7 +92,7 @@ impl TransactionLog { let file = FILE_CACHE .write_or_create(&path, SeekFrom::Start(0))? .upgrade()?; - let file = file.write()?; + let mut file = file.write()?; file.set_len(init_capacity_in_bytes as u64)?; init_capacity_in_bytes @@ -118,7 +105,7 @@ impl TransactionLog { }; Self { - file_path: path, + file_path: path.to_path_buf(), capacity_in_bytes, write_pos, read_pos, @@ -128,6 +115,44 @@ impl TransactionLog { Ok(instance) } + fn integrity_check( + path: &PathBuf, + init_capacity_in_bytes: usize, + capacity_in_bytes: usize, + write_pos: usize, + read_pos: usize, + ) -> Result<(), ReductError> { + if init_capacity_in_bytes < HEADER_SIZE + || (capacity_in_bytes - HEADER_SIZE) % ENTRY_SIZE != 0 + { + return Err(internal_server_error!( + "Invalid size {} of transaction log {}", + capacity_in_bytes, + path.to_str().unwrap_or("unknown path") + )); + } + + let check_pos = |pos: usize, name: &str| { + if pos < HEADER_SIZE + || pos >= capacity_in_bytes + || (pos - HEADER_SIZE) % ENTRY_SIZE != 0 + { + return Err(internal_server_error!( + "Invalid {} position {} in transaction log {}", + name, + pos, + path.to_str().unwrap_or("unknown path") + )); + } + Ok(()) + }; + + check_pos(write_pos, "write")?; + check_pos(read_pos, "read")?; + + Ok(()) + } + /// Push a new transaction to the log. /// /// # Arguments @@ -271,13 +296,13 @@ mod tests { #[rstest] fn test_new_transaction_log(path: PathBuf) { - let transaction_log = TransactionLog::try_load_or_create(path, 100).unwrap(); + let transaction_log = TransactionLog::try_load_or_create(&path, 100).unwrap(); assert_eq!(transaction_log.is_empty(), true); } #[rstest] fn test_write_read_transaction_log(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path, 100).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 100).unwrap(); assert_eq!( transaction_log .push_back(Transaction::WriteRecord(1)) @@ -308,7 +333,7 @@ mod tests { #[rstest] fn test_write_broken_type(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path.clone(), 100).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 100).unwrap(); assert_eq!( transaction_log .push_back(Transaction::WriteRecord(1)) @@ -337,7 +362,7 @@ mod tests { #[rstest] fn test_out_of_range(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path, 100).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 100).unwrap(); assert_eq!( transaction_log .push_back(Transaction::WriteRecord(1)) @@ -369,7 +394,7 @@ mod tests { #[rstest] fn test_overflow(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path, 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); for i in 1..5 { transaction_log .push_back(Transaction::WriteRecord(i)) @@ -385,14 +410,14 @@ mod tests { #[rstest] fn test_recovery(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path.clone(), 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); for i in 1..5 { transaction_log .push_back(Transaction::WriteRecord(i)) .unwrap(); } - let mut transaction_log = TransactionLog::try_load_or_create(path, 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); assert_eq!(transaction_log.len(), 2); assert_eq!( transaction_log.front(2).unwrap(), @@ -405,27 +430,27 @@ mod tests { #[rstest] fn test_recovery_init(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path.clone(), 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); transaction_log .push_back(Transaction::WriteRecord(1)) .unwrap(); drop(transaction_log); - let transaction_log = TransactionLog::try_load_or_create(path, 3).unwrap(); + let transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); assert_eq!(transaction_log.write_pos, HEADER_SIZE + ENTRY_SIZE); assert_eq!(transaction_log.read_pos, HEADER_SIZE); } #[rstest] fn test_recovery_empty_cache(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path.clone(), 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); transaction_log .push_back(Transaction::WriteRecord(1)) .unwrap(); FILE_CACHE.discard_recursive(&path).unwrap(); // discard the cache to simulate restart - let mut transaction_log = TransactionLog::try_load_or_create(path, 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); // check if the transaction log is still working after cache discard assert_eq!( @@ -438,13 +463,13 @@ mod tests { #[rstest] fn test_resize_empty_log(path: PathBuf) { - TransactionLog::try_load_or_create(path.clone(), 3).unwrap(); + TransactionLog::try_load_or_create(&path, 3).unwrap(); assert_eq!( fs::metadata(&path).unwrap().len() as usize, ENTRY_SIZE * 3 + HEADER_SIZE ); - TransactionLog::try_load_or_create(path.clone(), 5).unwrap(); + TransactionLog::try_load_or_create(&path, 5).unwrap(); assert_eq!( fs::metadata(&path).unwrap().len() as usize, ENTRY_SIZE * 5 + HEADER_SIZE @@ -453,7 +478,7 @@ mod tests { #[rstest] fn test_resize_non_empty_log(path: PathBuf) { - let mut transaction_log = TransactionLog::try_load_or_create(path.clone(), 3).unwrap(); + let mut transaction_log = TransactionLog::try_load_or_create(&path, 3).unwrap(); transaction_log .push_back(Transaction::WriteRecord(1)) .unwrap(); @@ -462,7 +487,7 @@ mod tests { ENTRY_SIZE * 3 + HEADER_SIZE ); - TransactionLog::try_load_or_create(path.clone(), 5).unwrap(); + TransactionLog::try_load_or_create(&path, 5).unwrap(); assert_eq!( fs::metadata(&path).unwrap().len() as usize, ENTRY_SIZE * 3 + HEADER_SIZE, @@ -470,6 +495,69 @@ mod tests { ); } + mod integrity_tests { + use super::*; + + #[rstest] + #[case([0, 0, 0, 0, 0, 0, 0, 17, 0, 0, 0, 0, 0, 0, 0, 16], "Invalid write position 17 in transaction log" + )] + #[case([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 16], "Invalid write position 1 in transaction log" + )] + #[case([0, 0, 0, 0, 0, 0, 0, 34, 0, 0, 0, 0, 0, 0, 0, 16], "Invalid write position 34 in transaction log" + )] + #[case([0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 17], "Invalid read position 17 in transaction log" + )] + #[case([0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 1], "Invalid read position 1 in transaction log" + )] + #[case([0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 34], "Invalid read position 34 in transaction log" + )] + fn test_invalid_position( + #[case] buf: [u8; 16], + #[case] expected_error: &str, + path: PathBuf, + ) { + { + let file = FILE_CACHE + .write_or_create(&path, SeekFrom::Start(0)) + .unwrap() + .upgrade() + .unwrap(); + let mut file = file.write().unwrap(); + file.write_all(&buf).unwrap(); + file.set_len((HEADER_SIZE + ENTRY_SIZE * 2) as u64).unwrap(); + } + + let result = TransactionLog::try_load_or_create(&path, 3); + assert_eq!( + result.err().unwrap(), + internal_server_error!("{} {}", expected_error, path.to_str().unwrap()) + ); + } + + #[rstest] + fn test_invalid_size(path: PathBuf) { + { + let file = FILE_CACHE + .write_or_create(&path, SeekFrom::Start(0)) + .unwrap() + .upgrade() + .unwrap(); + let mut file = file.write().unwrap(); + file.write_all(&[0u8; 16]).unwrap(); + file.set_len((HEADER_SIZE + 1) as u64).unwrap(); + } + + let result = TransactionLog::try_load_or_create(&path, 3); + assert_eq!( + result.err().unwrap(), + internal_server_error!( + "Invalid size 17 of transaction log {}", + path.to_str().unwrap() + ) + ); + } + } + #[fixture] fn path() -> PathBuf { let path = tempdir().unwrap().keep().join("transaction_log"); diff --git a/reductstore/src/storage/block_manager.rs b/reductstore/src/storage/block_manager.rs index d60fa75e1..ab7e91f60 100644 --- a/reductstore/src/storage/block_manager.rs +++ b/reductstore/src/storage/block_manager.rs @@ -204,7 +204,7 @@ impl BlockManager { let file = FILE_CACHE .write_or_create(&self.path_to_data(block_id), SeekFrom::Start(0))? .upgrade()?; - let file = file.write()?; + let mut file = file.write()?; file.set_len(max_block_size)?; } @@ -233,14 +233,14 @@ impl BlockManager { let file = FILE_CACHE .write_or_create(&path, SeekFrom::Current(0))? .upgrade()?; - let data_block = file.write()?; + let mut data_block = file.write()?; data_block.set_len(block.size())?; data_block.sync_all()?; let file = FILE_CACHE .write_or_create(&self.path_to_desc(block.block_id()), SeekFrom::Current(0))? .upgrade()?; - let descr_block = file.write()?; + let mut descr_block = file.write()?; descr_block.sync_all()?; Ok(()) @@ -264,11 +264,14 @@ impl BlockManager { self.block_cache.remove(&block_id); - let path = self.path_to_data(block_id); - FILE_CACHE.remove(&path)?; + let data_block_path = self.path_to_data(block_id); + FILE_CACHE.remove(&data_block_path)?; - let path = self.path_to_desc(block_id); - FILE_CACHE.remove(&path)?; + let desc_block_path = self.path_to_desc(block_id); + if FILE_CACHE.try_exists(&desc_block_path)? { + // it can be still in WAL only + FILE_CACHE.remove(&desc_block_path)?; + } self.wal.remove(block_id)?; Ok(()) @@ -608,6 +611,7 @@ mod tests { use reduct_base::error::ErrorCode; use rstest::{fixture, rstest}; + use crate::backend::Backend; use crate::storage::entry::RecordWriter; use crate::storage::storage::MAX_IO_BUFFER_SIZE; use rand::distr::Alphanumeric; @@ -1074,6 +1078,12 @@ mod tests { #[fixture] fn block_manager(block_id: u64) -> BlockManager { let path = tempdir().unwrap().keep().join("bucket").join("entry"); + FILE_CACHE.set_storage_backend( + Backend::builder() + .local_data_path(path.to_str().unwrap()) + .try_build() + .unwrap(), + ); let mut bm = BlockManager::new(path.clone(), BlockIndex::new(path.join(BLOCK_INDEX_FILE))); let block_ref = bm.start_new_block(block_id, 1024).unwrap().clone(); diff --git a/reductstore/src/storage/block_manager/block_index.rs b/reductstore/src/storage/block_manager/block_index.rs index 588694fb3..9c3236a30 100644 --- a/reductstore/src/storage/block_manager/block_index.rs +++ b/reductstore/src/storage/block_manager/block_index.rs @@ -1,4 +1,4 @@ -// Copyright 2024 ReductSoftware UG +// Copyright 2024-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use bytes::Bytes; @@ -7,7 +7,6 @@ use prost::Message; use reduct_base::error::ReductError; use reduct_base::internal_server_error; use std::collections::{BTreeSet, HashMap}; -use std::fs::read_dir; use std::io::{Read, SeekFrom, Write}; use std::path::PathBuf; @@ -119,7 +118,7 @@ impl BlockIndex { } pub fn try_load(path: PathBuf) -> Result { - if !path.try_exists()? { + if !FILE_CACHE.try_exists(&path)? { return Err(internal_server_error!("Block index {:?} not found", path)); } @@ -135,15 +134,10 @@ impl BlockIndex { )); }; - let has_block_descriptors = read_dir(&path.parent().unwrap())?.any(|file| { - file.map(|f| { - f.file_name() - .to_str() - .unwrap() - .ends_with(DESCRIPTOR_FILE_EXT) - }) - .unwrap_or(false) - }); + let has_block_descriptors = FILE_CACHE + .read_dir(&path.parent().unwrap().into())? + .iter() + .any(|path| path.ends_with(DESCRIPTOR_FILE_EXT)); if lock.metadata()?.len() == 0 && has_block_descriptors { return Err(internal_server_error!("Block index {:?} is empty", path)); @@ -247,7 +241,6 @@ impl BlockIndex { internal_server_error!("Failed to write block index {:?}: {}", self.path_buf, err) })?; - lock.flush()?; lock.sync_all()?; Ok(()) diff --git a/reductstore/src/storage/block_manager/wal.rs b/reductstore/src/storage/block_manager/wal.rs index 1992669ba..0485d2789 100644 --- a/reductstore/src/storage/block_manager/wal.rs +++ b/reductstore/src/storage/block_manager/wal.rs @@ -1,4 +1,4 @@ -// Copyright 2024 ReductSoftware UG +// Copyright 2024-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use std::collections::hash_map::Entry::{Occupied, Vacant}; @@ -233,15 +233,15 @@ impl Wal for WalImpl { fn remove(&self, block_id: u64) -> Result<(), ReductError> { let path = self.block_wal_path(block_id); - FILE_CACHE.remove(&path)?; + if FILE_CACHE.try_exists(&path)? { + FILE_CACHE.remove(&path)?; + } Ok(()) } fn list(&self) -> Result, ReductError> { let mut blocks = Vec::new(); - for entry in std::fs::read_dir(&self.root_path)? { - let entry = entry?; - let path = entry.path(); + for path in FILE_CACHE.read_dir(&self.root_path)? { if path.extension().unwrap_or_default() == "wal" { let block_id = path .file_stem() @@ -276,7 +276,9 @@ impl Wal for WalImpl { pub(in crate::storage) fn create_wal(entry_path: PathBuf) -> Box { let wal_folder = entry_path.join("wal"); if !wal_folder.try_exists().unwrap() { - std::fs::create_dir_all(&wal_folder).expect("Failed to create WAL folder"); + FILE_CACHE + .create_dir_all(&wal_folder) + .expect("Failed to create WAL folder"); } Box::new(WalImpl::new(entry_path.join("wal"))) } diff --git a/reductstore/src/storage/bucket.rs b/reductstore/src/storage/bucket.rs index 0b80bff0b..828610da0 100644 --- a/reductstore/src/storage/bucket.rs +++ b/reductstore/src/storage/bucket.rs @@ -24,6 +24,7 @@ use reduct_base::msg::bucket_api::{BucketInfo, BucketSettings, FullBucketInfo}; use reduct_base::msg::entry_api::EntryInfo; use reduct_base::{conflict, internal_server_error, not_found, Labels}; use std::collections::BTreeMap; +use std::io::{Read, SeekFrom}; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -56,7 +57,7 @@ impl Bucket { settings: BucketSettings, ) -> Result { let path = path.join(name); - std::fs::create_dir_all(&path)?; + FILE_CACHE.create_dir_all(&path)?; let settings = Self::fill_settings(settings, Self::defaults()); let bucket = Bucket { @@ -82,7 +83,15 @@ impl Bucket { /// /// * `Bucket` - The bucket or an HTTPError pub fn restore(path: PathBuf) -> Result { - let buf: Vec = std::fs::read(path.join(SETTINGS_NAME))?; + let buf = { + let lock = FILE_CACHE + .read(&path.join(SETTINGS_NAME), SeekFrom::Start(0))? + .upgrade()?; + let mut buf = Vec::new(); + lock.write()?.read_to_end(&mut buf)?; + buf + }; + let settings = ProtoBucketSettings::decode(&mut Bytes::from(buf)) .map_err(|e| internal_server_error!("Failed to decode settings: {}", e))?; @@ -92,8 +101,7 @@ impl Bucket { let mut entries = BTreeMap::new(); let mut task_set = Vec::new(); - for entry in std::fs::read_dir(&path)? { - let path = entry?.path(); + for path in FILE_CACHE.read_dir(&path)? { if path.is_dir() { let handler = Entry::restore( path, @@ -134,8 +142,7 @@ impl Bucket { /// * `&mut Entry` - The entry or an HTTPError pub fn get_or_create_entry(&self, key: &str) -> Result, ReductError> { check_name_convention(key)?; - let mut entries = self.entries.write()?; - if !entries.contains_key(key) { + if !self.entries.read()?.contains_key(key) { let settings = self.settings.read()?; let entry = Entry::try_new( &key, @@ -145,10 +152,12 @@ impl Bucket { max_block_records: settings.max_block_records.unwrap(), }, )?; - entries.insert(key.to_string(), Arc::new(entry)); + self.entries + .write()? + .insert(key.to_string(), Arc::new(entry)); } - Ok(entries.get_mut(key).unwrap().clone().into()) + Ok(self.entries.read()?.get(key).unwrap().clone().into()) } /// Get an entry in the bucket @@ -178,7 +187,7 @@ impl Bucket { let mut latest_record = 0u64; let mut entries: Vec = vec![]; - let entry_map = self.entries.read().unwrap(); + let entry_map = self.entries.read()?; for entry in entry_map.values() { let info = entry.info()?; entries.push(info.clone()); @@ -197,7 +206,7 @@ impl Bucket { .is_provisioned .load(std::sync::atomic::Ordering::Relaxed), }, - settings: self.settings.read().unwrap().clone(), + settings: self.settings.read()?.clone(), entries, }) } @@ -220,7 +229,7 @@ impl Bucket { &self, name: &str, time: u64, - content_size: usize, + content_size: u64, content_type: String, labels: Labels, ) -> TaskHandle, ReductError>> { @@ -289,7 +298,7 @@ impl Bucket { entries.write()?.remove(&old_name); FILE_CACHE.discard_recursive(&old_path)?; // we need to close all open files - std::fs::rename(&old_path, &new_path)?; + FILE_CACHE.rename(&old_path, &new_path)?; let entry = Entry::restore( new_path, @@ -543,7 +552,7 @@ mod tests { .begin_write( entry_name, time, - content.len(), + content.len() as u64, "".to_string(), Labels::new(), ) diff --git a/reductstore/src/storage/bucket/quotas.rs b/reductstore/src/storage/bucket/quotas.rs index 884072d90..49766c7b6 100644 --- a/reductstore/src/storage/bucket/quotas.rs +++ b/reductstore/src/storage/bucket/quotas.rs @@ -9,7 +9,7 @@ use reduct_base::msg::bucket_api::QuotaType; use reduct_base::{bad_request, internal_server_error}; impl Bucket { - pub(super) fn keep_quota_for(&self, content_size: usize) -> Result<(), ReductError> { + pub(super) fn keep_quota_for(&self, content_size: u64) -> Result<(), ReductError> { let settings = self.settings.read()?; let quota_size = settings.quota_size.unwrap_or(0); match settings.quota_type.clone().unwrap_or(QuotaType::NONE) { @@ -25,37 +25,53 @@ impl Bucket { } } - fn remove_oldest_block(&self, content_size: usize, quota_size: u64) -> Result<(), ReductError> { - let mut size = self.info()?.info.size + content_size as u64; + fn remove_oldest_block(&self, content_size: u64, quota_size: u64) -> Result<(), ReductError> { + let get_bucket_size = || { + self.entries + .read() + .map(|entries| { + entries + .values() + .map(|e| e.size()) + .reduce(|acc, e| acc + e) + .unwrap_or(0) + }) + .unwrap_or(0) + }; + + let mut size = get_bucket_size() + content_size as u64; while size > quota_size { - debug!( - "Need more space. Remove an oldest block from bucket '{}'", - self.name() - ); - - let mut candidates: Vec<(u64, &Entry)> = vec![]; - let entries = self.entries.read()?; - for (_, entry) in entries.iter() { - let info = entry.info()?; - candidates.push((info.oldest_record, entry)); - } - candidates.sort_by_key(|entry| entry.0); + let mut success = false; - let candidates = candidates - .iter() - .map(|(_, entry)| entry.name().to_string()) - .collect::>(); + { + debug!( + "Need more space. Remove an oldest block from bucket '{}'", + self.name() + ); - let mut success = false; - for name in candidates { - debug!("Remove an oldest block from entry '{}'", name); - match entries.get(&name).unwrap().try_remove_oldest_block().wait() { - Ok(_) => { - success = true; - break; - } - Err(e) => { - debug!("Failed to remove oldest block from entry '{}': {}", name, e); + let mut candidates: Vec<(u64, &Entry)> = vec![]; + let entries = self.entries.read()?; + for (_, entry) in entries.iter() { + let info = entry.info()?; + candidates.push((info.oldest_record, entry)); + } + candidates.sort_by_key(|entry| entry.0); + + let candidates = candidates + .iter() + .map(|(_, entry)| entry.name().to_string()) + .collect::>(); + + for name in candidates { + debug!("Remove an oldest block from entry '{}'", name); + match entries.get(&name).unwrap().try_remove_oldest_block().wait() { + Ok(_) => { + success = true; + break; + } + Err(e) => { + debug!("Failed to remove oldest block from entry '{}': {}", name, e); + } } } } @@ -67,7 +83,7 @@ impl Bucket { )); } - size = self.info()?.info.size + content_size as u64; + size = get_bucket_size() + content_size as u64; } // Remove empty entries diff --git a/reductstore/src/storage/bucket/settings.rs b/reductstore/src/storage/bucket/settings.rs index 6cb5a3d76..430fffccd 100644 --- a/reductstore/src/storage/bucket/settings.rs +++ b/reductstore/src/storage/bucket/settings.rs @@ -1,6 +1,7 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 +use crate::core::file_cache::FILE_CACHE; use crate::core::thread_pool::{unique, TaskHandle}; use crate::storage::bucket::Bucket; use crate::storage::entry::EntrySettings; @@ -9,7 +10,7 @@ use prost::Message; use reduct_base::error::ReductError; use reduct_base::msg::bucket_api::{BucketSettings, QuotaType}; use reduct_base::{conflict, internal_server_error}; -use std::io::Write; +use std::io::{SeekFrom, Write}; pub(super) const DEFAULT_MAX_RECORDS: u64 = 1024; pub(super) const DEFAULT_MAX_BLOCK_SIZE: u64 = 64000000; @@ -117,8 +118,12 @@ impl Bucket { .encode(&mut buf) .map_err(|e| internal_server_error!("Failed to encode bucket settings: {}", e))?; - let mut file = std::fs::File::create(path)?; - file.write(&buf)?; + let lock = FILE_CACHE + .write_or_create(&path, SeekFrom::Start(0))? + .upgrade()?; + let mut file = lock.write()?; + file.write_all(&buf)?; + file.sync_all()?; Ok(()) }) } diff --git a/reductstore/src/storage/entry.rs b/reductstore/src/storage/entry.rs index 6b4ec0705..06d0c7fc8 100644 --- a/reductstore/src/storage/entry.rs +++ b/reductstore/src/storage/entry.rs @@ -18,7 +18,6 @@ use log::debug; use reduct_base::error::ReductError; use reduct_base::msg::entry_api::{EntryInfo, QueryEntry}; use std::collections::HashMap; -use std::fs; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; @@ -27,6 +26,7 @@ use tokio::sync::RwLock as AsyncRwLock; pub(crate) use io::record_writer::{RecordDrainer, RecordWriter}; +use crate::core::file_cache::FILE_CACHE; use crate::core::thread_pool::{ group_from_path, shared, try_unique, unique_child, GroupDepth, TaskHandle, }; @@ -74,7 +74,7 @@ impl Entry { path: PathBuf, settings: EntrySettings, ) -> Result { - fs::create_dir_all(path.join(name))?; + FILE_CACHE.create_dir_all(&path.join(name))?; let path = path.join(name); Ok(Self { name: name.to_string(), @@ -215,6 +215,11 @@ impl Entry { }) } + pub fn size(&self) -> u64 { + let bm = self.block_manager.read().unwrap(); + bm.index().size() + } + /// Try to remove the oldest block. /// /// # Returns @@ -230,7 +235,6 @@ impl Entry { let oldest_block_id = *index_tree.first().unwrap(); let block_manager = Arc::clone(&self.block_manager); - match try_unique( &format!("{}/{}", self.task_group(), oldest_block_id), "try to remove oldest block", @@ -238,6 +242,10 @@ impl Entry { move || { let mut bm = block_manager.write().unwrap(); bm.remove_block(oldest_block_id)?; + debug!( + "Removing the oldest block {}.blk", + bm.path().join(oldest_block_id.to_string()).display() + ); Ok(()) }, ) { @@ -550,7 +558,7 @@ mod tests { let mut sender = entry .begin_write( 1000000, - MAX_IO_BUFFER_SIZE + 1, + (MAX_IO_BUFFER_SIZE + 1) as u64, "text/plain".to_string(), Labels::new(), ) @@ -625,7 +633,12 @@ mod tests { pub fn write_record(entry: &mut Entry, time: u64, data: Vec) { let mut sender = entry - .begin_write(time, data.len(), "text/plain".to_string(), Labels::new()) + .begin_write( + time, + data.len() as u64, + "text/plain".to_string(), + Labels::new(), + ) .wait() .unwrap(); sender.blocking_send(Ok(Some(Bytes::from(data)))).unwrap(); @@ -636,7 +649,7 @@ mod tests { pub fn write_record_with_labels(entry: &mut Entry, time: u64, data: Vec, labels: Labels) { let mut sender = entry - .begin_write(time, data.len(), "text/plain".to_string(), labels) + .begin_write(time, data.len() as u64, "text/plain".to_string(), labels) .wait() .unwrap(); sender.blocking_send(Ok(Some(Bytes::from(data)))).unwrap(); diff --git a/reductstore/src/storage/entry/entry_loader.rs b/reductstore/src/storage/entry/entry_loader.rs index 86e5b0785..7d068558d 100644 --- a/reductstore/src/storage/entry/entry_loader.rs +++ b/reductstore/src/storage/entry/entry_loader.rs @@ -1,9 +1,8 @@ -// Copyright 2024 ReductSoftware UG +// Copyright 2024-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use std::collections::HashMap; -use std::fs; -use std::io::Write; +use std::io::{Read, SeekFrom, Write}; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -14,6 +13,7 @@ use crc64fast::Digest; use log::{debug, error, info, trace, warn}; use prost::Message; +use crate::core::file_cache::FILE_CACHE; use crate::storage::block_manager::block_index::BlockIndex; use crate::storage::block_manager::wal::{create_wal, WalEntry}; use crate::storage::block_manager::{ @@ -80,9 +80,7 @@ impl EntryLoader { options: EntrySettings, ) -> Result { let mut block_index = BlockIndex::new(path.join(BLOCK_INDEX_FILE)); - for filein in fs::read_dir(path.clone())? { - let file = filein?; - let path = file.path(); + for path in FILE_CACHE.read_dir(&path)? { if path.is_dir() { continue; } @@ -97,20 +95,23 @@ impl EntryLoader { error!("Failed to decode block {:?}: {}", path, $err); warn!("Removing meta block {:?}", path); let mut data_path = path.clone(); - fs::remove_file(path)?; + FILE_CACHE.remove(&path)?; data_path.set_extension(DATA_FILE_EXT[1..].to_string()); warn!("Removing data block {:?}", data_path); - fs::remove_file(data_path)?; + FILE_CACHE.remove(&data_path)?; continue; }}; } - let buf = fs::read(path.clone())?; + let file = FILE_CACHE.read(&path, SeekFrom::Start(0))?.upgrade()?; + let mut buf = vec![]; + file.write()?.read_to_end(&mut buf)?; let mut crc = Digest::new(); crc.write(&buf); - let mut block = match MinimalBlock::decode(Bytes::from(buf)) { + let descriptor_content = Bytes::from(buf); + let mut block = match MinimalBlock::decode(descriptor_content.clone()) { Ok(block) => block, Err(err) => { remove_bad_block!(err); @@ -120,7 +121,7 @@ impl EntryLoader { // Migration for old blocks without fields to speed up the restore process if block.record_count == 0 { debug!("Record count is 0. Migrate the block"); - let mut full_block = match Block::decode(Bytes::from(fs::read(path.clone())?)) { + let mut full_block = match Block::decode(descriptor_content) { Ok(block) => block, Err(err) => { remove_bad_block!(err); @@ -133,7 +134,11 @@ impl EntryLoader { block.record_count = full_block.record_count; block.metadata_size = full_block.metadata_size; - let mut file = fs::File::create(path.clone())?; + let lock = FILE_CACHE + .write_or_create(&path, SeekFrom::Start(0))? + .upgrade()?; + let mut file = lock.write()?; + file.set_len(0)?; let buf = full_block.encode_to_vec(); crc = Digest::new(); @@ -199,13 +204,12 @@ impl EntryLoader { } fn check_descriptor_count(path: &PathBuf, block_index: &BlockIndex) -> Result<(), ReductError> { - let number_of_descriptors = std::fs::read_dir(&path)? + let number_of_descriptors = FILE_CACHE + .read_dir(&path)? .into_iter() - .filter(|entry| { - let entry = entry.as_ref().unwrap().path(); - entry.is_file() - && DESCRIPTOR_FILE_EXT.contains(entry.extension().unwrap().to_str().unwrap()) - }) + .filter(|entry| + // path maybe a virtual from remote storage + entry.to_str().unwrap_or("").ends_with(DESCRIPTOR_FILE_EXT)) .count(); if number_of_descriptors != block_index.tree().len() { @@ -229,15 +233,20 @@ impl EntryLoader { let mut inconsistent_data = false; for block_id in block_index.tree().iter() { let desc_path = path.join(format!("{}{}", block_id, DESCRIPTOR_FILE_EXT)); - if !desc_path.exists() { - warn!("Block descriptor {:?} not found", path); + if FILE_CACHE.try_exists(&desc_path)? { + let data_path = path.join(format!("{}{}", block_id, DATA_FILE_EXT)); + if !FILE_CACHE.try_exists(&data_path)? { + warn!( + "Data block {:?} not found. Removing its descriptor", + data_path + ); + FILE_CACHE.remove(&desc_path)?; + inconsistent_data = true; + } + } else { + warn!("Block descriptor {:?} not found", desc_path); inconsistent_data = true; } - - let data_path = path.join(format!("{}{}", block_id, DATA_FILE_EXT)); - if !data_path.exists() { - warn!("Block descriptor {:?} not found. Removing it", path); - } } if inconsistent_data { @@ -352,9 +361,11 @@ mod tests { use crate::storage::block_manager::wal::WalEntry; use crate::storage::entry::tests::{entry, entry_settings, path, write_stub_record}; use crate::storage::proto::{record, us_to_ts, BlockIndex as BlockIndexProto, Record}; + use std::fs; use std::io::SeekFrom; use super::*; + use crate::backend::Backend; use crate::core::file_cache::FILE_CACHE; use reduct_base::io::ReadRecord; use rstest::{fixture, rstest}; @@ -448,8 +459,16 @@ mod tests { #[rstest] fn test_migration_v18_v19(entry_settings: EntrySettings, path: PathBuf) { + FILE_CACHE.set_storage_backend( + Backend::builder() + .local_data_path(path.to_str().unwrap()) + .try_build() + .unwrap(), + ); + let path = path.join("entry"); - fs::create_dir_all(path.clone()).unwrap(); + FILE_CACHE.create_dir_all(&path).unwrap(); + let mut block_manager = BlockManager::new( path.clone(), BlockIndex::new(path.clone().join(BLOCK_INDEX_FILE)), @@ -483,10 +502,19 @@ mod tests { .clone() .into(); block_proto.record_count = 0; - fs::write(path.join("1.meta"), block_proto.encode_to_vec()).unwrap(); + + let lock = FILE_CACHE + .write_or_create(&path.join("1.meta"), SeekFrom::Start(0)) + .unwrap() + .upgrade() + .unwrap(); + + lock.write() + .unwrap() + .write_all(&block_proto.encode_to_vec()) + .unwrap(); // repack the block - FILE_CACHE.remove(&path.join(BLOCK_INDEX_FILE)).unwrap(); let entry = EntryLoader::restore_entry(path.clone(), entry_settings).unwrap(); let info = entry.info().unwrap(); @@ -518,7 +546,7 @@ mod tests { .unwrap() .upgrade() .unwrap(); - let file = rc.write().unwrap(); + let mut file = rc.write().unwrap(); file.set_len(0).unwrap(); file.sync_all().unwrap(); } @@ -640,6 +668,7 @@ mod tests { mod wal_recovery { use crate::storage::proto::Record; use reduct_base::error::ErrorCode::InternalServerError; + use std::fs; use std::fs::File; use super::*; diff --git a/reductstore/src/storage/entry/write_record.rs b/reductstore/src/storage/entry/write_record.rs index a587f2c3e..8257b3e0b 100644 --- a/reductstore/src/storage/entry/write_record.rs +++ b/reductstore/src/storage/entry/write_record.rs @@ -28,7 +28,7 @@ impl Entry { pub fn begin_write( &self, time: u64, - content_size: usize, + content_size: u64, content_type: String, labels: Labels, ) -> TaskHandle, ReductError>> { @@ -143,7 +143,7 @@ impl Entry { fn prepare_block_for_writing( block: &mut BlockRef, time: u64, - content_size: usize, + content_size: u64, content_type: String, labels: Labels, ) -> Result<(), ReductError> { @@ -151,7 +151,7 @@ impl Entry { let record = Record { timestamp: Some(us_to_ts(&time)), begin: block.size(), - end: block.size() + content_size as u64, + end: block.size() + content_size, content_type, state: record::State::Started as i32, labels: labels diff --git a/reductstore/src/storage/query.rs b/reductstore/src/storage/query.rs index defdba715..8f836ed41 100644 --- a/reductstore/src/storage/query.rs +++ b/reductstore/src/storage/query.rs @@ -176,6 +176,8 @@ impl QueryWatcher { #[cfg(test)] mod tests { use super::*; + use crate::backend::Backend; + use crate::core::file_cache::FILE_CACHE; use crate::storage::block_manager::block_index::BlockIndex; use crate::storage::proto::Record; use prost_wkt_types::Timestamp; @@ -224,7 +226,7 @@ mod tests { block_manager.clone(), ); assert!(rx.is_empty()); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; assert!(handle.is_finished()); } @@ -319,6 +321,13 @@ mod tests { .join("bucket") .join("entry"); + FILE_CACHE.set_storage_backend( + Backend::builder() + .local_data_path(path.to_str().unwrap()) + .try_build() + .unwrap(), + ); + let mut block_manager = BlockManager::new(path.clone(), BlockIndex::new(path.join("index"))); let block_ref = block_manager.start_new_block(0, 10).unwrap(); diff --git a/reductstore/src/storage/storage.rs b/reductstore/src/storage/storage.rs index 3c2d34a26..78dc07770 100644 --- a/reductstore/src/storage/storage.rs +++ b/reductstore/src/storage/storage.rs @@ -1,4 +1,4 @@ -// Copyright 2023-2024 ReductSoftware UG +// Copyright 2023-2025 ReductSoftware UG // Licensed under the Business Source License 1.1 use log::{debug, error, info}; @@ -48,15 +48,14 @@ impl Storage { /// /// If the data_path doesn't exist and can't be created, or if a bucket can't be restored. pub fn load(data_path: PathBuf, license: Option) -> Storage { - if !data_path.try_exists().unwrap_or(false) { + if !FILE_CACHE.try_exists(&data_path).unwrap_or(false) { info!("Folder {:?} doesn't exist. Create it.", data_path); - std::fs::create_dir_all(&data_path).unwrap(); + FILE_CACHE.create_dir_all(&data_path).unwrap(); } // restore buckets let mut buckets = BTreeMap::new(); - for entry in std::fs::read_dir(&data_path).unwrap() { - let path = entry.unwrap().path(); + for path in FILE_CACHE.read_dir(&data_path).unwrap() { if path.is_dir() { match Bucket::restore(path.clone()) { Ok(bucket) => { @@ -236,7 +235,7 @@ impl Storage { Some(_) => { sync_task.wait()?; FILE_CACHE.discard_recursive(&path)?; - std::fs::rename(&path, &new_path)?; + FILE_CACHE.rename(&path, &new_path)?; let bucket = Bucket::restore(new_path)?; buckets.insert(new_name.to_string(), Arc::new(bucket)); debug!("Bucket '{}' is renamed to '{}'", old_name, new_name); @@ -273,6 +272,8 @@ impl Storage { } } + FILE_CACHE.force_sync_all(); + Ok(()) } @@ -294,6 +295,7 @@ pub(super) fn check_name_convention(name: &str) -> Result<(), ReductError> { #[cfg(test)] mod tests { use super::*; + use crate::backend::Backend; use bytes::Bytes; use reduct_base::msg::bucket_api::QuotaType; use reduct_base::Labels; @@ -411,12 +413,13 @@ mod tests { } #[rstest] - fn test_ignore_broken_buket(storage: Storage) { + fn test_ignore_broken_bucket(storage: Storage) { let bucket_settings = BucketSettings { quota_size: Some(100), quota_type: Some(QuotaType::FIFO), ..Bucket::defaults() }; + let bucket = storage .create_bucket("test", bucket_settings.clone()) .unwrap() @@ -424,7 +427,7 @@ mod tests { assert_eq!(bucket.name(), "test"); let path = storage.data_path.join("test"); - std::fs::remove_file(path.join(SETTINGS_NAME)).unwrap(); + FILE_CACHE.remove(&path.join(SETTINGS_NAME)).unwrap(); let storage = Storage::load(storage.data_path.clone(), None); assert_eq!( storage.info().unwrap(), @@ -659,6 +662,12 @@ mod tests { #[fixture] fn storage(path: PathBuf) -> Storage { + FILE_CACHE.set_storage_backend( + Backend::builder() + .local_data_path(path.to_str().unwrap()) + .try_build() + .unwrap(), + ); Storage::load(path, None) } }