Project

General

Profile

1 11 aaronmk
# Database access
2
3 1869 aaronmk
import copy
4 11 aaronmk
import re
5 865 aaronmk
import warnings
6 11 aaronmk
7 300 aaronmk
import exc
8 1909 aaronmk
import dicts
9 1893 aaronmk
import iters
10 1960 aaronmk
import lists
11 1889 aaronmk
from Proxy import Proxy
12 1872 aaronmk
import rand
13 862 aaronmk
import strings
14 131 aaronmk
import util
15 11 aaronmk
16 832 aaronmk
##### Exceptions
17
18 135 aaronmk
def get_cur_query(cur):
19
    if hasattr(cur, 'query'): return cur.query
20
    elif hasattr(cur, '_last_executed'): return cur._last_executed
21
    else: return None
22 14 aaronmk
23 300 aaronmk
def _add_cursor_info(e, cur): exc.add_msg(e, 'query: '+get_cur_query(cur))
24 135 aaronmk
25 300 aaronmk
class DbException(exc.ExceptionWithCause):
26 14 aaronmk
    def __init__(self, msg, cause=None, cur=None):
27 300 aaronmk
        exc.ExceptionWithCause.__init__(self, msg, cause)
28 14 aaronmk
        if cur != None: _add_cursor_info(self, cur)
29
30 360 aaronmk
class NameException(DbException): pass
31
32 468 aaronmk
class ExceptionWithColumns(DbException):
33
    def __init__(self, cols, cause=None):
34
        DbException.__init__(self, 'columns: ' + ', '.join(cols), cause)
35
        self.cols = cols
36 11 aaronmk
37 468 aaronmk
class DuplicateKeyException(ExceptionWithColumns): pass
38 13 aaronmk
39 468 aaronmk
class NullValueException(ExceptionWithColumns): pass
40 13 aaronmk
41 89 aaronmk
class EmptyRowException(DbException): pass
42
43 865 aaronmk
##### Warnings
44
45
class DbWarning(UserWarning): pass
46
47 1930 aaronmk
##### Result retrieval
48
49
def col_names(cur): return (col[0] for col in cur.description)
50
51
def rows(cur): return iter(lambda: cur.fetchone(), None)
52
53
def consume_rows(cur):
54
    '''Used to fetch all rows so result will be cached'''
55
    iters.consume_iter(rows(cur))
56
57
def next_row(cur): return rows(cur).next()
58
59
def row(cur):
60
    row_ = next_row(cur)
61
    consume_rows(cur)
62
    return row_
63
64
def next_value(cur): return next_row(cur)[0]
65
66
def value(cur): return row(cur)[0]
67
68
def values(cur): return iters.func_iter(lambda: next_value(cur))
69
70
def value_or_none(cur):
71
    try: return value(cur)
72
    except StopIteration: return None
73
74 1869 aaronmk
##### Database connections
75 1849 aaronmk
76 1926 aaronmk
db_config_names = ['engine', 'host', 'user', 'password', 'database']
77
78 1869 aaronmk
db_engines = {
79
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
80
    'PostgreSQL': ('psycopg2', {}),
81
}
82
83
DatabaseErrors_set = set([DbException])
84
DatabaseErrors = tuple(DatabaseErrors_set)
85
86
def _add_module(module):
87
    DatabaseErrors_set.add(module.DatabaseError)
88
    global DatabaseErrors
89
    DatabaseErrors = tuple(DatabaseErrors_set)
90
91
def db_config_str(db_config):
92
    return db_config['engine']+' database '+db_config['database']
93
94 1909 aaronmk
def _query_lookup(query, params): return (query, dicts.make_hashable(params))
95 1894 aaronmk
96 1901 aaronmk
log_debug_none = lambda msg: None
97
98 1849 aaronmk
class DbConn:
99 2047 aaronmk
    def __init__(self, db_config, serializable=True, log_debug=log_debug_none,
100 2050 aaronmk
        caching=True):
101 1869 aaronmk
        self.db_config = db_config
102
        self.serializable = serializable
103 1901 aaronmk
        self.log_debug = log_debug
104 2047 aaronmk
        self.caching = caching
105 1869 aaronmk
106
        self.__db = None
107 1889 aaronmk
        self.query_results = {}
108 1869 aaronmk
109
    def __getattr__(self, name):
110
        if name == '__dict__': raise Exception('getting __dict__')
111
        if name == 'db': return self._db()
112
        else: raise AttributeError()
113
114
    def __getstate__(self):
115
        state = copy.copy(self.__dict__) # shallow copy
116 1915 aaronmk
        state['log_debug'] = None # don't pickle the debug callback
117 1869 aaronmk
        state['_DbConn__db'] = None # don't pickle the connection
118
        return state
119
120
    def _db(self):
121
        if self.__db == None:
122
            # Process db_config
123
            db_config = self.db_config.copy() # don't modify input!
124
            module_name, mappings = db_engines[db_config.pop('engine')]
125
            module = __import__(module_name)
126
            _add_module(module)
127
            for orig, new in mappings.iteritems():
128
                try: util.rename_key(db_config, orig, new)
129
                except KeyError: pass
130
131
            # Connect
132
            self.__db = module.connect(**db_config)
133
134
            # Configure connection
135
            if self.serializable: run_raw_query(self,
136
                'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE')
137
138
        return self.__db
139 1889 aaronmk
140 1891 aaronmk
    class DbCursor(Proxy):
141 1927 aaronmk
        def __init__(self, outer):
142 1891 aaronmk
            Proxy.__init__(self, outer.db.cursor())
143 1927 aaronmk
            self.query_results = outer.query_results
144 1894 aaronmk
            self.query_lookup = None
145 1891 aaronmk
            self.result = []
146 1889 aaronmk
147 1894 aaronmk
        def execute(self, query, params=None):
148 1930 aaronmk
            self._is_insert = query.upper().find('INSERT') >= 0
149 1894 aaronmk
            self.query_lookup = _query_lookup(query, params)
150 1904 aaronmk
            try: return_value = self.inner.execute(query, params)
151
            except Exception, e:
152
                self.result = e # cache the exception as the result
153
                self._cache_result()
154
                raise
155
            finally: self.query = get_cur_query(self.inner)
156 1930 aaronmk
            # Fetch all rows so result will be cached
157
            if self.rowcount == 0 and not self._is_insert: consume_rows(self)
158 1894 aaronmk
            return return_value
159
160 1891 aaronmk
        def fetchone(self):
161
            row = self.inner.fetchone()
162 1899 aaronmk
            if row != None: self.result.append(row)
163
            # otherwise, fetched all rows
164 1904 aaronmk
            else: self._cache_result()
165
            return row
166
167
        def _cache_result(self):
168 1906 aaronmk
            # For inserts, only cache exceptions since inserts are not
169
            # idempotent, but an invalid insert will always be invalid
170 1930 aaronmk
            if self.query_results != None and (not self._is_insert
171 1906 aaronmk
                or isinstance(self.result, Exception)):
172
173 1894 aaronmk
                assert self.query_lookup != None
174 1916 aaronmk
                self.query_results[self.query_lookup] = self.CacheCursor(
175
                    util.dict_subset(dicts.AttrsDictView(self),
176
                    ['query', 'result', 'rowcount', 'description']))
177 1906 aaronmk
178 1916 aaronmk
        class CacheCursor:
179
            def __init__(self, cached_result): self.__dict__ = cached_result
180
181 1927 aaronmk
            def execute(self, *args, **kw_args):
182 1916 aaronmk
                if isinstance(self.result, Exception): raise self.result
183
                # otherwise, result is a rows list
184
                self.iter = iter(self.result)
185
186
            def fetchone(self):
187
                try: return self.iter.next()
188
                except StopIteration: return None
189 1891 aaronmk
190 1894 aaronmk
    def run_query(self, query, params=None, cacheable=False):
191 2047 aaronmk
        if not self.caching: cacheable = False
192 1903 aaronmk
        used_cache = False
193
        try:
194 1927 aaronmk
            # Get cursor
195
            if cacheable:
196
                query_lookup = _query_lookup(query, params)
197
                try:
198
                    cur = self.query_results[query_lookup]
199
                    used_cache = True
200
                except KeyError: cur = self.DbCursor(self)
201
            else: cur = self.db.cursor()
202
203
            # Run query
204
            try: cur.execute(query, params)
205
            except Exception, e:
206
                _add_cursor_info(e, cur)
207
                raise
208 1903 aaronmk
        finally:
209
            if self.log_debug != log_debug_none: # only compute msg if needed
210
                if used_cache: cache_status = 'Cache hit'
211
                elif cacheable: cache_status = 'Cache miss'
212
                else: cache_status = 'Non-cacheable'
213 1927 aaronmk
                self.log_debug(cache_status+': '
214
                    +strings.one_line(get_cur_query(cur)))
215 1903 aaronmk
216
        return cur
217 1914 aaronmk
218
    def is_cached(self, query, params=None):
219
        return _query_lookup(query, params) in self.query_results
220 1849 aaronmk
221 1869 aaronmk
connect = DbConn
222
223 1919 aaronmk
##### Input validation
224
225
def check_name(name):
226
    if re.search(r'\W', name) != None: raise NameException('Name "'+name
227
        +'" may contain only alphanumeric characters and _')
228
229 2061 aaronmk
def esc_name_by_module(module, name, ignore_case=False):
230 1919 aaronmk
    if module == 'psycopg2':
231 2061 aaronmk
        if ignore_case:
232
            # Don't enclose in quotes because this disables case-insensitivity
233 2057 aaronmk
            check_name(name)
234
            return name
235 2061 aaronmk
        else: quote = '"'
236 1919 aaronmk
    elif module == 'MySQLdb': quote = '`'
237
    else: raise NotImplementedError("Can't escape name for "+module+' database')
238
    return quote + name.replace(quote, '') + quote
239
240
def esc_name_by_engine(engine, name, **kw_args):
241
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
242
243
def esc_name(db, name, **kw_args):
244
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
245
246 1968 aaronmk
def qual_name(db, schema, table):
247 2060 aaronmk
    def esc_name_(name): return esc_name(db, name)
248 2051 aaronmk
    table = esc_name_(table)
249
    if schema != None: return esc_name_(schema)+'.'+table
250
    else: return table
251 1968 aaronmk
252 832 aaronmk
##### Querying
253
254 1894 aaronmk
def run_raw_query(db, *args, **kw_args):
255
    '''For args, see DbConn.run_query()'''
256
    return db.run_query(*args, **kw_args)
257 11 aaronmk
258 2068 aaronmk
def mogrify(db, query, params):
259
    module = util.root_module(db.db)
260
    if module == 'psycopg2': return db.db.cursor().mogrify(query, params)
261
    else: raise NotImplementedError("Can't mogrify query for "+module+
262
        ' database')
263
264 832 aaronmk
##### Recoverable querying
265 15 aaronmk
266 11 aaronmk
def with_savepoint(db, func):
267 1872 aaronmk
    savepoint = 'savepoint_'+str(rand.rand_int()) # must be unique
268 830 aaronmk
    run_raw_query(db, 'SAVEPOINT '+savepoint)
269 11 aaronmk
    try: return_val = func()
270
    except:
271 830 aaronmk
        run_raw_query(db, 'ROLLBACK TO SAVEPOINT '+savepoint)
272 11 aaronmk
        raise
273
    else:
274 830 aaronmk
        run_raw_query(db, 'RELEASE SAVEPOINT '+savepoint)
275 11 aaronmk
        return return_val
276
277 1894 aaronmk
def run_query(db, query, params=None, recover=None, cacheable=False):
278 830 aaronmk
    if recover == None: recover = False
279
280 1894 aaronmk
    def run(): return run_raw_query(db, query, params, cacheable)
281 1914 aaronmk
    if recover and not db.is_cached(query, params):
282
        return with_savepoint(db, run)
283
    else: return run() # don't need savepoint if cached
284 830 aaronmk
285 832 aaronmk
##### Basic queries
286
287 2054 aaronmk
def mk_select(db, table, fields=None, conds=None, limit=None, start=None,
288
    table_is_esc=False):
289 1981 aaronmk
    '''
290
    @param fields Use None to select all fields in the table
291
    @param table_is_esc Whether the table name has already been escaped
292 2054 aaronmk
    @return tuple(query, params)
293 1981 aaronmk
    '''
294 2060 aaronmk
    def esc_name_(name): return esc_name(db, name)
295 2058 aaronmk
296 1135 aaronmk
    if conds == None: conds = {}
297 135 aaronmk
    assert limit == None or type(limit) == int
298 865 aaronmk
    assert start == None or type(start) == int
299 2058 aaronmk
    if not table_is_esc: table = esc_name_(table)
300 865 aaronmk
301 2056 aaronmk
    params = []
302
303
    def parse_col(field):
304
        '''Parses fields'''
305
        if isinstance(field, tuple): # field is literal values
306
            value, col = field
307
            sql_ = '%s'
308
            params.append(value)
309 2058 aaronmk
            if col != None: sql_ += ' AS '+esc_name_(col)
310
        else: sql_ = esc_name_(field) # field is col name
311 2056 aaronmk
        return sql_
312 11 aaronmk
    def cond(entry):
313 2056 aaronmk
        '''Parses conditions'''
314 13 aaronmk
        col, value = entry
315 2058 aaronmk
        cond_ = esc_name_(col)+' '
316 11 aaronmk
        if value == None: cond_ += 'IS'
317
        else: cond_ += '='
318
        cond_ += ' %s'
319
        return cond_
320 2056 aaronmk
321 1135 aaronmk
    query = 'SELECT '
322
    if fields == None: query += '*'
323 2056 aaronmk
    else: query += ', '.join(map(parse_col, fields))
324 2055 aaronmk
    query += ' FROM '+table
325 865 aaronmk
326
    missing = True
327 89 aaronmk
    if conds != {}:
328
        query += ' WHERE '+' AND '.join(map(cond, conds.iteritems()))
329 2056 aaronmk
        params += conds.values()
330 865 aaronmk
        missing = False
331
    if limit != None: query += ' LIMIT '+str(limit); missing = False
332
    if start != None:
333
        if start != 0: query += ' OFFSET '+str(start)
334
        missing = False
335
    if missing: warnings.warn(DbWarning(
336
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
337
338 2056 aaronmk
    return (query, params)
339 11 aaronmk
340 2054 aaronmk
def select(db, *args, **kw_args):
341
    '''For params, see mk_select() and run_query()'''
342
    recover = kw_args.pop('recover', None)
343
    cacheable = kw_args.pop('cacheable', True)
344
345
    query, params = mk_select(db, *args, **kw_args)
346
    return run_query(db, query, params, recover, cacheable)
347
348 2066 aaronmk
def mk_insert_select(db, table, cols=None, select_query=None, params=None,
349 2070 aaronmk
    returning=None, embeddable=False, table_is_esc=False):
350 1960 aaronmk
    '''
351
    @param returning str|None An inserted column (such as pkey) to return
352 2070 aaronmk
    @param embeddable Whether the query should be embeddable as a nested SELECT.
353 2073 aaronmk
        Warning: If you set this and cacheable=True when the query is run, the
354
        query will be fully cached, not just if it raises an exception.
355 1960 aaronmk
    @param table_is_esc Whether the table name has already been escaped
356
    '''
357 2063 aaronmk
    if select_query == None: select_query = 'DEFAULT VALUES'
358
    if cols == []: cols = None # no cols (all defaults) = unknown col names
359 1960 aaronmk
    if not table_is_esc: check_name(table)
360 2063 aaronmk
361
    # Build query
362
    query = 'INSERT INTO '+table
363
    if cols != None:
364
        map(check_name, cols)
365
        query += ' ('+', '.join(cols)+')'
366
    query += ' '+select_query
367
368
    if returning != None:
369
        check_name(returning)
370
        query += ' RETURNING '+returning
371
372 2070 aaronmk
    if embeddable:
373
        # Create function
374
        function = 'pg_temp.'+('_'.join(['insert_returning', table] + cols))
375
        return_type = 'SETOF '+table+'.'+returning+'%TYPE'
376
        function_query = '''\
377
CREATE OR REPLACE FUNCTION '''+function+'''() RETURNS '''+return_type+'''
378
    LANGUAGE sql
379
    AS $$'''+mogrify(db, query, params)+''';$$;
380
'''
381
        run_query(db, function_query, cacheable=True)
382
383
        # Return query that uses function
384
        return mk_select(db, function+'()', table_is_esc=True)
385
386 2066 aaronmk
    return (query, params)
387
388
def insert_select(db, *args, **kw_args):
389 2072 aaronmk
    '''For params, see mk_insert_select() and run_query()
390
    @param into Name of temp table to place RETURNING values in
391
    '''
392
    into = kw_args.pop('into', None)
393
    if into != None: kw_args['embeddable'] = True
394 2066 aaronmk
    recover = kw_args.pop('recover', None)
395
    cacheable = kw_args.pop('cacheable', True)
396
397
    query, params = mk_insert_select(db, *args, **kw_args)
398 2072 aaronmk
    if into == None: # return RETURNING values
399
        return run_query(db, query, params, recover, cacheable)
400
    else: # place RETURNING values in temp table
401 2075 aaronmk
        run_query(db, 'DROP TABLE IF EXISTS '+into+' CASCADE', recover,
402
            cacheable)
403 2073 aaronmk
        return run_query(db, 'CREATE TEMP TABLE '+into+' AS '+query, params,
404
            recover, cacheable) # CREATE TABLE sets rowcount to # inserts
405 2063 aaronmk
406 2066 aaronmk
default = object() # tells insert() to use the default value for a column
407
408 2063 aaronmk
def insert(db, table, row, *args, **kw_args):
409
    '''For args, see insert_select()'''
410 1960 aaronmk
    if lists.is_seq(row): cols = None
411
    else:
412
        cols = row.keys()
413
        row = row.values()
414
    row = list(row) # ensure that "!= []" works
415
416 1961 aaronmk
    # Check for special values
417
    labels = []
418
    values = []
419
    for value in row:
420
        if value == default: labels.append('DEFAULT')
421
        else:
422
            labels.append('%s')
423
            values.append(value)
424
425
    # Build query
426 2063 aaronmk
    if values != []: query = ' VALUES ('+(', '.join(labels))+')'
427
    else: query = None
428 1554 aaronmk
429 2064 aaronmk
    return insert_select(db, table, cols, query, values, *args, **kw_args)
430 11 aaronmk
431 135 aaronmk
def last_insert_id(db):
432 1849 aaronmk
    module = util.root_module(db.db)
433 135 aaronmk
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
434
    elif module == 'MySQLdb': return db.insert_id()
435
    else: return None
436 13 aaronmk
437 1968 aaronmk
def truncate(db, table, schema='public'):
438
    return run_query(db, 'TRUNCATE '+qual_name(db, schema, table)+' CASCADE')
439 832 aaronmk
440
##### Database structure queries
441
442 1850 aaronmk
def pkey(db, table, recover=None):
443 832 aaronmk
    '''Assumed to be first column in table'''
444
    check_name(table)
445 1929 aaronmk
    return col_names(select(db, table, limit=0, recover=recover)).next()
446 832 aaronmk
447 853 aaronmk
def index_cols(db, table, index):
448
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
449
    automatically created. When you don't know whether something is a UNIQUE
450
    constraint or a UNIQUE index, use this function.'''
451
    check_name(table)
452
    check_name(index)
453 1909 aaronmk
    module = util.root_module(db.db)
454
    if module == 'psycopg2':
455
        return list(values(run_query(db, '''\
456 853 aaronmk
SELECT attname
457 866 aaronmk
FROM
458
(
459
        SELECT attnum, attname
460
        FROM pg_index
461
        JOIN pg_class index ON index.oid = indexrelid
462
        JOIN pg_class table_ ON table_.oid = indrelid
463
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
464
        WHERE
465
            table_.relname = %(table)s
466
            AND index.relname = %(index)s
467
    UNION
468
        SELECT attnum, attname
469
        FROM
470
        (
471
            SELECT
472
                indrelid
473
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
474
                    AS indkey
475
            FROM pg_index
476
            JOIN pg_class index ON index.oid = indexrelid
477
            JOIN pg_class table_ ON table_.oid = indrelid
478
            WHERE
479
                table_.relname = %(table)s
480
                AND index.relname = %(index)s
481
        ) s
482
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
483
) s
484 853 aaronmk
ORDER BY attnum
485
''',
486 1909 aaronmk
            {'table': table, 'index': index}, cacheable=True)))
487
    else: raise NotImplementedError("Can't list index columns for "+module+
488
        ' database')
489 853 aaronmk
490 464 aaronmk
def constraint_cols(db, table, constraint):
491
    check_name(table)
492
    check_name(constraint)
493 1849 aaronmk
    module = util.root_module(db.db)
494 464 aaronmk
    if module == 'psycopg2':
495
        return list(values(run_query(db, '''\
496
SELECT attname
497
FROM pg_constraint
498
JOIN pg_class ON pg_class.oid = conrelid
499
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
500
WHERE
501
    relname = %(table)s
502
    AND conname = %(constraint)s
503
ORDER BY attnum
504
''',
505
            {'table': table, 'constraint': constraint})))
506
    else: raise NotImplementedError("Can't list constraint columns for "+module+
507
        ' database')
508
509 1968 aaronmk
def tables(db, schema='public', table_like='%'):
510 1849 aaronmk
    module = util.root_module(db.db)
511 1968 aaronmk
    params = {'schema': schema, 'table_like': table_like}
512 832 aaronmk
    if module == 'psycopg2':
513 1968 aaronmk
        return values(run_query(db, '''\
514
SELECT tablename
515
FROM pg_tables
516
WHERE
517
    schemaname = %(schema)s
518
    AND tablename LIKE %(table_like)s
519
ORDER BY tablename
520
''',
521
            params, cacheable=True))
522
    elif module == 'MySQLdb':
523
        return values(run_query(db, 'SHOW TABLES LIKE %(table_like)s', params,
524
            cacheable=True))
525 832 aaronmk
    else: raise NotImplementedError("Can't list tables for "+module+' database')
526 830 aaronmk
527 833 aaronmk
##### Database management
528
529 1968 aaronmk
def empty_db(db, schema='public', **kw_args):
530
    '''For kw_args, see tables()'''
531
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
532 833 aaronmk
533 832 aaronmk
##### Heuristic queries
534
535 1554 aaronmk
def try_insert(db, table, row, returning=None):
536 830 aaronmk
    '''Recovers from errors'''
537 1554 aaronmk
    try: return insert(db, table, row, returning, recover=True)
538 46 aaronmk
    except Exception, e:
539
        msg = str(e)
540 465 aaronmk
        match = re.search(r'duplicate key value violates unique constraint '
541
            r'"(([^\W_]+)_[^"]+)"', msg)
542
        if match:
543
            constraint, table = match.groups()
544 854 aaronmk
            try: cols = index_cols(db, table, constraint)
545 465 aaronmk
            except NotImplementedError: raise e
546 851 aaronmk
            else: raise DuplicateKeyException(cols, e)
547 13 aaronmk
        match = re.search(r'null value in column "(\w+)" violates not-null '
548
            'constraint', msg)
549 470 aaronmk
        if match: raise NullValueException([match.group(1)], e)
550 13 aaronmk
        raise # no specific exception raised
551 11 aaronmk
552 471 aaronmk
def put(db, table, row, pkey, row_ct_ref=None):
553 1554 aaronmk
    '''Recovers from errors.
554
    Only works under PostgreSQL (uses `INSERT ... RETURNING`)'''
555 471 aaronmk
    try:
556 1554 aaronmk
        cur = try_insert(db, table, row, pkey)
557
        if row_ct_ref != None and cur.rowcount >= 0:
558
            row_ct_ref[0] += cur.rowcount
559
        return value(cur)
560 471 aaronmk
    except DuplicateKeyException, e:
561 1069 aaronmk
        return value(select(db, table, [pkey],
562
            util.dict_subset_right_join(row, e.cols), recover=True))
563 471 aaronmk
564 473 aaronmk
def get(db, table, row, pkey, row_ct_ref=None, create=False):
565 830 aaronmk
    '''Recovers from errors'''
566
    try: return value(select(db, table, [pkey], row, 1, recover=True))
567 14 aaronmk
    except StopIteration:
568 40 aaronmk
        if not create: raise
569 471 aaronmk
        return put(db, table, row, pkey, row_ct_ref) # insert new row