Source code for alex.utils.cache

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# pylint: disable-msg=E1103


import collections
import functools
import os
import os.path
import cPickle as pickle
import fcntl
import hashlib

from itertools import ifilterfalse
from heapq import nsmallest
from operator import itemgetter

persistent_cache_directory = '~/.alex_persistent_cache'


[docs]class Counter(dict): 'Mapping where default values are zero' def __missing__(self, key): return 0
[docs]def lru_cache(maxsize=100): '''Least-recently-used cache decorator. Arguments to the cached function must be hashable. Cache performance statistics stored in f.hits and f.misses. Clear the cache with f.clear(). http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used ''' maxqueue = maxsize * 10 def decorator(user_function, len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError): cache = {} # mapping of args to results queue = collections.deque() # order that keys have been used refcount = Counter() # times each key is in the queue sentinel = object() # marker for looping around the queue kwd_mark = object() # separate positional and keyword args # lookup optimizations (ugly but fast) queue_append, queue_popleft = queue.append, queue.popleft queue_appendleft, queue_pop = queue.appendleft, queue.pop @functools.wraps(user_function) def wrapper(*args, **kwds): # cache key records both positional and keyword args key = args if kwds: key += (kwd_mark,) + tuple(sorted(kwds.items())) # record recent use of this key queue_append(key) refcount[key] += 1 # get cache entry or compute if not found try: result = cache[key] wrapper.hits += 1 except KeyError: result = user_function(*args, **kwds) cache[key] = result wrapper.misses += 1 # purge least recently used cache entry if len(cache) > maxsize: key = queue_popleft() refcount[key] -= 1 while refcount[key]: key = queue_popleft() refcount[key] -= 1 del cache[key], refcount[key] # periodically compact the queue by eliminating duplicate keys # while preserving order of most recent access if len(queue) > maxqueue: refcount.clear() queue_appendleft(sentinel) for key in ifilterfalse(refcount.__contains__, iter(queue_pop, sentinel)): queue_appendleft(key) refcount[key] = 1 return result def clear(): cache.clear() queue.clear() refcount.clear() wrapper.hits = wrapper.misses = 0 wrapper.hits = wrapper.misses = 0 wrapper.clear = clear return wrapper return decorator
[docs]def lfu_cache(maxsize=100): '''Least-frequently-used cache decorator. Arguments to the cached function must be hashable. Cache performance statistics stored in f.hits and f.misses. Clear the cache with f.clear(). http://en.wikipedia.org/wiki/Least_Frequently_Used ''' def decorator(user_function): cache = {} # mapping of args to results use_count = Counter() # times each key has been accessed kwarg_mark = object() # separate positional and keyword args @functools.wraps(user_function) def wrapper(*args, **kwargs): key = args if kwargs: key += (kwarg_mark,) + tuple(sorted(kwargs.items())) # get cache entry or compute if not found try: result = cache[key] use_count[key] += 1 wrapper.hits += 1 except KeyError: # need to add something to the cache, make room if necessary if len(cache) == maxsize: for k, _ in nsmallest(maxsize // 10 or 1, use_count.iteritems(), key=itemgetter(1)): del cache[k], use_count[k] cache[key] = user_function(*args, **kwargs) result = cache[key] use_count[key] += 1 wrapper.misses += 1 return result def clear(): cache.clear() use_count.clear() wrapper.hits = wrapper.misses = 0 wrapper.hits = wrapper.misses = 0 wrapper.clear = clear wrapper.cache = cache return wrapper return decorator
[docs]def get_persitent_cache_content(key): key_name = os.path.join(persistent_cache_directory, '_'.join([str(i) for i in key]).replace(' ', '_')) try: # you cannot have exlusive lock if you don't ask for writing permissions # therefor use "r+" mode f = open(key_name, 'r+b') fcntl.lockf(f, fcntl.LOCK_EX) except IOError: raise KeyError data = pickle.load(f) fcntl.lockf(f, fcntl.LOCK_UN) f.close() return data
[docs]def set_persitent_cache_content(key, value): key_name = os.path.join(persistent_cache_directory,'_'.join([str(i) for i in key]).replace(' ', '_')) f = open(key_name, 'wb') fcntl.lockf(f, fcntl.LOCK_EX) data = pickle.dump(value, f) fcntl.lockf(f, fcntl.LOCK_UN) f.close()
[docs]def persistent_cache(method=False, file_prefix='', file_suffix=''): '''Persistent cache decorator. It grows indefinitely. Arguments to the cached function must be hashable. Cache performance statistics stored in f.hits and f.misses. ''' sha = hashlib.sha1() def decorator(user_function): @functools.wraps(user_function) def wrapper(*args, **kwds): key = (file_prefix,) if method: key += args[1:] else: key += args if kwds: key += tuple(sorted(kwds.items())) key += (file_suffix,) key = (hashlib.sha224(str(key)).hexdigest(),) try: result = get_persitent_cache_content(key) wrapper.hits += 1 except KeyError: result = user_function(*args, **kwds) wrapper.misses += 1 # record this key set_persitent_cache_content(key, result) return result wrapper.hits = wrapper.misses = 0 return wrapper return decorator
persistent_cache_directory = os.path.expanduser(persistent_cache_directory) if not os.path.exists(persistent_cache_directory): os.makedirs(persistent_cache_directory) if __name__ == '__main__': # pylint: disable-msg=E1101 print "Testing the LRU and LFU cache decorators." print "=" * 120 print "LRU cache" @lru_cache(maxsize=40) def f1(x, y): return 3 * x + y domain = range(5) from random import choice for i in range(1000): r = f1(choice(domain), choice(domain)) print(f1.hits, f1.misses) print "LFU cache" @lfu_cache(maxsize=40) def f2(x, y): return 3 * x + y domain = range(5) from random import choice for i in range(1000): r = f2(choice(domain), choice(domain)) print(f2.hits, f2.misses) print "persistent LRU cache" @persistent_cache(False, 'f3') def f3(x, y): return 3 * x + y domain = range(5) from random import choice for i in range(1000): r = f3(choice(domain), choice(domain)) print(f3.hits, f3.misses)