Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import time
6
import warnings
7

    
8
import exc
9
import dicts
10
import iters
11
import lists
12
import profiling
13
from Proxy import Proxy
14
import rand
15
import sql_gen
16
import strings
17
import util
18

    
19
##### Exceptions
20

    
21
def get_cur_query(cur, input_query=None):
22
    raw_query = None
23
    if hasattr(cur, 'query'): raw_query = cur.query
24
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
25
    
26
    if raw_query != None: return raw_query
27
    else: return '[input] '+strings.ustr(input_query)
28

    
29
def _add_cursor_info(e, *args, **kw_args):
30
    '''For params, see get_cur_query()'''
31
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
32

    
33
class DbException(exc.ExceptionWithCause):
34
    def __init__(self, msg, cause=None, cur=None):
35
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
36
        if cur != None: _add_cursor_info(self, cur)
37

    
38
class ExceptionWithName(DbException):
39
    def __init__(self, name, cause=None):
40
        DbException.__init__(self, 'for name: '
41
            +strings.as_tt(strings.ustr(name)), cause)
42
        self.name = name
43

    
44
class ExceptionWithValue(DbException):
45
    def __init__(self, value, cause=None):
46
        DbException.__init__(self, 'for value: '
47
            +strings.as_tt(strings.urepr(value)), cause)
48
        self.value = value
49

    
50
class ExceptionWithNameType(DbException):
51
    def __init__(self, type_, name, cause=None):
52
        DbException.__init__(self, 'for type: '+strings.as_tt(strings.ustr(
53
            type_))+'; name: '+strings.as_tt(name), cause)
54
        self.type = type_
55
        self.name = name
56

    
57
class ConstraintException(DbException):
58
    def __init__(self, name, cond, cols, cause=None):
59
        msg = 'Violated '+strings.as_tt(name)+' constraint'
60
        if cond != None: msg += ' with condition '+cond
61
        if cols != []: msg += ' on columns: '+strings.as_tt(', '.join(cols))
62
        DbException.__init__(self, msg, cause)
63
        self.name = name
64
        self.cond = cond
65
        self.cols = cols
66

    
67
class MissingCastException(DbException):
68
    def __init__(self, type_, col=None, cause=None):
69
        msg = 'Missing cast to type '+strings.as_tt(type_)
70
        if col != None: msg += ' on column: '+strings.as_tt(col)
71
        DbException.__init__(self, msg, cause)
72
        self.type = type_
73
        self.col = col
74

    
75
class NameException(DbException): pass
76

    
77
class DuplicateKeyException(ConstraintException): pass
78

    
79
class NullValueException(ConstraintException): pass
80

    
81
class CheckException(ConstraintException): pass
82

    
83
class InvalidValueException(ExceptionWithValue): pass
84

    
85
class DuplicateException(ExceptionWithNameType): pass
86

    
87
class DoesNotExistException(ExceptionWithNameType): pass
88

    
89
class EmptyRowException(DbException): pass
90

    
91
##### Warnings
92

    
93
class DbWarning(UserWarning): pass
94

    
95
##### Result retrieval
96

    
97
def col_names(cur): return (col[0] for col in cur.description)
98

    
99
def rows(cur): return iter(lambda: cur.fetchone(), None)
100

    
101
def consume_rows(cur):
102
    '''Used to fetch all rows so result will be cached'''
103
    iters.consume_iter(rows(cur))
104

    
105
def next_row(cur): return rows(cur).next()
106

    
107
def row(cur):
108
    row_ = next_row(cur)
109
    consume_rows(cur)
110
    return row_
111

    
112
def next_value(cur): return next_row(cur)[0]
113

    
114
def value(cur): return row(cur)[0]
115

    
116
def values(cur): return iters.func_iter(lambda: next_value(cur))
117

    
118
def value_or_none(cur):
119
    try: return value(cur)
120
    except StopIteration: return None
121

    
122
##### Escaping
123

    
124
def esc_name_by_module(module, name):
125
    if module == 'psycopg2' or module == None: quote = '"'
126
    elif module == 'MySQLdb': quote = '`'
127
    else: raise NotImplementedError("Can't escape name for "+module+' database')
128
    return sql_gen.esc_name(name, quote)
129

    
130
def esc_name_by_engine(engine, name, **kw_args):
131
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
132

    
133
def esc_name(db, name, **kw_args):
134
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
135

    
136
def qual_name(db, schema, table):
137
    def esc_name_(name): return esc_name(db, name)
138
    table = esc_name_(table)
139
    if schema != None: return esc_name_(schema)+'.'+table
140
    else: return table
141

    
142
##### Database connections
143

    
144
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
145

    
146
db_engines = {
147
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
148
    'PostgreSQL': ('psycopg2', {}),
149
}
150

    
151
DatabaseErrors_set = set([DbException])
152
DatabaseErrors = tuple(DatabaseErrors_set)
153

    
154
def _add_module(module):
155
    DatabaseErrors_set.add(module.DatabaseError)
156
    global DatabaseErrors
157
    DatabaseErrors = tuple(DatabaseErrors_set)
158

    
159
def db_config_str(db_config):
160
    return db_config['engine']+' database '+db_config['database']
161

    
162
log_debug_none = lambda msg, level=2: None
163

    
164
class DbConn:
165
    def __init__(self, db_config, autocommit=True, caching=True,
166
        log_debug=log_debug_none, debug_temp=False, src=None):
167
        '''
168
        @param debug_temp Whether temporary objects should instead be permanent.
169
            This assists in debugging the internal objects used by the program.
170
        @param src In autocommit mode, will be included in a comment in every
171
            query, to help identify the data source in pg_stat_activity.
172
        '''
173
        self.db_config = db_config
174
        self.autocommit = autocommit
175
        self.caching = caching
176
        self.log_debug = log_debug
177
        self.debug = log_debug != log_debug_none
178
        self.debug_temp = debug_temp
179
        self.src = src
180
        self.autoanalyze = False
181
        self.autoexplain = False
182
        self.profile_row_ct = None
183
        
184
        self._savepoint = 0
185
        self._reset()
186
    
187
    def __getattr__(self, name):
188
        if name == '__dict__': raise Exception('getting __dict__')
189
        if name == 'db': return self._db()
190
        else: raise AttributeError()
191
    
192
    def __getstate__(self):
193
        state = copy.copy(self.__dict__) # shallow copy
194
        state['log_debug'] = None # don't pickle the debug callback
195
        state['_DbConn__db'] = None # don't pickle the connection
196
        return state
197
    
198
    def clear_cache(self): self.query_results = {}
199
    
200
    def _reset(self):
201
        self.clear_cache()
202
        assert self._savepoint == 0
203
        self._notices_seen = set()
204
        self.__db = None
205
    
206
    def connected(self): return self.__db != None
207
    
208
    def close(self):
209
        if not self.connected(): return
210
        
211
        # Record that the automatic transaction is now closed
212
        self._savepoint -= 1
213
        
214
        self.db.close()
215
        self._reset()
216
    
217
    def reconnect(self):
218
        # Do not do this in test mode as it would roll back everything
219
        if self.autocommit: self.close()
220
        # Connection will be reopened automatically on first query
221
    
222
    def _db(self):
223
        if self.__db == None:
224
            # Process db_config
225
            db_config = self.db_config.copy() # don't modify input!
226
            schemas = db_config.pop('schemas', None)
227
            module_name, mappings = db_engines[db_config.pop('engine')]
228
            module = __import__(module_name)
229
            _add_module(module)
230
            for orig, new in mappings.iteritems():
231
                try: util.rename_key(db_config, orig, new)
232
                except KeyError: pass
233
            
234
            # Connect
235
            self.__db = module.connect(**db_config)
236
            
237
            # Record that a transaction is already open
238
            self._savepoint += 1
239
            
240
            # Configure connection
241
            if hasattr(self.db, 'set_isolation_level'):
242
                import psycopg2.extensions
243
                self.db.set_isolation_level(
244
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
245
            if schemas != None:
246
                search_path = [self.esc_name(s) for s in schemas.split(',')]
247
                search_path.append(value(run_query(self, 'SHOW search_path',
248
                    log_level=4)))
249
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
250
                    log_level=3)
251
        
252
        return self.__db
253
    
254
    class DbCursor(Proxy):
255
        def __init__(self, outer):
256
            Proxy.__init__(self, outer.db.cursor())
257
            self.outer = outer
258
            self.query_results = outer.query_results
259
            self.query_lookup = None
260
            self.result = []
261
        
262
        def execute(self, query):
263
            self._is_insert = query.startswith('INSERT')
264
            self.query_lookup = query
265
            try:
266
                try: cur = self.inner.execute(query)
267
                finally: self.query = get_cur_query(self.inner, query)
268
            except Exception, e:
269
                self.result = e # cache the exception as the result
270
                self._cache_result()
271
                raise
272
            
273
            # Always cache certain queries
274
            query = sql_gen.lstrip(query)
275
            if query.startswith('CREATE') or query.startswith('ALTER'):
276
                # structural changes
277
                # Rest of query must be unique in the face of name collisions,
278
                # so don't cache ADD COLUMN unless it has distinguishing comment
279
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
280
                    self._cache_result()
281
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
282
                consume_rows(self) # fetch all rows so result will be cached
283
            
284
            return cur
285
        
286
        def fetchone(self):
287
            row = self.inner.fetchone()
288
            if row != None: self.result.append(row)
289
            # otherwise, fetched all rows
290
            else: self._cache_result()
291
            return row
292
        
293
        def _cache_result(self):
294
            # For inserts that return a result set, don't cache result set since
295
            # inserts are not idempotent. Other non-SELECT queries don't have
296
            # their result set read, so only exceptions will be cached (an
297
            # invalid query will always be invalid).
298
            if self.query_results != None and (not self._is_insert
299
                or isinstance(self.result, Exception)):
300
                
301
                assert self.query_lookup != None
302
                self.query_results[self.query_lookup] = self.CacheCursor(
303
                    util.dict_subset(dicts.AttrsDictView(self),
304
                    ['query', 'result', 'rowcount', 'description']))
305
        
306
        class CacheCursor:
307
            def __init__(self, cached_result): self.__dict__ = cached_result
308
            
309
            def execute(self, *args, **kw_args):
310
                if isinstance(self.result, Exception): raise self.result
311
                # otherwise, result is a rows list
312
                self.iter = iter(self.result)
313
            
314
            def fetchone(self):
315
                try: return self.iter.next()
316
                except StopIteration: return None
317
    
318
    def esc_value(self, value):
319
        try: str_ = self.mogrify('%s', [value])
320
        except NotImplementedError, e:
321
            module = util.root_module(self.db)
322
            if module == 'MySQLdb':
323
                import _mysql
324
                str_ = _mysql.escape_string(value)
325
            else: raise e
326
        return strings.to_unicode(str_)
327
    
328
    def esc_name(self, name): return esc_name(self, name) # calls global func
329
    
330
    def std_code(self, str_):
331
        '''Standardizes SQL code.
332
        * Ensures that string literals are prefixed by `E`
333
        '''
334
        if str_.startswith("'"): str_ = 'E'+str_
335
        return str_
336
    
337
    def can_mogrify(self):
338
        module = util.root_module(self.db)
339
        return module == 'psycopg2'
340
    
341
    def mogrify(self, query, params=None):
342
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
343
        else: raise NotImplementedError("Can't mogrify query")
344
    
345
    def print_notices(self):
346
        if hasattr(self.db, 'notices'):
347
            for msg in self.db.notices:
348
                if msg not in self._notices_seen:
349
                    self._notices_seen.add(msg)
350
                    self.log_debug(msg, level=2)
351
    
352
    def run_query(self, query, cacheable=False, log_level=2,
353
        debug_msg_ref=None):
354
        '''
355
        @param log_ignore_excs The log_level will be increased by 2 if the query
356
            throws one of these exceptions.
357
        @param debug_msg_ref If specified, the log message will be returned in
358
            this instead of being output. This allows you to filter log messages
359
            depending on the result of the query.
360
        '''
361
        assert query != None
362
        
363
        if self.autocommit and self.src != None:
364
            query = sql_gen.esc_comment(self.src)+'\t'+query
365
        
366
        if not self.caching: cacheable = False
367
        used_cache = False
368
        
369
        if self.debug:
370
            profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
371
        try:
372
            # Get cursor
373
            if cacheable:
374
                try: cur = self.query_results[query]
375
                except KeyError: cur = self.DbCursor(self)
376
                else: used_cache = True
377
            else: cur = self.db.cursor()
378
            
379
            # Run query
380
            try: cur.execute(query)
381
            except Exception, e:
382
                _add_cursor_info(e, self, query)
383
                raise
384
            else: self.do_autocommit()
385
        finally:
386
            if self.debug:
387
                profiler.stop(self.profile_row_ct)
388
                
389
                ## Log or return query
390
                
391
                query = strings.ustr(get_cur_query(cur, query))
392
                # Put the src comment on a separate line in the log file
393
                query = query.replace('\t', '\n', 1)
394
                
395
                msg = 'DB query: '
396
                
397
                if used_cache: msg += 'cache hit'
398
                elif cacheable: msg += 'cache miss'
399
                else: msg += 'non-cacheable'
400
                
401
                msg += ':\n'+profiler.msg()+'\n'+strings.as_code(query, 'SQL')
402
                
403
                if debug_msg_ref != None: debug_msg_ref[0] = msg
404
                else: self.log_debug(msg, log_level)
405
                
406
                self.print_notices()
407
        
408
        return cur
409
    
410
    def is_cached(self, query): return query in self.query_results
411
    
412
    def with_autocommit(self, func):
413
        import psycopg2.extensions
414
        
415
        prev_isolation_level = self.db.isolation_level
416
        self.db.set_isolation_level(
417
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
418
        try: return func()
419
        finally: self.db.set_isolation_level(prev_isolation_level)
420
    
421
    def with_savepoint(self, func):
422
        top = self._savepoint == 0
423
        savepoint = 'level_'+str(self._savepoint)
424
        
425
        if self.debug:
426
            self.log_debug('Begin transaction', level=4)
427
            profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
428
        
429
        # Must happen before running queries so they don't get autocommitted
430
        self._savepoint += 1
431
        
432
        if top: query = 'START TRANSACTION ISOLATION LEVEL READ COMMITTED'
433
        else: query = 'SAVEPOINT '+savepoint
434
        self.run_query(query, log_level=4)
435
        try:
436
            return func()
437
            if top: self.run_query('COMMIT', log_level=4)
438
        except:
439
            if top: query = 'ROLLBACK'
440
            else: query = 'ROLLBACK TO SAVEPOINT '+savepoint
441
            self.run_query(query, log_level=4)
442
            
443
            raise
444
        finally:
445
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
446
            # "The savepoint remains valid and can be rolled back to again"
447
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
448
            if not top:
449
                self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
450
            
451
            self._savepoint -= 1
452
            assert self._savepoint >= 0
453
            
454
            if self.debug:
455
                profiler.stop(self.profile_row_ct)
456
                self.log_debug('End transaction\n'+profiler.msg(), level=4)
457
            
458
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
459
    
460
    def do_autocommit(self):
461
        '''Autocommits if outside savepoint'''
462
        assert self._savepoint >= 1
463
        if self.autocommit and self._savepoint == 1:
464
            self.log_debug('Autocommitting', level=4)
465
            self.db.commit()
466
    
467
    def col_info(self, col, cacheable=True):
468
        table = sql_gen.Table('columns', 'information_schema')
469
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
470
            'USER-DEFINED'), sql_gen.Col('udt_name'))
471
        cols = [type_, 'column_default',
472
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
473
        
474
        conds = [('table_name', col.table.name),
475
            ('column_name', strings.ustr(col.name))]
476
        schema = col.table.schema
477
        if schema != None: conds.append(('table_schema', schema))
478
        
479
        cur = select(self, table, cols, conds, order_by='table_schema', limit=1,
480
            cacheable=cacheable, log_level=4) # TODO: order by search_path order
481
        try: type_, default, nullable = row(cur)
482
        except StopIteration: raise sql_gen.NoUnderlyingTableException(col)
483
        default = sql_gen.as_Code(default, self)
484
        
485
        return sql_gen.TypedCol(col.name, type_, default, nullable)
486
    
487
    def TempFunction(self, name):
488
        if self.debug_temp: schema = None
489
        else: schema = 'pg_temp'
490
        return sql_gen.Function(name, schema)
491

    
492
connect = DbConn
493

    
494
##### Recoverable querying
495

    
496
def with_savepoint(db, func): return db.with_savepoint(func)
497

    
498
def run_query(db, query, recover=None, cacheable=False, log_level=2,
499
    log_ignore_excs=None, **kw_args):
500
    '''For params, see DbConn.run_query()'''
501
    if recover == None: recover = False
502
    if log_ignore_excs == None: log_ignore_excs = ()
503
    log_ignore_excs = tuple(log_ignore_excs)
504
    debug_msg_ref = [None]
505
    
506
    query = with_explain_comment(db, query)
507
    
508
    try:
509
        try:
510
            def run(): return db.run_query(query, cacheable, log_level,
511
                debug_msg_ref, **kw_args)
512
            if recover and not db.is_cached(query):
513
                return with_savepoint(db, run)
514
            else: return run() # don't need savepoint if cached
515
        except Exception, e:
516
            msg = strings.ustr(e.args[0])
517
            msg = re.sub(r'^(?:PL/Python: )?ValueError: ', r'', msg)
518
            
519
            match = re.match(r'^duplicate key value violates unique constraint '
520
                r'"(.+?)"', msg)
521
            if match:
522
                constraint, = match.groups()
523
                cols = []
524
                if recover: # need auto-rollback to run index_cols()
525
                    try: cols = index_cols(db, constraint)
526
                    except NotImplementedError: pass
527
                raise DuplicateKeyException(constraint, None, cols, e)
528
            
529
            match = re.match(r'^null value in column "(.+?)" violates not-null'
530
                r' constraint', msg)
531
            if match:
532
                col, = match.groups()
533
                raise NullValueException('NOT NULL', None, [col], e)
534
            
535
            match = re.match(r'^new row for relation "(.+?)" violates check '
536
                r'constraint "(.+?)"', msg)
537
            if match:
538
                table, constraint = match.groups()
539
                constraint = sql_gen.Col(constraint, table)
540
                cond = None
541
                if recover: # need auto-rollback to run constraint_cond()
542
                    try: cond = constraint_cond(db, constraint)
543
                    except NotImplementedError: pass
544
                raise CheckException(constraint.to_str(db), cond, [], e)
545
            
546
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
547
                r'|.+? out of range): "(.+?)"', msg)
548
            if match:
549
                value, = match.groups()
550
                raise InvalidValueException(strings.to_unicode(value), e)
551
            
552
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
553
                r'is of type', msg)
554
            if match:
555
                col, type_ = match.groups()
556
                raise MissingCastException(type_, col, e)
557
            
558
            match = re.match(r'^could not determine polymorphic type because '
559
                r'input has type "unknown"', msg)
560
            if match: raise MissingCastException('text', None, e)
561
            
562
            match = re.match(r'^.+? types .+? and .+? cannot be matched', msg)
563
            if match: raise MissingCastException('text', None, e)
564
            
565
            typed_name_re = r'^(\S+) "(.+?)"(?: of relation ".+?")?'
566
            
567
            match = re.match(typed_name_re+r'.*? already exists', msg)
568
            if match:
569
                type_, name = match.groups()
570
                raise DuplicateException(type_, name, e)
571
            
572
            match = re.match(r'more than one (\S+) named ""(.+?)""', msg)
573
            if match:
574
                type_, name = match.groups()
575
                raise DuplicateException(type_, name, e)
576
            
577
            match = re.match(typed_name_re+r' does not exist', msg)
578
            if match:
579
                type_, name = match.groups()
580
                raise DoesNotExistException(type_, name, e)
581
            
582
            raise # no specific exception raised
583
    except log_ignore_excs:
584
        log_level += 2
585
        raise
586
    finally:
587
        if debug_msg_ref[0] != None: db.log_debug(debug_msg_ref[0], log_level)
588

    
589
##### Basic queries
590

    
591
def is_explainable(query):
592
    # See <http://www.postgresql.org/docs/8.3/static/sql-explain.html#AEN57749>
593
    return re.match(r'^(?:SELECT|INSERT|UPDATE|DELETE|VALUES|EXECUTE|DECLARE)\b'
594
        , query)
595

    
596
def explain(db, query, **kw_args):
597
    '''
598
    For params, see run_query().
599
    '''
600
    kw_args.setdefault('log_level', 4)
601
    
602
    return strings.ustr(strings.join_lines(values(run_query(db,
603
        'EXPLAIN '+query, recover=True, cacheable=True, **kw_args))))
604
        # not a higher log_level because it's useful to see what query is being
605
        # run before it's executed, which EXPLAIN effectively provides
606

    
607
def has_comment(query): return query.endswith('*/')
608

    
609
def with_explain_comment(db, query, **kw_args):
610
    if db.autoexplain and not has_comment(query) and is_explainable(query):
611
        query += '\n'+sql_gen.esc_comment(' EXPLAIN:\n'
612
            +explain(db, query, **kw_args))
613
    return query
614

    
615
def next_version(name):
616
    version = 1 # first existing name was version 0
617
    match = re.match(r'^(.*)#(\d+)$', name)
618
    if match:
619
        name, version = match.groups()
620
        version = int(version)+1
621
    return sql_gen.concat(name, '#'+str(version))
622

    
623
def lock_table(db, table, mode):
624
    table = sql_gen.as_Table(table)
625
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
626

    
627
def run_query_into(db, query, into=None, add_pkey_=False, **kw_args):
628
    '''Outputs a query to a temp table.
629
    For params, see run_query().
630
    '''
631
    if into == None: return run_query(db, query, **kw_args)
632
    
633
    assert isinstance(into, sql_gen.Table)
634
    
635
    into.is_temp = True
636
    # "temporary tables cannot specify a schema name", so remove schema
637
    into.schema = None
638
    
639
    kw_args['recover'] = True
640
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
641
    
642
    temp = not db.debug_temp # tables are permanent in debug_temp mode
643
    
644
    # Create table
645
    while True:
646
        create_query = 'CREATE'
647
        if temp: create_query += ' TEMP'
648
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
649
        
650
        try:
651
            cur = run_query(db, create_query, **kw_args)
652
                # CREATE TABLE AS sets rowcount to # rows in query
653
            break
654
        except DuplicateException, e:
655
            into.name = next_version(into.name)
656
            # try again with next version of name
657
    
658
    if add_pkey_: add_pkey(db, into)
659
    
660
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
661
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
662
    # table is going to be used in complex queries, it is wise to run ANALYZE on
663
    # the temporary table after it is populated."
664
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
665
    # If into is not a temp table, ANALYZE is useful but not required.
666
    analyze(db, into)
667
    
668
    return cur
669

    
670
order_by_pkey = object() # tells mk_select() to order by the pkey
671

    
672
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
673

    
674
def mk_select(db, tables=None, fields=None, conds=None, distinct_on=[],
675
    limit=None, start=None, order_by=order_by_pkey, default_table=None,
676
    explain=True):
677
    '''
678
    @param tables The single table to select from, or a list of tables to join
679
        together, with tables after the first being sql_gen.Join objects
680
    @param fields Use None to select all fields in the table
681
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
682
        * container can be any iterable type
683
        * compare_left_side: sql_gen.Code|str (for col name)
684
        * compare_right_side: sql_gen.ValueCond|literal value
685
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
686
        use all columns
687
    @return query
688
    '''
689
    # Parse tables param
690
    tables = lists.mk_seq(tables)
691
    tables = list(tables) # don't modify input! (list() copies input)
692
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
693
    
694
    # Parse other params
695
    if conds == None: conds = []
696
    elif dicts.is_dict(conds): conds = conds.items()
697
    conds = list(conds) # don't modify input! (list() copies input)
698
    assert limit == None or isinstance(limit, (int, long))
699
    assert start == None or isinstance(start, (int, long))
700
    if order_by is order_by_pkey:
701
        if table0 == None or distinct_on != []: order_by = None
702
        else: order_by = pkey(db, table0, recover=True)
703
    
704
    query = 'SELECT'
705
    
706
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
707
    
708
    # DISTINCT ON columns
709
    if distinct_on != []:
710
        query += '\nDISTINCT'
711
        if distinct_on is not distinct_on_all:
712
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
713
    
714
    # Columns
715
    if query.find('\n') >= 0: whitespace = '\n'
716
    else: whitespace = ' '
717
    if fields == None: query += whitespace+'*'
718
    else:
719
        assert fields != []
720
        if len(fields) > 1: whitespace = '\n'
721
        query += whitespace+('\n, '.join(map(parse_col, fields)))
722
    
723
    # Main table
724
    if query.find('\n') >= 0 or len(tables) > 0: whitespace = '\n'
725
    else: whitespace = ' '
726
    if table0 != None: query += whitespace+'FROM '+table0.to_str(db)
727
    
728
    # Add joins
729
    left_table = table0
730
    for join_ in tables:
731
        table = join_.table
732
        
733
        # Parse special values
734
        if join_.type_ is sql_gen.filter_out: # filter no match
735
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
736
                sql_gen.CompareCond(None, '~=')))
737
        
738
        query += '\n'+join_.to_str(db, left_table)
739
        
740
        left_table = table
741
    
742
    missing = True
743
    if conds != []:
744
        if len(conds) == 1: whitespace = ' '
745
        else: whitespace = '\n'
746
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
747
            .to_str(db) for l, r in conds], 'WHERE')
748
    if order_by != None:
749
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
750
    if limit != None: query += '\nLIMIT '+str(limit)
751
    if start != None:
752
        if start != 0: query += '\nOFFSET '+str(start)
753
    
754
    if explain: query = with_explain_comment(db, query)
755
    
756
    return query
757

    
758
def select(db, *args, **kw_args):
759
    '''For params, see mk_select() and run_query()'''
760
    recover = kw_args.pop('recover', None)
761
    cacheable = kw_args.pop('cacheable', True)
762
    log_level = kw_args.pop('log_level', 2)
763
    
764
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
765
        log_level=log_level)
766

    
767
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
768
    embeddable=False, ignore=False, src=None):
769
    '''
770
    @param returning str|None An inserted column (such as pkey) to return
771
    @param embeddable Whether the query should be embeddable as a nested SELECT.
772
        Warning: If you set this and cacheable=True when the query is run, the
773
        query will be fully cached, not just if it raises an exception.
774
    @param ignore Whether to ignore duplicate keys.
775
    @param src Will be included in the name of any created function, to help
776
        identify the data source in pg_stat_activity.
777
    '''
778
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
779
    if cols == []: cols = None # no cols (all defaults) = unknown col names
780
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
781
    if select_query == None: select_query = 'DEFAULT VALUES'
782
    if returning != None: returning = sql_gen.as_Col(returning, table)
783
    
784
    first_line = 'INSERT INTO '+table.to_str(db)
785
    
786
    def mk_insert(select_query):
787
        query = first_line
788
        if cols != None:
789
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
790
        query += '\n'+select_query
791
        
792
        if returning != None:
793
            returning_name_col = sql_gen.to_name_only_col(returning)
794
            query += '\nRETURNING '+returning_name_col.to_str(db)
795
        
796
        return query
797
    
798
    return_type = sql_gen.CustomCode('unknown')
799
    if returning != None: return_type = sql_gen.ColType(returning)
800
    
801
    if ignore:
802
        # Always return something to set the correct rowcount
803
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
804
        
805
        embeddable = True # must use function
806
        
807
        if cols == None: row = [sql_gen.Col(sql_gen.all_cols, 'row')]
808
        else: row = [sql_gen.Col(c.name, 'row') for c in cols]
809
        
810
        query = sql_gen.RowExcIgnore(sql_gen.RowType(table), select_query,
811
            sql_gen.ReturnQuery(mk_insert(sql_gen.Values(row).to_str(db))),
812
            cols)
813
    else: query = mk_insert(select_query)
814
    
815
    if embeddable:
816
        # Create function
817
        function_name = sql_gen.clean_name(first_line)
818
        if src != None: function_name = src+': '+function_name
819
        while True:
820
            try:
821
                func = db.TempFunction(function_name)
822
                def_ = sql_gen.FunctionDef(func, sql_gen.SetOf(return_type),
823
                    query)
824
                
825
                run_query(db, def_.to_str(db), recover=True, cacheable=True,
826
                    log_ignore_excs=(DuplicateException,))
827
                break # this version was successful
828
            except DuplicateException, e:
829
                function_name = next_version(function_name)
830
                # try again with next version of name
831
        
832
        # Return query that uses function
833
        cols = None
834
        if returning != None: cols = [returning]
835
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(func), cols)
836
            # AS clause requires function alias
837
        return mk_select(db, func_table, order_by=None)
838
    
839
    return query
840

    
841
def insert_select(db, table, *args, **kw_args):
842
    '''For params, see mk_insert_select() and run_query_into()
843
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
844
        values in
845
    '''
846
    returning = kw_args.get('returning', None)
847
    ignore = kw_args.get('ignore', False)
848
    
849
    into = kw_args.pop('into', None)
850
    if into != None: kw_args['embeddable'] = True
851
    recover = kw_args.pop('recover', None)
852
    if ignore: recover = True
853
    cacheable = kw_args.pop('cacheable', True)
854
    log_level = kw_args.pop('log_level', 2)
855
    
856
    rowcount_only = ignore and returning == None # keep NULL rows on server
857
    if rowcount_only: into = sql_gen.Table('rowcount')
858
    
859
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
860
        into, recover=recover, cacheable=cacheable, log_level=log_level)
861
    if rowcount_only: empty_temp(db, into)
862
    autoanalyze(db, table)
863
    return cur
864

    
865
default = sql_gen.default # tells insert() to use the default value for a column
866

    
867
def insert(db, table, row, *args, **kw_args):
868
    '''For params, see insert_select()'''
869
    if lists.is_seq(row): cols = None
870
    else:
871
        cols = row.keys()
872
        row = row.values()
873
    row = list(row) # ensure that "== []" works
874
    
875
    if row == []: query = None
876
    else: query = sql_gen.Values(row).to_str(db)
877
    
878
    return insert_select(db, table, cols, query, *args, **kw_args)
879

    
880
def mk_update(db, table, changes=None, cond=None, in_place=False,
881
    cacheable_=True):
882
    '''
883
    @param changes [(col, new_value),...]
884
        * container can be any iterable type
885
        * col: sql_gen.Code|str (for col name)
886
        * new_value: sql_gen.Code|literal value
887
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
888
    @param in_place If set, locks the table and updates rows in place.
889
        This avoids creating dead rows in PostgreSQL.
890
        * cond must be None
891
    @param cacheable_ Whether column structure information used to generate the
892
        query can be cached
893
    @return str query
894
    '''
895
    table = sql_gen.as_Table(table)
896
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
897
        for c, v in changes]
898
    
899
    if in_place:
900
        assert cond == None
901
        
902
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
903
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
904
            +db.col_info(sql_gen.with_default_table(c, table), cacheable_).type
905
            +'\nUSING '+v.to_str(db) for c, v in changes))
906
    else:
907
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
908
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
909
            for c, v in changes))
910
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
911
    
912
    query = with_explain_comment(db, query)
913
    
914
    return query
915

    
916
def update(db, table, *args, **kw_args):
917
    '''For params, see mk_update() and run_query()'''
918
    recover = kw_args.pop('recover', None)
919
    cacheable = kw_args.pop('cacheable', False)
920
    log_level = kw_args.pop('log_level', 2)
921
    
922
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
923
        cacheable, log_level=log_level)
924
    autoanalyze(db, table)
925
    return cur
926

    
927
def mk_delete(db, table, cond=None):
928
    '''
929
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
930
    @return str query
931
    '''
932
    query = 'DELETE FROM '+table.to_str(db)
933
    if cond != None: query += '\nWHERE '+cond.to_str(db)
934
    
935
    query = with_explain_comment(db, query)
936
    
937
    return query
938

    
939
def delete(db, table, *args, **kw_args):
940
    '''For params, see mk_delete() and run_query()'''
941
    recover = kw_args.pop('recover', None)
942
    cacheable = kw_args.pop('cacheable', True)
943
    log_level = kw_args.pop('log_level', 2)
944
    
945
    cur = run_query(db, mk_delete(db, table, *args, **kw_args), recover,
946
        cacheable, log_level=log_level)
947
    autoanalyze(db, table)
948
    return cur
949

    
950
def last_insert_id(db):
951
    module = util.root_module(db.db)
952
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
953
    elif module == 'MySQLdb': return db.insert_id()
954
    else: return None
955

    
956
def define_func(db, def_):
957
    func = def_.function
958
    while True:
959
        try:
960
            run_query(db, def_.to_str(db), recover=True, cacheable=True,
961
                log_ignore_excs=(DuplicateException,))
962
            break # successful
963
        except DuplicateException:
964
            func.name = next_version(func.name)
965
            # try again with next version of name
966

    
967
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
968
    '''Creates a mapping from original column names (which may have collisions)
969
    to names that will be distinct among the columns' tables.
970
    This is meant to be used for several tables that are being joined together.
971
    @param cols The columns to combine. Duplicates will be removed.
972
    @param into The table for the new columns.
973
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
974
        columns will be included in the mapping even if they are not in cols.
975
        The tables of the provided Col objects will be changed to into, so make
976
        copies of them if you want to keep the original tables.
977
    @param as_items Whether to return a list of dict items instead of a dict
978
    @return dict(orig_col=new_col, ...)
979
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
980
        * new_col: sql_gen.Col(orig_col_name, into)
981
        * All mappings use the into table so its name can easily be
982
          changed for all columns at once
983
    '''
984
    cols = lists.uniqify(cols)
985
    
986
    items = []
987
    for col in preserve:
988
        orig_col = copy.copy(col)
989
        col.table = into
990
        items.append((orig_col, col))
991
    preserve = set(preserve)
992
    for col in cols:
993
        if col not in preserve:
994
            items.append((col, sql_gen.Col(strings.ustr(col), into, col.srcs)))
995
    
996
    if not as_items: items = dict(items)
997
    return items
998

    
999
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
1000
    '''For params, see mk_flatten_mapping()
1001
    @return See return value of mk_flatten_mapping()
1002
    '''
1003
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
1004
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
1005
    run_query_into(db, mk_select(db, joins, cols, order_by=None, limit=limit,
1006
        start=start), into=into, add_pkey_=True)
1007
        # don't cache because the temp table will usually be truncated after use
1008
    return dict(items)
1009

    
1010
##### Database structure introspection
1011

    
1012
#### Expressions
1013

    
1014
bool_re = r'(?:true|false)'
1015

    
1016
def simplify_expr(expr):
1017
    expr = expr.replace('(NULL IS NULL)', 'true')
1018
    expr = expr.replace('(NULL IS NOT NULL)', 'false')
1019
    expr = re.sub(r' OR '+bool_re, r'', expr)
1020
    expr = re.sub(bool_re+r' OR ', r'', expr)
1021
    while True:
1022
        expr, n = re.subn(r'\((\([^()]*\))\)', r'\1', expr)
1023
        if n == 0: break
1024
    return expr
1025

    
1026
name_re = r'(?:\w+|(?:"[^"]*")+)'
1027

    
1028
def parse_expr_col(str_):
1029
    match = re.match(r'^\('+name_re+r'\(('+name_re+r').*\)\)$', str_)
1030
    if match: str_ = match.group(1)
1031
    return sql_gen.unesc_name(str_)
1032

    
1033
def map_expr(db, expr, mapping, in_cols_found=None):
1034
    '''Replaces output columns with input columns in an expression.
1035
    @param in_cols_found If set, will be filled in with the expr's (input) cols
1036
    '''
1037
    for out, in_ in mapping.iteritems():
1038
        orig_expr = expr
1039
        out = sql_gen.to_name_only_col(out)
1040
        in_str = sql_gen.to_name_only_col(sql_gen.remove_col_rename(in_)
1041
            ).to_str(db)
1042
        
1043
        # Replace out both with and without quotes
1044
        expr = expr.replace(out.to_str(db), in_str)
1045
        expr = re.sub(r'(?<!\.)\b'+out.name+r'\b(?!\.)', in_str, expr)
1046
        
1047
        if in_cols_found != None and expr != orig_expr: # replaced something
1048
            in_cols_found.append(in_)
1049
    
1050
    return simplify_expr(expr)
1051

    
1052
#### Tables
1053

    
1054
def tables(db, schema_like='public', table_like='%', exact=False,
1055
    cacheable=True):
1056
    if exact: compare = '='
1057
    else: compare = 'LIKE'
1058
    
1059
    module = util.root_module(db.db)
1060
    if module == 'psycopg2':
1061
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
1062
            ('tablename', sql_gen.CompareCond(table_like, compare))]
1063
        return values(select(db, 'pg_tables', ['tablename'], conds,
1064
            order_by='tablename', cacheable=cacheable, log_level=4))
1065
    elif module == 'MySQLdb':
1066
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
1067
            , cacheable=True, log_level=4))
1068
    else: raise NotImplementedError("Can't list tables for "+module+' database')
1069

    
1070
def table_exists(db, table):
1071
    table = sql_gen.as_Table(table)
1072
    return list(tables(db, table.schema, table.name, exact=True)) != []
1073

    
1074
def table_row_count(db, table, recover=None):
1075
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
1076
        order_by=None), recover=recover, log_level=3))
1077

    
1078
def table_cols(db, table, recover=None):
1079
    return list(col_names(select(db, table, limit=0, order_by=None,
1080
        recover=recover, log_level=4)))
1081

    
1082
pkey_col = 'row_num'
1083

    
1084
def pkey(db, table, recover=None):
1085
    '''Uses pkey_col, or if not found, the first column in the table.'''
1086
    cols = table_cols(db, table, recover)
1087
    if pkey_col in cols: return pkey_col
1088
    else: return cols[0]
1089

    
1090
not_null_col = 'not_null_col'
1091

    
1092
def table_not_null_col(db, table, recover=None):
1093
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
1094
    if not_null_col in table_cols(db, table, recover): return not_null_col
1095
    else: return pkey(db, table, recover)
1096

    
1097
def constraint_cond(db, constraint):
1098
    module = util.root_module(db.db)
1099
    if module == 'psycopg2':
1100
        table_str = sql_gen.Literal(constraint.table.to_str(db))
1101
        name_str = sql_gen.Literal(constraint.name)
1102
        return value(run_query(db, '''\
1103
SELECT consrc
1104
FROM pg_constraint
1105
WHERE
1106
conrelid = '''+table_str.to_str(db)+'''::regclass
1107
AND conname = '''+name_str.to_str(db)+'''
1108
'''
1109
            , cacheable=True, log_level=4))
1110
    else: raise NotImplementedError("Can't list index columns for "+module+
1111
        ' database')
1112

    
1113
def index_cols(db, index):
1114
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
1115
    automatically created. When you don't know whether something is a UNIQUE
1116
    constraint or a UNIQUE index, use this function.'''
1117
    index = sql_gen.as_Table(index)
1118
    module = util.root_module(db.db)
1119
    if module == 'psycopg2':
1120
        qual_index = sql_gen.Literal(index.to_str(db))
1121
        return map(parse_expr_col, values(run_query(db, '''\
1122
SELECT pg_get_indexdef(indexrelid, generate_series(1, indnatts), true)
1123
FROM pg_index
1124
WHERE indexrelid = '''+qual_index.to_str(db)+'''::regclass
1125
'''
1126
            , cacheable=True, log_level=4)))
1127
    else: raise NotImplementedError("Can't list index columns for "+module+
1128
        ' database')
1129

    
1130
#### Functions
1131

    
1132
def function_exists(db, function):
1133
    qual_function = sql_gen.Literal(function.to_str(db))
1134
    try:
1135
        select(db, fields=[sql_gen.Cast('regproc', qual_function)],
1136
            recover=True, cacheable=True, log_level=4)
1137
    except DoesNotExistException: return False
1138
    except DuplicateException: return True # overloaded function
1139
    else: return True
1140

    
1141
##### Structural changes
1142

    
1143
#### Columns
1144

    
1145
def add_col(db, table, col, comment=None, **kw_args):
1146
    '''
1147
    @param col TypedCol Name may be versioned, so be sure to propagate any
1148
        renaming back to any source column for the TypedCol.
1149
    @param comment None|str SQL comment used to distinguish columns of the same
1150
        name from each other when they contain different data, to allow the
1151
        ADD COLUMN query to be cached. If not set, query will not be cached.
1152
    '''
1153
    assert isinstance(col, sql_gen.TypedCol)
1154
    
1155
    while True:
1156
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
1157
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
1158
        
1159
        try:
1160
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
1161
            break
1162
        except DuplicateException:
1163
            col.name = next_version(col.name)
1164
            # try again with next version of name
1165

    
1166
def add_not_null(db, col):
1167
    table = col.table
1168
    col = sql_gen.to_name_only_col(col)
1169
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1170
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1171

    
1172
def drop_not_null(db, col):
1173
    table = col.table
1174
    col = sql_gen.to_name_only_col(col)
1175
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1176
        +col.to_str(db)+' DROP NOT NULL', cacheable=True, log_level=3)
1177

    
1178
row_num_col = '_row_num'
1179

    
1180
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1181
    constraints='PRIMARY KEY')
1182

    
1183
def add_row_num(db, table):
1184
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1185
    be the primary key.'''
1186
    add_col(db, table, row_num_typed_col, log_level=3)
1187

    
1188
#### Indexes
1189

    
1190
def add_pkey(db, table, cols=None, recover=None):
1191
    '''Adds a primary key.
1192
    @param cols [sql_gen.Col,...] The columns in the primary key.
1193
        Defaults to the first column in the table.
1194
    @pre The table must not already have a primary key.
1195
    '''
1196
    table = sql_gen.as_Table(table)
1197
    if cols == None: cols = [pkey(db, table, recover)]
1198
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1199
    
1200
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1201
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1202
        log_ignore_excs=(DuplicateException,))
1203

    
1204
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1205
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1206
    Currently, only function calls and literal values are supported expressions.
1207
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1208
        This allows indexes to be used for comparisons where NULLs are equal.
1209
    '''
1210
    exprs = lists.mk_seq(exprs)
1211
    
1212
    # Parse exprs
1213
    old_exprs = exprs[:]
1214
    exprs = []
1215
    cols = []
1216
    for i, expr in enumerate(old_exprs):
1217
        expr = sql_gen.as_Col(expr, table)
1218
        
1219
        # Handle nullable columns
1220
        if ensure_not_null_:
1221
            try: expr = sql_gen.ensure_not_null(db, expr)
1222
            except KeyError: pass # unknown type, so just create plain index
1223
        
1224
        # Extract col
1225
        expr = copy.deepcopy(expr) # don't modify input!
1226
        col = expr
1227
        if isinstance(expr, sql_gen.FunctionCall): col = expr.args[0]
1228
        expr = sql_gen.cast_literal(expr)
1229
        if not isinstance(expr, (sql_gen.Expr, sql_gen.Col)):
1230
            expr = sql_gen.Expr(expr)
1231
            
1232
        
1233
        # Extract table
1234
        if table == None:
1235
            assert sql_gen.is_table_col(col)
1236
            table = col.table
1237
        
1238
        if isinstance(col, sql_gen.Col): col.table = None
1239
        
1240
        exprs.append(expr)
1241
        cols.append(col)
1242
    
1243
    table = sql_gen.as_Table(table)
1244
    
1245
    # Add index
1246
    str_ = 'CREATE'
1247
    if unique: str_ += ' UNIQUE'
1248
    str_ += ' INDEX ON '+table.to_str(db)+' ('+(
1249
        ', '.join((v.to_str(db) for v in exprs)))+')'
1250
    run_query(db, str_, recover=True, cacheable=True, log_level=3)
1251

    
1252
already_indexed = object() # tells add_indexes() the pkey has already been added
1253

    
1254
def add_indexes(db, table, has_pkey=True):
1255
    '''Adds an index on all columns in a table.
1256
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1257
        index should be added on the first column.
1258
        * If already_indexed, the pkey is assumed to have already been added
1259
    '''
1260
    cols = table_cols(db, table)
1261
    if has_pkey:
1262
        if has_pkey is not already_indexed: add_pkey(db, table)
1263
        cols = cols[1:]
1264
    for col in cols: add_index(db, col, table)
1265

    
1266
#### Tables
1267

    
1268
### Maintenance
1269

    
1270
def analyze(db, table):
1271
    table = sql_gen.as_Table(table)
1272
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1273

    
1274
def autoanalyze(db, table):
1275
    if db.autoanalyze: analyze(db, table)
1276

    
1277
def vacuum(db, table):
1278
    table = sql_gen.as_Table(table)
1279
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1280
        log_level=3))
1281

    
1282
### Lifecycle
1283

    
1284
def drop(db, type_, name):
1285
    name = sql_gen.as_Name(name)
1286
    run_query(db, 'DROP '+type_+' IF EXISTS '+name.to_str(db)+' CASCADE')
1287

    
1288
def drop_table(db, table): drop(db, 'TABLE', table)
1289

    
1290
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1291
    like=None):
1292
    '''Creates a table.
1293
    @param cols [sql_gen.TypedCol,...] The column names and types
1294
    @param has_pkey If set, the first column becomes the primary key.
1295
    @param col_indexes bool|[ref]
1296
        * If True, indexes will be added on all non-pkey columns.
1297
        * If a list reference, [0] will be set to a function to do this.
1298
          This can be used to delay index creation until the table is populated.
1299
    '''
1300
    table = sql_gen.as_Table(table)
1301
    
1302
    if like != None:
1303
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1304
            ]+cols
1305
    if has_pkey:
1306
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1307
        pkey.constraints = 'PRIMARY KEY'
1308
    
1309
    temp = table.is_temp and not db.debug_temp
1310
        # temp tables permanent in debug_temp mode
1311
    
1312
    # Create table
1313
    def create():
1314
        str_ = 'CREATE'
1315
        if temp: str_ += ' TEMP'
1316
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1317
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1318
        str_ += '\n);'
1319
        
1320
        run_query(db, str_, recover=True, cacheable=True, log_level=2,
1321
            log_ignore_excs=(DuplicateException,))
1322
    if table.is_temp:
1323
        while True:
1324
            try:
1325
                create()
1326
                break
1327
            except DuplicateException:
1328
                table.name = next_version(table.name)
1329
                # try again with next version of name
1330
    else: create()
1331
    
1332
    # Add indexes
1333
    if has_pkey: has_pkey = already_indexed
1334
    def add_indexes_(): add_indexes(db, table, has_pkey)
1335
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1336
    elif col_indexes: add_indexes_() # add now
1337

    
1338
def copy_table_struct(db, src, dest):
1339
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1340
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1341

    
1342
### Data
1343

    
1344
def truncate(db, table, schema='public', **kw_args):
1345
    '''For params, see run_query()'''
1346
    table = sql_gen.as_Table(table, schema)
1347
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1348

    
1349
def empty_temp(db, tables):
1350
    tables = lists.mk_seq(tables)
1351
    for table in tables: truncate(db, table, log_level=3)
1352

    
1353
def empty_db(db, schema='public', **kw_args):
1354
    '''For kw_args, see tables()'''
1355
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1356

    
1357
def distinct_table(db, table, distinct_on):
1358
    '''Creates a copy of a temp table which is distinct on the given columns.
1359
    The old and new tables will both get an index on these columns, to
1360
    facilitate merge joins.
1361
    @param distinct_on If empty, creates a table with one row. This is useful if
1362
        your distinct_on columns are all literal values.
1363
    @return The new table.
1364
    '''
1365
    new_table = sql_gen.suffixed_table(table, '_distinct')
1366
    distinct_on = filter(sql_gen.is_table_col, distinct_on)
1367
    
1368
    copy_table_struct(db, table, new_table)
1369
    
1370
    limit = None
1371
    if distinct_on == []: limit = 1 # one sample row
1372
    else:
1373
        add_index(db, distinct_on, new_table, unique=True)
1374
        add_index(db, distinct_on, table) # for join optimization
1375
    
1376
    insert_select(db, new_table, None, mk_select(db, table, order_by=None,
1377
        limit=limit), ignore=True)
1378
    analyze(db, new_table)
1379
    
1380
    return new_table
(27-27/40)