Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import time
6
import warnings
7

    
8
import exc
9
import dicts
10
import iters
11
import lists
12
import profiling
13
from Proxy import Proxy
14
import rand
15
import sql_gen
16
import strings
17
import util
18

    
19
##### Exceptions
20

    
21
def get_cur_query(cur, input_query=None):
22
    raw_query = None
23
    if hasattr(cur, 'query'): raw_query = cur.query
24
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
25
    
26
    if raw_query != None: return raw_query
27
    else: return '[input] '+strings.ustr(input_query)
28

    
29
def _add_cursor_info(e, *args, **kw_args):
30
    '''For params, see get_cur_query()'''
31
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
32

    
33
class DbException(exc.ExceptionWithCause):
34
    def __init__(self, msg, cause=None, cur=None):
35
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
36
        if cur != None: _add_cursor_info(self, cur)
37

    
38
class ExceptionWithName(DbException):
39
    def __init__(self, name, cause=None):
40
        DbException.__init__(self, 'for name: '
41
            +strings.as_tt(strings.ustr(name)), cause)
42
        self.name = name
43

    
44
class ExceptionWithValue(DbException):
45
    def __init__(self, value, cause=None):
46
        DbException.__init__(self, 'for value: '+strings.as_tt(repr(value)),
47
            cause)
48
        self.value = value
49

    
50
class ExceptionWithNameType(DbException):
51
    def __init__(self, type_, name, cause=None):
52
        DbException.__init__(self, 'for type: '+strings.as_tt(strings.ustr(
53
            type_))+'; name: '+strings.as_tt(name), cause)
54
        self.type = type_
55
        self.name = name
56

    
57
class ConstraintException(DbException):
58
    def __init__(self, name, cond, cols, cause=None):
59
        msg = 'Violated '+strings.as_tt(name)+' constraint'
60
        if cond != None: msg += ' with condition '+cond
61
        if cols != []: msg += ' on columns: '+strings.as_tt(', '.join(cols))
62
        DbException.__init__(self, msg, cause)
63
        self.name = name
64
        self.cond = cond
65
        self.cols = cols
66

    
67
class MissingCastException(DbException):
68
    def __init__(self, type_, col=None, cause=None):
69
        msg = 'Missing cast to type '+strings.as_tt(type_)
70
        if col != None: msg += ' on column: '+strings.as_tt(col)
71
        DbException.__init__(self, msg, cause)
72
        self.type = type_
73
        self.col = col
74

    
75
class NameException(DbException): pass
76

    
77
class DuplicateKeyException(ConstraintException): pass
78

    
79
class NullValueException(ConstraintException): pass
80

    
81
class CheckException(ConstraintException): pass
82

    
83
class InvalidValueException(ExceptionWithValue): pass
84

    
85
class DuplicateException(ExceptionWithNameType): pass
86

    
87
class DoesNotExistException(ExceptionWithNameType): pass
88

    
89
class EmptyRowException(DbException): pass
90

    
91
##### Warnings
92

    
93
class DbWarning(UserWarning): pass
94

    
95
##### Result retrieval
96

    
97
def col_names(cur): return (col[0] for col in cur.description)
98

    
99
def rows(cur): return iter(lambda: cur.fetchone(), None)
100

    
101
def consume_rows(cur):
102
    '''Used to fetch all rows so result will be cached'''
103
    iters.consume_iter(rows(cur))
104

    
105
def next_row(cur): return rows(cur).next()
106

    
107
def row(cur):
108
    row_ = next_row(cur)
109
    consume_rows(cur)
110
    return row_
111

    
112
def next_value(cur): return next_row(cur)[0]
113

    
114
def value(cur): return row(cur)[0]
115

    
116
def values(cur): return iters.func_iter(lambda: next_value(cur))
117

    
118
def value_or_none(cur):
119
    try: return value(cur)
120
    except StopIteration: return None
121

    
122
##### Escaping
123

    
124
def esc_name_by_module(module, name):
125
    if module == 'psycopg2' or module == None: quote = '"'
126
    elif module == 'MySQLdb': quote = '`'
127
    else: raise NotImplementedError("Can't escape name for "+module+' database')
128
    return sql_gen.esc_name(name, quote)
129

    
130
def esc_name_by_engine(engine, name, **kw_args):
131
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
132

    
133
def esc_name(db, name, **kw_args):
134
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
135

    
136
def qual_name(db, schema, table):
137
    def esc_name_(name): return esc_name(db, name)
138
    table = esc_name_(table)
139
    if schema != None: return esc_name_(schema)+'.'+table
140
    else: return table
141

    
142
##### Database connections
143

    
144
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
145

    
146
db_engines = {
147
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
148
    'PostgreSQL': ('psycopg2', {}),
149
}
150

    
151
DatabaseErrors_set = set([DbException])
152
DatabaseErrors = tuple(DatabaseErrors_set)
153

    
154
def _add_module(module):
155
    DatabaseErrors_set.add(module.DatabaseError)
156
    global DatabaseErrors
157
    DatabaseErrors = tuple(DatabaseErrors_set)
158

    
159
def db_config_str(db_config):
160
    return db_config['engine']+' database '+db_config['database']
161

    
162
log_debug_none = lambda msg, level=2: None
163

    
164
class DbConn:
165
    def __init__(self, db_config, autocommit=True, caching=True,
166
        log_debug=log_debug_none, debug_temp=False, src=None):
167
        '''
168
        @param debug_temp Whether temporary objects should instead be permanent.
169
            This assists in debugging the internal objects used by the program.
170
        @param src In autocommit mode, will be included in a comment in every
171
            query, to help identify the data source in pg_stat_activity.
172
        '''
173
        self.db_config = db_config
174
        self.autocommit = autocommit
175
        self.caching = caching
176
        self.log_debug = log_debug
177
        self.debug = log_debug != log_debug_none
178
        self.debug_temp = debug_temp
179
        self.src = src
180
        self.autoanalyze = False
181
        self.autoexplain = False
182
        self.profile_row_ct = None
183
        
184
        self._savepoint = 0
185
        self._reset()
186
    
187
    def __getattr__(self, name):
188
        if name == '__dict__': raise Exception('getting __dict__')
189
        if name == 'db': return self._db()
190
        else: raise AttributeError()
191
    
192
    def __getstate__(self):
193
        state = copy.copy(self.__dict__) # shallow copy
194
        state['log_debug'] = None # don't pickle the debug callback
195
        state['_DbConn__db'] = None # don't pickle the connection
196
        return state
197
    
198
    def clear_cache(self): self.query_results = {}
199
    
200
    def _reset(self):
201
        self.clear_cache()
202
        assert self._savepoint == 0
203
        self._notices_seen = set()
204
        self.__db = None
205
    
206
    def connected(self): return self.__db != None
207
    
208
    def close(self):
209
        if not self.connected(): return
210
        
211
        # Record that the automatic transaction is now closed
212
        self._savepoint -= 1
213
        
214
        self.db.close()
215
        self._reset()
216
    
217
    def reconnect(self):
218
        # Do not do this in test mode as it would roll back everything
219
        if self.autocommit: self.close()
220
        # Connection will be reopened automatically on first query
221
    
222
    def _db(self):
223
        if self.__db == None:
224
            # Process db_config
225
            db_config = self.db_config.copy() # don't modify input!
226
            schemas = db_config.pop('schemas', None)
227
            module_name, mappings = db_engines[db_config.pop('engine')]
228
            module = __import__(module_name)
229
            _add_module(module)
230
            for orig, new in mappings.iteritems():
231
                try: util.rename_key(db_config, orig, new)
232
                except KeyError: pass
233
            
234
            # Connect
235
            self.__db = module.connect(**db_config)
236
            
237
            # Record that a transaction is already open
238
            self._savepoint += 1
239
            
240
            # Configure connection
241
            if hasattr(self.db, 'set_isolation_level'):
242
                import psycopg2.extensions
243
                self.db.set_isolation_level(
244
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
245
            if schemas != None:
246
                search_path = [self.esc_name(s) for s in schemas.split(',')]
247
                search_path.append(value(run_query(self, 'SHOW search_path',
248
                    log_level=4)))
249
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
250
                    log_level=3)
251
        
252
        return self.__db
253
    
254
    class DbCursor(Proxy):
255
        def __init__(self, outer):
256
            Proxy.__init__(self, outer.db.cursor())
257
            self.outer = outer
258
            self.query_results = outer.query_results
259
            self.query_lookup = None
260
            self.result = []
261
        
262
        def execute(self, query):
263
            self._is_insert = query.startswith('INSERT')
264
            self.query_lookup = query
265
            try:
266
                try: cur = self.inner.execute(query)
267
                finally: self.query = get_cur_query(self.inner, query)
268
            except Exception, e:
269
                self.result = e # cache the exception as the result
270
                self._cache_result()
271
                raise
272
            
273
            # Always cache certain queries
274
            query = sql_gen.lstrip(query)
275
            if query.startswith('CREATE') or query.startswith('ALTER'):
276
                # structural changes
277
                # Rest of query must be unique in the face of name collisions,
278
                # so don't cache ADD COLUMN unless it has distinguishing comment
279
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
280
                    self._cache_result()
281
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
282
                consume_rows(self) # fetch all rows so result will be cached
283
            
284
            return cur
285
        
286
        def fetchone(self):
287
            row = self.inner.fetchone()
288
            if row != None: self.result.append(row)
289
            # otherwise, fetched all rows
290
            else: self._cache_result()
291
            return row
292
        
293
        def _cache_result(self):
294
            # For inserts that return a result set, don't cache result set since
295
            # inserts are not idempotent. Other non-SELECT queries don't have
296
            # their result set read, so only exceptions will be cached (an
297
            # invalid query will always be invalid).
298
            if self.query_results != None and (not self._is_insert
299
                or isinstance(self.result, Exception)):
300
                
301
                assert self.query_lookup != None
302
                self.query_results[self.query_lookup] = self.CacheCursor(
303
                    util.dict_subset(dicts.AttrsDictView(self),
304
                    ['query', 'result', 'rowcount', 'description']))
305
        
306
        class CacheCursor:
307
            def __init__(self, cached_result): self.__dict__ = cached_result
308
            
309
            def execute(self, *args, **kw_args):
310
                if isinstance(self.result, Exception): raise self.result
311
                # otherwise, result is a rows list
312
                self.iter = iter(self.result)
313
            
314
            def fetchone(self):
315
                try: return self.iter.next()
316
                except StopIteration: return None
317
    
318
    def esc_value(self, value):
319
        try: str_ = self.mogrify('%s', [value])
320
        except NotImplementedError, e:
321
            module = util.root_module(self.db)
322
            if module == 'MySQLdb':
323
                import _mysql
324
                str_ = _mysql.escape_string(value)
325
            else: raise e
326
        return strings.to_unicode(str_)
327
    
328
    def esc_name(self, name): return esc_name(self, name) # calls global func
329
    
330
    def std_code(self, str_):
331
        '''Standardizes SQL code.
332
        * Ensures that string literals are prefixed by `E`
333
        '''
334
        if str_.startswith("'"): str_ = 'E'+str_
335
        return str_
336
    
337
    def can_mogrify(self):
338
        module = util.root_module(self.db)
339
        return module == 'psycopg2'
340
    
341
    def mogrify(self, query, params=None):
342
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
343
        else: raise NotImplementedError("Can't mogrify query")
344
    
345
    def print_notices(self):
346
        if hasattr(self.db, 'notices'):
347
            for msg in self.db.notices:
348
                if msg not in self._notices_seen:
349
                    self._notices_seen.add(msg)
350
                    self.log_debug(msg, level=2)
351
    
352
    def run_query(self, query, cacheable=False, log_level=2,
353
        debug_msg_ref=None):
354
        '''
355
        @param log_ignore_excs The log_level will be increased by 2 if the query
356
            throws one of these exceptions.
357
        @param debug_msg_ref If specified, the log message will be returned in
358
            this instead of being output. This allows you to filter log messages
359
            depending on the result of the query.
360
        '''
361
        assert query != None
362
        
363
        if self.autocommit and self.src != None:
364
            query = sql_gen.esc_comment(self.src)+'\t'+query
365
        
366
        if not self.caching: cacheable = False
367
        used_cache = False
368
        
369
        if self.debug:
370
            profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
371
        try:
372
            # Get cursor
373
            if cacheable:
374
                try: cur = self.query_results[query]
375
                except KeyError: cur = self.DbCursor(self)
376
                else: used_cache = True
377
            else: cur = self.db.cursor()
378
            
379
            # Run query
380
            try: cur.execute(query)
381
            except Exception, e:
382
                _add_cursor_info(e, self, query)
383
                raise
384
            else: self.do_autocommit()
385
        finally:
386
            if self.debug:
387
                profiler.stop(self.profile_row_ct)
388
                
389
                ## Log or return query
390
                
391
                query = strings.ustr(get_cur_query(cur, query))
392
                # Put the src comment on a separate line in the log file
393
                query = query.replace('\t', '\n', 1)
394
                
395
                msg = 'DB query: '
396
                
397
                if used_cache: msg += 'cache hit'
398
                elif cacheable: msg += 'cache miss'
399
                else: msg += 'non-cacheable'
400
                
401
                msg += ':\n'+profiler.msg()+'\n'+strings.as_code(query, 'SQL')
402
                
403
                if debug_msg_ref != None: debug_msg_ref[0] = msg
404
                else: self.log_debug(msg, log_level)
405
                
406
                self.print_notices()
407
        
408
        return cur
409
    
410
    def is_cached(self, query): return query in self.query_results
411
    
412
    def with_autocommit(self, func):
413
        import psycopg2.extensions
414
        
415
        prev_isolation_level = self.db.isolation_level
416
        self.db.set_isolation_level(
417
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
418
        try: return func()
419
        finally: self.db.set_isolation_level(prev_isolation_level)
420
    
421
    def with_savepoint(self, func):
422
        top = self._savepoint == 0
423
        savepoint = 'level_'+str(self._savepoint)
424
        
425
        if self.debug:
426
            self.log_debug('Begin transaction', level=4)
427
            profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
428
        
429
        # Must happen before running queries so they don't get autocommitted
430
        self._savepoint += 1
431
        
432
        if top: query = 'START TRANSACTION ISOLATION LEVEL READ COMMITTED'
433
        else: query = 'SAVEPOINT '+savepoint
434
        self.run_query(query, log_level=4)
435
        try:
436
            return func()
437
            if top: self.run_query('COMMIT', log_level=4)
438
        except:
439
            if top: query = 'ROLLBACK'
440
            else: query = 'ROLLBACK TO SAVEPOINT '+savepoint
441
            self.run_query(query, log_level=4)
442
            
443
            raise
444
        finally:
445
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
446
            # "The savepoint remains valid and can be rolled back to again"
447
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
448
            if not top:
449
                self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
450
            
451
            self._savepoint -= 1
452
            assert self._savepoint >= 0
453
            
454
            if self.debug:
455
                profiler.stop(self.profile_row_ct)
456
                self.log_debug('End transaction\n'+profiler.msg(), level=4)
457
            
458
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
459
    
460
    def do_autocommit(self):
461
        '''Autocommits if outside savepoint'''
462
        assert self._savepoint >= 1
463
        if self.autocommit and self._savepoint == 1:
464
            self.log_debug('Autocommitting', level=4)
465
            self.db.commit()
466
    
467
    def col_info(self, col, cacheable=True):
468
        table = sql_gen.Table('columns', 'information_schema')
469
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
470
            'USER-DEFINED'), sql_gen.Col('udt_name'))
471
        cols = [type_, 'column_default',
472
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
473
        
474
        conds = [('table_name', col.table.name),
475
            ('column_name', strings.ustr(col.name))]
476
        schema = col.table.schema
477
        if schema != None: conds.append(('table_schema', schema))
478
        
479
        cur = select(self, table, cols, conds, order_by='table_schema', limit=1,
480
            cacheable=cacheable, log_level=4) # TODO: order by search_path order
481
        try: type_, default, nullable = row(cur)
482
        except StopIteration: raise sql_gen.NoUnderlyingTableException(col)
483
        default = sql_gen.as_Code(default, self)
484
        
485
        return sql_gen.TypedCol(col.name, type_, default, nullable)
486
    
487
    def TempFunction(self, name):
488
        if self.debug_temp: schema = None
489
        else: schema = 'pg_temp'
490
        return sql_gen.Function(name, schema)
491

    
492
connect = DbConn
493

    
494
##### Recoverable querying
495

    
496
def with_savepoint(db, func): return db.with_savepoint(func)
497

    
498
def run_query(db, query, recover=None, cacheable=False, log_level=2,
499
    log_ignore_excs=None, **kw_args):
500
    '''For params, see DbConn.run_query()'''
501
    if recover == None: recover = False
502
    if log_ignore_excs == None: log_ignore_excs = ()
503
    log_ignore_excs = tuple(log_ignore_excs)
504
    debug_msg_ref = [None]
505
    
506
    query = with_explain_comment(db, query)
507
    
508
    try:
509
        try:
510
            def run(): return db.run_query(query, cacheable, log_level,
511
                debug_msg_ref, **kw_args)
512
            if recover and not db.is_cached(query):
513
                return with_savepoint(db, run)
514
            else: return run() # don't need savepoint if cached
515
        except Exception, e:
516
            msg = strings.ustr(e.args[0])
517
            msg = re.sub(r'^(?:PL/Python: )?ValueError: ', r'', msg)
518
            
519
            match = re.match(r'^duplicate key value violates unique constraint '
520
                r'"(.+?)"', msg)
521
            if match:
522
                constraint, = match.groups()
523
                cols = []
524
                if recover: # need auto-rollback to run index_cols()
525
                    try: cols = index_cols(db, constraint)
526
                    except NotImplementedError: pass
527
                raise DuplicateKeyException(constraint, None, cols, e)
528
            
529
            match = re.match(r'^null value in column "(.+?)" violates not-null'
530
                r' constraint', msg)
531
            if match:
532
                col, = match.groups()
533
                raise NullValueException('NOT NULL', None, [col], e)
534
            
535
            match = re.match(r'^new row for relation "(.+?)" violates check '
536
                r'constraint "(.+?)"', msg)
537
            if match:
538
                table, constraint = match.groups()
539
                constraint = sql_gen.Col(constraint, table)
540
                cond = None
541
                if recover: # need auto-rollback to run constraint_cond()
542
                    try: cond = constraint_cond(db, constraint)
543
                    except NotImplementedError: pass
544
                raise CheckException(constraint.to_str(db), cond, [], e)
545
            
546
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
547
                r'|.+? out of range): "(.+?)"', msg)
548
            if match:
549
                value, = match.groups()
550
                raise InvalidValueException(strings.to_unicode(value), e)
551
            
552
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
553
                r'is of type', msg)
554
            if match:
555
                col, type_ = match.groups()
556
                raise MissingCastException(type_, col, e)
557
            
558
            match = re.match(r'^could not determine polymorphic type because '
559
                r'input has type "unknown"', msg)
560
            if match: raise MissingCastException('text', None, e)
561
            
562
            match = re.match(r'^.+? types .+? and .+? cannot be matched', msg)
563
            if match: raise MissingCastException('text', None, e)
564
            
565
            typed_name_re = r'^(\S+) "(.+?)"'
566
            
567
            match = re.match(typed_name_re+r'.*? already exists', msg)
568
            if match:
569
                type_, name = match.groups()
570
                raise DuplicateException(type_, name, e)
571
            
572
            match = re.match(r'more than one (\S+) named ""(.+?)""', msg)
573
            if match:
574
                type_, name = match.groups()
575
                raise DuplicateException(type_, name, e)
576
            
577
            match = re.match(typed_name_re+r' does not exist', msg)
578
            if match:
579
                type_, name = match.groups()
580
                raise DoesNotExistException(type_, name, e)
581
            
582
            raise # no specific exception raised
583
    except log_ignore_excs:
584
        log_level += 2
585
        raise
586
    finally:
587
        if debug_msg_ref[0] != None: db.log_debug(debug_msg_ref[0], log_level)
588

    
589
##### Basic queries
590

    
591
def is_explainable(query):
592
    # See <http://www.postgresql.org/docs/8.3/static/sql-explain.html#AEN57749>
593
    return re.match(r'^(?:SELECT|INSERT|UPDATE|DELETE|VALUES|EXECUTE|DECLARE)\b'
594
        , query)
595

    
596
def explain(db, query, **kw_args):
597
    '''
598
    For params, see run_query().
599
    '''
600
    kw_args.setdefault('log_level', 4)
601
    
602
    return strings.ustr(strings.join_lines(values(run_query(db,
603
        'EXPLAIN '+query, recover=True, cacheable=True, **kw_args))))
604
        # not a higher log_level because it's useful to see what query is being
605
        # run before it's executed, which EXPLAIN effectively provides
606

    
607
def has_comment(query): return query.endswith('*/')
608

    
609
def with_explain_comment(db, query, **kw_args):
610
    if db.autoexplain and not has_comment(query) and is_explainable(query):
611
        query += '\n'+sql_gen.esc_comment(' EXPLAIN:\n'
612
            +explain(db, query, **kw_args))
613
    return query
614

    
615
def next_version(name):
616
    version = 1 # first existing name was version 0
617
    match = re.match(r'^(.*)#(\d+)$', name)
618
    if match:
619
        name, version = match.groups()
620
        version = int(version)+1
621
    return sql_gen.concat(name, '#'+str(version))
622

    
623
def lock_table(db, table, mode):
624
    table = sql_gen.as_Table(table)
625
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
626

    
627
def run_query_into(db, query, into=None, add_pkey_=False, **kw_args):
628
    '''Outputs a query to a temp table.
629
    For params, see run_query().
630
    '''
631
    if into == None: return run_query(db, query, **kw_args)
632
    
633
    assert isinstance(into, sql_gen.Table)
634
    
635
    into.is_temp = True
636
    # "temporary tables cannot specify a schema name", so remove schema
637
    into.schema = None
638
    
639
    kw_args['recover'] = True
640
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
641
    
642
    temp = not db.debug_temp # tables are permanent in debug_temp mode
643
    
644
    # Create table
645
    while True:
646
        create_query = 'CREATE'
647
        if temp: create_query += ' TEMP'
648
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
649
        
650
        try:
651
            cur = run_query(db, create_query, **kw_args)
652
                # CREATE TABLE AS sets rowcount to # rows in query
653
            break
654
        except DuplicateException, e:
655
            into.name = next_version(into.name)
656
            # try again with next version of name
657
    
658
    if add_pkey_: add_pkey(db, into)
659
    
660
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
661
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
662
    # table is going to be used in complex queries, it is wise to run ANALYZE on
663
    # the temporary table after it is populated."
664
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
665
    # If into is not a temp table, ANALYZE is useful but not required.
666
    analyze(db, into)
667
    
668
    return cur
669

    
670
order_by_pkey = object() # tells mk_select() to order by the pkey
671

    
672
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
673

    
674
def mk_select(db, tables=None, fields=None, conds=None, distinct_on=[],
675
    limit=None, start=None, order_by=order_by_pkey, default_table=None,
676
    explain=True):
677
    '''
678
    @param tables The single table to select from, or a list of tables to join
679
        together, with tables after the first being sql_gen.Join objects
680
    @param fields Use None to select all fields in the table
681
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
682
        * container can be any iterable type
683
        * compare_left_side: sql_gen.Code|str (for col name)
684
        * compare_right_side: sql_gen.ValueCond|literal value
685
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
686
        use all columns
687
    @return query
688
    '''
689
    # Parse tables param
690
    tables = lists.mk_seq(tables)
691
    tables = list(tables) # don't modify input! (list() copies input)
692
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
693
    
694
    # Parse other params
695
    if conds == None: conds = []
696
    elif dicts.is_dict(conds): conds = conds.items()
697
    conds = list(conds) # don't modify input! (list() copies input)
698
    assert limit == None or isinstance(limit, (int, long))
699
    assert start == None or isinstance(start, (int, long))
700
    if order_by is order_by_pkey:
701
        if table0 == None or distinct_on != []: order_by = None
702
        else: order_by = pkey(db, table0, recover=True)
703
    
704
    query = 'SELECT'
705
    
706
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
707
    
708
    # DISTINCT ON columns
709
    if distinct_on != []:
710
        query += '\nDISTINCT'
711
        if distinct_on is not distinct_on_all:
712
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
713
    
714
    # Columns
715
    if query.find('\n') >= 0: whitespace = '\n'
716
    else: whitespace = ' '
717
    if fields == None: query += whitespace+'*'
718
    else:
719
        assert fields != []
720
        if len(fields) > 1: whitespace = '\n'
721
        query += whitespace+('\n, '.join(map(parse_col, fields)))
722
    
723
    # Main table
724
    if query.find('\n') >= 0 or len(tables) > 0: whitespace = '\n'
725
    else: whitespace = ' '
726
    if table0 != None: query += whitespace+'FROM '+table0.to_str(db)
727
    
728
    # Add joins
729
    left_table = table0
730
    for join_ in tables:
731
        table = join_.table
732
        
733
        # Parse special values
734
        if join_.type_ is sql_gen.filter_out: # filter no match
735
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
736
                sql_gen.CompareCond(None, '~=')))
737
        
738
        query += '\n'+join_.to_str(db, left_table)
739
        
740
        left_table = table
741
    
742
    missing = True
743
    if conds != []:
744
        if len(conds) == 1: whitespace = ' '
745
        else: whitespace = '\n'
746
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
747
            .to_str(db) for l, r in conds], 'WHERE')
748
    if order_by != None:
749
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
750
    if limit != None: query += '\nLIMIT '+str(limit)
751
    if start != None:
752
        if start != 0: query += '\nOFFSET '+str(start)
753
    
754
    if explain: query = with_explain_comment(db, query)
755
    
756
    return query
757

    
758
def select(db, *args, **kw_args):
759
    '''For params, see mk_select() and run_query()'''
760
    recover = kw_args.pop('recover', None)
761
    cacheable = kw_args.pop('cacheable', True)
762
    log_level = kw_args.pop('log_level', 2)
763
    
764
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
765
        log_level=log_level)
766

    
767
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
768
    embeddable=False, ignore=False, src=None):
769
    '''
770
    @param returning str|None An inserted column (such as pkey) to return
771
    @param embeddable Whether the query should be embeddable as a nested SELECT.
772
        Warning: If you set this and cacheable=True when the query is run, the
773
        query will be fully cached, not just if it raises an exception.
774
    @param ignore Whether to ignore duplicate keys.
775
    @param src Will be included in the name of any created function, to help
776
        identify the data source in pg_stat_activity.
777
    '''
778
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
779
    if cols == []: cols = None # no cols (all defaults) = unknown col names
780
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
781
    if select_query == None: select_query = 'DEFAULT VALUES'
782
    if returning != None: returning = sql_gen.as_Col(returning, table)
783
    
784
    first_line = 'INSERT INTO '+table.to_str(db)
785
    
786
    def mk_insert(select_query):
787
        query = first_line
788
        if cols != None:
789
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
790
        query += '\n'+select_query
791
        
792
        if returning != None:
793
            returning_name_col = sql_gen.to_name_only_col(returning)
794
            query += '\nRETURNING '+returning_name_col.to_str(db)
795
        
796
        return query
797
    
798
    return_type = sql_gen.CustomCode('unknown')
799
    if returning != None: return_type = sql_gen.ColType(returning)
800
    
801
    if ignore:
802
        # Always return something to set the correct rowcount
803
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
804
        
805
        embeddable = True # must use function
806
        
807
        if cols == None: row = [sql_gen.Col(sql_gen.all_cols, 'row')]
808
        else: row = [sql_gen.Col(c.name, 'row') for c in cols]
809
        
810
        query = sql_gen.RowExcIgnore(sql_gen.RowType(table), select_query,
811
            sql_gen.ReturnQuery(mk_insert(sql_gen.Values(row).to_str(db))),
812
            cols)
813
    else: query = mk_insert(select_query)
814
    
815
    if embeddable:
816
        # Create function
817
        function_name = sql_gen.clean_name(first_line)
818
        if src != None: function_name = src+': '+function_name
819
        while True:
820
            try:
821
                func = db.TempFunction(function_name)
822
                def_ = sql_gen.FunctionDef(func, sql_gen.SetOf(return_type),
823
                    query)
824
                
825
                run_query(db, def_.to_str(db), recover=True, cacheable=True,
826
                    log_ignore_excs=(DuplicateException,))
827
                break # this version was successful
828
            except DuplicateException, e:
829
                function_name = next_version(function_name)
830
                # try again with next version of name
831
        
832
        # Return query that uses function
833
        cols = None
834
        if returning != None: cols = [returning]
835
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(func), cols)
836
            # AS clause requires function alias
837
        return mk_select(db, func_table, order_by=None)
838
    
839
    return query
840

    
841
def insert_select(db, table, *args, **kw_args):
842
    '''For params, see mk_insert_select() and run_query_into()
843
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
844
        values in
845
    '''
846
    returning = kw_args.get('returning', None)
847
    ignore = kw_args.get('ignore', False)
848
    
849
    into = kw_args.pop('into', None)
850
    if into != None: kw_args['embeddable'] = True
851
    recover = kw_args.pop('recover', None)
852
    if ignore: recover = True
853
    cacheable = kw_args.pop('cacheable', True)
854
    log_level = kw_args.pop('log_level', 2)
855
    
856
    rowcount_only = ignore and returning == None # keep NULL rows on server
857
    if rowcount_only: into = sql_gen.Table('rowcount')
858
    
859
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
860
        into, recover=recover, cacheable=cacheable, log_level=log_level)
861
    if rowcount_only: empty_temp(db, into)
862
    autoanalyze(db, table)
863
    return cur
864

    
865
default = sql_gen.default # tells insert() to use the default value for a column
866

    
867
def insert(db, table, row, *args, **kw_args):
868
    '''For params, see insert_select()'''
869
    if lists.is_seq(row): cols = None
870
    else:
871
        cols = row.keys()
872
        row = row.values()
873
    row = list(row) # ensure that "== []" works
874
    
875
    if row == []: query = None
876
    else: query = sql_gen.Values(row).to_str(db)
877
    
878
    return insert_select(db, table, cols, query, *args, **kw_args)
879

    
880
def mk_update(db, table, changes=None, cond=None, in_place=False,
881
    cacheable_=True):
882
    '''
883
    @param changes [(col, new_value),...]
884
        * container can be any iterable type
885
        * col: sql_gen.Code|str (for col name)
886
        * new_value: sql_gen.Code|literal value
887
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
888
    @param in_place If set, locks the table and updates rows in place.
889
        This avoids creating dead rows in PostgreSQL.
890
        * cond must be None
891
    @param cacheable_ Whether column structure information used to generate the
892
        query can be cached
893
    @return str query
894
    '''
895
    table = sql_gen.as_Table(table)
896
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
897
        for c, v in changes]
898
    
899
    if in_place:
900
        assert cond == None
901
        
902
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
903
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
904
            +db.col_info(sql_gen.with_default_table(c, table), cacheable_).type
905
            +'\nUSING '+v.to_str(db) for c, v in changes))
906
    else:
907
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
908
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
909
            for c, v in changes))
910
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
911
    
912
    query = with_explain_comment(db, query)
913
    
914
    return query
915

    
916
def update(db, table, *args, **kw_args):
917
    '''For params, see mk_update() and run_query()'''
918
    recover = kw_args.pop('recover', None)
919
    cacheable = kw_args.pop('cacheable', False)
920
    log_level = kw_args.pop('log_level', 2)
921
    
922
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
923
        cacheable, log_level=log_level)
924
    autoanalyze(db, table)
925
    return cur
926

    
927
def mk_delete(db, table, cond=None):
928
    '''
929
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
930
    @return str query
931
    '''
932
    query = 'DELETE FROM '+table.to_str(db)
933
    if cond != None: query += '\nWHERE '+cond.to_str(db)
934
    
935
    query = with_explain_comment(db, query)
936
    
937
    return query
938

    
939
def delete(db, table, *args, **kw_args):
940
    '''For params, see mk_delete() and run_query()'''
941
    recover = kw_args.pop('recover', None)
942
    cacheable = kw_args.pop('cacheable', True)
943
    log_level = kw_args.pop('log_level', 2)
944
    
945
    cur = run_query(db, mk_delete(db, table, *args, **kw_args), recover,
946
        cacheable, log_level=log_level)
947
    autoanalyze(db, table)
948
    return cur
949

    
950
def last_insert_id(db):
951
    module = util.root_module(db.db)
952
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
953
    elif module == 'MySQLdb': return db.insert_id()
954
    else: return None
955

    
956
def define_func(db, def_):
957
    func = def_.function
958
    while True:
959
        try:
960
            run_query(db, def_.to_str(db), recover=True, cacheable=True,
961
                log_ignore_excs=(DuplicateException,))
962
            break # successful
963
        except DuplicateException:
964
            func.name = next_version(func.name)
965
            # try again with next version of name
966

    
967
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
968
    '''Creates a mapping from original column names (which may have collisions)
969
    to names that will be distinct among the columns' tables.
970
    This is meant to be used for several tables that are being joined together.
971
    @param cols The columns to combine. Duplicates will be removed.
972
    @param into The table for the new columns.
973
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
974
        columns will be included in the mapping even if they are not in cols.
975
        The tables of the provided Col objects will be changed to into, so make
976
        copies of them if you want to keep the original tables.
977
    @param as_items Whether to return a list of dict items instead of a dict
978
    @return dict(orig_col=new_col, ...)
979
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
980
        * new_col: sql_gen.Col(orig_col_name, into)
981
        * All mappings use the into table so its name can easily be
982
          changed for all columns at once
983
    '''
984
    cols = lists.uniqify(cols)
985
    
986
    items = []
987
    for col in preserve:
988
        orig_col = copy.copy(col)
989
        col.table = into
990
        items.append((orig_col, col))
991
    preserve = set(preserve)
992
    for col in cols:
993
        if col not in preserve:
994
            items.append((col, sql_gen.Col(strings.ustr(col), into, col.srcs)))
995
    
996
    if not as_items: items = dict(items)
997
    return items
998

    
999
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
1000
    '''For params, see mk_flatten_mapping()
1001
    @return See return value of mk_flatten_mapping()
1002
    '''
1003
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
1004
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
1005
    run_query_into(db, mk_select(db, joins, cols, order_by=None, limit=limit,
1006
        start=start), into=into, add_pkey_=True)
1007
        # don't cache because the temp table will usually be truncated after use
1008
    return dict(items)
1009

    
1010
##### Database structure introspection
1011

    
1012
#### Expressions
1013

    
1014
bool_re = r'(?:true|false)'
1015

    
1016
def simplify_expr(expr):
1017
    expr = expr.replace('(NULL IS NULL)', 'true')
1018
    expr = expr.replace('(NULL IS NOT NULL)', 'false')
1019
    expr = re.sub(r' OR '+bool_re, r'', expr)
1020
    expr = re.sub(bool_re+r' OR ', r'', expr)
1021
    while True:
1022
        expr, n = re.subn(r'\((\([^()]*\))\)', r'\1', expr)
1023
        if n == 0: break
1024
    return expr
1025

    
1026
name_re = r'(?:\w+|(?:"[^"]*")+)'
1027

    
1028
def parse_expr_col(str_):
1029
    match = re.match(r'^\('+name_re+r'\(('+name_re+r').*\)\)$', str_)
1030
    if match: str_ = match.group(1)
1031
    return sql_gen.unesc_name(str_)
1032

    
1033
def map_expr(db, expr, mapping, in_cols_found=None):
1034
    '''Replaces output columns with input columns in an expression.
1035
    @param in_cols_found If set, will be filled in with the expr's (input) cols
1036
    '''
1037
    for out, in_ in mapping.iteritems():
1038
        orig_expr = expr
1039
        out = sql_gen.to_name_only_col(out)
1040
        in_str = sql_gen.to_name_only_col(sql_gen.remove_col_rename(in_)
1041
            ).to_str(db)
1042
        
1043
        # Replace out both with and without quotes
1044
        expr = expr.replace(out.to_str(db), in_str)
1045
        expr = re.sub(r'(?<!\.)\b'+out.name+r'\b(?!\.)', in_str, expr)
1046
        
1047
        if in_cols_found != None and expr != orig_expr: # replaced something
1048
            in_cols_found.append(in_)
1049
    
1050
    return simplify_expr(expr)
1051

    
1052
#### Tables
1053

    
1054
def tables(db, schema_like='public', table_like='%', exact=False):
1055
    if exact: compare = '='
1056
    else: compare = 'LIKE'
1057
    
1058
    module = util.root_module(db.db)
1059
    if module == 'psycopg2':
1060
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
1061
            ('tablename', sql_gen.CompareCond(table_like, compare))]
1062
        return values(select(db, 'pg_tables', ['tablename'], conds,
1063
            order_by='tablename', log_level=4))
1064
    elif module == 'MySQLdb':
1065
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
1066
            , cacheable=True, log_level=4))
1067
    else: raise NotImplementedError("Can't list tables for "+module+' database')
1068

    
1069
def table_exists(db, table):
1070
    table = sql_gen.as_Table(table)
1071
    return list(tables(db, table.schema, table.name, exact=True)) != []
1072

    
1073
def table_row_count(db, table, recover=None):
1074
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
1075
        order_by=None), recover=recover, log_level=3))
1076

    
1077
def table_cols(db, table, recover=None):
1078
    return list(col_names(select(db, table, limit=0, order_by=None,
1079
        recover=recover, log_level=4)))
1080

    
1081
pkey_col = 'row_num'
1082

    
1083
def pkey(db, table, recover=None):
1084
    '''Uses pkey_col, or if not found, the first column in the table.'''
1085
    cols = table_cols(db, table, recover)
1086
    if pkey_col in cols: return pkey_col
1087
    else: return cols[0]
1088

    
1089
not_null_col = 'not_null_col'
1090

    
1091
def table_not_null_col(db, table, recover=None):
1092
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
1093
    if not_null_col in table_cols(db, table, recover): return not_null_col
1094
    else: return pkey(db, table, recover)
1095

    
1096
def constraint_cond(db, constraint):
1097
    module = util.root_module(db.db)
1098
    if module == 'psycopg2':
1099
        table_str = sql_gen.Literal(constraint.table.to_str(db))
1100
        name_str = sql_gen.Literal(constraint.name)
1101
        return value(run_query(db, '''\
1102
SELECT consrc
1103
FROM pg_constraint
1104
WHERE
1105
conrelid = '''+table_str.to_str(db)+'''::regclass
1106
AND conname = '''+name_str.to_str(db)+'''
1107
'''
1108
            , cacheable=True, log_level=4))
1109
    else: raise NotImplementedError("Can't list index columns for "+module+
1110
        ' database')
1111

    
1112
def index_cols(db, index):
1113
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
1114
    automatically created. When you don't know whether something is a UNIQUE
1115
    constraint or a UNIQUE index, use this function.'''
1116
    index = sql_gen.as_Table(index)
1117
    module = util.root_module(db.db)
1118
    if module == 'psycopg2':
1119
        qual_index = sql_gen.Literal(index.to_str(db))
1120
        return map(parse_expr_col, values(run_query(db, '''\
1121
SELECT pg_get_indexdef(indexrelid, generate_series(1, indnatts), true)
1122
FROM pg_index
1123
WHERE indexrelid = '''+qual_index.to_str(db)+'''::regclass
1124
'''
1125
            , cacheable=True, log_level=4)))
1126
    else: raise NotImplementedError("Can't list index columns for "+module+
1127
        ' database')
1128

    
1129
#### Functions
1130

    
1131
def function_exists(db, function):
1132
    qual_function = sql_gen.Literal(function.to_str(db))
1133
    try:
1134
        select(db, fields=[sql_gen.Cast('regproc', qual_function)],
1135
            recover=True, cacheable=True, log_level=4)
1136
    except DoesNotExistException: return False
1137
    except DuplicateException: return True # overloaded function
1138
    else: return True
1139

    
1140
##### Structural changes
1141

    
1142
#### Columns
1143

    
1144
def add_col(db, table, col, comment=None, **kw_args):
1145
    '''
1146
    @param col TypedCol Name may be versioned, so be sure to propagate any
1147
        renaming back to any source column for the TypedCol.
1148
    @param comment None|str SQL comment used to distinguish columns of the same
1149
        name from each other when they contain different data, to allow the
1150
        ADD COLUMN query to be cached. If not set, query will not be cached.
1151
    '''
1152
    assert isinstance(col, sql_gen.TypedCol)
1153
    
1154
    while True:
1155
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
1156
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
1157
        
1158
        try:
1159
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
1160
            break
1161
        except DuplicateException:
1162
            col.name = next_version(col.name)
1163
            # try again with next version of name
1164

    
1165
def add_not_null(db, col):
1166
    table = col.table
1167
    col = sql_gen.to_name_only_col(col)
1168
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1169
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1170

    
1171
def drop_not_null(db, col):
1172
    table = col.table
1173
    col = sql_gen.to_name_only_col(col)
1174
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1175
        +col.to_str(db)+' DROP NOT NULL', cacheable=True, log_level=3)
1176

    
1177
row_num_col = '_row_num'
1178

    
1179
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1180
    constraints='PRIMARY KEY')
1181

    
1182
def add_row_num(db, table):
1183
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1184
    be the primary key.'''
1185
    add_col(db, table, row_num_typed_col, log_level=3)
1186

    
1187
#### Indexes
1188

    
1189
def add_pkey(db, table, cols=None, recover=None):
1190
    '''Adds a primary key.
1191
    @param cols [sql_gen.Col,...] The columns in the primary key.
1192
        Defaults to the first column in the table.
1193
    @pre The table must not already have a primary key.
1194
    '''
1195
    table = sql_gen.as_Table(table)
1196
    if cols == None: cols = [pkey(db, table, recover)]
1197
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1198
    
1199
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1200
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1201
        log_ignore_excs=(DuplicateException,))
1202

    
1203
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1204
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1205
    Currently, only function calls and literal values are supported expressions.
1206
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1207
        This allows indexes to be used for comparisons where NULLs are equal.
1208
    '''
1209
    exprs = lists.mk_seq(exprs)
1210
    
1211
    # Parse exprs
1212
    old_exprs = exprs[:]
1213
    exprs = []
1214
    cols = []
1215
    for i, expr in enumerate(old_exprs):
1216
        expr = sql_gen.as_Col(expr, table)
1217
        
1218
        # Handle nullable columns
1219
        if ensure_not_null_:
1220
            try: expr = sql_gen.ensure_not_null(db, expr)
1221
            except KeyError: pass # unknown type, so just create plain index
1222
        
1223
        # Extract col
1224
        expr = copy.deepcopy(expr) # don't modify input!
1225
        col = expr
1226
        if isinstance(expr, sql_gen.FunctionCall): col = expr.args[0]
1227
        expr = sql_gen.cast_literal(expr)
1228
        if not isinstance(expr, (sql_gen.Expr, sql_gen.Col)):
1229
            expr = sql_gen.Expr(expr)
1230
            
1231
        
1232
        # Extract table
1233
        if table == None:
1234
            assert sql_gen.is_table_col(col)
1235
            table = col.table
1236
        
1237
        if isinstance(col, sql_gen.Col): col.table = None
1238
        
1239
        exprs.append(expr)
1240
        cols.append(col)
1241
    
1242
    table = sql_gen.as_Table(table)
1243
    
1244
    # Add index
1245
    str_ = 'CREATE'
1246
    if unique: str_ += ' UNIQUE'
1247
    str_ += ' INDEX ON '+table.to_str(db)+' ('+(
1248
        ', '.join((v.to_str(db) for v in exprs)))+')'
1249
    run_query(db, str_, recover=True, cacheable=True, log_level=3)
1250

    
1251
already_indexed = object() # tells add_indexes() the pkey has already been added
1252

    
1253
def add_indexes(db, table, has_pkey=True):
1254
    '''Adds an index on all columns in a table.
1255
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1256
        index should be added on the first column.
1257
        * If already_indexed, the pkey is assumed to have already been added
1258
    '''
1259
    cols = table_cols(db, table)
1260
    if has_pkey:
1261
        if has_pkey is not already_indexed: add_pkey(db, table)
1262
        cols = cols[1:]
1263
    for col in cols: add_index(db, col, table)
1264

    
1265
#### Tables
1266

    
1267
### Maintenance
1268

    
1269
def analyze(db, table):
1270
    table = sql_gen.as_Table(table)
1271
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1272

    
1273
def autoanalyze(db, table):
1274
    if db.autoanalyze: analyze(db, table)
1275

    
1276
def vacuum(db, table):
1277
    table = sql_gen.as_Table(table)
1278
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1279
        log_level=3))
1280

    
1281
### Lifecycle
1282

    
1283
def drop(db, type_, name):
1284
    name = sql_gen.as_Name(name)
1285
    run_query(db, 'DROP '+type_+' IF EXISTS '+name.to_str(db)+' CASCADE')
1286

    
1287
def drop_table(db, table): drop(db, 'TABLE', table)
1288

    
1289
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1290
    like=None):
1291
    '''Creates a table.
1292
    @param cols [sql_gen.TypedCol,...] The column names and types
1293
    @param has_pkey If set, the first column becomes the primary key.
1294
    @param col_indexes bool|[ref]
1295
        * If True, indexes will be added on all non-pkey columns.
1296
        * If a list reference, [0] will be set to a function to do this.
1297
          This can be used to delay index creation until the table is populated.
1298
    '''
1299
    table = sql_gen.as_Table(table)
1300
    
1301
    if like != None:
1302
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1303
            ]+cols
1304
    if has_pkey:
1305
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1306
        pkey.constraints = 'PRIMARY KEY'
1307
    
1308
    temp = table.is_temp and not db.debug_temp
1309
        # temp tables permanent in debug_temp mode
1310
    
1311
    # Create table
1312
    def create():
1313
        str_ = 'CREATE'
1314
        if temp: str_ += ' TEMP'
1315
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1316
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1317
        str_ += '\n);'
1318
        
1319
        run_query(db, str_, recover=True, cacheable=True, log_level=2,
1320
            log_ignore_excs=(DuplicateException,))
1321
    if table.is_temp:
1322
        while True:
1323
            try:
1324
                create()
1325
                break
1326
            except DuplicateException:
1327
                table.name = next_version(table.name)
1328
                # try again with next version of name
1329
    else: create()
1330
    
1331
    # Add indexes
1332
    if has_pkey: has_pkey = already_indexed
1333
    def add_indexes_(): add_indexes(db, table, has_pkey)
1334
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1335
    elif col_indexes: add_indexes_() # add now
1336

    
1337
def copy_table_struct(db, src, dest):
1338
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1339
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1340

    
1341
### Data
1342

    
1343
def truncate(db, table, schema='public', **kw_args):
1344
    '''For params, see run_query()'''
1345
    table = sql_gen.as_Table(table, schema)
1346
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1347

    
1348
def empty_temp(db, tables):
1349
    tables = lists.mk_seq(tables)
1350
    for table in tables: truncate(db, table, log_level=3)
1351

    
1352
def empty_db(db, schema='public', **kw_args):
1353
    '''For kw_args, see tables()'''
1354
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1355

    
1356
def distinct_table(db, table, distinct_on):
1357
    '''Creates a copy of a temp table which is distinct on the given columns.
1358
    The old and new tables will both get an index on these columns, to
1359
    facilitate merge joins.
1360
    @param distinct_on If empty, creates a table with one row. This is useful if
1361
        your distinct_on columns are all literal values.
1362
    @return The new table.
1363
    '''
1364
    new_table = sql_gen.suffixed_table(table, '_distinct')
1365
    distinct_on = filter(sql_gen.is_table_col, distinct_on)
1366
    
1367
    copy_table_struct(db, table, new_table)
1368
    
1369
    limit = None
1370
    if distinct_on == []: limit = 1 # one sample row
1371
    else:
1372
        add_index(db, distinct_on, new_table, unique=True)
1373
        add_index(db, distinct_on, table) # for join optimization
1374
    
1375
    insert_select(db, new_table, None, mk_select(db, table, order_by=None,
1376
        limit=limit), ignore=True)
1377
    analyze(db, new_table)
1378
    
1379
    return new_table
(27-27/40)