Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import warnings
6

    
7
import exc
8
import dicts
9
import iters
10
import lists
11
from Proxy import Proxy
12
import rand
13
import sql_gen
14
import strings
15
import util
16

    
17
##### Exceptions
18

    
19
def get_cur_query(cur, input_query=None):
20
    raw_query = None
21
    if hasattr(cur, 'query'): raw_query = cur.query
22
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
23
    
24
    if raw_query != None: return raw_query
25
    else: return '[input] '+strings.ustr(input_query)
26

    
27
def _add_cursor_info(e, *args, **kw_args):
28
    '''For params, see get_cur_query()'''
29
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
30

    
31
class DbException(exc.ExceptionWithCause):
32
    def __init__(self, msg, cause=None, cur=None):
33
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
34
        if cur != None: _add_cursor_info(self, cur)
35

    
36
class ExceptionWithName(DbException):
37
    def __init__(self, name, cause=None):
38
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name)), cause)
39
        self.name = name
40

    
41
class ExceptionWithValue(DbException):
42
    def __init__(self, value, cause=None):
43
        DbException.__init__(self, 'for value: '+strings.as_tt(repr(value)),
44
            cause)
45
        self.value = value
46

    
47
class ExceptionWithNameType(DbException):
48
    def __init__(self, type_, name, cause=None):
49
        DbException.__init__(self, 'for type: '+strings.as_tt(str(type_))
50
            +'; name: '+strings.as_tt(name), cause)
51
        self.type = type_
52
        self.name = name
53

    
54
class ConstraintException(DbException):
55
    def __init__(self, name, cols, cause=None):
56
        DbException.__init__(self, 'Violated '+strings.as_tt(name)
57
            +' constraint on columns: '+strings.as_tt(', '.join(cols)), cause)
58
        self.name = name
59
        self.cols = cols
60

    
61
class MissingCastException(DbException):
62
    def __init__(self, type_, col, cause=None):
63
        DbException.__init__(self, 'Missing cast to type '+strings.as_tt(type_)
64
            +' on column: '+strings.as_tt(col), cause)
65
        self.type = type_
66
        self.col = col
67

    
68
class NameException(DbException): pass
69

    
70
class DuplicateKeyException(ConstraintException): pass
71

    
72
class NullValueException(ConstraintException): pass
73

    
74
class InvalidValueException(ExceptionWithValue): pass
75

    
76
class DuplicateException(ExceptionWithNameType): pass
77

    
78
class EmptyRowException(DbException): pass
79

    
80
##### Warnings
81

    
82
class DbWarning(UserWarning): pass
83

    
84
##### Result retrieval
85

    
86
def col_names(cur): return (col[0] for col in cur.description)
87

    
88
def rows(cur): return iter(lambda: cur.fetchone(), None)
89

    
90
def consume_rows(cur):
91
    '''Used to fetch all rows so result will be cached'''
92
    iters.consume_iter(rows(cur))
93

    
94
def next_row(cur): return rows(cur).next()
95

    
96
def row(cur):
97
    row_ = next_row(cur)
98
    consume_rows(cur)
99
    return row_
100

    
101
def next_value(cur): return next_row(cur)[0]
102

    
103
def value(cur): return row(cur)[0]
104

    
105
def values(cur): return iters.func_iter(lambda: next_value(cur))
106

    
107
def value_or_none(cur):
108
    try: return value(cur)
109
    except StopIteration: return None
110

    
111
##### Escaping
112

    
113
def esc_name_by_module(module, name):
114
    if module == 'psycopg2' or module == None: quote = '"'
115
    elif module == 'MySQLdb': quote = '`'
116
    else: raise NotImplementedError("Can't escape name for "+module+' database')
117
    return sql_gen.esc_name(name, quote)
118

    
119
def esc_name_by_engine(engine, name, **kw_args):
120
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
121

    
122
def esc_name(db, name, **kw_args):
123
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
124

    
125
def qual_name(db, schema, table):
126
    def esc_name_(name): return esc_name(db, name)
127
    table = esc_name_(table)
128
    if schema != None: return esc_name_(schema)+'.'+table
129
    else: return table
130

    
131
##### Database connections
132

    
133
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
134

    
135
db_engines = {
136
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
137
    'PostgreSQL': ('psycopg2', {}),
138
}
139

    
140
DatabaseErrors_set = set([DbException])
141
DatabaseErrors = tuple(DatabaseErrors_set)
142

    
143
def _add_module(module):
144
    DatabaseErrors_set.add(module.DatabaseError)
145
    global DatabaseErrors
146
    DatabaseErrors = tuple(DatabaseErrors_set)
147

    
148
def db_config_str(db_config):
149
    return db_config['engine']+' database '+db_config['database']
150

    
151
log_debug_none = lambda msg, level=2: None
152

    
153
class DbConn:
154
    def __init__(self, db_config, autocommit=True, caching=True,
155
        log_debug=log_debug_none, debug_temp=False):
156
        '''
157
        @param debug_temp Whether temporary objects should instead be permanent.
158
            This assists in debugging the internal objects used by the program.
159
        '''
160
        self.db_config = db_config
161
        self.autocommit = autocommit
162
        self.caching = caching
163
        self.log_debug = log_debug
164
        self.debug = log_debug != log_debug_none
165
        self.debug_temp = debug_temp
166
        self.autoanalyze = False
167
        
168
        self._reset()
169
    
170
    def __getattr__(self, name):
171
        if name == '__dict__': raise Exception('getting __dict__')
172
        if name == 'db': return self._db()
173
        else: raise AttributeError()
174
    
175
    def __getstate__(self):
176
        state = copy.copy(self.__dict__) # shallow copy
177
        state['log_debug'] = None # don't pickle the debug callback
178
        state['_DbConn__db'] = None # don't pickle the connection
179
        return state
180
    
181
    def clear_cache(self): self.query_results = {}
182
    
183
    def _reset(self):
184
        self.clear_cache()
185
        self._savepoint = 0
186
        self._notices_seen = set()
187
        self.__db = None
188
    
189
    def connected(self): return self.__db != None
190
    
191
    def close(self):
192
        if not self.connected(): return
193
        
194
        self.db.close()
195
        self._reset()
196
    
197
    def _db(self):
198
        if self.__db == None:
199
            # Process db_config
200
            db_config = self.db_config.copy() # don't modify input!
201
            schemas = db_config.pop('schemas', None)
202
            module_name, mappings = db_engines[db_config.pop('engine')]
203
            module = __import__(module_name)
204
            _add_module(module)
205
            for orig, new in mappings.iteritems():
206
                try: util.rename_key(db_config, orig, new)
207
                except KeyError: pass
208
            
209
            # Connect
210
            self.__db = module.connect(**db_config)
211
            
212
            # Configure connection
213
            if hasattr(self.db, 'set_isolation_level'):
214
                import psycopg2.extensions
215
                self.db.set_isolation_level(
216
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
217
            if schemas != None:
218
                search_path = [self.esc_name(s) for s in schemas.split(',')]
219
                search_path.append(value(run_query(self, 'SHOW search_path',
220
                    log_level=4)))
221
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
222
                    log_level=3)
223
        
224
        return self.__db
225
    
226
    class DbCursor(Proxy):
227
        def __init__(self, outer):
228
            Proxy.__init__(self, outer.db.cursor())
229
            self.outer = outer
230
            self.query_results = outer.query_results
231
            self.query_lookup = None
232
            self.result = []
233
        
234
        def execute(self, query):
235
            self._is_insert = query.startswith('INSERT')
236
            self.query_lookup = query
237
            try:
238
                try:
239
                    cur = self.inner.execute(query)
240
                    self.outer.do_autocommit()
241
                finally: self.query = get_cur_query(self.inner, query)
242
            except Exception, e:
243
                _add_cursor_info(e, self, query)
244
                self.result = e # cache the exception as the result
245
                self._cache_result()
246
                raise
247
            
248
            # Always cache certain queries
249
            if query.startswith('CREATE') or query.startswith('ALTER'):
250
                # structural changes
251
                # Rest of query must be unique in the face of name collisions,
252
                # so don't cache ADD COLUMN unless it has distinguishing comment
253
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
254
                    self._cache_result()
255
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
256
                consume_rows(self) # fetch all rows so result will be cached
257
            
258
            return cur
259
        
260
        def fetchone(self):
261
            row = self.inner.fetchone()
262
            if row != None: self.result.append(row)
263
            # otherwise, fetched all rows
264
            else: self._cache_result()
265
            return row
266
        
267
        def _cache_result(self):
268
            # For inserts that return a result set, don't cache result set since
269
            # inserts are not idempotent. Other non-SELECT queries don't have
270
            # their result set read, so only exceptions will be cached (an
271
            # invalid query will always be invalid).
272
            if self.query_results != None and (not self._is_insert
273
                or isinstance(self.result, Exception)):
274
                
275
                assert self.query_lookup != None
276
                self.query_results[self.query_lookup] = self.CacheCursor(
277
                    util.dict_subset(dicts.AttrsDictView(self),
278
                    ['query', 'result', 'rowcount', 'description']))
279
        
280
        class CacheCursor:
281
            def __init__(self, cached_result): self.__dict__ = cached_result
282
            
283
            def execute(self, *args, **kw_args):
284
                if isinstance(self.result, Exception): raise self.result
285
                # otherwise, result is a rows list
286
                self.iter = iter(self.result)
287
            
288
            def fetchone(self):
289
                try: return self.iter.next()
290
                except StopIteration: return None
291
    
292
    def esc_value(self, value):
293
        try: str_ = self.mogrify('%s', [value])
294
        except NotImplementedError, e:
295
            module = util.root_module(self.db)
296
            if module == 'MySQLdb':
297
                import _mysql
298
                str_ = _mysql.escape_string(value)
299
            else: raise e
300
        return strings.to_unicode(str_)
301
    
302
    def esc_name(self, name): return esc_name(self, name) # calls global func
303
    
304
    def std_code(self, str_):
305
        '''Standardizes SQL code.
306
        * Ensures that string literals are prefixed by `E`
307
        '''
308
        if str_.startswith("'"): str_ = 'E'+str_
309
        return str_
310
    
311
    def can_mogrify(self):
312
        module = util.root_module(self.db)
313
        return module == 'psycopg2'
314
    
315
    def mogrify(self, query, params=None):
316
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
317
        else: raise NotImplementedError("Can't mogrify query")
318
    
319
    def print_notices(self):
320
        if hasattr(self.db, 'notices'):
321
            for msg in self.db.notices:
322
                if msg not in self._notices_seen:
323
                    self._notices_seen.add(msg)
324
                    self.log_debug(msg, level=2)
325
    
326
    def run_query(self, query, cacheable=False, log_level=2,
327
        debug_msg_ref=None):
328
        '''
329
        @param log_ignore_excs The log_level will be increased by 2 if the query
330
            throws one of these exceptions.
331
        @param debug_msg_ref If specified, the log message will be returned in
332
            this instead of being output. This allows you to filter log messages
333
            depending on the result of the query.
334
        '''
335
        assert query != None
336
        
337
        if not self.caching: cacheable = False
338
        used_cache = False
339
        
340
        def log_msg(query):
341
            if used_cache: cache_status = 'cache hit'
342
            elif cacheable: cache_status = 'cache miss'
343
            else: cache_status = 'non-cacheable'
344
            return 'DB query: '+cache_status+':\n'+strings.as_code(query, 'SQL')
345
        
346
        try:
347
            # Get cursor
348
            if cacheable:
349
                try:
350
                    cur = self.query_results[query]
351
                    used_cache = True
352
                except KeyError: cur = self.DbCursor(self)
353
            else: cur = self.db.cursor()
354
            
355
            # Log query
356
            if self.debug and debug_msg_ref == None: # log before running
357
                self.log_debug(log_msg(query), log_level)
358
            
359
            # Run query
360
            cur.execute(query)
361
        finally:
362
            self.print_notices()
363
            if self.debug and debug_msg_ref != None: # return after running
364
                debug_msg_ref[0] = log_msg(str(get_cur_query(cur, query)))
365
        
366
        return cur
367
    
368
    def is_cached(self, query): return query in self.query_results
369
    
370
    def with_autocommit(self, func):
371
        import psycopg2.extensions
372
        
373
        prev_isolation_level = self.db.isolation_level
374
        self.db.set_isolation_level(
375
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
376
        try: return func()
377
        finally: self.db.set_isolation_level(prev_isolation_level)
378
    
379
    def with_savepoint(self, func):
380
        savepoint = 'level_'+str(self._savepoint)
381
        self.run_query('SAVEPOINT '+savepoint, log_level=4)
382
        self._savepoint += 1
383
        try: return func()
384
        except:
385
            self.run_query('ROLLBACK TO SAVEPOINT '+savepoint, log_level=4)
386
            raise
387
        finally:
388
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
389
            # "The savepoint remains valid and can be rolled back to again"
390
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
391
            self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
392
            
393
            self._savepoint -= 1
394
            assert self._savepoint >= 0
395
            
396
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
397
    
398
    def do_autocommit(self):
399
        '''Autocommits if outside savepoint'''
400
        assert self._savepoint >= 0
401
        if self.autocommit and self._savepoint == 0:
402
            self.log_debug('Autocommitting', level=4)
403
            self.db.commit()
404
    
405
    def col_info(self, col):
406
        table = sql_gen.Table('columns', 'information_schema')
407
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
408
            'USER-DEFINED'), sql_gen.Col('udt_name'))
409
        cols = [type_, 'column_default',
410
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
411
        
412
        conds = [('table_name', col.table.name), ('column_name', col.name)]
413
        schema = col.table.schema
414
        if schema != None: conds.append(('table_schema', schema))
415
        
416
        type_, default, nullable = row(select(self, table, cols, conds,
417
            order_by='table_schema', limit=1, cacheable=False, log_level=4))
418
            # TODO: order_by search_path schema order
419
        default = sql_gen.as_Code(default, self)
420
        
421
        return sql_gen.TypedCol(col.name, type_, default, nullable)
422
    
423
    def TempFunction(self, name):
424
        if self.debug_temp: schema = None
425
        else: schema = 'pg_temp'
426
        return sql_gen.Function(name, schema)
427

    
428
connect = DbConn
429

    
430
##### Recoverable querying
431

    
432
def with_savepoint(db, func): return db.with_savepoint(func)
433

    
434
def run_query(db, query, recover=None, cacheable=False, log_level=2,
435
    log_ignore_excs=None, **kw_args):
436
    '''For params, see DbConn.run_query()'''
437
    if recover == None: recover = False
438
    if log_ignore_excs == None: log_ignore_excs = ()
439
    log_ignore_excs = tuple(log_ignore_excs)
440
    
441
    debug_msg_ref = None # usually, db.run_query() logs query before running it
442
    # But if filtering with log_ignore_excs, wait until after exception parsing
443
    if log_ignore_excs != () or not db.can_mogrify(): debug_msg_ref = [None]
444
    
445
    try:
446
        try:
447
            def run(): return db.run_query(query, cacheable, log_level,
448
                debug_msg_ref, **kw_args)
449
            if recover and not db.is_cached(query):
450
                return with_savepoint(db, run)
451
            else: return run() # don't need savepoint if cached
452
        except Exception, e:
453
            msg = strings.ustr(e.args[0])
454
            
455
            match = re.match(r'^duplicate key value violates unique constraint '
456
                r'"((_?[^\W_]+(?=[._]))?.+?)"', msg)
457
            if match:
458
                constraint, table = match.groups()
459
                cols = []
460
                if recover: # need auto-rollback to run index_cols()
461
                    try: cols = index_cols(db, table, constraint)
462
                    except NotImplementedError: pass
463
                raise DuplicateKeyException(constraint, cols, e)
464
            
465
            match = re.match(r'^null value in column "(.+?)" violates not-null'
466
                r' constraint', msg)
467
            if match: raise NullValueException('NOT NULL', [match.group(1)], e)
468
            
469
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
470
                r'|.+? field value out of range): "(.+?)"', msg)
471
            if match:
472
                value, = match.groups()
473
                raise InvalidValueException(strings.to_unicode(value), e)
474
            
475
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
476
                r'is of type', msg)
477
            if match:
478
                col, type_ = match.groups()
479
                raise MissingCastException(type_, col, e)
480
            
481
            match = re.match(r'^(\S+) "(.+?)".*? already exists', msg)
482
            if match:
483
                type_, name = match.groups()
484
                raise DuplicateException(type_, name, e)
485
            
486
            raise # no specific exception raised
487
    except log_ignore_excs:
488
        log_level += 2
489
        raise
490
    finally:
491
        if debug_msg_ref != None and debug_msg_ref[0] != None:
492
            db.log_debug(debug_msg_ref[0], log_level)
493

    
494
##### Basic queries
495

    
496
def next_version(name):
497
    version = 1 # first existing name was version 0
498
    match = re.match(r'^(.*)#(\d+)$', name)
499
    if match:
500
        name, version = match.groups()
501
        version = int(version)+1
502
    return sql_gen.concat(name, '#'+str(version))
503

    
504
def lock_table(db, table, mode):
505
    table = sql_gen.as_Table(table)
506
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
507

    
508
def run_query_into(db, query, into=None, add_indexes_=False, **kw_args):
509
    '''Outputs a query to a temp table.
510
    For params, see run_query().
511
    '''
512
    if into == None: return run_query(db, query, **kw_args)
513
    
514
    assert isinstance(into, sql_gen.Table)
515
    
516
    into.is_temp = True
517
    # "temporary tables cannot specify a schema name", so remove schema
518
    into.schema = None
519
    
520
    kw_args['recover'] = True
521
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
522
    
523
    temp = not db.debug_temp # tables are permanent in debug_temp mode
524
    
525
    # Create table
526
    while True:
527
        create_query = 'CREATE'
528
        if temp: create_query += ' TEMP'
529
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
530
        
531
        try:
532
            cur = run_query(db, create_query, **kw_args)
533
                # CREATE TABLE AS sets rowcount to # rows in query
534
            break
535
        except DuplicateException, e:
536
            into.name = next_version(into.name)
537
            # try again with next version of name
538
    
539
    if add_indexes_: add_indexes(db, into)
540
    
541
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
542
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
543
    # table is going to be used in complex queries, it is wise to run ANALYZE on
544
    # the temporary table after it is populated."
545
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
546
    # If into is not a temp table, ANALYZE is useful but not required.
547
    analyze(db, into)
548
    
549
    return cur
550

    
551
order_by_pkey = object() # tells mk_select() to order by the pkey
552

    
553
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
554

    
555
def mk_select(db, tables, fields=None, conds=None, distinct_on=[], limit=None,
556
    start=None, order_by=order_by_pkey, default_table=None):
557
    '''
558
    @param tables The single table to select from, or a list of tables to join
559
        together, with tables after the first being sql_gen.Join objects
560
    @param fields Use None to select all fields in the table
561
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
562
        * container can be any iterable type
563
        * compare_left_side: sql_gen.Code|str (for col name)
564
        * compare_right_side: sql_gen.ValueCond|literal value
565
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
566
        use all columns
567
    @return query
568
    '''
569
    # Parse tables param
570
    tables = lists.mk_seq(tables)
571
    tables = list(tables) # don't modify input! (list() copies input)
572
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
573
    
574
    # Parse other params
575
    if conds == None: conds = []
576
    elif dicts.is_dict(conds): conds = conds.items()
577
    conds = list(conds) # don't modify input! (list() copies input)
578
    assert limit == None or type(limit) == int
579
    assert start == None or type(start) == int
580
    if order_by is order_by_pkey:
581
        if distinct_on != []: order_by = None
582
        else: order_by = pkey(db, table0, recover=True)
583
    
584
    query = 'SELECT'
585
    
586
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
587
    
588
    # DISTINCT ON columns
589
    if distinct_on != []:
590
        query += '\nDISTINCT'
591
        if distinct_on is not distinct_on_all:
592
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
593
    
594
    # Columns
595
    if fields == None:
596
        if query.find('\n') >= 0: whitespace = '\n'
597
        else: whitespace = ' '
598
        query += whitespace+'*'
599
    else:
600
        assert fields != []
601
        query += '\n'+('\n, '.join(map(parse_col, fields)))
602
    
603
    # Main table
604
    query += '\nFROM '+table0.to_str(db)
605
    
606
    # Add joins
607
    left_table = table0
608
    for join_ in tables:
609
        table = join_.table
610
        
611
        # Parse special values
612
        if join_.type_ is sql_gen.filter_out: # filter no match
613
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
614
                sql_gen.CompareCond(None, '~=')))
615
        
616
        query += '\n'+join_.to_str(db, left_table)
617
        
618
        left_table = table
619
    
620
    missing = True
621
    if conds != []:
622
        if len(conds) == 1: whitespace = ' '
623
        else: whitespace = '\n'
624
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
625
            .to_str(db) for l, r in conds], 'WHERE')
626
        missing = False
627
    if order_by != None:
628
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
629
    if limit != None: query += '\nLIMIT '+str(limit); missing = False
630
    if start != None:
631
        if start != 0: query += '\nOFFSET '+str(start)
632
        missing = False
633
    if missing: warnings.warn(DbWarning(
634
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
635
    
636
    return query
637

    
638
def select(db, *args, **kw_args):
639
    '''For params, see mk_select() and run_query()'''
640
    recover = kw_args.pop('recover', None)
641
    cacheable = kw_args.pop('cacheable', True)
642
    log_level = kw_args.pop('log_level', 2)
643
    
644
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
645
        log_level=log_level)
646

    
647
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
648
    embeddable=False, ignore=False):
649
    '''
650
    @param returning str|None An inserted column (such as pkey) to return
651
    @param embeddable Whether the query should be embeddable as a nested SELECT.
652
        Warning: If you set this and cacheable=True when the query is run, the
653
        query will be fully cached, not just if it raises an exception.
654
    @param ignore Whether to ignore duplicate keys.
655
    '''
656
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
657
    if cols == []: cols = None # no cols (all defaults) = unknown col names
658
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
659
    if select_query == None: select_query = 'DEFAULT VALUES'
660
    if returning != None: returning = sql_gen.as_Col(returning, table)
661
    
662
    first_line = 'INSERT INTO '+table.to_str(db)
663
    
664
    def mk_insert(select_query):
665
        query = first_line
666
        if cols != None:
667
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
668
        query += '\n'+select_query
669
        
670
        if returning != None:
671
            returning_name_col = sql_gen.to_name_only_col(returning)
672
            query += '\nRETURNING '+returning_name_col.to_str(db)
673
        
674
        return query
675
    
676
    return_type = 'unknown'
677
    if returning != None: return_type = returning.to_str(db)+'%TYPE'
678
    
679
    lang = 'sql'
680
    if ignore:
681
        # Always return something to set the correct rowcount
682
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
683
        
684
        embeddable = True # must use function
685
        lang = 'plpgsql'
686
        
687
        if cols == None:
688
            row = [sql_gen.Col(sql_gen.all_cols, 'row')]
689
            row_vars = [sql_gen.Table('row')]
690
        else:
691
            row_vars = row = [sql_gen.Col(c.name, 'row') for c in cols]
692
        
693
        query = '''\
694
DECLARE
695
    row '''+table.to_str(db)+'''%ROWTYPE;
696
BEGIN
697
    /* Need an EXCEPTION block for each individual row because "When an error is
698
    caught by an EXCEPTION clause, [...] all changes to persistent database
699
    state within the block are rolled back."
700
    This is unfortunate because "A block containing an EXCEPTION clause is
701
    significantly more expensive to enter and exit than a block without one."
702
    (http://www.postgresql.org/docs/8.3/static/plpgsql-control-structures.html\
703
#PLPGSQL-ERROR-TRAPPING)
704
    */
705
    FOR '''+(', '.join((v.to_str(db) for v in row_vars)))+''' IN
706
'''+select_query+'''
707
    LOOP
708
        BEGIN
709
            RETURN QUERY
710
'''+mk_insert(sql_gen.Values(row).to_str(db))+'''
711
;
712
        EXCEPTION
713
            WHEN unique_violation THEN NULL; -- continue to next row
714
        END;
715
    END LOOP;
716
END;\
717
'''
718
    else: query = mk_insert(select_query)
719
    
720
    if embeddable:
721
        # Create function
722
        function_name = sql_gen.clean_name(first_line)
723
        while True:
724
            try:
725
                function = db.TempFunction(function_name)
726
                
727
                function_query = '''\
728
CREATE FUNCTION '''+function.to_str(db)+'''()
729
RETURNS SETOF '''+return_type+'''
730
LANGUAGE '''+lang+'''
731
AS $$
732
'''+query+'''
733
$$;
734
'''
735
                run_query(db, function_query, recover=True, cacheable=True,
736
                    log_ignore_excs=(DuplicateException,))
737
                break # this version was successful
738
            except DuplicateException, e:
739
                function_name = next_version(function_name)
740
                # try again with next version of name
741
        
742
        # Return query that uses function
743
        cols = None
744
        if returning != None: cols = [returning]
745
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(function),
746
            cols) # AS clause requires function alias
747
        return mk_select(db, func_table, start=0, order_by=None)
748
    
749
    return query
750

    
751
def insert_select(db, table, *args, **kw_args):
752
    '''For params, see mk_insert_select() and run_query_into()
753
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
754
        values in
755
    '''
756
    into = kw_args.pop('into', None)
757
    if into != None: kw_args['embeddable'] = True
758
    recover = kw_args.pop('recover', None)
759
    if kw_args.get('ignore', False): recover = True
760
    cacheable = kw_args.pop('cacheable', True)
761
    log_level = kw_args.pop('log_level', 2)
762
    
763
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
764
        into, recover=recover, cacheable=cacheable, log_level=log_level)
765
    autoanalyze(db, table)
766
    return cur
767

    
768
default = sql_gen.default # tells insert() to use the default value for a column
769

    
770
def insert(db, table, row, *args, **kw_args):
771
    '''For params, see insert_select()'''
772
    if lists.is_seq(row): cols = None
773
    else:
774
        cols = row.keys()
775
        row = row.values()
776
    row = list(row) # ensure that "== []" works
777
    
778
    if row == []: query = None
779
    else: query = sql_gen.Values(row).to_str(db)
780
    
781
    return insert_select(db, table, cols, query, *args, **kw_args)
782

    
783
def mk_update(db, table, changes=None, cond=None, in_place=False):
784
    '''
785
    @param changes [(col, new_value),...]
786
        * container can be any iterable type
787
        * col: sql_gen.Code|str (for col name)
788
        * new_value: sql_gen.Code|literal value
789
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
790
    @param in_place If set, locks the table and updates rows in place.
791
        This avoids creating dead rows in PostgreSQL.
792
        * cond must be None
793
    @return str query
794
    '''
795
    table = sql_gen.as_Table(table)
796
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
797
        for c, v in changes]
798
    
799
    if in_place:
800
        assert cond == None
801
        
802
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
803
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
804
            +db.col_info(sql_gen.with_default_table(c, table)).type
805
            +'\nUSING '+v.to_str(db) for c, v in changes))
806
    else:
807
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
808
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
809
            for c, v in changes))
810
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
811
    
812
    return query
813

    
814
def update(db, table, *args, **kw_args):
815
    '''For params, see mk_update() and run_query()'''
816
    recover = kw_args.pop('recover', None)
817
    cacheable = kw_args.pop('cacheable', False)
818
    log_level = kw_args.pop('log_level', 2)
819
    
820
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
821
        cacheable, log_level=log_level)
822
    autoanalyze(db, table)
823
    return cur
824

    
825
def last_insert_id(db):
826
    module = util.root_module(db.db)
827
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
828
    elif module == 'MySQLdb': return db.insert_id()
829
    else: return None
830

    
831
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
832
    '''Creates a mapping from original column names (which may have collisions)
833
    to names that will be distinct among the columns' tables.
834
    This is meant to be used for several tables that are being joined together.
835
    @param cols The columns to combine. Duplicates will be removed.
836
    @param into The table for the new columns.
837
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
838
        columns will be included in the mapping even if they are not in cols.
839
        The tables of the provided Col objects will be changed to into, so make
840
        copies of them if you want to keep the original tables.
841
    @param as_items Whether to return a list of dict items instead of a dict
842
    @return dict(orig_col=new_col, ...)
843
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
844
        * new_col: sql_gen.Col(orig_col_name, into)
845
        * All mappings use the into table so its name can easily be
846
          changed for all columns at once
847
    '''
848
    cols = lists.uniqify(cols)
849
    
850
    items = []
851
    for col in preserve:
852
        orig_col = copy.copy(col)
853
        col.table = into
854
        items.append((orig_col, col))
855
    preserve = set(preserve)
856
    for col in cols:
857
        if col not in preserve:
858
            items.append((col, sql_gen.Col(str(col), into, col.srcs)))
859
    
860
    if not as_items: items = dict(items)
861
    return items
862

    
863
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
864
    '''For params, see mk_flatten_mapping()
865
    @return See return value of mk_flatten_mapping()
866
    '''
867
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
868
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
869
    run_query_into(db, mk_select(db, joins, cols, limit=limit, start=start),
870
        into=into, add_indexes_=True)
871
    return dict(items)
872

    
873
##### Database structure introspection
874

    
875
#### Tables
876

    
877
def tables(db, schema_like='public', table_like='%', exact=False):
878
    if exact: compare = '='
879
    else: compare = 'LIKE'
880
    
881
    module = util.root_module(db.db)
882
    if module == 'psycopg2':
883
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
884
            ('tablename', sql_gen.CompareCond(table_like, compare))]
885
        return values(select(db, 'pg_tables', ['tablename'], conds,
886
            order_by='tablename', log_level=4))
887
    elif module == 'MySQLdb':
888
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
889
            , cacheable=True, log_level=4))
890
    else: raise NotImplementedError("Can't list tables for "+module+' database')
891

    
892
def table_exists(db, table):
893
    table = sql_gen.as_Table(table)
894
    return list(tables(db, table.schema, table.name, exact=True)) != []
895

    
896
def table_row_count(db, table, recover=None):
897
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
898
        order_by=None, start=0), recover=recover, log_level=3))
899

    
900
def table_cols(db, table, recover=None):
901
    return list(col_names(select(db, table, limit=0, order_by=None,
902
        recover=recover, log_level=4)))
903

    
904
def pkey(db, table, recover=None):
905
    '''Assumed to be first column in table'''
906
    return table_cols(db, table, recover)[0]
907

    
908
not_null_col = 'not_null_col'
909

    
910
def table_not_null_col(db, table, recover=None):
911
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
912
    if not_null_col in table_cols(db, table, recover): return not_null_col
913
    else: return pkey(db, table, recover)
914

    
915
def index_cols(db, table, index):
916
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
917
    automatically created. When you don't know whether something is a UNIQUE
918
    constraint or a UNIQUE index, use this function.'''
919
    module = util.root_module(db.db)
920
    if module == 'psycopg2':
921
        return list(values(run_query(db, '''\
922
SELECT attname
923
FROM
924
(
925
        SELECT attnum, attname
926
        FROM pg_index
927
        JOIN pg_class index ON index.oid = indexrelid
928
        JOIN pg_class table_ ON table_.oid = indrelid
929
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
930
        WHERE
931
            table_.relname = '''+db.esc_value(table)+'''
932
            AND index.relname = '''+db.esc_value(index)+'''
933
    UNION
934
        SELECT attnum, attname
935
        FROM
936
        (
937
            SELECT
938
                indrelid
939
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
940
                    AS indkey
941
            FROM pg_index
942
            JOIN pg_class index ON index.oid = indexrelid
943
            JOIN pg_class table_ ON table_.oid = indrelid
944
            WHERE
945
                table_.relname = '''+db.esc_value(table)+'''
946
                AND index.relname = '''+db.esc_value(index)+'''
947
        ) s
948
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
949
) s
950
ORDER BY attnum
951
'''
952
            , cacheable=True, log_level=4)))
953
    else: raise NotImplementedError("Can't list index columns for "+module+
954
        ' database')
955

    
956
def constraint_cols(db, table, constraint):
957
    module = util.root_module(db.db)
958
    if module == 'psycopg2':
959
        return list(values(run_query(db, '''\
960
SELECT attname
961
FROM pg_constraint
962
JOIN pg_class ON pg_class.oid = conrelid
963
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
964
WHERE
965
    relname = '''+db.esc_value(table)+'''
966
    AND conname = '''+db.esc_value(constraint)+'''
967
ORDER BY attnum
968
'''
969
            )))
970
    else: raise NotImplementedError("Can't list constraint columns for "+module+
971
        ' database')
972

    
973
#### Functions
974

    
975
def function_exists(db, function):
976
    function = sql_gen.as_Function(function)
977
    
978
    info_table = sql_gen.Table('routines', 'information_schema')
979
    conds = [('routine_name', function.name)]
980
    schema = function.schema
981
    if schema != None: conds.append(('routine_schema', schema))
982
    # Exclude trigger functions, since they cannot be called directly
983
    conds.append(('data_type', sql_gen.CompareCond('trigger', '!=')))
984
    
985
    return list(values(select(db, info_table, ['routine_name'], conds,
986
        order_by='routine_schema', limit=1, log_level=4))) != []
987
        # TODO: order_by search_path schema order
988

    
989
##### Structural changes
990

    
991
#### Columns
992

    
993
def add_col(db, table, col, comment=None, **kw_args):
994
    '''
995
    @param col TypedCol Name may be versioned, so be sure to propagate any
996
        renaming back to any source column for the TypedCol.
997
    @param comment None|str SQL comment used to distinguish columns of the same
998
        name from each other when they contain different data, to allow the
999
        ADD COLUMN query to be cached. If not set, query will not be cached.
1000
    '''
1001
    assert isinstance(col, sql_gen.TypedCol)
1002
    
1003
    while True:
1004
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
1005
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
1006
        
1007
        try:
1008
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
1009
            break
1010
        except DuplicateException:
1011
            col.name = next_version(col.name)
1012
            # try again with next version of name
1013

    
1014
def add_not_null(db, col):
1015
    table = col.table
1016
    col = sql_gen.to_name_only_col(col)
1017
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1018
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1019

    
1020
row_num_col = '_row_num'
1021

    
1022
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1023
    constraints='PRIMARY KEY')
1024

    
1025
def add_row_num(db, table):
1026
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1027
    be the primary key.'''
1028
    add_col(db, table, row_num_typed_col, log_level=3)
1029

    
1030
#### Indexes
1031

    
1032
def add_pkey(db, table, cols=None, recover=None):
1033
    '''Adds a primary key.
1034
    @param cols [sql_gen.Col,...] The columns in the primary key.
1035
        Defaults to the first column in the table.
1036
    @pre The table must not already have a primary key.
1037
    '''
1038
    table = sql_gen.as_Table(table)
1039
    if cols == None: cols = [pkey(db, table, recover)]
1040
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1041
    
1042
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1043
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1044
        log_ignore_excs=(DuplicateException,))
1045

    
1046
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1047
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1048
    Currently, only function calls are supported as expressions.
1049
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1050
        This allows indexes to be used for comparisons where NULLs are equal.
1051
    '''
1052
    exprs = lists.mk_seq(exprs)
1053
    
1054
    # Parse exprs
1055
    old_exprs = exprs[:]
1056
    exprs = []
1057
    cols = []
1058
    for i, expr in enumerate(old_exprs):
1059
        expr = sql_gen.as_Col(expr, table)
1060
        
1061
        # Handle nullable columns
1062
        if ensure_not_null_:
1063
            try: expr = ensure_not_null(db, expr)
1064
            except KeyError: pass # unknown type, so just create plain index
1065
        
1066
        # Extract col
1067
        expr = copy.deepcopy(expr) # don't modify input!
1068
        if isinstance(expr, sql_gen.FunctionCall):
1069
            col = expr.args[0]
1070
            expr = sql_gen.Expr(expr)
1071
        else: col = expr
1072
        assert isinstance(col, sql_gen.Col)
1073
        
1074
        # Extract table
1075
        if table == None:
1076
            assert sql_gen.is_table_col(col)
1077
            table = col.table
1078
        
1079
        col.table = None
1080
        
1081
        exprs.append(expr)
1082
        cols.append(col)
1083
    
1084
    table = sql_gen.as_Table(table)
1085
    index = sql_gen.Table(str(sql_gen.Col(','.join(map(str, cols)), table)))
1086
    
1087
    # Add index
1088
    while True:
1089
        str_ = 'CREATE'
1090
        if unique: str_ += ' UNIQUE'
1091
        str_ += ' INDEX '+index.to_str(db)+' ON '+table.to_str(db)+' ('+(
1092
            ', '.join((v.to_str(db) for v in exprs)))+')'
1093
        
1094
        try:
1095
            run_query(db, str_, recover=True, cacheable=True, log_level=3,
1096
                log_ignore_excs=(DuplicateException,))
1097
            break
1098
        except DuplicateException:
1099
            index.name = next_version(index.name)
1100
            # try again with next version of name
1101

    
1102
def add_index_col(db, col, suffix, expr, nullable=True):
1103
    if sql_gen.index_col(col) != None: return # already has index col
1104
    
1105
    new_col = sql_gen.suffixed_col(col, suffix)
1106
    
1107
    # Add column
1108
    new_typed_col = sql_gen.TypedCol(new_col.name, db.col_info(col).type)
1109
    add_col(db, col.table, new_typed_col, comment='src: '+repr(col),
1110
        log_level=3)
1111
    new_col.name = new_typed_col.name # propagate any renaming
1112
    
1113
    update(db, col.table, [(new_col, expr)], in_place=True, cacheable=True,
1114
        log_level=3)
1115
    if not nullable: add_not_null(db, new_col)
1116
    add_index(db, new_col)
1117
    
1118
    col.table.index_cols[col.name] = new_col.name
1119

    
1120
# Controls when ensure_not_null() will use index columns
1121
not_null_index_cols_min_rows = 0 # rows; initially always use index columns
1122

    
1123
def ensure_not_null(db, col):
1124
    '''For params, see sql_gen.ensure_not_null()'''
1125
    expr = sql_gen.ensure_not_null(db, col)
1126
    
1127
    # If a nullable column in a temp table, add separate index column instead.
1128
    # Note that for small datasources, this adds 6-25% to the total import time.
1129
    if (sql_gen.is_temp_col(col) and isinstance(expr, sql_gen.EnsureNotNull)
1130
        and table_row_count(db, col.table) >= not_null_index_cols_min_rows):
1131
        add_index_col(db, col, '::NOT NULL', expr, nullable=False)
1132
        expr = sql_gen.index_col(col)
1133
    
1134
    return expr
1135

    
1136
already_indexed = object() # tells add_indexes() the pkey has already been added
1137

    
1138
def add_indexes(db, table, has_pkey=True):
1139
    '''Adds an index on all columns in a table.
1140
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1141
        index should be added on the first column.
1142
        * If already_indexed, the pkey is assumed to have already been added
1143
    '''
1144
    cols = table_cols(db, table)
1145
    if has_pkey:
1146
        if has_pkey is not already_indexed: add_pkey(db, table)
1147
        cols = cols[1:]
1148
    for col in cols: add_index(db, col, table)
1149

    
1150
#### Tables
1151

    
1152
### Maintenance
1153

    
1154
def analyze(db, table):
1155
    table = sql_gen.as_Table(table)
1156
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1157

    
1158
def autoanalyze(db, table):
1159
    if db.autoanalyze: analyze(db, table)
1160

    
1161
def vacuum(db, table):
1162
    table = sql_gen.as_Table(table)
1163
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1164
        log_level=3))
1165

    
1166
### Lifecycle
1167

    
1168
def drop_table(db, table):
1169
    table = sql_gen.as_Table(table)
1170
    return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE')
1171

    
1172
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1173
    like=None):
1174
    '''Creates a table.
1175
    @param cols [sql_gen.TypedCol,...] The column names and types
1176
    @param has_pkey If set, the first column becomes the primary key.
1177
    @param col_indexes bool|[ref]
1178
        * If True, indexes will be added on all non-pkey columns.
1179
        * If a list reference, [0] will be set to a function to do this.
1180
          This can be used to delay index creation until the table is populated.
1181
    '''
1182
    table = sql_gen.as_Table(table)
1183
    
1184
    if like != None:
1185
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1186
            ]+cols
1187
    if has_pkey:
1188
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1189
        pkey.constraints = 'PRIMARY KEY'
1190
    
1191
    temp = table.is_temp and not db.debug_temp
1192
        # temp tables permanent in debug_temp mode
1193
    
1194
    # Create table
1195
    while True:
1196
        str_ = 'CREATE'
1197
        if temp: str_ += ' TEMP'
1198
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1199
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1200
        str_ += '\n);\n'
1201
        
1202
        try:
1203
            run_query(db, str_, cacheable=True, log_level=2,
1204
                log_ignore_excs=(DuplicateException,))
1205
            break
1206
        except DuplicateException:
1207
            table.name = next_version(table.name)
1208
            # try again with next version of name
1209
    
1210
    # Add indexes
1211
    if has_pkey: has_pkey = already_indexed
1212
    def add_indexes_(): add_indexes(db, table, has_pkey)
1213
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1214
    elif col_indexes: add_indexes_() # add now
1215

    
1216
def copy_table_struct(db, src, dest):
1217
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1218
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1219

    
1220
### Data
1221

    
1222
def truncate(db, table, schema='public', **kw_args):
1223
    '''For params, see run_query()'''
1224
    table = sql_gen.as_Table(table, schema)
1225
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1226

    
1227
def empty_temp(db, tables):
1228
    if db.debug_temp: return # leave temp tables there for debugging
1229
    tables = lists.mk_seq(tables)
1230
    for table in tables: truncate(db, table, log_level=3)
1231

    
1232
def empty_db(db, schema='public', **kw_args):
1233
    '''For kw_args, see tables()'''
1234
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1235

    
1236
def distinct_table(db, table, distinct_on):
1237
    '''Creates a copy of a temp table which is distinct on the given columns.
1238
    The old and new tables will both get an index on these columns, to
1239
    facilitate merge joins.
1240
    @param distinct_on If empty, creates a table with one row. This is useful if
1241
        your distinct_on columns are all literal values.
1242
    @return The new table.
1243
    '''
1244
    new_table = sql_gen.suffixed_table(table, '_distinct')
1245
    
1246
    copy_table_struct(db, table, new_table)
1247
    
1248
    limit = None
1249
    if distinct_on == []: limit = 1 # one sample row
1250
    else:
1251
        add_index(db, distinct_on, new_table, unique=True)
1252
        add_index(db, distinct_on, table) # for join optimization
1253
    
1254
    insert_select(db, new_table, None, mk_select(db, table, start=0,
1255
        limit=limit), ignore=True)
1256
    analyze(db, new_table)
1257
    
1258
    return new_table
(24-24/37)