From 4b2b8b236dc0c0e67a0636aad3b0de2dda871bc7 Mon Sep 17 00:00:00 2001 From: Ulrik Falklof Date: Thu, 28 May 2020 14:17:35 +0200 Subject: [PATCH 1/2] Add AC hit rate metrics with prometheus labels This is a draft, not ready to be merged. I wait with test cases until getting feedback about the main functionality. Same functionality as in https://github.com/buchgr/bazel-remote/pull/350 - Prometheus counter for cache hit ratio of only AC requests. - Support for prometheus labels based on custom HTTP and gRPC headers. but implemented in an alternative way: - disk.io implements two interfaces: cache.CasAcCache, cache.Stats - cache.metricsdecorator is decorator for cache.CasAcCache and provides prometheus metrics. Pros with this alternative implementation: - Should allow non-prometheus metrics as requested in https://github.com/buchgr/bazel-remote/issues/355 - Avoid the question about if the counters should be placed in disk.go or http.go/grpc*.go. If placing in disk.go, there are issues about how to avoid incrementing counter twice for the same request (both in Get and in GetValidatedActionResult) and at the same time count found AC but missing CAS, as cache miss. - Makes headers available also for logic in disk.go, which could be useful for other functionality in the future. - Metrics can be separated from logging, and still not require injecting counter increment invocations in tons of places. Incrementing from a few well defined places minimize the risk for bugs in the metrics. --- BUILD.bazel | 1 + README.md | 17 ++ cache/BUILD.bazel | 3 + cache/cache.go | 43 +++++ cache/disk/disk.go | 23 +-- cache/disk/disk_test.go | 84 ++++----- cache/httpproxy/httpproxy_test.go | 20 +-- cache/metricsdecorator/BUILD.bazel | 18 ++ cache/metricsdecorator/metricsdecorator.go | 200 +++++++++++++++++++++ config/config.go | 8 + main.go | 9 +- server/grpc.go | 30 +++- server/grpc_ac.go | 30 ++-- server/grpc_asset.go | 12 +- server/grpc_bytestream.go | 8 +- server/grpc_cas.go | 26 +-- server/grpc_test.go | 2 +- server/http.go | 58 ++++-- server/http_test.go | 17 +- 19 files changed, 478 insertions(+), 131 deletions(-) create mode 100644 cache/metricsdecorator/BUILD.bazel create mode 100644 cache/metricsdecorator/metricsdecorator.go diff --git a/BUILD.bazel b/BUILD.bazel index 298e8b322..791392e4e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//cache/gcsproxy:go_default_library", "//cache/httpproxy:go_default_library", "//cache/s3proxy:go_default_library", + "//cache/metricsdecorator:go_default_library", "//config:go_default_library", "//server:go_default_library", "//utils/idle:go_default_library", diff --git a/README.md b/README.md index 70cd71e52..b89855136 100644 --- a/README.md +++ b/README.md @@ -223,6 +223,23 @@ host: localhost # If true, enable experimental remote asset API support: #experimental_remote_asset_api: true + +# Allows mapping HTTP and gRPC headers to prometheus +# labels. Headers can be set by bazel client as: +# --remote_header=os=ubuntu18-04. Not all counters are +# affected. +#metrics: +# categories: +# os: +# - rhel7 +# - rhel8 +# - ubuntu16-04 +# - ubuntu18-04 +# branch: +# - master +# user: +# - ci + ``` ## Docker diff --git a/cache/BUILD.bazel b/cache/BUILD.bazel index 14417d924..afb38deb3 100644 --- a/cache/BUILD.bazel +++ b/cache/BUILD.bazel @@ -5,4 +5,7 @@ go_library( srcs = ["cache.go"], importpath = "github.com/buchgr/bazel-remote/cache", visibility = ["//visibility:public"], + deps = [ + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", + ], ) diff --git a/cache/cache.go b/cache/cache.go index 49c7cc93c..acf146ae8 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -5,6 +5,8 @@ import ( "encoding/hex" "io" "path/filepath" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" ) // EntryKind describes the kind of cache entry @@ -50,6 +52,47 @@ func (e *Error) Error() string { return e.Text } +// Represent the context of an incoming request. For now it acts as an +// adapter providing a common interface to headers from HTTP and gRPC +// requests. In the future it could be extend if additional +// information needs to be associated with a request, or be propagated +// from HTTP/gRPC servers towards disk cache, or perhaps further +// to proxies. +type RequestContext interface { + // Return values for HTTP/gRPC header in the associated + // request. Returns a slice since there could be several + // headers with same name. Returns empty slice if no + // headers exist with the requested name. + // The headerName is expected in lowercase. + GetHeader(headerName string) (headerValues []string) +} + +// TODO Document interface +type CasAcCache interface { + // TODO change to io.ReadCloser? + Put(kind EntryKind, hash string, size int64, rdr io.Reader, reqCtx RequestContext) error + + Get(kind EntryKind, hash string, size int64, reqCtx RequestContext) (io.ReadCloser, int64, error) + + Contains(kind EntryKind, hash string, size int64, reqCtx RequestContext) (bool, int64) + + GetValidatedActionResult(hash string, reqCtx RequestContext) (*pb.ActionResult, []byte, error) +} + +// TODO Document interface +type Stats interface { + Stats() (totalSize int64, reservedSize int64, numItems int) + MaxSize() int64 +} + +// TODO Should the proxy interface also be extended with RequestContext parameter? To allow +// for example forwarding of custom headers from client to proxy, or support for HTTP +// headers like Max-Forwards. + +// TODO Could the disk and proxies implement same interface? But proxies are not supporting +// GetValidatedActionResult and that method is important to have in the interface +// for cache.metricdecorator. + // Proxy is the interface that (optional) proxy backends must implement. // Implementations are expected to be safe for concurrent use. type Proxy interface { diff --git a/cache/disk/disk.go b/cache/disk/disk.go index 40c4a6a7c..bc2bb6976 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -27,6 +27,7 @@ import ( "github.com/golang/protobuf/proto" ) +// TODO remove these counters? var ( cacheHits = promauto.NewCounter(prometheus.CounterOpts{ Name: "bazel_remote_disk_cache_hits", @@ -230,7 +231,7 @@ func (c *Cache) loadExistingFiles() error { // Put stores a stream of `size` bytes from `r` into the cache. // If `hash` is not the empty string, and the contents don't match it, // a non-nil error is returned. -func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) { +func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader, reqCtx cache.RequestContext) (rErr error) { if size < 0 { return fmt.Errorf("Invalid (negative) size: %d", size) } @@ -471,7 +472,7 @@ func (c *Cache) availableOrTryProxy(key string, size int64, blobPath string) (rc // item is not found, the io.ReadCloser will be nil. If some error occurred // when processing the request, then it is returned. Callers should provide // the `size` of the item to be retrieved, or -1 if unknown. -func (c *Cache) Get(kind cache.EntryKind, hash string, size int64) (rc io.ReadCloser, s int64, rErr error) { +func (c *Cache) Get(kind cache.EntryKind, hash string, size int64, reqCtx cache.RequestContext) (rc io.ReadCloser, s int64, rErr error) { // The hash format is checked properly in the http/grpc code. // Just perform a simple/fast check here, to catch bad tests. @@ -575,7 +576,7 @@ func (c *Cache) Get(kind cache.EntryKind, hash string, size int64) (rc io.ReadCl // one) will be checked. // // Callers should provide the `size` of the item, or -1 if unknown. -func (c *Cache) Contains(kind cache.EntryKind, hash string, size int64) (bool, int64) { +func (c *Cache) Contains(kind cache.EntryKind, hash string, size int64, reqCtx cache.RequestContext) (bool, int64) { // The hash format is checked properly in the http/grpc code. // Just perform a simple/fast check here, to catch bad tests. @@ -648,9 +649,9 @@ func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string { // value from the CAS if it and all its dependencies are also available. If // not, nil values are returned. If something unexpected went wrong, return // an error. -func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, error) { +func (c *Cache) GetValidatedActionResult(hash string, reqCtx cache.RequestContext) (*pb.ActionResult, []byte, error) { - rc, sizeBytes, err := c.Get(cache.AC, hash, -1) + rc, sizeBytes, err := c.Get(cache.AC, hash, -1, reqCtx) if rc != nil { defer rc.Close() } @@ -675,7 +676,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, for _, f := range result.OutputFiles { if len(f.Contents) == 0 { - found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes) + found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } @@ -683,7 +684,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, } for _, d := range result.OutputDirectories { - r, size, err := c.Get(cache.CAS, d.TreeDigest.Hash, d.TreeDigest.SizeBytes) + r, size, err := c.Get(cache.CAS, d.TreeDigest.Hash, d.TreeDigest.SizeBytes, reqCtx) if r == nil { return nil, nil, err // aka "not found", or an err if non-nil } @@ -714,7 +715,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, if f.Digest == nil { continue } - found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes) + found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } @@ -725,7 +726,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, if f.Digest == nil { continue } - found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes) + found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } @@ -734,14 +735,14 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, } if result.StdoutDigest != nil { - found, _ := c.Contains(cache.CAS, result.StdoutDigest.Hash, result.StdoutDigest.SizeBytes) + found, _ := c.Contains(cache.CAS, result.StdoutDigest.Hash, result.StdoutDigest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } } if result.StderrDigest != nil { - found, _ := c.Contains(cache.CAS, result.StderrDigest.Hash, result.StderrDigest.SizeBytes) + found, _ := c.Contains(cache.CAS, result.StderrDigest.Hash, result.StderrDigest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } diff --git a/cache/disk/disk_test.go b/cache/disk/disk_test.go index d4b341529..3d527925d 100644 --- a/cache/disk/disk_test.go +++ b/cache/disk/disk_test.go @@ -78,7 +78,7 @@ func TestCacheBasics(t *testing.T) { } // Non-existing item - rdr, _, err := testCache.Get(cache.CAS, contentsHash, contentsLength) + rdr, _, err := testCache.Get(cache.CAS, contentsHash, contentsLength, nil) if err != nil { t.Fatal(err) } @@ -88,7 +88,7 @@ func TestCacheBasics(t *testing.T) { // Add an item err = testCache.Put(cache.CAS, contentsHash, int64(len(contents)), - ioutil.NopCloser(strings.NewReader(contents))) + ioutil.NopCloser(strings.NewReader(contents)), nil) if err != nil { t.Fatal(err) } @@ -101,7 +101,7 @@ func TestCacheBasics(t *testing.T) { } // Get the item back - rdr, sizeBytes, err := testCache.Get(cache.CAS, contentsHash, contentsLength) + rdr, sizeBytes, err := testCache.Get(cache.CAS, contentsHash, contentsLength, nil) if err != nil { t.Fatal(err) } @@ -142,7 +142,7 @@ func TestCacheEviction(t *testing.T) { } err := testCache.Put(cache.AC, key, int64(i), - ioutil.NopCloser(strReader)) + ioutil.NopCloser(strReader), nil) if err != nil { t.Fatal(err) } @@ -164,16 +164,16 @@ func TestCachePutWrongSize(t *testing.T) { var err error - err = testCache.Put(cache.AC, hash, int64(len(content)), strings.NewReader(content)) + err = testCache.Put(cache.AC, hash, int64(len(content)), strings.NewReader(content), nil) if err != nil { t.Fatal("Expected success", err) } - err = testCache.Put(cache.AC, hash, int64(len(content))+1, strings.NewReader(content)) + err = testCache.Put(cache.AC, hash, int64(len(content))+1, strings.NewReader(content), nil) if err == nil { t.Error("Expected error due to size being different") } - err = testCache.Put(cache.AC, hash, int64(len(content))-1, strings.NewReader(content)) + err = testCache.Put(cache.AC, hash, int64(len(content))-1, strings.NewReader(content), nil) if err == nil { t.Error("Expected error due to size being different") } @@ -188,27 +188,27 @@ func TestCacheGetContainsWrongSize(t *testing.T) { var found bool var rdr io.ReadCloser - err := testCache.Put(cache.CAS, contentsHash, contentsLength, strings.NewReader(contents)) + err := testCache.Put(cache.CAS, contentsHash, contentsLength, strings.NewReader(contents), nil) if err != nil { t.Fatal("Expected success", err) } - found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1) + found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1, nil) if found { t.Error("Expected not found, due to size being different") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1, nil) if rdr != nil { t.Error("Expected not found, due to size being different") } - found, _ = testCache.Contains(cache.CAS, contentsHash, -1) + found, _ = testCache.Contains(cache.CAS, contentsHash, -1, nil) if !found { t.Error("Expected found, when unknown size") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1, nil) if rdr == nil { t.Error("Expected found, when unknown size") } @@ -225,12 +225,12 @@ func TestCacheGetContainsWrongSizeWithProxy(t *testing.T) { // The proxyStub contains the digest {contentsHash, contentsLength}. - found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1) + found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1, nil) if found { t.Error("Expected not found, due to size being different") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1, nil) if rdr != nil { t.Error("Expected not found, due to size being different") } @@ -238,12 +238,12 @@ func TestCacheGetContainsWrongSizeWithProxy(t *testing.T) { t.Fatal(err) } - found, _ = testCache.Contains(cache.CAS, contentsHash, -1) + found, _ = testCache.Contains(cache.CAS, contentsHash, -1, nil) if !found { t.Error("Expected found, when unknown size") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1, nil) if rdr == nil { t.Error("Expected found, when unknown size") } @@ -302,12 +302,12 @@ func putGetCompareBytes(kind cache.EntryKind, hash string, data []byte, testCach r := bytes.NewReader(data) - err := testCache.Put(kind, hash, int64(len(data)), r) + err := testCache.Put(kind, hash, int64(len(data)), r, nil) if err != nil { return err } - rdr, sizeBytes, err := testCache.Get(kind, hash, int64(len(data))) + rdr, sizeBytes, err := testCache.Get(kind, hash, int64(len(data)), nil) if err != nil { return err } @@ -389,7 +389,7 @@ func TestCacheExistingFiles(t *testing.T) { } // Adding a new file should evict items[0] (the oldest) - err = testCache.Put(cache.CAS, contentsHash, int64(len(contents)), strings.NewReader(contents)) + err = testCache.Put(cache.CAS, contentsHash, int64(len(contents)), strings.NewReader(contents), nil) if err != nil { t.Fatal(err) } @@ -398,7 +398,7 @@ func TestCacheExistingFiles(t *testing.T) { if err != nil { t.Fatal(err) } - found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd", contentsLength) + found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd", contentsLength, nil) if found { t.Fatalf("%s should have been evicted", items[0]) } @@ -413,7 +413,7 @@ func TestCacheBlobTooLarge(t *testing.T) { for k := range []cache.EntryKind{cache.AC, cache.RAW} { kind := cache.EntryKind(k) - err := testCache.Put(kind, hashStr("foo"), 10000, strings.NewReader(contents)) + err := testCache.Put(kind, hashStr("foo"), 10000, strings.NewReader(contents), nil) if err == nil { t.Fatal("Expected an error") } @@ -435,14 +435,14 @@ func TestCacheCorruptedCASBlob(t *testing.T) { testCache := New(cacheDir, 1000, nil) err := testCache.Put(cache.CAS, hashStr("foo"), int64(len(contents)), - strings.NewReader(contents)) + strings.NewReader(contents), nil) if err == nil { t.Fatal("expected hash mismatch error") } // We expect the upload to succeed without validation: err = testCache.Put(cache.RAW, hashStr("foo"), int64(len(contents)), - strings.NewReader(contents)) + strings.NewReader(contents), nil) if err != nil { t.Fatal(err) } @@ -481,17 +481,17 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) { } var found bool - found, _ = testCache.Contains(cache.AC, acHash, 512) + found, _ = testCache.Contains(cache.AC, acHash, 512, nil) if !found { t.Fatalf("Expected cache to contain AC entry '%s'", acHash) } - found, _ = testCache.Contains(cache.CAS, casHash1, 1024) + found, _ = testCache.Contains(cache.CAS, casHash1, 1024, nil) if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash1) } - found, _ = testCache.Contains(cache.CAS, casHash2, 1024) + found, _ = testCache.Contains(cache.CAS, casHash2, 1024, nil) if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2) } @@ -527,17 +527,17 @@ func TestLoadExistingEntries(t *testing.T) { var found bool - found, _ = testCache.Contains(cache.AC, acHash, blobSize) + found, _ = testCache.Contains(cache.AC, acHash, blobSize, nil) if !found { t.Fatalf("Expected cache to contain AC entry '%s'", acHash) } - found, _ = testCache.Contains(cache.CAS, casHash, blobSize) + found, _ = testCache.Contains(cache.CAS, casHash, blobSize, nil) if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash) } - found, _ = testCache.Contains(cache.RAW, rawHash, blobSize) + found, _ = testCache.Contains(cache.RAW, rawHash, blobSize, nil) if !found { t.Fatalf("Expected cache to contain RAW entry '%s'", rawHash) } @@ -671,7 +671,7 @@ func TestHttpProxyBackend(t *testing.T) { blob, casHash := testutils.RandomDataAndHash(blobSize) // Non-existing item - r, _, err := testCache.Get(cache.CAS, casHash, blobSize) + r, _, err := testCache.Get(cache.CAS, casHash, blobSize, nil) if err != nil { t.Fatal(err) } @@ -684,7 +684,7 @@ func TestHttpProxyBackend(t *testing.T) { } err = testCache.Put(cache.CAS, casHash, int64(len(blob)), - bytes.NewReader(blob)) + bytes.NewReader(blob), nil) if err != nil { t.Fatal(err) } @@ -705,12 +705,12 @@ func TestHttpProxyBackend(t *testing.T) { // Confirm that it does not contain the item we added to the // first testCache and the proxy backend. - found, _ := testCache.Contains(cache.CAS, casHash, blobSize) + found, _ := testCache.Contains(cache.CAS, casHash, blobSize, nil) if found { t.Fatalf("Expected the cache not to contain %s", casHash) } - r, _, err = testCache.Get(cache.CAS, casHash, blobSize) + r, _, err = testCache.Get(cache.CAS, casHash, blobSize, nil) if err != nil { t.Fatal(err) } @@ -721,13 +721,13 @@ func TestHttpProxyBackend(t *testing.T) { // Add the proxy backend and check that we can Get the item. testCache.proxy = proxy - found, _ = testCache.Contains(cache.CAS, casHash, blobSize) + found, _ = testCache.Contains(cache.CAS, casHash, blobSize, nil) if !found { t.Fatalf("Expected the cache to contain %s (via the proxy)", casHash) } - r, fetchedSize, err := testCache.Get(cache.CAS, casHash, blobSize) + r, fetchedSize, err := testCache.Get(cache.CAS, casHash, blobSize, nil) if err != nil { t.Fatal(err) } @@ -772,7 +772,7 @@ func TestGetValidatedActionResult(t *testing.T) { grokHashStr := hex.EncodeToString(grokHash[:]) err = testCache.Put(cache.CAS, grokHashStr, int64(len(grokData)), - bytes.NewReader(grokData)) + bytes.NewReader(grokData), nil) if err != nil { t.Fatal(err) } @@ -782,7 +782,7 @@ func TestGetValidatedActionResult(t *testing.T) { fooHashStr := hex.EncodeToString(fooHash[:]) err = testCache.Put(cache.CAS, fooHashStr, int64(len(fooData)), - bytes.NewReader(fooData)) + bytes.NewReader(fooData), nil) if err != nil { t.Fatal(err) } @@ -814,7 +814,7 @@ func TestGetValidatedActionResult(t *testing.T) { barDataHashStr := hex.EncodeToString(barDataHash[:]) err = testCache.Put(cache.CAS, barDataHashStr, int64(len(barData)), - bytes.NewReader(barData)) + bytes.NewReader(barData), nil) if err != nil { t.Fatal(err) } @@ -839,7 +839,7 @@ func TestGetValidatedActionResult(t *testing.T) { rootDataHashStr := hex.EncodeToString(rootDataHash[:]) err = testCache.Put(cache.CAS, rootDataHashStr, int64(len(rootData)), - bytes.NewReader(rootData)) + bytes.NewReader(rootData), nil) if err != nil { t.Fatal(err) } @@ -856,7 +856,7 @@ func TestGetValidatedActionResult(t *testing.T) { treeDataHashStr := hex.EncodeToString(treeDataHash[:]) err = testCache.Put(cache.CAS, treeDataHashStr, int64(len(treeData)), - bytes.NewReader(treeData)) + bytes.NewReader(treeData), nil) if err != nil { t.Fatal(err) } @@ -898,7 +898,7 @@ func TestGetValidatedActionResult(t *testing.T) { arDataHashStr := hex.EncodeToString(arDataHash[:]) err = testCache.Put(cache.AC, arDataHashStr, int64(len(arData)), - bytes.NewReader(arData)) + bytes.NewReader(arData), nil) if err != nil { t.Fatal(err) } @@ -910,7 +910,7 @@ func TestGetValidatedActionResult(t *testing.T) { // to assume that the value should be returned unchanged by the cache // layer. - rAR, rData, err := testCache.GetValidatedActionResult(arDataHashStr) + rAR, rData, err := testCache.GetValidatedActionResult(arDataHashStr, nil) if err != nil { t.Fatal(err) } diff --git a/cache/httpproxy/httpproxy_test.go b/cache/httpproxy/httpproxy_test.go index 23da27936..302c9c3dc 100644 --- a/cache/httpproxy/httpproxy_test.go +++ b/cache/httpproxy/httpproxy_test.go @@ -107,7 +107,7 @@ func TestEverything(t *testing.T) { // PUT two different values with the same key in ac and cas. - err = diskCache.Put(cache.AC, hash, int64(len(acData)), bytes.NewReader(acData)) + err = diskCache.Put(cache.AC, hash, int64(len(acData)), bytes.NewReader(acData), nil) if err != nil { t.Error(err) } @@ -124,7 +124,7 @@ func TestEverything(t *testing.T) { } s.mu.Unlock() - err = diskCache.Put(cache.CAS, hash, int64(len(casData)), bytes.NewReader(casData)) + err = diskCache.Put(cache.CAS, hash, int64(len(casData)), bytes.NewReader(casData), nil) if err != nil { t.Error(err) } @@ -157,7 +157,7 @@ func TestEverything(t *testing.T) { var found bool var size int64 - found, size = diskCache.Contains(cache.AC, hash, int64(len(acData))) + found, size = diskCache.Contains(cache.AC, hash, int64(len(acData)), nil) if !found { t.Fatalf("Expected to find AC item %s", hash) } @@ -166,7 +166,7 @@ func TestEverything(t *testing.T) { len(acData), size) } - found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData))) + found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData)), nil) if !found { t.Fatalf("Expected to find CAS item %s", hash) } @@ -180,7 +180,7 @@ func TestEverything(t *testing.T) { var data []byte var rc io.ReadCloser - rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData))) + rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData)), nil) if err != nil { t.Error(err) } @@ -200,7 +200,7 @@ func TestEverything(t *testing.T) { } rc.Close() - rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData))) + rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData)), nil) if err != nil { t.Error(err) } @@ -235,7 +235,7 @@ func TestEverything(t *testing.T) { // Confirm that we can HEAD both values successfully. - found, size = diskCache.Contains(cache.AC, hash, int64(len(acData))) + found, size = diskCache.Contains(cache.AC, hash, int64(len(acData)), nil) if !found { t.Fatalf("Expected to find AC item %s", hash) } @@ -244,7 +244,7 @@ func TestEverything(t *testing.T) { len(acData), size) } - found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData))) + found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData)), nil) if !found { t.Fatalf("Expected to find CAS item %s", hash) } @@ -255,7 +255,7 @@ func TestEverything(t *testing.T) { // Confirm that we can GET both values successfully. - rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData))) + rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData)), nil) if err != nil { t.Error(err) } @@ -275,7 +275,7 @@ func TestEverything(t *testing.T) { } rc.Close() - rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData))) + rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData)), nil) if err != nil { t.Error(err) } diff --git a/cache/metricsdecorator/BUILD.bazel b/cache/metricsdecorator/BUILD.bazel new file mode 100644 index 000000000..9f88c253e --- /dev/null +++ b/cache/metricsdecorator/BUILD.bazel @@ -0,0 +1,18 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "metricsdecorator.go", + ], + importpath = "github.com/buchgr/bazel-remote/cache/metricsdecorator", + visibility = ["//visibility:public"], + deps = [ + "//config:go_default_library", + "//cache:go_default_library", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", + ], +) + diff --git a/cache/metricsdecorator/metricsdecorator.go b/cache/metricsdecorator/metricsdecorator.go new file mode 100644 index 000000000..51130d003 --- /dev/null +++ b/cache/metricsdecorator/metricsdecorator.go @@ -0,0 +1,200 @@ +package metricsdecorator + +// This is a decorator for any implementation of the cache.CasAcCache interface. +// It adds prometheus metrics for the cache requests. +// +// The decorator can report cache miss if AC is found but referenced CAS entries are missing. +// That is possible since GetValidatedActionResult is part of the cache.CasAcCache +// interface, not only the low level Get method. + +import ( + "github.com/buchgr/bazel-remote/cache" + "github.com/buchgr/bazel-remote/config" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "io" + "strings" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" +) + +type metrics struct { + categoryValues map[string]map[string]struct{} + counterIncomingReqs *prometheus.CounterVec + parent cache.CasAcCache +} + +const statusOK = "ok" +const statusNotFound = "notFound" +const statusError = "error" + +const methodGet = "get" +const methodPut = "put" +const methodContains = "contains" + +// TODO add test cases for this file + +func NewMetricsDecorator(config *config.Metrics, parent cache.CasAcCache) cache.CasAcCache { + + labels := []string{"method", "status", "kind"} + categoryValues := make(map[string]map[string]struct{}) + + if config != nil && config.Categories != nil { + for categoryName, allowedValues := range config.Categories { + // Normalize to lower case since canonical for gRPC headers + // and convention for prometheus. + categoryName := strings.ToLower(categoryName) + + // Store allowed category values as set for efficient access + allowedValuesSet := make(map[string]struct{}) + for _, categoryValue := range allowedValues { + allowedValuesSet[categoryValue] = struct{}{} + } + categoryValues[categoryName] = allowedValuesSet + + // Construct a prometheus label for each category. + // Prometheus does not allow changing set of + // labels until next time bazel-remote is + // restarted. + labels = append(labels, categoryName) + } + } + + // For now we only count AC requests, and only the most common status codes, + // becuse: + // + // - No identified use case for others. + // - Limit number of prometheus time series (if many configured categories). + // - Reduce performance overhead of counters (if many configured categories). + // + // But the naming, and the labels, of the counter, are generic to allow + // counting additional requests types or status codes in the future. Without + // having to rename the counter and get issues with non continous history of + // metrics. + + counterIncomingReqs := promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "bazel_remote_incoming_requests_total", + Help: "The number of incoming HTTP and gRPC request. Currently only AC requests", + }, + labels) + + m := &metrics{ + categoryValues: categoryValues, + counterIncomingReqs: counterIncomingReqs, + parent: parent, + } + return m +} + +func (m *metrics) Put(kind cache.EntryKind, hash string, size int64, r io.Reader, context cache.RequestContext) error { + err := m.parent.Put(kind, hash, size, r, context) + + if kind == cache.AC { + var status string + if err != nil { + status = statusError + } else { + status = statusOK + } + m.incrementRequests(kind, methodPut, status, context) + } + + return err +} + +func (m *metrics) Get(kind cache.EntryKind, hash string, size int64, context cache.RequestContext) (io.ReadCloser, int64, error) { + rc, sizeBytes, err := m.parent.Get(kind, hash, size, context) + + if kind == cache.AC { + var status string + if err != nil { + status = statusError + } else if rc == nil { + status = statusNotFound + } else { + status = statusOK + } + m.incrementRequests(kind, methodGet, status, context) + } + + return rc, sizeBytes, err +} + +func (m *metrics) Contains(kind cache.EntryKind, hash string, size int64, context cache.RequestContext) (bool, int64) { + ok, sizeBytes := m.parent.Contains(kind, hash, size, context) + + if kind == cache.AC { + var status string + if ok { + status = statusOK + } else { + status = statusNotFound + } + m.incrementRequests(kind, methodContains, status, context) + } + + return ok, sizeBytes +} + +func (m *metrics) GetValidatedActionResult(hash string, context cache.RequestContext) (*pb.ActionResult, []byte, error) { + ac, data, err := m.parent.GetValidatedActionResult(hash, context) + + var status string + if err != nil { + status = statusError + } else if ac == nil { + status = statusNotFound + } else { + status = statusOK + } + m.incrementRequests(cache.AC, methodGet, status, context) + + return ac, data, err +} + +func getLabelValueFromHeaderValues(headerValues []string, allowedValues map[string]struct{}) string { + if len(headerValues) == 0 { + return "" // No header for this label + } + for _, headerValue := range headerValues { + // Prometheus only allows one value per label. + // Pick the first allowed header value we find. + if _, ok := allowedValues[headerValue]; ok { + return headerValue + } + } + + // The values found in the header has not been listed in + // the configuration file. Represent them as "other". + // + // Listening allowed values is an attempt to avoid polluting + // prometheus with too many different time series. + // + // https://prometheus.io/docs/practices/naming/ warns about: + // + // "CAUTION: Remember that every unique combination of key-value + // label pairs represents a new time series, which can dramatically + // increase the amount of data stored. Do not use labels to store + // dimensions with high cardinality (many different label values), + // such as user IDs, email addresses, or other unbounded sets of + // values." + // + // It would have been nice if bazel-remote could reload the set + // of allowed values from updated configuration file, by + // SIGHUP signal instead of having to restart bazel-remote. + return "other" +} + +func (m *metrics) incrementRequests(kind cache.EntryKind, method string, status string, reqCtx cache.RequestContext) { + labels := make(prometheus.Labels) + labels["method"] = method + labels["status"] = status + labels["kind"] = kind.String() + + for labelName := range m.categoryValues { + labels[labelName] = getLabelValueFromHeaderValues(reqCtx.GetHeader(labelName), m.categoryValues[labelName]) + } + + m.counterIncomingReqs.With(labels).Inc() +} diff --git a/config/config.go b/config/config.go index c9efc2da3..7df906921 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,11 @@ type HTTPBackendConfig struct { BaseURL string `yaml:"url"` } +// Metrics stores configuration for prometheus metrics. +type Metrics struct { + Categories map[string][]string `yaml:"categories"` +} + // Config holds the top-level configuration for bazel-remote. type Config struct { Host string `yaml:"host"` @@ -55,6 +60,7 @@ type Config struct { DisableGRPCACDepsCheck bool `yaml:"disable_grpc_ac_deps_check"` EnableACKeyInstanceMangling bool `yaml:"enable_ac_key_instance_mangling"` EnableEndpointMetrics bool `yaml:"enable_endpoint_metrics"` + Metrics *Metrics `yaml:"metrics"` ExperimentalRemoteAssetAPI bool `yaml:"experimental_remote_asset_api"` HTTPReadTimeout time.Duration `yaml:"http_read_timeout"` HTTPWriteTimeout time.Duration `yaml:"http_write_timeout"` @@ -73,6 +79,7 @@ func New(dir string, maxSize int, host string, port int, grpcPort int, disableGRPCACDepsCheck bool, enableACKeyInstanceMangling bool, enableEndpointMetrics bool, + metrics *Metrics, experimentalRemoteAssetAPI bool, httpReadTimeout time.Duration, httpWriteTimeout time.Duration) (*Config, error) { @@ -95,6 +102,7 @@ func New(dir string, maxSize int, host string, port int, grpcPort int, DisableGRPCACDepsCheck: disableGRPCACDepsCheck, EnableACKeyInstanceMangling: enableACKeyInstanceMangling, EnableEndpointMetrics: enableEndpointMetrics, + Metrics: metrics, ExperimentalRemoteAssetAPI: experimentalRemoteAssetAPI, HTTPReadTimeout: httpReadTimeout, HTTPWriteTimeout: httpWriteTimeout, diff --git a/main.go b/main.go index 98876863d..96cc292a4 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/buchgr/bazel-remote/cache/s3proxy" "github.com/buchgr/bazel-remote/cache/httpproxy" + "github.com/buchgr/bazel-remote/cache/metricsdecorator" "github.com/buchgr/bazel-remote/config" "github.com/buchgr/bazel-remote/server" @@ -283,6 +284,7 @@ func main() { ctx.Bool("disable_grpc_ac_deps_check"), ctx.Bool("enable_ac_key_instance_mangling"), ctx.Bool("enable_endpoint_metrics"), + nil, ctx.Bool("experimental_remote_asset_api"), ctx.Duration("http_read_timeout"), ctx.Duration("http_write_timeout"), @@ -335,6 +337,8 @@ func main() { diskCache := disk.New(c.Dir, int64(c.MaxSize)*1024*1024*1024, proxyCache) + casAcCache := metricsdecorator.NewMetricsDecorator(c.Metrics, diskCache) + mux := http.NewServeMux() httpServer := &http.Server{ Addr: c.Host + ":" + strconv.Itoa(c.Port), @@ -344,8 +348,7 @@ func main() { } validateAC := !c.DisableHTTPACValidation - h := server.NewHTTPCache(diskCache, accessLogger, errorLogger, validateAC, c.EnableACKeyInstanceMangling, gitCommit) - + h := server.NewHTTPCache(casAcCache, diskCache, accessLogger, errorLogger, validateAC, c.EnableACKeyInstanceMangling, gitCommit) var htpasswdSecrets auth.SecretProvider cacheHandler := h.CacheHandler if c.HtpasswdFile != "" { @@ -444,7 +447,7 @@ func main() { validateAC, c.EnableACKeyInstanceMangling, enableRemoteAssetAPI, - diskCache, accessLogger, errorLogger) + casAcCache, accessLogger, errorLogger) if err3 != nil { log.Fatal(err3) } diff --git a/server/grpc.go b/server/grpc.go index a89ec78d6..29fa56e7b 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -8,8 +8,10 @@ import ( "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" + "google.golang.org/grpc/codes" _ "google.golang.org/grpc/encoding/gzip" // Register gzip support. + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" asset "github.com/bazelbuild/remote-apis/build/bazel/remote/asset/v1" @@ -17,8 +19,6 @@ import ( "github.com/bazelbuild/remote-apis/build/bazel/semver" "github.com/buchgr/bazel-remote/cache" - "github.com/buchgr/bazel-remote/cache/disk" - _ "github.com/mostynb/go-grpc-compression/snappy" // Register snappy _ "github.com/mostynb/go-grpc-compression/zstd" // and zstd support. ) @@ -34,7 +34,7 @@ var ( ) type grpcServer struct { - cache *disk.Cache + cache cache.CasAcCache accessLogger cache.Logger errorLogger cache.Logger depsCheck bool @@ -48,7 +48,7 @@ func ListenAndServeGRPC(addr string, opts []grpc.ServerOption, validateACDeps bool, mangleACKeys bool, enableRemoteAssetAPI bool, - c *disk.Cache, a cache.Logger, e cache.Logger) error { + c cache.CasAcCache, a cache.Logger, e cache.Logger) error { listener, err := net.Listen("tcp", addr) if err != nil { @@ -62,7 +62,7 @@ func serveGRPC(l net.Listener, opts []grpc.ServerOption, validateACDepsCheck bool, mangleACKeys bool, enableRemoteAssetAPI bool, - c *disk.Cache, a cache.Logger, e cache.Logger) error { + c cache.CasAcCache, a cache.Logger, e cache.Logger) error { srv := grpc.NewServer(opts...) s := &grpcServer{ @@ -139,3 +139,23 @@ func (s *grpcServer) validateHash(hash string, size int64, logPrefix string) err return nil } + +type reqCtxGrpc struct { + ctx context.Context +} + +func newReqCtxGrpc(ctx context.Context) *reqCtxGrpc { + rc := &reqCtxGrpc{ + ctx: ctx, + } + return rc +} + +func (h *reqCtxGrpc) GetHeader(headerName string) (headerValues []string) { + headers, _ := metadata.FromIncomingContext(h.ctx) // TODO avoid doing for each header? + if headerValues, ok := headers[headerName]; ok { + return headerValues + } else { + return []string{} + } +} diff --git a/server/grpc_ac.go b/server/grpc_ac.go index a92d91bb0..906a92db0 100644 --- a/server/grpc_ac.go +++ b/server/grpc_ac.go @@ -40,6 +40,8 @@ func (s *grpcServer) GetActionResult(ctx context.Context, logPrefix := "GRPC AC GET" + reqCtx := newReqCtxGrpc(ctx) + if s.mangleACKeys { req.ActionDigest.Hash = cache.TransformActionCacheKey(req.ActionDigest.Hash, req.InstanceName, s.accessLogger) } @@ -56,7 +58,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, if !s.depsCheck { logPrefix = "GRPC AC GET NODEPSCHECK" - rdr, sizeBytes, err := s.cache.Get(cache.AC, req.ActionDigest.Hash, unknownActionResultSize) + rdr, sizeBytes, err := s.cache.Get(cache.AC, req.ActionDigest.Hash, unknownActionResultSize, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -85,7 +87,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, return result, nil } - result, _, err := s.cache.GetValidatedActionResult(req.ActionDigest.Hash) + result, _, err := s.cache.GetValidatedActionResult(req.ActionDigest.Hash, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -102,14 +104,14 @@ func (s *grpcServer) GetActionResult(ctx context.Context, var inlinedSoFar int64 err = s.maybeInline(req.InlineStdout, - &result.StdoutRaw, &result.StdoutDigest, &inlinedSoFar) + &result.StdoutRaw, &result.StdoutDigest, &inlinedSoFar, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) } err = s.maybeInline(req.InlineStderr, - &result.StderrRaw, &result.StderrDigest, &inlinedSoFar) + &result.StderrRaw, &result.StderrDigest, &inlinedSoFar, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -121,7 +123,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, } for _, of := range result.GetOutputFiles() { _, ok := inlinableFiles[of.Path] - err = s.maybeInline(ok, &of.Contents, &of.Digest, &inlinedSoFar) + err = s.maybeInline(ok, &of.Contents, &of.Digest, &inlinedSoFar, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -133,7 +135,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, return result, nil } -func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, inlinedSoFar *int64) error { +func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, inlinedSoFar *int64, reqCtx *reqCtxGrpc) error { if (*inlinedSoFar + int64(len(*slice))) > maxInlineSize { inline = false @@ -155,10 +157,10 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, } } - found, _ := s.cache.Contains(cache.CAS, (*digest).Hash, (*digest).SizeBytes) + found, _ := s.cache.Contains(cache.CAS, (*digest).Hash, (*digest).SizeBytes, reqCtx) if !found { err := s.cache.Put(cache.CAS, (*digest).Hash, (*digest).SizeBytes, - bytes.NewReader(*slice)) + bytes.NewReader(*slice), reqCtx) if err != nil { return err } @@ -179,7 +181,7 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, // Otherwise, attempt to inline. if (*digest).SizeBytes > 0 { - data, err := s.getBlobData((*digest).Hash, (*digest).SizeBytes) + data, err := s.getBlobData((*digest).Hash, (*digest).SizeBytes, reqCtx) if err != nil { return err } @@ -193,6 +195,8 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, func (s *grpcServer) UpdateActionResult(ctx context.Context, req *pb.UpdateActionResultRequest) (*pb.ActionResult, error) { + var reqCtx cache.RequestContext + logPrefix := "GRPC AC PUT" err := s.validateHash(req.ActionDigest.Hash, req.ActionDigest.SizeBytes, logPrefix) if err != nil { @@ -215,7 +219,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.AC, req.ActionDigest.Hash, - int64(len(data)), bytes.NewReader(data)) + int64(len(data)), bytes.NewReader(data), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Internal, err.Error()) @@ -238,7 +242,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.CAS, f.Digest.Hash, - f.Digest.SizeBytes, bytes.NewReader(f.Contents)) + f.Digest.SizeBytes, bytes.NewReader(f.Contents), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) @@ -260,7 +264,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.CAS, hash, sizeBytes, - bytes.NewReader(req.ActionResult.StdoutRaw)) + bytes.NewReader(req.ActionResult.StdoutRaw), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) @@ -281,7 +285,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.CAS, hash, sizeBytes, - bytes.NewReader(req.ActionResult.StderrRaw)) + bytes.NewReader(req.ActionResult.StderrRaw), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) diff --git a/server/grpc_asset.go b/server/grpc_asset.go index 09351f764..d1153ea18 100644 --- a/server/grpc_asset.go +++ b/server/grpc_asset.go @@ -48,6 +48,8 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) // key -> CAS sha256 + timestamp // Should we place a limit on the size of the index? + reqCtx := newReqCtxGrpc(ctx) + for _, q := range req.GetQualifiers() { if q.Name == "checksum.sri" && strings.HasPrefix(q.Value, "sha256-") { // Ref: https://developer.mozilla.org/en-US/docs/Web/Security/Subresource_Integrity @@ -63,14 +65,14 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) sha256Str = hex.EncodeToString(decoded) - found, size := s.cache.Contains(cache.CAS, sha256Str, -1) + found, size := s.cache.Contains(cache.CAS, sha256Str, -1, reqCtx) if !found { continue } if size < 0 { // We don't know the size yet (bad http backend?). - r, size, err := s.cache.Get(cache.CAS, sha256Str, -1) + r, size, err := s.cache.Get(cache.CAS, sha256Str, -1, reqCtx) if r != nil { defer r.Close() } @@ -94,7 +96,7 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) // Cache miss. See if we can download one of the URIs. for _, uri := range req.GetUris() { - ok, actualHash, size := s.fetchItem(uri, sha256Str) + ok, actualHash, size := s.fetchItem(uri, sha256Str, reqCtx) if ok { return &asset.FetchBlobResponse{ Status: &status.Status{Code: int32(codes.OK)}, @@ -114,7 +116,7 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) }, nil } -func (s *grpcServer) fetchItem(uri string, expectedHash string) (bool, string, int64) { +func (s *grpcServer) fetchItem(uri string, expectedHash string, reqCtx *reqCtxGrpc) (bool, string, int64) { u, err := url.Parse(uri) if err != nil { s.errorLogger.Printf("unable to parse URI: %s err: %v", uri, err) @@ -163,7 +165,7 @@ func (s *grpcServer) fetchItem(uri string, expectedHash string) (bool, string, i rc = ioutil.NopCloser(bytes.NewReader(data)) } - err = s.cache.Put(cache.CAS, expectedHash, expectedSize, rc) + err = s.cache.Put(cache.CAS, expectedHash, expectedSize, rc, reqCtx) if err != nil { s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err) return false, "", int64(-1) diff --git a/server/grpc_bytestream.go b/server/grpc_bytestream.go index b28fe9b13..a2683f8f4 100644 --- a/server/grpc_bytestream.go +++ b/server/grpc_bytestream.go @@ -94,7 +94,7 @@ func (s *grpcServer) Read(req *bytestream.ReadRequest, return status.Error(codes.OutOfRange, msg) } - rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size) + rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size, newReqCtxGrpc(resp.Context())) if err != nil { msg := fmt.Sprintf("GRPC BYTESTREAM READ FAILED: %v", err) s.accessLogger.Printf(msg) @@ -237,6 +237,8 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { recvResult := make(chan error) resourceNameChan := make(chan string, 1) + reqCtx := newReqCtxGrpc(srv.Context()) + go func() { firstIteration := true var resourceName string @@ -279,7 +281,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { return } - exists, _ := s.cache.Contains(cache.CAS, hash, size) + exists, _ := s.cache.Contains(cache.CAS, hash, size, reqCtx) if exists { // Blob already exists, return without writing anything. resp.CommittedSize = size @@ -295,7 +297,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { } go func() { - putResult <- s.cache.Put(cache.CAS, hash, size, pr) + putResult <- s.cache.Put(cache.CAS, hash, size, pr, reqCtx) }() firstIteration = false diff --git a/server/grpc_cas.go b/server/grpc_cas.go index 7e9dcc0c6..b8e5f7a2d 100644 --- a/server/grpc_cas.go +++ b/server/grpc_cas.go @@ -37,7 +37,7 @@ func (s *grpcServer) FindMissingBlobs(ctx context.Context, return nil, err } - found, _ := s.cache.Contains(cache.CAS, hash, digest.GetSizeBytes()) + found, _ := s.cache.Contains(cache.CAS, hash, digest.GetSizeBytes(), newReqCtxGrpc(ctx)) if !found { s.accessLogger.Printf("GRPC CAS HEAD %s NOT FOUND", hash) resp.MissingBlobDigests = append(resp.MissingBlobDigests, digest) @@ -75,7 +75,7 @@ func (s *grpcServer) BatchUpdateBlobs(ctx context.Context, resp.Responses = append(resp.Responses, &rr) err = s.cache.Put(cache.CAS, req.Digest.Hash, - int64(len(req.Data)), bytes.NewReader(req.Data)) + int64(len(req.Data)), bytes.NewReader(req.Data), newReqCtxGrpc(ctx)) if err != nil { s.errorLogger.Printf("%s %s %s", errorPrefix, req.Digest.Hash, err) rr.Status.Code = int32(code.Code_UNKNOWN) @@ -91,7 +91,7 @@ func (s *grpcServer) BatchUpdateBlobs(ctx context.Context, // Return the data for a blob, or an error. If the blob was not // found, the returned error is errBlobNotFound. Only use this // function when it's OK to buffer the entire blob in memory. -func (s *grpcServer) getBlobData(hash string, size int64) ([]byte, error) { +func (s *grpcServer) getBlobData(hash string, size int64, reqCtx *reqCtxGrpc) ([]byte, error) { if size < 0 { return []byte{}, errBadSize } @@ -100,7 +100,7 @@ func (s *grpcServer) getBlobData(hash string, size int64) ([]byte, error) { return []byte{}, nil } - rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size) + rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size, reqCtx) if err != nil { rdr.Close() return []byte{}, err @@ -124,10 +124,10 @@ func (s *grpcServer) getBlobData(hash string, size int64) ([]byte, error) { return data, rdr.Close() } -func (s *grpcServer) getBlobResponse(digest *pb.Digest) *pb.BatchReadBlobsResponse_Response { +func (s *grpcServer) getBlobResponse(digest *pb.Digest, reqCtx *reqCtxGrpc) *pb.BatchReadBlobsResponse_Response { r := pb.BatchReadBlobsResponse_Response{Digest: digest} - data, err := s.getBlobData(digest.Hash, digest.SizeBytes) + data, err := s.getBlobData(digest.Hash, digest.SizeBytes, reqCtx) if err == errBlobNotFound { s.accessLogger.Printf("GRPC CAS GET %s NOT FOUND", digest.Hash) r.Status = &status.Status{Code: int32(code.Code_NOT_FOUND)} @@ -163,7 +163,7 @@ func (s *grpcServer) BatchReadBlobs(ctx context.Context, if err != nil { return nil, err } - resp.Responses = append(resp.Responses, s.getBlobResponse(digest)) + resp.Responses = append(resp.Responses, s.getBlobResponse(digest, newReqCtxGrpc(ctx))) } return &resp, nil @@ -172,6 +172,8 @@ func (s *grpcServer) BatchReadBlobs(ctx context.Context, func (s *grpcServer) GetTree(in *pb.GetTreeRequest, stream pb.ContentAddressableStorage_GetTreeServer) error { + reqCtx := newReqCtxGrpc(stream.Context()) + resp := pb.GetTreeResponse{ Directories: make([]*pb.Directory, 0), } @@ -181,7 +183,7 @@ func (s *grpcServer) GetTree(in *pb.GetTreeRequest, return err } - data, err := s.getBlobData(in.RootDigest.Hash, in.RootDigest.SizeBytes) + data, err := s.getBlobData(in.RootDigest.Hash, in.RootDigest.SizeBytes, reqCtx) if err == errBlobNotFound { s.accessLogger.Printf("GRPC CAS GETTREEREQUEST %s NOT FOUND", in.RootDigest.Hash) @@ -199,7 +201,7 @@ func (s *grpcServer) GetTree(in *pb.GetTreeRequest, return grpc_status.Error(codes.DataLoss, err.Error()) } - err = s.fillDirectories(&resp, &dir, errorPrefix) + err = s.fillDirectories(&resp, &dir, errorPrefix, reqCtx) if err != nil { return err } @@ -214,7 +216,7 @@ func (s *grpcServer) GetTree(in *pb.GetTreeRequest, // Attempt to populate `resp`. Return errors for invalid requests, but // otherwise attempt to return as many blobs as possible. -func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory, errorPrefix string) error { +func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory, errorPrefix string, reqCtx *reqCtxGrpc) error { // Add this dir. resp.Directories = append(resp.Directories, dir) @@ -227,7 +229,7 @@ func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory return err } - data, err := s.getBlobData(dirNode.Digest.Hash, dirNode.Digest.SizeBytes) + data, err := s.getBlobData(dirNode.Digest.Hash, dirNode.Digest.SizeBytes, reqCtx) if err == errBlobNotFound { s.accessLogger.Printf("GRPC GETTREEREQUEST BLOB %s NOT FOUND", dirNode.Digest.Hash) @@ -248,7 +250,7 @@ func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory s.accessLogger.Printf("GRPC GETTREEREQUEST BLOB %s ADDED OK", dirNode.Digest.Hash) - err = s.fillDirectories(resp, &dirMsg, errorPrefix) + err = s.fillDirectories(resp, &dirMsg, errorPrefix, reqCtx) if err != nil { return err } diff --git a/server/grpc_test.go b/server/grpc_test.go index 89eff7b9c..da52fa304 100644 --- a/server/grpc_test.go +++ b/server/grpc_test.go @@ -575,7 +575,7 @@ func TestGrpcByteStreamDeadline(t *testing.T) { t.Fatal(err) } - _, sz, err := diskCache.Get(cache.CAS, testBlobHash, testBlobSize) + _, sz, err := diskCache.Get(cache.CAS, testBlobHash, testBlobSize, nil) if err != nil { t.Fatalf("get error: %v\n", err) } diff --git a/server/http.go b/server/http.go index 8e336a742..056c60a4d 100644 --- a/server/http.go +++ b/server/http.go @@ -16,7 +16,6 @@ import ( pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/buchgr/bazel-remote/cache" - "github.com/buchgr/bazel-remote/cache/disk" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" ) @@ -30,7 +29,8 @@ type HTTPCache interface { } type httpCache struct { - cache *disk.Cache + cache cache.CasAcCache + stats cache.Stats accessLogger cache.Logger errorLogger cache.Logger validateAC bool @@ -51,14 +51,14 @@ type statusPageData struct { // accessLogger will print one line for each HTTP request to stdout. // errorLogger will print unexpected server errors. Inexistent files and malformed URLs will not // be reported. -func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, commit string) HTTPCache { - - _, _, numItems := cache.Stats() +func NewHTTPCache(cache cache.CasAcCache, stats cache.Stats, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, commit string) HTTPCache { + _, _, numItems := stats.Stats() errorLogger.Printf("Loaded %d existing disk cache items.", numItems) hc := &httpCache{ cache: cache, + stats: stats, accessLogger: accessLogger, errorLogger: errorLogger, validateAC: validateAC, @@ -102,8 +102,8 @@ func parseRequestURL(url string, validateAC bool) (kind cache.EntryKind, hash st return cache.RAW, hash, instance, nil } -func (h *httpCache) handleContainsValidAC(w http.ResponseWriter, r *http.Request, hash string) { - _, data, err := h.cache.GetValidatedActionResult(hash) +func (h *httpCache) handleContainsValidAC(w http.ResponseWriter, r *http.Request, hash string, reqContext cache.RequestContext) { + _, data, err := h.cache.GetValidatedActionResult(hash, reqContext) if err != nil { http.Error(w, "Not found", http.StatusNotFound) h.logResponse(http.StatusNotFound, r) @@ -121,8 +121,8 @@ func (h *httpCache) handleContainsValidAC(w http.ResponseWriter, r *http.Request h.logResponse(http.StatusOK, r) } -func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, hash string) { - _, data, err := h.cache.GetValidatedActionResult(hash) +func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, hash string, reqContext cache.RequestContext) { + _, data, err := h.cache.GetValidatedActionResult(hash, reqContext) if err != nil { http.Error(w, "Not found", http.StatusNotFound) h.logResponse(http.StatusNotFound, r) @@ -151,6 +151,7 @@ func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, has return } + h.logResponse(http.StatusOK, r) return } @@ -167,6 +168,8 @@ func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, has h.logResponse(http.StatusInternalServerError, r) return } + + h.logResponse(http.StatusOK, r) } // Helper function for logging responses @@ -184,6 +187,8 @@ func (h *httpCache) logResponse(code int, r *http.Request) { func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() + reqContext := newReqContextHttp(r) + kind, hash, instance, err := parseRequestURL(r.URL.Path, h.validateAC) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -199,11 +204,11 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { case http.MethodGet: if h.validateAC && kind == cache.AC { - h.handleGetValidAC(w, r, hash) + h.handleGetValidAC(w, r, hash, reqContext) return } - rdr, sizeBytes, err := h.cache.Get(kind, hash, -1) + rdr, sizeBytes, err := h.cache.Get(kind, hash, -1, reqContext) if err != nil { if e, ok := err.(*cache.Error); ok { http.Error(w, e.Error(), e.Code) @@ -280,7 +285,7 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { rc = ioutil.NopCloser(bytes.NewReader(data)) } - err := h.cache.Put(kind, hash, contentLength, rc) + err := h.cache.Put(kind, hash, contentLength, rc, reqContext) if err != nil { if cerr, ok := err.(*cache.Error); ok { http.Error(w, err.Error(), cerr.Code) @@ -295,19 +300,18 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { case http.MethodHead: if h.validateAC && kind == cache.AC { - h.handleContainsValidAC(w, r, hash) + h.handleContainsValidAC(w, r, hash, reqContext) return } // Unvalidated path: - ok, size := h.cache.Contains(kind, hash, -1) + ok, size := h.cache.Contains(kind, hash, -1, reqContext) if !ok { http.Error(w, "Not found", http.StatusNotFound) h.logResponse(http.StatusNotFound, r) return } - w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) w.WriteHeader(http.StatusOK) h.logResponse(http.StatusOK, r) @@ -358,13 +362,13 @@ func addWorkerMetadataHTTP(addr string, ct string, orig []byte) (data []byte, co func (h *httpCache) StatusPageHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() - totalSize, reservedSize, numItems := h.cache.Stats() + totalSize, reservedSize, numItems := h.stats.Stats() w.Header().Set("Content-Type", "application/json") enc := json.NewEncoder(w) enc.SetIndent("", " ") enc.Encode(statusPageData{ - MaxSize: h.cache.MaxSize(), + MaxSize: h.stats.MaxSize(), CurrSize: totalSize, ReservedSize: reservedSize, NumFiles: numItems, @@ -376,3 +380,23 @@ func (h *httpCache) StatusPageHandler(w http.ResponseWriter, r *http.Request) { func path(kind cache.EntryKind, hash string) string { return fmt.Sprintf("/%s/%s", kind, hash) } + +type reqCtxHttp struct { + request *http.Request +} + +func newReqContextHttp(request *http.Request) *reqCtxHttp { + rc := &reqCtxHttp{ + request: request, + } + return rc +} + +func (h *reqCtxHttp) GetHeader(headerName string) (headerValues []string) { + headerName = strings.Title(headerName) + if headerValues, ok := h.request.Header[headerName]; ok { + return headerValues + } else { + return []string{} + } +} diff --git a/server/http_test.go b/server/http_test.go index 104b79093..524585af7 100644 --- a/server/http_test.go +++ b/server/http_test.go @@ -36,8 +36,7 @@ func TestDownloadFile(t *testing.T) { } c := disk.New(cacheDir, blobSize, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") - + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") req, err := http.NewRequest("GET", "/cas/"+hash, bytes.NewReader([]byte{})) if err != nil { t.Fatal(err) @@ -99,7 +98,7 @@ func TestUploadFilesConcurrently(t *testing.T) { } c := disk.New(cacheDir, 1000*1024, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") handler := http.HandlerFunc(h.CacheHandler) var wg sync.WaitGroup @@ -157,7 +156,7 @@ func TestUploadSameFileConcurrently(t *testing.T) { numWorkers := 100 c := disk.New(cacheDir, int64(len(data)*numWorkers), nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") handler := http.HandlerFunc(h.CacheHandler) var wg sync.WaitGroup @@ -203,7 +202,7 @@ func TestUploadCorruptedFile(t *testing.T) { } c := disk.New(cacheDir, 2048, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.CacheHandler) handler.ServeHTTP(rr, r) @@ -245,7 +244,7 @@ func TestUploadEmptyActionResult(t *testing.T) { c := disk.New(cacheDir, 2048, nil) validate := true mangle := false - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.CacheHandler) handler.ServeHTTP(rr, r) @@ -302,7 +301,7 @@ func testEmptyBlobAvailable(t *testing.T, method string) { c := disk.New(cacheDir, 2048, nil) validate := true mangle := false - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.CacheHandler) handler.ServeHTTP(rr, r) @@ -325,7 +324,7 @@ func TestStatusPage(t *testing.T) { } c := disk.New(cacheDir, 2048, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.StatusPageHandler) handler.ServeHTTP(rr, r) @@ -466,7 +465,7 @@ func TestRemoteReturnsNotFound(t *testing.T) { defer os.RemoveAll(cacheDir) emptyCache := disk.New(cacheDir, 1024, nil) - h := NewHTTPCache(emptyCache, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(emptyCache, emptyCache, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") // create a fake http.Request _, hash := testutils.RandomDataAndHash(1024) url, _ := url.Parse(fmt.Sprintf("http://localhost:8080/ac/%s", hash)) From 7f54961e28bb44a2d9594d99af80b022a05f0011 Mon Sep 17 00:00:00 2001 From: Ulrik Falklof Date: Sun, 27 Sep 2020 16:04:45 +0200 Subject: [PATCH 2/2] refactor cache interfaces --- cache/cache.go | 21 +++++++++++++-------- cache/disk/disk.go | 3 +++ cache/metricsdecorator/metricsdecorator.go | 16 ++++++++++------ server/grpc.go | 6 +++--- server/http.go | 4 ++-- 5 files changed, 31 insertions(+), 19 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index acf146ae8..e99c8037d 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -68,30 +68,35 @@ type RequestContext interface { } // TODO Document interface -type CasAcCache interface { +type BlobStore interface { // TODO change to io.ReadCloser? Put(kind EntryKind, hash string, size int64, rdr io.Reader, reqCtx RequestContext) error Get(kind EntryKind, hash string, size int64, reqCtx RequestContext) (io.ReadCloser, int64, error) Contains(kind EntryKind, hash string, size int64, reqCtx RequestContext) (bool, int64) +} +// TODO Document interface +type AcStore interface { GetValidatedActionResult(hash string, reqCtx RequestContext) (*pb.ActionResult, []byte, error) } +// TODO Document interface +type BlobAcStore interface { + BlobStore + AcStore +} + // TODO Document interface type Stats interface { Stats() (totalSize int64, reservedSize int64, numItems int) MaxSize() int64 } -// TODO Should the proxy interface also be extended with RequestContext parameter? To allow -// for example forwarding of custom headers from client to proxy, or support for HTTP -// headers like Max-Forwards. - -// TODO Could the disk and proxies implement same interface? But proxies are not supporting -// GetValidatedActionResult and that method is important to have in the interface -// for cache.metricdecorator. +// TODO Could the proxies implement the BlobStore interface instead? And remove Proxy interface? +// Having access to the original headers would allow new use cases such as forwarding of +// custom headers from client via proxy, or support for HTTP headers like Max-Forwards. // Proxy is the interface that (optional) proxy backends must implement. // Implementations are expected to be safe for concurrent use. diff --git a/cache/disk/disk.go b/cache/disk/disk.go index bc2bb6976..53b1b8978 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -649,6 +649,9 @@ func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string { // value from the CAS if it and all its dependencies are also available. If // not, nil values are returned. If something unexpected went wrong, return // an error. +// TODO Consider separating implementation of cache.AcStore interface, to open up +// possibilities combining that functionality with proxies, and also allow +// bazel-remote configurations with proxy but no local disk storage? func (c *Cache) GetValidatedActionResult(hash string, reqCtx cache.RequestContext) (*pb.ActionResult, []byte, error) { rc, sizeBytes, err := c.Get(cache.AC, hash, -1, reqCtx) diff --git a/cache/metricsdecorator/metricsdecorator.go b/cache/metricsdecorator/metricsdecorator.go index 51130d003..74f1bfc1e 100644 --- a/cache/metricsdecorator/metricsdecorator.go +++ b/cache/metricsdecorator/metricsdecorator.go @@ -1,12 +1,16 @@ package metricsdecorator -// This is a decorator for any implementation of the cache.CasAcCache interface. +// This is a decorator for any implementation of the cache.BlobAcStore interface. // It adds prometheus metrics for the cache requests. // // The decorator can report cache miss if AC is found but referenced CAS entries are missing. -// That is possible since GetValidatedActionResult is part of the cache.CasAcCache -// interface, not only the low level Get method. - +// That is possible since metricsdecorator supports GetValidatedActionResult in the +// cache.BlobAcStore interface. +// +// TODO Consider allow using a metricsdecorator also for pure cache.BlobStore interfaces, +// in order to replace the current prometheus counters in the proxies? That would +// probably require better support for non AC requests in metricsdecorator and configurable +// counter name. import ( "github.com/buchgr/bazel-remote/cache" "github.com/buchgr/bazel-remote/config" @@ -21,7 +25,7 @@ import ( type metrics struct { categoryValues map[string]map[string]struct{} counterIncomingReqs *prometheus.CounterVec - parent cache.CasAcCache + parent cache.BlobAcStore } const statusOK = "ok" @@ -34,7 +38,7 @@ const methodContains = "contains" // TODO add test cases for this file -func NewMetricsDecorator(config *config.Metrics, parent cache.CasAcCache) cache.CasAcCache { +func NewMetricsDecorator(config *config.Metrics, parent cache.BlobAcStore) cache.BlobAcStore { labels := []string{"method", "status", "kind"} categoryValues := make(map[string]map[string]struct{}) diff --git a/server/grpc.go b/server/grpc.go index 29fa56e7b..6cdd22285 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -34,7 +34,7 @@ var ( ) type grpcServer struct { - cache cache.CasAcCache + cache cache.BlobAcStore accessLogger cache.Logger errorLogger cache.Logger depsCheck bool @@ -48,7 +48,7 @@ func ListenAndServeGRPC(addr string, opts []grpc.ServerOption, validateACDeps bool, mangleACKeys bool, enableRemoteAssetAPI bool, - c cache.CasAcCache, a cache.Logger, e cache.Logger) error { + c cache.BlobAcStore, a cache.Logger, e cache.Logger) error { listener, err := net.Listen("tcp", addr) if err != nil { @@ -62,7 +62,7 @@ func serveGRPC(l net.Listener, opts []grpc.ServerOption, validateACDepsCheck bool, mangleACKeys bool, enableRemoteAssetAPI bool, - c cache.CasAcCache, a cache.Logger, e cache.Logger) error { + c cache.BlobAcStore, a cache.Logger, e cache.Logger) error { srv := grpc.NewServer(opts...) s := &grpcServer{ diff --git a/server/http.go b/server/http.go index 056c60a4d..8f8575d84 100644 --- a/server/http.go +++ b/server/http.go @@ -29,7 +29,7 @@ type HTTPCache interface { } type httpCache struct { - cache cache.CasAcCache + cache cache.BlobAcStore stats cache.Stats accessLogger cache.Logger errorLogger cache.Logger @@ -51,7 +51,7 @@ type statusPageData struct { // accessLogger will print one line for each HTTP request to stdout. // errorLogger will print unexpected server errors. Inexistent files and malformed URLs will not // be reported. -func NewHTTPCache(cache cache.CasAcCache, stats cache.Stats, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, commit string) HTTPCache { +func NewHTTPCache(cache cache.BlobAcStore, stats cache.Stats, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, commit string) HTTPCache { _, _, numItems := stats.Stats() errorLogger.Printf("Loaded %d existing disk cache items.", numItems)