cache2go/cache_table.go
2024-03-18 16:42:26 +08:00

359 lines
8.8 KiB
Go

package cache2go
import (
"log"
"sort"
"sync"
"time"
)
// CacheTable 缓存中的一个表
type CacheTable struct {
sync.RWMutex
// name 表名
name string
// items 缓存项
items map[any]*CacheItem
// cleanupTimer 负责触发清理的计时器
cleanupTimer *time.Timer
// cleanupInterval 当前计时器持续时间
cleanupInterval time.Duration
// logger 日志
logger *log.Logger
// loadData 加载缓存项 (回调)
loadData func(key any, args ...any) *CacheItem
// addedItem 添加缓存项 (回调)
addedItem []func(item *CacheItem)
// aboutToDeleteItem 删除缓存项 (回调)
aboutToDeleteItem []func(item *CacheItem)
}
// Count 返回缓存中当前存储的项目数
func (table *CacheTable) Count() int {
table.RLock()
defer table.RUnlock()
return len(table.items)
}
// Foreach 循环所有项
func (table *CacheTable) Foreach(trans func(key any, item *CacheItem)) {
table.RLock()
defer table.RUnlock()
for k, v := range table.items {
trans(k, v)
}
}
// SetDataLoader 配置一个数据加载器回调
func (table *CacheTable) SetDataLoader(f func(any, ...any) *CacheItem) {
table.Lock()
defer table.Unlock()
table.loadData = f
}
// SetAddedItemCallback 添加项的回调函数 每次向缓存中添加新项时都会调用该回调
func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) {
if len(table.addedItem) > 0 {
table.ClearAddedItemCallbacks()
}
table.AddAddedItemCallback(f)
}
// AddAddedItemCallback 将新的回调追加到addedItem队列
func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) {
table.Lock()
defer table.Unlock()
table.addedItem = append(table.addedItem, f)
}
// ClearAddedItemCallbacks 清空添加的项回调队列
func (table *CacheTable) ClearAddedItemCallbacks() {
table.Lock()
defer table.Unlock()
table.addedItem = nil
}
// SetAboutToDeleteItemCallback 配置回调 在缓存中删除项目的时候调用
func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) {
if len(table.aboutToDeleteItem) > 0 {
table.ClearAboutToDeleteItemCallback()
}
table.AddAboutToDeleteItemCallback(f)
}
// AddAboutToDeleteItemCallback 将新的回调追加到aboutToDeleteItem队列
func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) {
table.Lock()
defer table.Unlock()
table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
}
// ClearAboutToDeleteItemCallback 清空AboutToDeleteItem
func (table *CacheTable) ClearAboutToDeleteItemCallback() {
table.Lock()
defer table.Unlock()
table.aboutToDeleteItem = nil
}
// SetLogger 设置日志
func (table *CacheTable) SetLogger(logger *log.Logger) {
table.Lock()
defer table.Unlock()
table.logger = logger
}
// expirationCheck 过期循环检查 由自调整计时器触发
func (table *CacheTable) expirationCheck() {
table.Lock()
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
table.log("Expiration check installed for table", table.name)
}
// To be more accurate with timers, we would need to update 'now' on every
// loop iteration. Not sure it's really efficient though.
now := time.Now()
smallestDuration := 0 * time.Second
for key, item := range table.items {
// Cache values so we don't keep blocking the mutex.
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()
if lifeSpan == 0 {
continue
}
if now.Sub(accessedOn) >= lifeSpan {
// Item has excessed its lifespan.
table.deleteInternal(key)
} else {
// Find the item chronologically closest to its end-of-lifespan.
if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}
// Setup the interval for the next cleanup run.
table.cleanupInterval = smallestDuration
if smallestDuration > 0 {
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
go table.expirationCheck()
})
}
table.Unlock()
}
func (table *CacheTable) addInternal(item *CacheItem) {
// Careful: do not run this method unless the table-mutex is locked!
// It will unlock it for the caller before running the callbacks and checks
table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
table.items[item.key] = item
// Cache values so we don't keep blocking the mutex.
expDur := table.cleanupInterval
addedItem := table.addedItem
table.Unlock()
// Trigger callback after adding an item to cache.
for _, callback := range addedItem {
callback(item)
}
// If we haven't set up any expiration check timer or found a more imminent item.
if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
table.expirationCheck()
}
}
// Add adds a key/value pair to the cache.
// Parameter key is the item's cache-key.
// Parameter lifeSpan determines after which time period without an access the item
// will get removed from the cache.
// Parameter data is the item's value.
func (table *CacheTable) Add(key any, data any, lifeSpan time.Duration) *CacheItem {
item := NewCacheItem(key, data, lifeSpan)
// Add item to cache.
table.Lock()
table.addInternal(item)
return item
}
func (table *CacheTable) deleteInternal(key any) (*CacheItem, error) {
r, ok := table.items[key]
if !ok {
return nil, ErrKeyNotFound
}
// Cache value so we don't keep blocking the mutex.
aboutToDeleteItem := table.aboutToDeleteItem
table.Unlock()
// Trigger callbacks before deleting an item from cache.
for _, callback := range aboutToDeleteItem {
callback(r)
}
r.RLock()
defer r.RUnlock()
if r.aboutToExpire != nil {
for _, callback := range r.aboutToExpire {
callback(key)
}
}
table.Lock()
table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
delete(table.items, key)
return r, nil
}
// Delete an item from the cache.
func (table *CacheTable) Delete(key any) (*CacheItem, error) {
table.Lock()
defer table.Unlock()
return table.deleteInternal(key)
}
// Exists returns whether an item exists in the cache. Unlike the Value method
// Exists neither tries to fetch data via the loadData callback nor does it
// keep the item alive in the cache.
func (table *CacheTable) Exists(key any) bool {
table.RLock()
defer table.RUnlock()
_, ok := table.items[key]
return ok
}
// NotFoundAdd checks whether an item is not yet cached. Unlike the Exists
// method this also adds data if the key could not be found.
func (table *CacheTable) NotFoundAdd(key any, lifeSpan time.Duration, data any) bool {
table.Lock()
if _, ok := table.items[key]; ok {
table.Unlock()
return false
}
item := NewCacheItem(key, data, lifeSpan)
table.addInternal(item)
return true
}
// Value returns an item from the cache and marks it to be kept alive. You can
// pass additional arguments to your DataLoader callback function.
func (table *CacheTable) Value(key any, args ...any) (*CacheItem, error) {
table.RLock()
r, ok := table.items[key]
loadData := table.loadData
table.RUnlock()
if ok {
// Update access counter and timestamp.
r.KeepAlive()
return r, nil
}
// Item doesn't exist in cache. Try and fetch it with a data-loader.
if loadData != nil {
item := loadData(key, args...)
if item != nil {
table.Add(key, item.data, item.lifeSpan)
return item, nil
}
return nil, ErrKeyNotFoundOrLoadable
}
return nil, ErrKeyNotFound
}
// Flush deletes all items from this cache table.
func (table *CacheTable) Flush() {
table.Lock()
defer table.Unlock()
table.log("Flushing table", table.name)
table.items = make(map[any]*CacheItem)
table.cleanupInterval = 0
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
}
// CacheItemPair maps key to access counter
type CacheItemPair struct {
Key any
AccessCount int64
}
// CacheItemPairList is a slice of CacheItemPairs that implements sort.
// Interface to sort by AccessCount.
type CacheItemPairList []CacheItemPair
func (p CacheItemPairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p CacheItemPairList) Len() int { return len(p) }
func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount }
// MostAccessed returns the most accessed items in this cache table
func (table *CacheTable) MostAccessed(count int64) []*CacheItem {
table.RLock()
defer table.RUnlock()
p := make(CacheItemPairList, len(table.items))
i := 0
for k, v := range table.items {
p[i] = CacheItemPair{k, v.accessCount}
i++
}
sort.Sort(p)
var r []*CacheItem
c := int64(0)
for _, v := range p {
if c >= count {
break
}
item, ok := table.items[v.Key]
if ok {
r = append(r, item)
}
c++
}
return r
}
// log 方便的内部日志记录方法
func (table *CacheTable) log(v ...any) {
if table.logger == nil {
return
}
table.logger.Println(v...)
}