commit b0409c75d008e176ae40635633efd7e5239a94aa Author: kenneth Date: Mon Mar 18 16:42:26 2024 +0800 first commit diff --git a/cache_item.go b/cache_item.go new file mode 100644 index 0000000..9d68674 --- /dev/null +++ b/cache_item.go @@ -0,0 +1,115 @@ +package cache2go + +import ( + "sync" + "time" +) + +// CacheItem 单独的缓存项 +type CacheItem struct { + sync.RWMutex + + // key 缓存项 键 + key any + // data 缓存项 值 + data any + // lifeSpan 缓存时长 + lifeSpan time.Duration + + // createdOn 创建时间 + createdOn time.Time + // accessedOn 上次访问时间 + accessedOn time.Time + // accessCount 访问项目的频率 + accessCount int64 + + // aboutToExpire 在从缓存中删除项之前立即触发回调方法 + aboutToExpire []func(key any) +} + +// NewCacheItem 新建一个缓存项 +func NewCacheItem(key any, data any, lifeSpan time.Duration) *CacheItem { + t := time.Now() + return &CacheItem{ + key: key, + data: data, + lifeSpan: lifeSpan, + createdOn: t, + accessedOn: t, + accessCount: 0, + aboutToExpire: nil, + } +} + +// keepAlive 标记访问的项 +func (item *CacheItem) KeepAlive() { + item.Lock() + defer item.Unlock() + + item.accessedOn = time.Now() + item.accessCount++ +} + +// LifeSpan 返回缓存项的缓存时长 +func (item *CacheItem) LifeSpan() time.Duration { + return item.lifeSpan +} + +// AccessedOn 返回缓存项最后一次访问时间 +func (item *CacheItem) AccessedOn() time.Time { + item.RLock() + defer item.RUnlock() + + return item.accessedOn +} + +// AccessedOn 返回缓存项访问次数 +func (item *CacheItem) AccessedCount() int64 { + item.RLock() + defer item.RUnlock() + + return item.accessCount +} + +// CreatedOn 返回缓存项添加时间 +func (item *CacheItem) CreatedOn() time.Time { + return item.createdOn +} + +// Key 返回缓存项的键 +func (item *CacheItem) Key() any { + return item.key +} + +// Data 返回缓存项的值 +func (item *CacheItem) Data() any { + return item.data +} + +// SetAboutToExpireCallback 设置回调 在缓存项删除之前调用 +func (item *CacheItem) SetAboutToExpireCallback(f func(any)) { + if len(item.aboutToExpire) > 0 { + item.ClearAboutToExpireCallback() + } + + item.Lock() + defer item.Unlock() + + item.aboutToExpire = append(item.aboutToExpire, f) +} + +// AddAboutToExpireCallback 将一个新的回调追加到AboutToExpire队列 +func (item *CacheItem) AddAboutToExpireCallback(f func(any)) { + item.Lock() + defer item.Unlock() + + item.aboutToExpire = append(item.aboutToExpire, f) +} + +// ClearAboutToExpireCallback 清空即将过期的回调队列 +func (item *CacheItem) ClearAboutToExpireCallback() { + item.Lock() + defer item.Unlock() + + item.aboutToExpire = nil +} diff --git a/cache_table.go b/cache_table.go new file mode 100644 index 0000000..714745c --- /dev/null +++ b/cache_table.go @@ -0,0 +1,358 @@ +package cache2go + +import ( + "log" + "sort" + "sync" + "time" +) + +// CacheTable 缓存中的一个表 +type CacheTable struct { + sync.RWMutex + + // name 表名 + name string + // items 缓存项 + items map[any]*CacheItem + + // cleanupTimer 负责触发清理的计时器 + cleanupTimer *time.Timer + // cleanupInterval 当前计时器持续时间 + cleanupInterval time.Duration + + // logger 日志 + logger *log.Logger + + // loadData 加载缓存项 (回调) + loadData func(key any, args ...any) *CacheItem + // addedItem 添加缓存项 (回调) + addedItem []func(item *CacheItem) + // aboutToDeleteItem 删除缓存项 (回调) + aboutToDeleteItem []func(item *CacheItem) +} + +// Count 返回缓存中当前存储的项目数 +func (table *CacheTable) Count() int { + table.RLock() + defer table.RUnlock() + + return len(table.items) +} + +// Foreach 循环所有项 +func (table *CacheTable) Foreach(trans func(key any, item *CacheItem)) { + table.RLock() + defer table.RUnlock() + + for k, v := range table.items { + trans(k, v) + } +} + +// SetDataLoader 配置一个数据加载器回调 +func (table *CacheTable) SetDataLoader(f func(any, ...any) *CacheItem) { + table.Lock() + defer table.Unlock() + + table.loadData = f +} + +// SetAddedItemCallback 添加项的回调函数 每次向缓存中添加新项时都会调用该回调 +func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) { + if len(table.addedItem) > 0 { + table.ClearAddedItemCallbacks() + } + + table.AddAddedItemCallback(f) +} + +// AddAddedItemCallback 将新的回调追加到addedItem队列 +func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) { + table.Lock() + defer table.Unlock() + table.addedItem = append(table.addedItem, f) +} + +// ClearAddedItemCallbacks 清空添加的项回调队列 +func (table *CacheTable) ClearAddedItemCallbacks() { + table.Lock() + defer table.Unlock() + + table.addedItem = nil +} + +// SetAboutToDeleteItemCallback 配置回调 在缓存中删除项目的时候调用 +func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) { + if len(table.aboutToDeleteItem) > 0 { + table.ClearAboutToDeleteItemCallback() + } + + table.AddAboutToDeleteItemCallback(f) +} + +// AddAboutToDeleteItemCallback 将新的回调追加到aboutToDeleteItem队列 +func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) { + table.Lock() + defer table.Unlock() + table.aboutToDeleteItem = append(table.aboutToDeleteItem, f) +} + +// ClearAboutToDeleteItemCallback 清空AboutToDeleteItem +func (table *CacheTable) ClearAboutToDeleteItemCallback() { + table.Lock() + defer table.Unlock() + table.aboutToDeleteItem = nil +} + +// SetLogger 设置日志 +func (table *CacheTable) SetLogger(logger *log.Logger) { + table.Lock() + defer table.Unlock() + + table.logger = logger +} + +// expirationCheck 过期循环检查 由自调整计时器触发 +func (table *CacheTable) expirationCheck() { + table.Lock() + if table.cleanupTimer != nil { + table.cleanupTimer.Stop() + } + if table.cleanupInterval > 0 { + table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name) + } else { + table.log("Expiration check installed for table", table.name) + } + + // To be more accurate with timers, we would need to update 'now' on every + // loop iteration. Not sure it's really efficient though. + now := time.Now() + smallestDuration := 0 * time.Second + for key, item := range table.items { + // Cache values so we don't keep blocking the mutex. + item.RLock() + lifeSpan := item.lifeSpan + accessedOn := item.accessedOn + item.RUnlock() + + if lifeSpan == 0 { + continue + } + if now.Sub(accessedOn) >= lifeSpan { + // Item has excessed its lifespan. + table.deleteInternal(key) + } else { + // Find the item chronologically closest to its end-of-lifespan. + if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration { + smallestDuration = lifeSpan - now.Sub(accessedOn) + } + } + } + + // Setup the interval for the next cleanup run. + table.cleanupInterval = smallestDuration + if smallestDuration > 0 { + table.cleanupTimer = time.AfterFunc(smallestDuration, func() { + go table.expirationCheck() + }) + } + table.Unlock() +} + +func (table *CacheTable) addInternal(item *CacheItem) { + // Careful: do not run this method unless the table-mutex is locked! + // It will unlock it for the caller before running the callbacks and checks + table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name) + table.items[item.key] = item + + // Cache values so we don't keep blocking the mutex. + expDur := table.cleanupInterval + addedItem := table.addedItem + table.Unlock() + + // Trigger callback after adding an item to cache. + for _, callback := range addedItem { + callback(item) + } + + // If we haven't set up any expiration check timer or found a more imminent item. + if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) { + table.expirationCheck() + } +} + +// Add adds a key/value pair to the cache. +// Parameter key is the item's cache-key. +// Parameter lifeSpan determines after which time period without an access the item +// will get removed from the cache. +// Parameter data is the item's value. +func (table *CacheTable) Add(key any, data any, lifeSpan time.Duration) *CacheItem { + item := NewCacheItem(key, data, lifeSpan) + + // Add item to cache. + table.Lock() + table.addInternal(item) + + return item +} + +func (table *CacheTable) deleteInternal(key any) (*CacheItem, error) { + r, ok := table.items[key] + if !ok { + return nil, ErrKeyNotFound + } + + // Cache value so we don't keep blocking the mutex. + aboutToDeleteItem := table.aboutToDeleteItem + table.Unlock() + + // Trigger callbacks before deleting an item from cache. + for _, callback := range aboutToDeleteItem { + callback(r) + } + + r.RLock() + defer r.RUnlock() + if r.aboutToExpire != nil { + for _, callback := range r.aboutToExpire { + callback(key) + } + } + + table.Lock() + table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name) + delete(table.items, key) + + return r, nil +} + +// Delete an item from the cache. +func (table *CacheTable) Delete(key any) (*CacheItem, error) { + table.Lock() + defer table.Unlock() + + return table.deleteInternal(key) +} + +// Exists returns whether an item exists in the cache. Unlike the Value method +// Exists neither tries to fetch data via the loadData callback nor does it +// keep the item alive in the cache. +func (table *CacheTable) Exists(key any) bool { + table.RLock() + defer table.RUnlock() + _, ok := table.items[key] + + return ok +} + +// NotFoundAdd checks whether an item is not yet cached. Unlike the Exists +// method this also adds data if the key could not be found. +func (table *CacheTable) NotFoundAdd(key any, lifeSpan time.Duration, data any) bool { + table.Lock() + + if _, ok := table.items[key]; ok { + table.Unlock() + return false + } + + item := NewCacheItem(key, data, lifeSpan) + table.addInternal(item) + + return true +} + +// Value returns an item from the cache and marks it to be kept alive. You can +// pass additional arguments to your DataLoader callback function. +func (table *CacheTable) Value(key any, args ...any) (*CacheItem, error) { + table.RLock() + r, ok := table.items[key] + loadData := table.loadData + table.RUnlock() + + if ok { + // Update access counter and timestamp. + r.KeepAlive() + return r, nil + } + + // Item doesn't exist in cache. Try and fetch it with a data-loader. + if loadData != nil { + item := loadData(key, args...) + if item != nil { + table.Add(key, item.data, item.lifeSpan) + return item, nil + } + + return nil, ErrKeyNotFoundOrLoadable + } + + return nil, ErrKeyNotFound +} + +// Flush deletes all items from this cache table. +func (table *CacheTable) Flush() { + table.Lock() + defer table.Unlock() + + table.log("Flushing table", table.name) + + table.items = make(map[any]*CacheItem) + table.cleanupInterval = 0 + if table.cleanupTimer != nil { + table.cleanupTimer.Stop() + } +} + +// CacheItemPair maps key to access counter +type CacheItemPair struct { + Key any + AccessCount int64 +} + +// CacheItemPairList is a slice of CacheItemPairs that implements sort. +// Interface to sort by AccessCount. +type CacheItemPairList []CacheItemPair + +func (p CacheItemPairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p CacheItemPairList) Len() int { return len(p) } +func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount } + +// MostAccessed returns the most accessed items in this cache table +func (table *CacheTable) MostAccessed(count int64) []*CacheItem { + table.RLock() + defer table.RUnlock() + + p := make(CacheItemPairList, len(table.items)) + i := 0 + for k, v := range table.items { + p[i] = CacheItemPair{k, v.accessCount} + i++ + } + sort.Sort(p) + + var r []*CacheItem + c := int64(0) + for _, v := range p { + if c >= count { + break + } + + item, ok := table.items[v.Key] + if ok { + r = append(r, item) + } + c++ + } + + return r +} + +// log 方便的内部日志记录方法 +func (table *CacheTable) log(v ...any) { + if table.logger == nil { + return + } + + table.logger.Println(v...) +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..7003858 --- /dev/null +++ b/errors.go @@ -0,0 +1,8 @@ +package cache2go + +import "errors" + +var ( + ErrKeyNotFound = errors.New("键不在缓存里面") + ErrKeyNotFoundOrLoadable = errors.New("键不存在或不在缓存里面") +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9754b57 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module githbub.com/zhang2092/cache2go + +go 1.22.0