Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import warnings
6

    
7
import exc
8
import dicts
9
import iters
10
import lists
11
from Proxy import Proxy
12
import rand
13
import sql_gen
14
import strings
15
import util
16

    
17
##### Exceptions
18

    
19
def get_cur_query(cur, input_query=None):
20
    raw_query = None
21
    if hasattr(cur, 'query'): raw_query = cur.query
22
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
23
    
24
    if raw_query != None: return raw_query
25
    else: return '[input] '+strings.ustr(input_query)
26

    
27
def _add_cursor_info(e, *args, **kw_args):
28
    '''For params, see get_cur_query()'''
29
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
30

    
31
class DbException(exc.ExceptionWithCause):
32
    def __init__(self, msg, cause=None, cur=None):
33
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
34
        if cur != None: _add_cursor_info(self, cur)
35

    
36
class ExceptionWithName(DbException):
37
    def __init__(self, name, cause=None):
38
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name)), cause)
39
        self.name = name
40

    
41
class ExceptionWithValue(DbException):
42
    def __init__(self, value, cause=None):
43
        DbException.__init__(self, 'for value: '+strings.as_tt(repr(value)),
44
            cause)
45
        self.value = value
46

    
47
class ExceptionWithNameType(DbException):
48
    def __init__(self, type_, name, cause=None):
49
        DbException.__init__(self, 'for type: '+strings.as_tt(str(type_))
50
            +'; name: '+strings.as_tt(name), cause)
51
        self.type = type_
52
        self.name = name
53

    
54
class ConstraintException(DbException):
55
    def __init__(self, name, cols, cause=None):
56
        DbException.__init__(self, 'Violated '+strings.as_tt(name)
57
            +' constraint on columns: '+strings.as_tt(', '.join(cols)), cause)
58
        self.name = name
59
        self.cols = cols
60

    
61
class MissingCastException(DbException):
62
    def __init__(self, type_, col, cause=None):
63
        DbException.__init__(self, 'Missing cast to type '+strings.as_tt(type_)
64
            +' on column: '+strings.as_tt(col), cause)
65
        self.type = type_
66
        self.col = col
67

    
68
class NameException(DbException): pass
69

    
70
class DuplicateKeyException(ConstraintException): pass
71

    
72
class NullValueException(ConstraintException): pass
73

    
74
class InvalidValueException(ExceptionWithValue): pass
75

    
76
class DuplicateException(ExceptionWithNameType): pass
77

    
78
class EmptyRowException(DbException): pass
79

    
80
##### Warnings
81

    
82
class DbWarning(UserWarning): pass
83

    
84
##### Result retrieval
85

    
86
def col_names(cur): return (col[0] for col in cur.description)
87

    
88
def rows(cur): return iter(lambda: cur.fetchone(), None)
89

    
90
def consume_rows(cur):
91
    '''Used to fetch all rows so result will be cached'''
92
    iters.consume_iter(rows(cur))
93

    
94
def next_row(cur): return rows(cur).next()
95

    
96
def row(cur):
97
    row_ = next_row(cur)
98
    consume_rows(cur)
99
    return row_
100

    
101
def next_value(cur): return next_row(cur)[0]
102

    
103
def value(cur): return row(cur)[0]
104

    
105
def values(cur): return iters.func_iter(lambda: next_value(cur))
106

    
107
def value_or_none(cur):
108
    try: return value(cur)
109
    except StopIteration: return None
110

    
111
##### Escaping
112

    
113
def esc_name_by_module(module, name):
114
    if module == 'psycopg2' or module == None: quote = '"'
115
    elif module == 'MySQLdb': quote = '`'
116
    else: raise NotImplementedError("Can't escape name for "+module+' database')
117
    return sql_gen.esc_name(name, quote)
118

    
119
def esc_name_by_engine(engine, name, **kw_args):
120
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
121

    
122
def esc_name(db, name, **kw_args):
123
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
124

    
125
def qual_name(db, schema, table):
126
    def esc_name_(name): return esc_name(db, name)
127
    table = esc_name_(table)
128
    if schema != None: return esc_name_(schema)+'.'+table
129
    else: return table
130

    
131
##### Database connections
132

    
133
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
134

    
135
db_engines = {
136
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
137
    'PostgreSQL': ('psycopg2', {}),
138
}
139

    
140
DatabaseErrors_set = set([DbException])
141
DatabaseErrors = tuple(DatabaseErrors_set)
142

    
143
def _add_module(module):
144
    DatabaseErrors_set.add(module.DatabaseError)
145
    global DatabaseErrors
146
    DatabaseErrors = tuple(DatabaseErrors_set)
147

    
148
def db_config_str(db_config):
149
    return db_config['engine']+' database '+db_config['database']
150

    
151
log_debug_none = lambda msg, level=2: None
152

    
153
class DbConn:
154
    def __init__(self, db_config, autocommit=True, caching=True,
155
        log_debug=log_debug_none, debug_temp=False):
156
        '''
157
        @param debug_temp Whether temporary objects should instead be permanent.
158
            This assists in debugging the internal objects used by the program.
159
        '''
160
        self.db_config = db_config
161
        self.autocommit = autocommit
162
        self.caching = caching
163
        self.log_debug = log_debug
164
        self.debug = log_debug != log_debug_none
165
        self.debug_temp = debug_temp
166
        self.autoanalyze = False
167
        
168
        self.__db = None
169
        self.query_results = {}
170
        self._savepoint = 0
171
        self._notices_seen = set()
172
    
173
    def __getattr__(self, name):
174
        if name == '__dict__': raise Exception('getting __dict__')
175
        if name == 'db': return self._db()
176
        else: raise AttributeError()
177
    
178
    def __getstate__(self):
179
        state = copy.copy(self.__dict__) # shallow copy
180
        state['log_debug'] = None # don't pickle the debug callback
181
        state['_DbConn__db'] = None # don't pickle the connection
182
        return state
183
    
184
    def connected(self): return self.__db != None
185
    
186
    def close(self):
187
        if self.connected(): self.db.close()
188
        self.__db = None
189
    
190
    def _db(self):
191
        if self.__db == None:
192
            # Process db_config
193
            db_config = self.db_config.copy() # don't modify input!
194
            schemas = db_config.pop('schemas', None)
195
            module_name, mappings = db_engines[db_config.pop('engine')]
196
            module = __import__(module_name)
197
            _add_module(module)
198
            for orig, new in mappings.iteritems():
199
                try: util.rename_key(db_config, orig, new)
200
                except KeyError: pass
201
            
202
            # Connect
203
            self.__db = module.connect(**db_config)
204
            
205
            # Configure connection
206
            if hasattr(self.db, 'set_isolation_level'):
207
                import psycopg2.extensions
208
                self.db.set_isolation_level(
209
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
210
            if schemas != None:
211
                search_path = [self.esc_name(s) for s in schemas.split(',')]
212
                search_path.append(value(run_query(self, 'SHOW search_path',
213
                    log_level=4)))
214
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
215
                    log_level=3)
216
        
217
        return self.__db
218
    
219
    class DbCursor(Proxy):
220
        def __init__(self, outer):
221
            Proxy.__init__(self, outer.db.cursor())
222
            self.outer = outer
223
            self.query_results = outer.query_results
224
            self.query_lookup = None
225
            self.result = []
226
        
227
        def execute(self, query):
228
            self._is_insert = query.startswith('INSERT')
229
            self.query_lookup = query
230
            try:
231
                try:
232
                    cur = self.inner.execute(query)
233
                    self.outer.do_autocommit()
234
                finally: self.query = get_cur_query(self.inner, query)
235
            except Exception, e:
236
                _add_cursor_info(e, self, query)
237
                self.result = e # cache the exception as the result
238
                self._cache_result()
239
                raise
240
            
241
            # Always cache certain queries
242
            if query.startswith('CREATE') or query.startswith('ALTER'):
243
                # structural changes
244
                # Rest of query must be unique in the face of name collisions,
245
                # so don't cache ADD COLUMN unless it has distinguishing comment
246
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
247
                    self._cache_result()
248
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
249
                consume_rows(self) # fetch all rows so result will be cached
250
            
251
            return cur
252
        
253
        def fetchone(self):
254
            row = self.inner.fetchone()
255
            if row != None: self.result.append(row)
256
            # otherwise, fetched all rows
257
            else: self._cache_result()
258
            return row
259
        
260
        def _cache_result(self):
261
            # For inserts that return a result set, don't cache result set since
262
            # inserts are not idempotent. Other non-SELECT queries don't have
263
            # their result set read, so only exceptions will be cached (an
264
            # invalid query will always be invalid).
265
            if self.query_results != None and (not self._is_insert
266
                or isinstance(self.result, Exception)):
267
                
268
                assert self.query_lookup != None
269
                self.query_results[self.query_lookup] = self.CacheCursor(
270
                    util.dict_subset(dicts.AttrsDictView(self),
271
                    ['query', 'result', 'rowcount', 'description']))
272
        
273
        class CacheCursor:
274
            def __init__(self, cached_result): self.__dict__ = cached_result
275
            
276
            def execute(self, *args, **kw_args):
277
                if isinstance(self.result, Exception): raise self.result
278
                # otherwise, result is a rows list
279
                self.iter = iter(self.result)
280
            
281
            def fetchone(self):
282
                try: return self.iter.next()
283
                except StopIteration: return None
284
    
285
    def esc_value(self, value):
286
        try: str_ = self.mogrify('%s', [value])
287
        except NotImplementedError, e:
288
            module = util.root_module(self.db)
289
            if module == 'MySQLdb':
290
                import _mysql
291
                str_ = _mysql.escape_string(value)
292
            else: raise e
293
        return strings.to_unicode(str_)
294
    
295
    def esc_name(self, name): return esc_name(self, name) # calls global func
296
    
297
    def std_code(self, str_):
298
        '''Standardizes SQL code.
299
        * Ensures that string literals are prefixed by `E`
300
        '''
301
        if str_.startswith("'"): str_ = 'E'+str_
302
        return str_
303
    
304
    def can_mogrify(self):
305
        module = util.root_module(self.db)
306
        return module == 'psycopg2'
307
    
308
    def mogrify(self, query, params=None):
309
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
310
        else: raise NotImplementedError("Can't mogrify query")
311
    
312
    def print_notices(self):
313
        if hasattr(self.db, 'notices'):
314
            for msg in self.db.notices:
315
                if msg not in self._notices_seen:
316
                    self._notices_seen.add(msg)
317
                    self.log_debug(msg, level=2)
318
    
319
    def run_query(self, query, cacheable=False, log_level=2,
320
        debug_msg_ref=None):
321
        '''
322
        @param log_ignore_excs The log_level will be increased by 2 if the query
323
            throws one of these exceptions.
324
        @param debug_msg_ref If specified, the log message will be returned in
325
            this instead of being output. This allows you to filter log messages
326
            depending on the result of the query.
327
        '''
328
        assert query != None
329
        
330
        if not self.caching: cacheable = False
331
        used_cache = False
332
        
333
        def log_msg(query):
334
            if used_cache: cache_status = 'cache hit'
335
            elif cacheable: cache_status = 'cache miss'
336
            else: cache_status = 'non-cacheable'
337
            return 'DB query: '+cache_status+':\n'+strings.as_code(query, 'SQL')
338
        
339
        try:
340
            # Get cursor
341
            if cacheable:
342
                try:
343
                    cur = self.query_results[query]
344
                    used_cache = True
345
                except KeyError: cur = self.DbCursor(self)
346
            else: cur = self.db.cursor()
347
            
348
            # Log query
349
            if self.debug and debug_msg_ref == None: # log before running
350
                self.log_debug(log_msg(query), log_level)
351
            
352
            # Run query
353
            cur.execute(query)
354
        finally:
355
            self.print_notices()
356
            if self.debug and debug_msg_ref != None: # return after running
357
                debug_msg_ref[0] = log_msg(str(get_cur_query(cur, query)))
358
        
359
        return cur
360
    
361
    def is_cached(self, query): return query in self.query_results
362
    
363
    def with_autocommit(self, func):
364
        import psycopg2.extensions
365
        
366
        prev_isolation_level = self.db.isolation_level
367
        self.db.set_isolation_level(
368
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
369
        try: return func()
370
        finally: self.db.set_isolation_level(prev_isolation_level)
371
    
372
    def with_savepoint(self, func):
373
        savepoint = 'level_'+str(self._savepoint)
374
        self.run_query('SAVEPOINT '+savepoint, log_level=4)
375
        self._savepoint += 1
376
        try: return func()
377
        except:
378
            self.run_query('ROLLBACK TO SAVEPOINT '+savepoint, log_level=4)
379
            raise
380
        finally:
381
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
382
            # "The savepoint remains valid and can be rolled back to again"
383
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
384
            self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
385
            
386
            self._savepoint -= 1
387
            assert self._savepoint >= 0
388
            
389
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
390
    
391
    def do_autocommit(self):
392
        '''Autocommits if outside savepoint'''
393
        assert self._savepoint >= 0
394
        if self.autocommit and self._savepoint == 0:
395
            self.log_debug('Autocommitting', level=4)
396
            self.db.commit()
397
    
398
    def col_info(self, col):
399
        table = sql_gen.Table('columns', 'information_schema')
400
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
401
            'USER-DEFINED'), sql_gen.Col('udt_name'))
402
        cols = [type_, 'column_default',
403
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
404
        
405
        conds = [('table_name', col.table.name), ('column_name', col.name)]
406
        schema = col.table.schema
407
        if schema != None: conds.append(('table_schema', schema))
408
        
409
        type_, default, nullable = row(select(self, table, cols, conds,
410
            order_by='table_schema', limit=1, cacheable=False, log_level=4))
411
            # TODO: order_by search_path schema order
412
        default = sql_gen.as_Code(default, self)
413
        
414
        return sql_gen.TypedCol(col.name, type_, default, nullable)
415
    
416
    def TempFunction(self, name):
417
        if self.debug_temp: schema = None
418
        else: schema = 'pg_temp'
419
        return sql_gen.Function(name, schema)
420

    
421
connect = DbConn
422

    
423
##### Recoverable querying
424

    
425
def with_savepoint(db, func): return db.with_savepoint(func)
426

    
427
def run_query(db, query, recover=None, cacheable=False, log_level=2,
428
    log_ignore_excs=None, **kw_args):
429
    '''For params, see DbConn.run_query()'''
430
    if recover == None: recover = False
431
    if log_ignore_excs == None: log_ignore_excs = ()
432
    log_ignore_excs = tuple(log_ignore_excs)
433
    
434
    debug_msg_ref = None # usually, db.run_query() logs query before running it
435
    # But if filtering with log_ignore_excs, wait until after exception parsing
436
    if log_ignore_excs != () or not db.can_mogrify(): debug_msg_ref = [None]
437
    
438
    try:
439
        try:
440
            def run(): return db.run_query(query, cacheable, log_level,
441
                debug_msg_ref, **kw_args)
442
            if recover and not db.is_cached(query):
443
                return with_savepoint(db, run)
444
            else: return run() # don't need savepoint if cached
445
        except Exception, e:
446
            msg = strings.ustr(e.args[0])
447
            
448
            match = re.match(r'^duplicate key value violates unique constraint '
449
                r'"((_?[^\W_]+(?=[._]))?.+?)"', msg)
450
            if match:
451
                constraint, table = match.groups()
452
                cols = []
453
                if recover: # need auto-rollback to run index_cols()
454
                    try: cols = index_cols(db, table, constraint)
455
                    except NotImplementedError: pass
456
                raise DuplicateKeyException(constraint, cols, e)
457
            
458
            match = re.match(r'^null value in column "(.+?)" violates not-null'
459
                r' constraint', msg)
460
            if match: raise NullValueException('NOT NULL', [match.group(1)], e)
461
            
462
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
463
                r'|.+? field value out of range): "(.+?)"', msg)
464
            if match:
465
                value, = match.groups()
466
                raise InvalidValueException(strings.to_unicode(value), e)
467
            
468
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
469
                r'is of type', msg)
470
            if match:
471
                col, type_ = match.groups()
472
                raise MissingCastException(type_, col, e)
473
            
474
            match = re.match(r'^(\S+) "(.+?)".*? already exists', msg)
475
            if match:
476
                type_, name = match.groups()
477
                raise DuplicateException(type_, name, e)
478
            
479
            raise # no specific exception raised
480
    except log_ignore_excs:
481
        log_level += 2
482
        raise
483
    finally:
484
        if debug_msg_ref != None and debug_msg_ref[0] != None:
485
            db.log_debug(debug_msg_ref[0], log_level)
486

    
487
##### Basic queries
488

    
489
def next_version(name):
490
    version = 1 # first existing name was version 0
491
    match = re.match(r'^(.*)#(\d+)$', name)
492
    if match:
493
        name, version = match.groups()
494
        version = int(version)+1
495
    return sql_gen.concat(name, '#'+str(version))
496

    
497
def lock_table(db, table, mode):
498
    table = sql_gen.as_Table(table)
499
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
500

    
501
def run_query_into(db, query, into=None, add_indexes_=False, **kw_args):
502
    '''Outputs a query to a temp table.
503
    For params, see run_query().
504
    '''
505
    if into == None: return run_query(db, query, **kw_args)
506
    
507
    assert isinstance(into, sql_gen.Table)
508
    
509
    into.is_temp = True
510
    # "temporary tables cannot specify a schema name", so remove schema
511
    into.schema = None
512
    
513
    kw_args['recover'] = True
514
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
515
    
516
    temp = not db.debug_temp # tables are permanent in debug_temp mode
517
    
518
    # Create table
519
    while True:
520
        create_query = 'CREATE'
521
        if temp: create_query += ' TEMP'
522
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
523
        
524
        try:
525
            cur = run_query(db, create_query, **kw_args)
526
                # CREATE TABLE AS sets rowcount to # rows in query
527
            break
528
        except DuplicateException, e:
529
            into.name = next_version(into.name)
530
            # try again with next version of name
531
    
532
    if add_indexes_: add_indexes(db, into)
533
    
534
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
535
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
536
    # table is going to be used in complex queries, it is wise to run ANALYZE on
537
    # the temporary table after it is populated."
538
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
539
    # If into is not a temp table, ANALYZE is useful but not required.
540
    analyze(db, into)
541
    
542
    return cur
543

    
544
order_by_pkey = object() # tells mk_select() to order by the pkey
545

    
546
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
547

    
548
def mk_select(db, tables, fields=None, conds=None, distinct_on=[], limit=None,
549
    start=None, order_by=order_by_pkey, default_table=None):
550
    '''
551
    @param tables The single table to select from, or a list of tables to join
552
        together, with tables after the first being sql_gen.Join objects
553
    @param fields Use None to select all fields in the table
554
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
555
        * container can be any iterable type
556
        * compare_left_side: sql_gen.Code|str (for col name)
557
        * compare_right_side: sql_gen.ValueCond|literal value
558
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
559
        use all columns
560
    @return query
561
    '''
562
    # Parse tables param
563
    tables = lists.mk_seq(tables)
564
    tables = list(tables) # don't modify input! (list() copies input)
565
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
566
    
567
    # Parse other params
568
    if conds == None: conds = []
569
    elif dicts.is_dict(conds): conds = conds.items()
570
    conds = list(conds) # don't modify input! (list() copies input)
571
    assert limit == None or type(limit) == int
572
    assert start == None or type(start) == int
573
    if order_by is order_by_pkey:
574
        if distinct_on != []: order_by = None
575
        else: order_by = pkey(db, table0, recover=True)
576
    
577
    query = 'SELECT'
578
    
579
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
580
    
581
    # DISTINCT ON columns
582
    if distinct_on != []:
583
        query += '\nDISTINCT'
584
        if distinct_on is not distinct_on_all:
585
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
586
    
587
    # Columns
588
    if fields == None:
589
        if query.find('\n') >= 0: whitespace = '\n'
590
        else: whitespace = ' '
591
        query += whitespace+'*'
592
    else:
593
        assert fields != []
594
        query += '\n'+('\n, '.join(map(parse_col, fields)))
595
    
596
    # Main table
597
    query += '\nFROM '+table0.to_str(db)
598
    
599
    # Add joins
600
    left_table = table0
601
    for join_ in tables:
602
        table = join_.table
603
        
604
        # Parse special values
605
        if join_.type_ is sql_gen.filter_out: # filter no match
606
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
607
                sql_gen.CompareCond(None, '~=')))
608
        
609
        query += '\n'+join_.to_str(db, left_table)
610
        
611
        left_table = table
612
    
613
    missing = True
614
    if conds != []:
615
        if len(conds) == 1: whitespace = ' '
616
        else: whitespace = '\n'
617
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
618
            .to_str(db) for l, r in conds], 'WHERE')
619
        missing = False
620
    if order_by != None:
621
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
622
    if limit != None: query += '\nLIMIT '+str(limit); missing = False
623
    if start != None:
624
        if start != 0: query += '\nOFFSET '+str(start)
625
        missing = False
626
    if missing: warnings.warn(DbWarning(
627
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
628
    
629
    return query
630

    
631
def select(db, *args, **kw_args):
632
    '''For params, see mk_select() and run_query()'''
633
    recover = kw_args.pop('recover', None)
634
    cacheable = kw_args.pop('cacheable', True)
635
    log_level = kw_args.pop('log_level', 2)
636
    
637
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
638
        log_level=log_level)
639

    
640
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
641
    embeddable=False, ignore=False):
642
    '''
643
    @param returning str|None An inserted column (such as pkey) to return
644
    @param embeddable Whether the query should be embeddable as a nested SELECT.
645
        Warning: If you set this and cacheable=True when the query is run, the
646
        query will be fully cached, not just if it raises an exception.
647
    @param ignore Whether to ignore duplicate keys.
648
    '''
649
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
650
    if cols == []: cols = None # no cols (all defaults) = unknown col names
651
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
652
    if select_query == None: select_query = 'DEFAULT VALUES'
653
    if returning != None: returning = sql_gen.as_Col(returning, table)
654
    
655
    first_line = 'INSERT INTO '+table.to_str(db)
656
    
657
    def mk_insert(select_query):
658
        query = first_line
659
        if cols != None:
660
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
661
        query += '\n'+select_query
662
        
663
        if returning != None:
664
            returning_name_col = sql_gen.to_name_only_col(returning)
665
            query += '\nRETURNING '+returning_name_col.to_str(db)
666
        
667
        return query
668
    
669
    return_type = 'unknown'
670
    if returning != None: return_type = returning.to_str(db)+'%TYPE'
671
    
672
    lang = 'sql'
673
    if ignore:
674
        # Always return something to set the correct rowcount
675
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
676
        
677
        embeddable = True # must use function
678
        lang = 'plpgsql'
679
        
680
        if cols == None:
681
            row = [sql_gen.Col(sql_gen.all_cols, 'row')]
682
            row_vars = [sql_gen.Table('row')]
683
        else:
684
            row_vars = row = [sql_gen.Col(c.name, 'row') for c in cols]
685
        
686
        query = '''\
687
DECLARE
688
    row '''+table.to_str(db)+'''%ROWTYPE;
689
BEGIN
690
    /* Need an EXCEPTION block for each individual row because "When an error is
691
    caught by an EXCEPTION clause, [...] all changes to persistent database
692
    state within the block are rolled back."
693
    This is unfortunate because "A block containing an EXCEPTION clause is
694
    significantly more expensive to enter and exit than a block without one."
695
    (http://www.postgresql.org/docs/8.3/static/plpgsql-control-structures.html\
696
#PLPGSQL-ERROR-TRAPPING)
697
    */
698
    FOR '''+(', '.join((v.to_str(db) for v in row_vars)))+''' IN
699
'''+select_query+'''
700
    LOOP
701
        BEGIN
702
            RETURN QUERY
703
'''+mk_insert(sql_gen.Values(row).to_str(db))+'''
704
;
705
        EXCEPTION
706
            WHEN unique_violation THEN NULL; -- continue to next row
707
        END;
708
    END LOOP;
709
END;\
710
'''
711
    else: query = mk_insert(select_query)
712
    
713
    if embeddable:
714
        # Create function
715
        function_name = sql_gen.clean_name(first_line)
716
        while True:
717
            try:
718
                function = db.TempFunction(function_name)
719
                
720
                function_query = '''\
721
CREATE FUNCTION '''+function.to_str(db)+'''()
722
RETURNS SETOF '''+return_type+'''
723
LANGUAGE '''+lang+'''
724
AS $$
725
'''+query+'''
726
$$;
727
'''
728
                run_query(db, function_query, recover=True, cacheable=True,
729
                    log_ignore_excs=(DuplicateException,))
730
                break # this version was successful
731
            except DuplicateException, e:
732
                function_name = next_version(function_name)
733
                # try again with next version of name
734
        
735
        # Return query that uses function
736
        cols = None
737
        if returning != None: cols = [returning]
738
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(function),
739
            cols) # AS clause requires function alias
740
        return mk_select(db, func_table, start=0, order_by=None)
741
    
742
    return query
743

    
744
def insert_select(db, table, *args, **kw_args):
745
    '''For params, see mk_insert_select() and run_query_into()
746
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
747
        values in
748
    '''
749
    into = kw_args.pop('into', None)
750
    if into != None: kw_args['embeddable'] = True
751
    recover = kw_args.pop('recover', None)
752
    if kw_args.get('ignore', False): recover = True
753
    cacheable = kw_args.pop('cacheable', True)
754
    log_level = kw_args.pop('log_level', 2)
755
    
756
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
757
        into, recover=recover, cacheable=cacheable, log_level=log_level)
758
    autoanalyze(db, table)
759
    return cur
760

    
761
default = sql_gen.default # tells insert() to use the default value for a column
762

    
763
def insert(db, table, row, *args, **kw_args):
764
    '''For params, see insert_select()'''
765
    if lists.is_seq(row): cols = None
766
    else:
767
        cols = row.keys()
768
        row = row.values()
769
    row = list(row) # ensure that "== []" works
770
    
771
    if row == []: query = None
772
    else: query = sql_gen.Values(row).to_str(db)
773
    
774
    return insert_select(db, table, cols, query, *args, **kw_args)
775

    
776
def mk_update(db, table, changes=None, cond=None, in_place=False):
777
    '''
778
    @param changes [(col, new_value),...]
779
        * container can be any iterable type
780
        * col: sql_gen.Code|str (for col name)
781
        * new_value: sql_gen.Code|literal value
782
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
783
    @param in_place If set, locks the table and updates rows in place.
784
        This avoids creating dead rows in PostgreSQL.
785
        * cond must be None
786
    @return str query
787
    '''
788
    table = sql_gen.as_Table(table)
789
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
790
        for c, v in changes]
791
    
792
    if in_place:
793
        assert cond == None
794
        
795
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
796
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
797
            +db.col_info(sql_gen.with_default_table(c, table)).type
798
            +'\nUSING '+v.to_str(db) for c, v in changes))
799
    else:
800
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
801
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
802
            for c, v in changes))
803
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
804
    
805
    return query
806

    
807
def update(db, table, *args, **kw_args):
808
    '''For params, see mk_update() and run_query()'''
809
    recover = kw_args.pop('recover', None)
810
    cacheable = kw_args.pop('cacheable', False)
811
    log_level = kw_args.pop('log_level', 2)
812
    
813
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
814
        cacheable, log_level=log_level)
815
    autoanalyze(db, table)
816
    return cur
817

    
818
def last_insert_id(db):
819
    module = util.root_module(db.db)
820
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
821
    elif module == 'MySQLdb': return db.insert_id()
822
    else: return None
823

    
824
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
825
    '''Creates a mapping from original column names (which may have collisions)
826
    to names that will be distinct among the columns' tables.
827
    This is meant to be used for several tables that are being joined together.
828
    @param cols The columns to combine. Duplicates will be removed.
829
    @param into The table for the new columns.
830
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
831
        columns will be included in the mapping even if they are not in cols.
832
        The tables of the provided Col objects will be changed to into, so make
833
        copies of them if you want to keep the original tables.
834
    @param as_items Whether to return a list of dict items instead of a dict
835
    @return dict(orig_col=new_col, ...)
836
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
837
        * new_col: sql_gen.Col(orig_col_name, into)
838
        * All mappings use the into table so its name can easily be
839
          changed for all columns at once
840
    '''
841
    cols = lists.uniqify(cols)
842
    
843
    items = []
844
    for col in preserve:
845
        orig_col = copy.copy(col)
846
        col.table = into
847
        items.append((orig_col, col))
848
    preserve = set(preserve)
849
    for col in cols:
850
        if col not in preserve:
851
            items.append((col, sql_gen.Col(str(col), into, col.srcs)))
852
    
853
    if not as_items: items = dict(items)
854
    return items
855

    
856
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
857
    '''For params, see mk_flatten_mapping()
858
    @return See return value of mk_flatten_mapping()
859
    '''
860
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
861
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
862
    run_query_into(db, mk_select(db, joins, cols, limit=limit, start=start),
863
        into=into, add_indexes_=True)
864
    return dict(items)
865

    
866
##### Database structure introspection
867

    
868
#### Tables
869

    
870
def tables(db, schema_like='public', table_like='%', exact=False):
871
    if exact: compare = '='
872
    else: compare = 'LIKE'
873
    
874
    module = util.root_module(db.db)
875
    if module == 'psycopg2':
876
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
877
            ('tablename', sql_gen.CompareCond(table_like, compare))]
878
        return values(select(db, 'pg_tables', ['tablename'], conds,
879
            order_by='tablename', log_level=4))
880
    elif module == 'MySQLdb':
881
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
882
            , cacheable=True, log_level=4))
883
    else: raise NotImplementedError("Can't list tables for "+module+' database')
884

    
885
def table_exists(db, table):
886
    table = sql_gen.as_Table(table)
887
    return list(tables(db, table.schema, table.name, exact=True)) != []
888

    
889
def table_row_count(db, table, recover=None):
890
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
891
        order_by=None, start=0), recover=recover, log_level=3))
892

    
893
def table_cols(db, table, recover=None):
894
    return list(col_names(select(db, table, limit=0, order_by=None,
895
        recover=recover, log_level=4)))
896

    
897
def pkey(db, table, recover=None):
898
    '''Assumed to be first column in table'''
899
    return table_cols(db, table, recover)[0]
900

    
901
not_null_col = 'not_null_col'
902

    
903
def table_not_null_col(db, table, recover=None):
904
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
905
    if not_null_col in table_cols(db, table, recover): return not_null_col
906
    else: return pkey(db, table, recover)
907

    
908
def index_cols(db, table, index):
909
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
910
    automatically created. When you don't know whether something is a UNIQUE
911
    constraint or a UNIQUE index, use this function.'''
912
    module = util.root_module(db.db)
913
    if module == 'psycopg2':
914
        return list(values(run_query(db, '''\
915
SELECT attname
916
FROM
917
(
918
        SELECT attnum, attname
919
        FROM pg_index
920
        JOIN pg_class index ON index.oid = indexrelid
921
        JOIN pg_class table_ ON table_.oid = indrelid
922
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
923
        WHERE
924
            table_.relname = '''+db.esc_value(table)+'''
925
            AND index.relname = '''+db.esc_value(index)+'''
926
    UNION
927
        SELECT attnum, attname
928
        FROM
929
        (
930
            SELECT
931
                indrelid
932
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
933
                    AS indkey
934
            FROM pg_index
935
            JOIN pg_class index ON index.oid = indexrelid
936
            JOIN pg_class table_ ON table_.oid = indrelid
937
            WHERE
938
                table_.relname = '''+db.esc_value(table)+'''
939
                AND index.relname = '''+db.esc_value(index)+'''
940
        ) s
941
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
942
) s
943
ORDER BY attnum
944
'''
945
            , cacheable=True, log_level=4)))
946
    else: raise NotImplementedError("Can't list index columns for "+module+
947
        ' database')
948

    
949
def constraint_cols(db, table, constraint):
950
    module = util.root_module(db.db)
951
    if module == 'psycopg2':
952
        return list(values(run_query(db, '''\
953
SELECT attname
954
FROM pg_constraint
955
JOIN pg_class ON pg_class.oid = conrelid
956
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
957
WHERE
958
    relname = '''+db.esc_value(table)+'''
959
    AND conname = '''+db.esc_value(constraint)+'''
960
ORDER BY attnum
961
'''
962
            )))
963
    else: raise NotImplementedError("Can't list constraint columns for "+module+
964
        ' database')
965

    
966
#### Functions
967

    
968
def function_exists(db, function):
969
    function = sql_gen.as_Function(function)
970
    
971
    info_table = sql_gen.Table('routines', 'information_schema')
972
    conds = [('routine_name', function.name)]
973
    schema = function.schema
974
    if schema != None: conds.append(('routine_schema', schema))
975
    # Exclude trigger functions, since they cannot be called directly
976
    conds.append(('data_type', sql_gen.CompareCond('trigger', '!=')))
977
    
978
    return list(values(select(db, info_table, ['routine_name'], conds,
979
        order_by='routine_schema', limit=1, log_level=4))) != []
980
        # TODO: order_by search_path schema order
981

    
982
##### Structural changes
983

    
984
#### Columns
985

    
986
def add_col(db, table, col, comment=None, **kw_args):
987
    '''
988
    @param col TypedCol Name may be versioned, so be sure to propagate any
989
        renaming back to any source column for the TypedCol.
990
    @param comment None|str SQL comment used to distinguish columns of the same
991
        name from each other when they contain different data, to allow the
992
        ADD COLUMN query to be cached. If not set, query will not be cached.
993
    '''
994
    assert isinstance(col, sql_gen.TypedCol)
995
    
996
    while True:
997
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
998
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
999
        
1000
        try:
1001
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
1002
            break
1003
        except DuplicateException:
1004
            col.name = next_version(col.name)
1005
            # try again with next version of name
1006

    
1007
def add_not_null(db, col):
1008
    table = col.table
1009
    col = sql_gen.to_name_only_col(col)
1010
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1011
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1012

    
1013
row_num_col = '_row_num'
1014

    
1015
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1016
    constraints='PRIMARY KEY')
1017

    
1018
def add_row_num(db, table):
1019
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1020
    be the primary key.'''
1021
    add_col(db, table, row_num_typed_col, log_level=3)
1022

    
1023
#### Indexes
1024

    
1025
def add_pkey(db, table, cols=None, recover=None):
1026
    '''Adds a primary key.
1027
    @param cols [sql_gen.Col,...] The columns in the primary key.
1028
        Defaults to the first column in the table.
1029
    @pre The table must not already have a primary key.
1030
    '''
1031
    table = sql_gen.as_Table(table)
1032
    if cols == None: cols = [pkey(db, table, recover)]
1033
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1034
    
1035
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1036
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1037
        log_ignore_excs=(DuplicateException,))
1038

    
1039
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1040
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1041
    Currently, only function calls are supported as expressions.
1042
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1043
        This allows indexes to be used for comparisons where NULLs are equal.
1044
    '''
1045
    exprs = lists.mk_seq(exprs)
1046
    
1047
    # Parse exprs
1048
    old_exprs = exprs[:]
1049
    exprs = []
1050
    cols = []
1051
    for i, expr in enumerate(old_exprs):
1052
        expr = sql_gen.as_Col(expr, table)
1053
        
1054
        # Handle nullable columns
1055
        if ensure_not_null_:
1056
            try: expr = ensure_not_null(db, expr)
1057
            except KeyError: pass # unknown type, so just create plain index
1058
        
1059
        # Extract col
1060
        expr = copy.deepcopy(expr) # don't modify input!
1061
        if isinstance(expr, sql_gen.FunctionCall):
1062
            col = expr.args[0]
1063
            expr = sql_gen.Expr(expr)
1064
        else: col = expr
1065
        assert isinstance(col, sql_gen.Col)
1066
        
1067
        # Extract table
1068
        if table == None:
1069
            assert sql_gen.is_table_col(col)
1070
            table = col.table
1071
        
1072
        col.table = None
1073
        
1074
        exprs.append(expr)
1075
        cols.append(col)
1076
    
1077
    table = sql_gen.as_Table(table)
1078
    index = sql_gen.Table(str(sql_gen.Col(','.join(map(str, cols)), table)))
1079
    
1080
    # Add index
1081
    while True:
1082
        str_ = 'CREATE'
1083
        if unique: str_ += ' UNIQUE'
1084
        str_ += ' INDEX '+index.to_str(db)+' ON '+table.to_str(db)+' ('+(
1085
            ', '.join((v.to_str(db) for v in exprs)))+')'
1086
        
1087
        try:
1088
            run_query(db, str_, recover=True, cacheable=True, log_level=3,
1089
                log_ignore_excs=(DuplicateException,))
1090
            break
1091
        except DuplicateException:
1092
            index.name = next_version(index.name)
1093
            # try again with next version of name
1094

    
1095
def add_index_col(db, col, suffix, expr, nullable=True):
1096
    if sql_gen.index_col(col) != None: return # already has index col
1097
    
1098
    new_col = sql_gen.suffixed_col(col, suffix)
1099
    
1100
    # Add column
1101
    new_typed_col = sql_gen.TypedCol(new_col.name, db.col_info(col).type)
1102
    add_col(db, col.table, new_typed_col, comment='src: '+repr(col),
1103
        log_level=3)
1104
    new_col.name = new_typed_col.name # propagate any renaming
1105
    
1106
    update(db, col.table, [(new_col, expr)], in_place=True, cacheable=True,
1107
        log_level=3)
1108
    if not nullable: add_not_null(db, new_col)
1109
    add_index(db, new_col)
1110
    
1111
    col.table.index_cols[col.name] = new_col.name
1112

    
1113
# Controls when ensure_not_null() will use index columns
1114
not_null_index_cols_min_rows = 0 # rows; initially always use index columns
1115

    
1116
def ensure_not_null(db, col):
1117
    '''For params, see sql_gen.ensure_not_null()'''
1118
    expr = sql_gen.ensure_not_null(db, col)
1119
    
1120
    # If a nullable column in a temp table, add separate index column instead.
1121
    # Note that for small datasources, this adds 6-25% to the total import time.
1122
    if (sql_gen.is_temp_col(col) and isinstance(expr, sql_gen.EnsureNotNull)
1123
        and table_row_count(db, col.table) >= not_null_index_cols_min_rows):
1124
        add_index_col(db, col, '::NOT NULL', expr, nullable=False)
1125
        expr = sql_gen.index_col(col)
1126
    
1127
    return expr
1128

    
1129
already_indexed = object() # tells add_indexes() the pkey has already been added
1130

    
1131
def add_indexes(db, table, has_pkey=True):
1132
    '''Adds an index on all columns in a table.
1133
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1134
        index should be added on the first column.
1135
        * If already_indexed, the pkey is assumed to have already been added
1136
    '''
1137
    cols = table_cols(db, table)
1138
    if has_pkey:
1139
        if has_pkey is not already_indexed: add_pkey(db, table)
1140
        cols = cols[1:]
1141
    for col in cols: add_index(db, col, table)
1142

    
1143
#### Tables
1144

    
1145
### Maintenance
1146

    
1147
def analyze(db, table):
1148
    table = sql_gen.as_Table(table)
1149
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1150

    
1151
def autoanalyze(db, table):
1152
    if db.autoanalyze: analyze(db, table)
1153

    
1154
def vacuum(db, table):
1155
    table = sql_gen.as_Table(table)
1156
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1157
        log_level=3))
1158

    
1159
### Lifecycle
1160

    
1161
def drop_table(db, table):
1162
    table = sql_gen.as_Table(table)
1163
    return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE')
1164

    
1165
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1166
    like=None):
1167
    '''Creates a table.
1168
    @param cols [sql_gen.TypedCol,...] The column names and types
1169
    @param has_pkey If set, the first column becomes the primary key.
1170
    @param col_indexes bool|[ref]
1171
        * If True, indexes will be added on all non-pkey columns.
1172
        * If a list reference, [0] will be set to a function to do this.
1173
          This can be used to delay index creation until the table is populated.
1174
    '''
1175
    table = sql_gen.as_Table(table)
1176
    
1177
    if like != None:
1178
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1179
            ]+cols
1180
    if has_pkey:
1181
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1182
        pkey.constraints = 'PRIMARY KEY'
1183
    
1184
    temp = table.is_temp and not db.debug_temp
1185
        # temp tables permanent in debug_temp mode
1186
    
1187
    # Create table
1188
    while True:
1189
        str_ = 'CREATE'
1190
        if temp: str_ += ' TEMP'
1191
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1192
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1193
        str_ += '\n);\n'
1194
        
1195
        try:
1196
            run_query(db, str_, cacheable=True, log_level=2,
1197
                log_ignore_excs=(DuplicateException,))
1198
            break
1199
        except DuplicateException:
1200
            table.name = next_version(table.name)
1201
            # try again with next version of name
1202
    
1203
    # Add indexes
1204
    if has_pkey: has_pkey = already_indexed
1205
    def add_indexes_(): add_indexes(db, table, has_pkey)
1206
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1207
    elif col_indexes: add_indexes_() # add now
1208

    
1209
def copy_table_struct(db, src, dest):
1210
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1211
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1212

    
1213
### Data
1214

    
1215
def truncate(db, table, schema='public', **kw_args):
1216
    '''For params, see run_query()'''
1217
    table = sql_gen.as_Table(table, schema)
1218
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1219

    
1220
def empty_temp(db, tables):
1221
    if db.debug_temp: return # leave temp tables there for debugging
1222
    tables = lists.mk_seq(tables)
1223
    for table in tables: truncate(db, table, log_level=3)
1224

    
1225
def empty_db(db, schema='public', **kw_args):
1226
    '''For kw_args, see tables()'''
1227
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1228

    
1229
def distinct_table(db, table, distinct_on):
1230
    '''Creates a copy of a temp table which is distinct on the given columns.
1231
    The old and new tables will both get an index on these columns, to
1232
    facilitate merge joins.
1233
    @param distinct_on If empty, creates a table with one row. This is useful if
1234
        your distinct_on columns are all literal values.
1235
    @return The new table.
1236
    '''
1237
    new_table = sql_gen.suffixed_table(table, '_distinct')
1238
    
1239
    copy_table_struct(db, table, new_table)
1240
    
1241
    limit = None
1242
    if distinct_on == []: limit = 1 # one sample row
1243
    else:
1244
        add_index(db, distinct_on, new_table, unique=True)
1245
        add_index(db, distinct_on, table) # for join optimization
1246
    
1247
    insert_select(db, new_table, None, mk_select(db, table, start=0,
1248
        limit=limit), ignore=True)
1249
    analyze(db, new_table)
1250
    
1251
    return new_table
(24-24/37)