-import os.path
+"""
+Provides caching of old results from mass commands, so that
+we don't have to do them again. Successes are cached as
+the string "OK"; errors are stored as the string exception name.
+Entries are indexed by location.
-class ICache(object):
- def put(self, name):
- raise NotImplementedError
+.. testsetup:: *
-class Cache(ICache):
- def __init__(self, file):
- self.set = set()
- if os.path.isfile(file):
- for line in open(file, "r"):
- self.set.add(line.rstrip())
- self.file = open(file, "a")
- def __contains__(self, name):
- return name in self.set
- def add(self, name):
- self.set.add(name)
- self.file.write(name + "\n")
- self.file.flush()
+ from wizard.cache import *
+"""
+
+import sqlite3
+
+def make(file=None):
+ """
+ Generates a :class:`SerializedDict` if ``file`` is not ``None``,
+ and a normal dictionary otherwise.
+ """
+ if file:
+ return SerializedDict(file)
+ else:
+ return {}
-class DummyCache(ICache):
- """Dummy object that doesn't actually cache anything and
- claims that everything needs to be done"""
- def __contains__(self, name):
- return False
- def add(self, name):
- pass
+class SerializedDict(object):
+ """
+ Implements a dictionary that serializes itself to disk. We
+ use sqlite in order to implement relatively efficient updates.
+
+ >>> import os
+ >>> f = "/tmp/wizard-cache-test.db"
+ >>> if os.path.exists(f): os.unlink(f)
+ >>> d = SerializedDict(f)
+ >>> try:
+ ... d["foo"]
+ ... raise Exception("didn't raise")
+ ... except KeyError:
+ ... pass
+ >>> d["foo"] = "baz"
+ >>> d["foo"]
+ 'baz'
+ >>> d["foo"] = "bar"
+ >>> d["foo"]
+ 'bar'
+ >>> e = SerializedDict(f)
+ >>> e["foo"]
+ 'bar'
+ >>> del e["foo"]
+ >>> "foo" in e
+ False
+ >>> os.unlink(f)
+ """
+ #: Connection to sqlite database
+ conn = None
+ def __init__(self, file):
+ self.conn = sqlite3.connect(file)
+ try:
+ self.conn.execute('create table dict(key string primary key, value string)')
+ except sqlite3.OperationalError as e:
+ if e.args[0] != "table dict already exists":
+ raise
+ self.conn.isolation_level = None
+ def __getitem__(self, key):
+ row = self.conn.execute('select value from dict where key = ?', (key,)).fetchone()
+ if row:
+ return str(row[0])
+ else:
+ raise KeyError
+ def __setitem__(self, key, value):
+ self.conn.execute('insert or replace into dict (key, value) values (?, ?)', (key, value))
+ def __delitem__(self, key):
+ self.conn.execute('delete from dict where key = ?', (key,))
+ def __contains__(self, key):
+ return bool(self.conn.execute('select count(*) from dict where key = ?', (key,)).fetchone()[0])