Support overwriting caches (#2265)
* feat: support overwrite caches * test: fix case * test: fix get_with_multiple_keys * chore: use atomic.Bool * test: improve get_with_multiple_keys * chore: use ping to improve path * fix: wrong CompareAndSwap * test: TestHandler_gcCache * chore: lint code * chore: lint code
This commit is contained in:
parent
f56dd65ff6
commit
b9382a2c4e
|
@ -35,7 +35,7 @@ type Handler struct {
|
|||
server *http.Server
|
||||
logger logrus.FieldLogger
|
||||
|
||||
gcing int32 // TODO: use atomic.Bool when we can use Go 1.19
|
||||
gcing atomic.Bool
|
||||
gcAt time.Time
|
||||
|
||||
outboundIP string
|
||||
|
@ -170,7 +170,7 @@ func (h *Handler) find(w http.ResponseWriter, r *http.Request, _ httprouter.Para
|
|||
}
|
||||
defer db.Close()
|
||||
|
||||
cache, err := h.findCache(db, keys, version)
|
||||
cache, err := findCache(db, keys, version)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 500, err)
|
||||
return
|
||||
|
@ -206,32 +206,17 @@ func (h *Handler) reserve(w http.ResponseWriter, r *http.Request, _ httprouter.P
|
|||
api.Key = strings.ToLower(api.Key)
|
||||
|
||||
cache := api.ToCache()
|
||||
cache.FillKeyVersionHash()
|
||||
db, err := h.openDB()
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 500, err)
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
||||
if !errors.Is(err, bolthold.ErrNotFound) {
|
||||
h.responseJSON(w, r, 500, err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
h.responseJSON(w, r, 400, fmt.Errorf("already exist"))
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
cache.CreatedAt = now
|
||||
cache.UsedAt = now
|
||||
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
||||
h.responseJSON(w, r, 500, err)
|
||||
return
|
||||
}
|
||||
// write back id to db
|
||||
if err := db.Update(cache.ID, cache); err != nil {
|
||||
if err := insertCache(db, cache); err != nil {
|
||||
h.responseJSON(w, r, 500, err)
|
||||
return
|
||||
}
|
||||
|
@ -364,56 +349,40 @@ func (h *Handler) middleware(handler httprouter.Handle) httprouter.Handle {
|
|||
}
|
||||
|
||||
// if not found, return (nil, nil) instead of an error.
|
||||
func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) {
|
||||
if len(keys) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
key := keys[0] // the first key is for exact match.
|
||||
|
||||
cache := &Cache{
|
||||
Key: key,
|
||||
Version: version,
|
||||
}
|
||||
cache.FillKeyVersionHash()
|
||||
|
||||
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
|
||||
if !errors.Is(err, bolthold.ErrNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
} else if cache.Complete {
|
||||
return cache, nil
|
||||
}
|
||||
stop := fmt.Errorf("stop")
|
||||
|
||||
func findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) {
|
||||
cache := &Cache{}
|
||||
for _, prefix := range keys {
|
||||
found := false
|
||||
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
|
||||
re, err := regexp.Compile(prefixPattern)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error {
|
||||
if !strings.HasPrefix(v.Key, prefix) {
|
||||
return stop
|
||||
}
|
||||
if v.Complete {
|
||||
cache = v
|
||||
found = true
|
||||
return stop
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
if !errors.Is(err, stop) {
|
||||
return nil, err
|
||||
if err := db.FindOne(cache,
|
||||
bolthold.Where("Key").RegExp(re).
|
||||
And("Version").Eq(version).
|
||||
And("Complete").Eq(true).
|
||||
SortBy("CreatedAt").Reverse()); err != nil {
|
||||
if errors.Is(err, bolthold.ErrNotFound) {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("find cache: %w", err)
|
||||
}
|
||||
if found {
|
||||
return cache, nil
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func insertCache(db *bolthold.Store, cache *Cache) error {
|
||||
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
||||
return fmt.Errorf("insert cache: %w", err)
|
||||
}
|
||||
// write back id to db
|
||||
if err := db.Update(cache.ID, cache); err != nil {
|
||||
return fmt.Errorf("write back id to db: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) useCache(id int64) {
|
||||
db, err := h.openDB()
|
||||
if err != nil {
|
||||
|
@ -428,14 +397,21 @@ func (h *Handler) useCache(id int64) {
|
|||
_ = db.Update(cache.ID, cache)
|
||||
}
|
||||
|
||||
const (
|
||||
keepUsed = 30 * 24 * time.Hour
|
||||
keepUnused = 7 * 24 * time.Hour
|
||||
keepTemp = 5 * time.Minute
|
||||
keepOld = 5 * time.Minute
|
||||
)
|
||||
|
||||
func (h *Handler) gcCache() {
|
||||
if atomic.LoadInt32(&h.gcing) != 0 {
|
||||
if h.gcing.Load() {
|
||||
return
|
||||
}
|
||||
if !atomic.CompareAndSwapInt32(&h.gcing, 0, 1) {
|
||||
if !h.gcing.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
defer atomic.StoreInt32(&h.gcing, 0)
|
||||
defer h.gcing.Store(false)
|
||||
|
||||
if time.Since(h.gcAt) < time.Hour {
|
||||
h.logger.Debugf("skip gc: %v", h.gcAt.String())
|
||||
|
@ -444,37 +420,18 @@ func (h *Handler) gcCache() {
|
|||
h.gcAt = time.Now()
|
||||
h.logger.Debugf("gc: %v", h.gcAt.String())
|
||||
|
||||
const (
|
||||
keepUsed = 30 * 24 * time.Hour
|
||||
keepUnused = 7 * 24 * time.Hour
|
||||
keepTemp = 5 * time.Minute
|
||||
)
|
||||
|
||||
db, err := h.openDB()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Remove the caches which are not completed for a while, they are most likely to be broken.
|
||||
var caches []*Cache
|
||||
if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil {
|
||||
h.logger.Warnf("find caches: %v", err)
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
if cache.Complete {
|
||||
continue
|
||||
}
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil {
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
|
||||
And("Complete").Eq(false),
|
||||
); err != nil {
|
||||
h.logger.Warnf("find caches: %v", err)
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
|
@ -487,8 +444,11 @@ func (h *Handler) gcCache() {
|
|||
}
|
||||
}
|
||||
|
||||
// Remove the old caches which have not been used recently.
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil {
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
|
||||
); err != nil {
|
||||
h.logger.Warnf("find caches: %v", err)
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
|
@ -500,6 +460,55 @@ func (h *Handler) gcCache() {
|
|||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches which are too old.
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
|
||||
); err != nil {
|
||||
h.logger.Warnf("find caches: %v", err)
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches with the same key and version, keep the latest one.
|
||||
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
|
||||
if results, err := db.FindAggregate(
|
||||
&Cache{},
|
||||
bolthold.Where("Complete").Eq(true),
|
||||
"Key", "Version",
|
||||
); err != nil {
|
||||
h.logger.Warnf("find aggregate caches: %v", err)
|
||||
} else {
|
||||
for _, result := range results {
|
||||
if result.Count() <= 1 {
|
||||
continue
|
||||
}
|
||||
result.Sort("CreatedAt")
|
||||
caches = caches[:0]
|
||||
result.Reduction(&caches)
|
||||
for _, cache := range caches[:len(caches)-1] {
|
||||
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
|
||||
// Keep it since it has been used recently, even if it's old.
|
||||
// Or it could break downloading in process.
|
||||
continue
|
||||
}
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
|
||||
|
|
|
@ -10,9 +10,11 @@ import (
|
|||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/timshannon/bolthold"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
|
@ -78,6 +80,9 @@ func TestHandler(t *testing.T) {
|
|||
t.Run("duplicate reserve", func(t *testing.T) {
|
||||
key := strings.ToLower(t.Name())
|
||||
version := "c19da02a2bd7e77277f1ac29ab45c09b7d46a4ee758284e26bb3045ad11d9d20"
|
||||
var first, second struct {
|
||||
CacheID uint64 `json:"cacheId"`
|
||||
}
|
||||
{
|
||||
body, err := json.Marshal(&Request{
|
||||
Key: key,
|
||||
|
@ -89,10 +94,8 @@ func TestHandler(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
|
||||
got := struct {
|
||||
CacheID uint64 `json:"cacheId"`
|
||||
}{}
|
||||
require.NoError(t, json.NewDecoder(resp.Body).Decode(&got))
|
||||
require.NoError(t, json.NewDecoder(resp.Body).Decode(&first))
|
||||
assert.NotZero(t, first.CacheID)
|
||||
}
|
||||
{
|
||||
body, err := json.Marshal(&Request{
|
||||
|
@ -103,8 +106,13 @@ func TestHandler(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
resp, err := http.Post(fmt.Sprintf("%s/caches", base), "application/json", bytes.NewReader(body))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 400, resp.StatusCode)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
|
||||
require.NoError(t, json.NewDecoder(resp.Body).Decode(&second))
|
||||
assert.NotZero(t, second.CacheID)
|
||||
}
|
||||
|
||||
assert.NotEqual(t, first.CacheID, second.CacheID)
|
||||
})
|
||||
|
||||
t.Run("upload with bad id", func(t *testing.T) {
|
||||
|
@ -341,9 +349,9 @@ func TestHandler(t *testing.T) {
|
|||
version := "c19da02a2bd7e77277f1ac29ab45c09b7d46a4ee758284e26bb3045ad11d9d20"
|
||||
key := strings.ToLower(t.Name())
|
||||
keys := [3]string{
|
||||
key + "_a",
|
||||
key + "_a_b",
|
||||
key + "_a_b_c",
|
||||
key + "_a_b",
|
||||
key + "_a",
|
||||
}
|
||||
contents := [3][]byte{
|
||||
make([]byte, 100),
|
||||
|
@ -354,6 +362,7 @@ func TestHandler(t *testing.T) {
|
|||
_, err := rand.Read(contents[i])
|
||||
require.NoError(t, err)
|
||||
uploadCacheNormally(t, base, keys[i], version, contents[i])
|
||||
time.Sleep(time.Second) // ensure CreatedAt of caches are different
|
||||
}
|
||||
|
||||
reqKeys := strings.Join([]string{
|
||||
|
@ -361,29 +370,33 @@ func TestHandler(t *testing.T) {
|
|||
key + "_a_b",
|
||||
key + "_a",
|
||||
}, ",")
|
||||
var archiveLocation string
|
||||
{
|
||||
resp, err := http.Get(fmt.Sprintf("%s/cache?keys=%s&version=%s", base, reqKeys, version))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 200, resp.StatusCode)
|
||||
got := struct {
|
||||
Result string `json:"result"`
|
||||
ArchiveLocation string `json:"archiveLocation"`
|
||||
CacheKey string `json:"cacheKey"`
|
||||
}{}
|
||||
require.NoError(t, json.NewDecoder(resp.Body).Decode(&got))
|
||||
assert.Equal(t, "hit", got.Result)
|
||||
assert.Equal(t, keys[1], got.CacheKey)
|
||||
archiveLocation = got.ArchiveLocation
|
||||
}
|
||||
{
|
||||
resp, err := http.Get(archiveLocation) //nolint:gosec
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 200, resp.StatusCode)
|
||||
got, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, contents[1], got)
|
||||
}
|
||||
|
||||
resp, err := http.Get(fmt.Sprintf("%s/cache?keys=%s&version=%s", base, reqKeys, version))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 200, resp.StatusCode)
|
||||
|
||||
/*
|
||||
Expect `key_a_b` because:
|
||||
- `key_a_b_x" doesn't match any caches.
|
||||
- `key_a_b" matches `key_a_b` and `key_a_b_c`, but `key_a_b` is newer.
|
||||
*/
|
||||
except := 1
|
||||
|
||||
got := struct {
|
||||
Result string `json:"result"`
|
||||
ArchiveLocation string `json:"archiveLocation"`
|
||||
CacheKey string `json:"cacheKey"`
|
||||
}{}
|
||||
require.NoError(t, json.NewDecoder(resp.Body).Decode(&got))
|
||||
assert.Equal(t, "hit", got.Result)
|
||||
assert.Equal(t, keys[except], got.CacheKey)
|
||||
|
||||
contentResp, err := http.Get(got.ArchiveLocation)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 200, contentResp.StatusCode)
|
||||
content, err := io.ReadAll(contentResp.Body)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, contents[except], content)
|
||||
})
|
||||
|
||||
t.Run("case insensitive", func(t *testing.T) {
|
||||
|
@ -469,3 +482,112 @@ func uploadCacheNormally(t *testing.T, base, key, version string, content []byte
|
|||
assert.Equal(t, content, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_gcCache(t *testing.T) {
|
||||
dir := filepath.Join(t.TempDir(), "artifactcache")
|
||||
handler, err := StartHandler(dir, "", 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
require.NoError(t, handler.Close())
|
||||
}()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
cases := []struct {
|
||||
Cache *Cache
|
||||
Kept bool
|
||||
}{
|
||||
{
|
||||
// should be kept, since it's used recently and not too old.
|
||||
Cache: &Cache{
|
||||
Key: "test_key_1",
|
||||
Version: "test_version",
|
||||
Complete: true,
|
||||
UsedAt: now.Unix(),
|
||||
CreatedAt: now.Add(-time.Hour).Unix(),
|
||||
},
|
||||
Kept: true,
|
||||
},
|
||||
{
|
||||
// should be removed, since it's not complete and not used for a while.
|
||||
Cache: &Cache{
|
||||
Key: "test_key_2",
|
||||
Version: "test_version",
|
||||
Complete: false,
|
||||
UsedAt: now.Add(-(keepTemp + time.Second)).Unix(),
|
||||
CreatedAt: now.Add(-(keepTemp + time.Hour)).Unix(),
|
||||
},
|
||||
Kept: false,
|
||||
},
|
||||
{
|
||||
// should be removed, since it's not used for a while.
|
||||
Cache: &Cache{
|
||||
Key: "test_key_3",
|
||||
Version: "test_version",
|
||||
Complete: true,
|
||||
UsedAt: now.Add(-(keepUnused + time.Second)).Unix(),
|
||||
CreatedAt: now.Add(-(keepUnused + time.Hour)).Unix(),
|
||||
},
|
||||
Kept: false,
|
||||
},
|
||||
{
|
||||
// should be removed, since it's used but too old.
|
||||
Cache: &Cache{
|
||||
Key: "test_key_3",
|
||||
Version: "test_version",
|
||||
Complete: true,
|
||||
UsedAt: now.Unix(),
|
||||
CreatedAt: now.Add(-(keepUsed + time.Second)).Unix(),
|
||||
},
|
||||
Kept: false,
|
||||
},
|
||||
{
|
||||
// should be kept, since it has a newer edition but be used recently.
|
||||
Cache: &Cache{
|
||||
Key: "test_key_1",
|
||||
Version: "test_version",
|
||||
Complete: true,
|
||||
UsedAt: now.Add(-(keepOld - time.Minute)).Unix(),
|
||||
CreatedAt: now.Add(-(time.Hour + time.Second)).Unix(),
|
||||
},
|
||||
Kept: true,
|
||||
},
|
||||
{
|
||||
// should be removed, since it has a newer edition and not be used recently.
|
||||
Cache: &Cache{
|
||||
Key: "test_key_1",
|
||||
Version: "test_version",
|
||||
Complete: true,
|
||||
UsedAt: now.Add(-(keepOld + time.Second)).Unix(),
|
||||
CreatedAt: now.Add(-(time.Hour + time.Second)).Unix(),
|
||||
},
|
||||
Kept: false,
|
||||
},
|
||||
}
|
||||
|
||||
db, err := handler.openDB()
|
||||
require.NoError(t, err)
|
||||
for _, c := range cases {
|
||||
require.NoError(t, insertCache(db, c.Cache))
|
||||
}
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
handler.gcAt = time.Time{} // ensure gcCache will not skip
|
||||
handler.gcCache()
|
||||
|
||||
db, err = handler.openDB()
|
||||
require.NoError(t, err)
|
||||
for i, v := range cases {
|
||||
t.Run(fmt.Sprintf("%d_%s", i, v.Cache.Key), func(t *testing.T) {
|
||||
cache := &Cache{}
|
||||
err = db.Get(v.Cache.ID, cache)
|
||||
if v.Kept {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.ErrorIs(t, err, bolthold.ErrNotFound)
|
||||
}
|
||||
})
|
||||
}
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
|
|
@ -1,10 +1,5 @@
|
|||
package artifactcache
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Request struct {
|
||||
Key string `json:"key" `
|
||||
Version string `json:"version"`
|
||||
|
@ -29,16 +24,11 @@ func (c *Request) ToCache() *Cache {
|
|||
}
|
||||
|
||||
type Cache struct {
|
||||
ID uint64 `json:"id" boltholdKey:"ID"`
|
||||
Key string `json:"key" boltholdIndex:"Key"`
|
||||
Version string `json:"version" boltholdIndex:"Version"`
|
||||
KeyVersionHash string `json:"keyVersionHash" boltholdUnique:"KeyVersionHash"`
|
||||
Size int64 `json:"cacheSize"`
|
||||
Complete bool `json:"complete"`
|
||||
UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"`
|
||||
CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"`
|
||||
}
|
||||
|
||||
func (c *Cache) FillKeyVersionHash() {
|
||||
c.KeyVersionHash = fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%s:%s", c.Key, c.Version))))
|
||||
ID uint64 `json:"id" boltholdKey:"ID"`
|
||||
Key string `json:"key" boltholdIndex:"Key"`
|
||||
Version string `json:"version" boltholdIndex:"Version"`
|
||||
Size int64 `json:"cacheSize"`
|
||||
Complete bool `json:"complete" boltholdIndex:"Complete"`
|
||||
UsedAt int64 `json:"usedAt" boltholdIndex:"UsedAt"`
|
||||
CreatedAt int64 `json:"createdAt" boltholdIndex:"CreatedAt"`
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue