如何实现线程安全的内存缓存

乙醇 创建于 9 months 之前

最后更新: 9 months 之前

阅读数: 669

一个用go实现的线程安全的内存缓存,实现代码非常简洁高效,不卖弄不烧脑

这两天正好看到一个用go实现的线程安全的内存缓存,实现代码非常简洁高效,不卖弄不烧脑,非常值得初学者拿来学习。

项目地址

项目地址在https://github.com/muesli/cache2go,目前已经有1.8k的star。

如何使用

package main

import (
    "github.com/muesli/cache2go"
    "fmt"
    "time"
)

// Keys & values in cache2go can be of arbitrary types, e.g. a struct.
type myStruct struct {
    text     string
    moreData []byte
}

func main() {
    // Accessing a new cache table for the first time will create it.
    cache := 

    // We will put a new item in the cache. It will expire after
    // not being accessed via Value(key) for more than 5 seconds.
    val := myStruct{"This is a test!", []byte{}}
    cache.Add("someKey", 5*time.Second, &val)

    // Let's retrieve the item from the cache.
    res, err := cache.Value("someKey")
    if err == nil {
        fmt.Println("Found value in cache:", res.Data().(*myStruct).text)
    } else {
        fmt.Println("Error retrieving value from cache:", err)
    }

    // Wait for the item to expire in cache.
    time.Sleep(6 * time.Second)
    res, err = cache.Value("someKey")
    if err != nil {
        fmt.Println("Item is not cached (anymore).")
    }

    // Add another item that never expires.
    cache.Add("someKey", 0, &val)

    // cache2go supports a few handy callbacks and loading mechanisms.
    cache.SetAboutToDeleteItemCallback(func(e *cache2go.CacheItem) {
        fmt.Println("Deleting:", e.Key(), e.Data().(*myStruct).text, e.CreatedOn())
    })

    // Remove the item from the cache.
    cache.Delete("someKey")

    // And wipe the entire cache table.
    cache.Flush()
}

简单看一下核心api

  • 创建缓存对象: cache2go.Cache("myCache")
  • 设置一个key:value对:cache.Add("someKey", 5*time.Second, &val) 设置的时候需要指定缓存的过期时间
  • 获取key对应的value值: res, err = cache.Value("someKey")

这里就简单分析一下对应接口的实现原理。

key value存储

这里使用的是CacheItem这个结构体来实现的key value存储。相应的数据结构是

// CacheItem is an individual cache item
// Parameter data contains the user-set value in the cache.
type CacheItem struct {
    sync.RWMutex

    // The item's key.
    key interface{}
    // The item's data.
    data interface{}
    // How long will the item live in the cache when not being accessed/kept alive.
    lifeSpan time.Duration

    // Creation timestamp.
    createdOn time.Time
    // Last access timestamp.
    accessedOn time.Time
    // How often the item was accessed.
    accessCount int64

    // Callback method triggered right before removing the item from the cache
    aboutToExpire []func(key interface{})
}

值得注意的点有

  • 读写锁: sync.RWMutex 多线程访问的时候用来进行资源的排他锁定
  • 键的实现: key interface{} 所以key可以是任意类型
  • 值的实现: data interface{} 跟key类似,value可以是任意类型
  • 存活时间: lifeSpan time.Duration 大于这个时间间隔没有被访问的话,的话key就会过期被清理
  • 上次被访问的时间: accessedOn time.Time
  • key被访问的次数: accessCount

再看一下CacheItem初始化的代码

// NewCacheItem returns a newly created CacheItem.
// Parameter key is the item's cache-key.
// Parameter lifeSpan determines after which time period without an access the item
// will get removed from the cache.
// Parameter data is the item's value.
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
    t := time.Now()
    return &CacheItem{
        key:           key,
        lifeSpan:      lifeSpan,
        createdOn:     t,
        accessedOn:    t,
        accessCount:   0,
        aboutToExpire: nil,
        data:          data,
    }
}

可以看出来创建时间和上次访问时间都被设置成了当前时间,访问次数是0.

缓存对象的实现CacheTable

CacheTable对象包含了n个CacheItem,看一下具体的数据结构

type CacheTable struct {
    sync.RWMutex

    // The table's name.
    name string
    // All cached items.
    items map[interface{}]*CacheItem

    // Timer responsible for triggering cleanup.
    cleanupTimer *time.Timer
    // Current timer duration.
    cleanupInterval time.Duration

    // The logger used for this table.
    logger *log.Logger

    // Callback method triggered when trying to load a non-existing key.
    loadData func(key interface{}, args ...interface{}) *CacheItem
    // Callback method triggered when adding a new item to the cache.
    addedItem []func(item *CacheItem)
    // Callback method triggered before deleting an item from the cache.
    aboutToDeleteItem []func(item *CacheItem)
}

这里需要关注的地方是

  • items map[interface{}]*CacheItem : 每个item其实都是这个map里的一项,其实map的key就是item的key
  • cleanupTimer *time.Timer : 用来做缓存过期的定时器
  • cleanupInterval time.Duration : 扫描所有的items进行缓存过期清理的时间间隔

添加一个key value

// Add adds a key/value pair to the cache.
// Parameter key is the item's cache-key.
// Parameter lifeSpan determines after which time period without an access the item
// will get removed from the cache.
// Parameter data is the item's value.
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
    item := NewCacheItem(key, lifeSpan, data)

    // Add item to cache.
    table.Lock()
    table.addInternal(item)

    return item
}

func (table *CacheTable) addInternal(item *CacheItem) {
    // Careful: do not run this method unless the table-mutex is locked!
    // It will unlock it for the caller before running the callbacks and checks
    table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
    table.items[item.key] = item

    // Cache values so we don't keep blocking the mutex.
    expDur := table.cleanupInterval
    addedItem := table.addedItem
    table.Unlock()

    // Trigger callback after adding an item to cache.
    if addedItem != nil {
        for _, callback := range addedItem {
            callback(item)
        }
    }

    // If we haven't set up any expiration check timer or found a more imminent item.
    if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
        table.expirationCheck()
    }
}

梳理一下流程

  • 创建cache item
  • 加读写锁
  • 调用addInternal 方法
  • 在items map里添加一项,key就是item的key,value就是item
  • 获取整个CacheTable的清理缓存间隔时间
  • 解锁,到这一步基本上就完成了数据的持久化
  • 运行添加item时的回调函数,如果有的话
  • 如果item设置了过期时间,并且table的过期扫描间隔是0(首次添加)或者item的过期间隔小于table的过期间隔时间的话,调用expirationCheck 函数,进行过期扫描

扫描并清理过期的key

代码如下

// Expiration check loop, triggered by a self-adjusting timer.
func (table *CacheTable) expirationCheck() {
    table.Lock()
    if table.cleanupTimer != nil {
        table.cleanupTimer.Stop()
    }
    if table.cleanupInterval > 0 {
        table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
    } else {
        table.log("Expiration check installed for table", table.name)
    }

    // To be more accurate with timers, we would need to update 'now' on every
    // loop iteration. Not sure it's really efficient though.
    now := time.Now()
    smallestDuration := 0 * time.Second
    for key, item := range table.items {
        // Cache values so we don't keep blocking the mutex.
        item.RLock()
        lifeSpan := item.lifeSpan
        accessedOn := item.accessedOn
        item.RUnlock()

        if lifeSpan == 0 {
            continue
        }
        if now.Sub(accessedOn) >= lifeSpan {
            // Item has excessed its lifespan.
            table.deleteInternal(key)
        } else {
            // Find the item chronologically closest to its end-of-lifespan.
            if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
                smallestDuration = lifeSpan - now.Sub(accessedOn)
            }
        }
    }

    // Setup the interval for the next cleanup run.
    table.cleanupInterval = smallestDuration
    if smallestDuration > 0 {
        table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
            go table.expirationCheck()
        })
    }
    table.Unlock()
}

简单过一起逻辑

  • 加读写锁
  • 如果启动了扫描定时器,关闭定时器先
  • 扫描所有的key,对每一个key
    • 加读锁
    • 获取key的存活周期
    • 获取key的上次访问时间
    • 如果存活周期是0,则不处理,这是持续保活的逻辑,可以先不管
    • 如果现在的时间距离上次访问时间已经大于了key的存活时间,则删除这个key
    • 否则的话算出所有key里面最快要到期的那个key的时间间隔
  • 如果有拿到了最快要到期那个key的时间间隔,则运行定时器,在下这个时间间隔之后运行清理函数
  • 解锁

删除key的实现

// Delete an item from the cache.
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
    table.Lock()
    defer table.Unlock()

    return table.deleteInternal(key)
}

func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
    r, ok := table.items[key]
    if !ok {
        return nil, ErrKeyNotFound
    }

    // Cache value so we don't keep blocking the mutex.
    aboutToDeleteItem := table.aboutToDeleteItem
    table.Unlock()

    // Trigger callbacks before deleting an item from cache.
    if aboutToDeleteItem != nil {
        for _, callback := range aboutToDeleteItem {
            callback(r)
        }
    }

    r.RLock()
    defer r.RUnlock()
    if r.aboutToExpire != nil {
        for _, callback := range r.aboutToExpire {
            callback(key)
        }
    }

    table.Lock()
    table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
    delete(table.items, key)

    return r, nil
}

删除的逻辑相对简单一些,主要是需要注意加锁解锁以及运行之前注册的回调函数。

总结

这应该是我见过代码最简单但是star相对比较多的开源项目了。这个项目非常适合我们进行学习,因为

  • 可以帮助我们理解锁的使用以及如何使用锁来保证线程安全;
  • 帮助我们理解key value内存缓存的实现
  • 提供了一种使用定时器来实现定时任务的思路
  • 帮助我们理解如何注册和运行回调函数
0