Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import warnings
6

    
7
import exc
8
import dicts
9
import iters
10
import lists
11
from Proxy import Proxy
12
import rand
13
import sql_gen
14
import strings
15
import util
16

    
17
##### Exceptions
18

    
19
def get_cur_query(cur, input_query=None):
20
    raw_query = None
21
    if hasattr(cur, 'query'): raw_query = cur.query
22
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
23
    
24
    if raw_query != None: return raw_query
25
    else: return '[input] '+strings.ustr(input_query)
26

    
27
def _add_cursor_info(e, *args, **kw_args):
28
    '''For params, see get_cur_query()'''
29
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
30

    
31
class DbException(exc.ExceptionWithCause):
32
    def __init__(self, msg, cause=None, cur=None):
33
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
34
        if cur != None: _add_cursor_info(self, cur)
35

    
36
class ExceptionWithName(DbException):
37
    def __init__(self, name, cause=None):
38
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name)), cause)
39
        self.name = name
40

    
41
class ExceptionWithValue(DbException):
42
    def __init__(self, value, cause=None):
43
        DbException.__init__(self, 'for value: '+strings.as_tt(repr(value)),
44
            cause)
45
        self.value = value
46

    
47
class ExceptionWithNameType(DbException):
48
    def __init__(self, type_, name, cause=None):
49
        DbException.__init__(self, 'for type: '+strings.as_tt(str(type_))
50
            +'; name: '+strings.as_tt(name), cause)
51
        self.type = type_
52
        self.name = name
53

    
54
class ConstraintException(DbException):
55
    def __init__(self, name, cols, cause=None):
56
        DbException.__init__(self, 'Violated '+strings.as_tt(name)
57
            +' constraint on columns: '+strings.as_tt(', '.join(cols)), cause)
58
        self.name = name
59
        self.cols = cols
60

    
61
class MissingCastException(DbException):
62
    def __init__(self, type_, col, cause=None):
63
        DbException.__init__(self, 'Missing cast to type '+strings.as_tt(type_)
64
            +' on column: '+strings.as_tt(col), cause)
65
        self.type = type_
66
        self.col = col
67

    
68
class NameException(DbException): pass
69

    
70
class DuplicateKeyException(ConstraintException): pass
71

    
72
class NullValueException(ConstraintException): pass
73

    
74
class InvalidValueException(ExceptionWithValue): pass
75

    
76
class DuplicateException(ExceptionWithNameType): pass
77

    
78
class EmptyRowException(DbException): pass
79

    
80
##### Warnings
81

    
82
class DbWarning(UserWarning): pass
83

    
84
##### Result retrieval
85

    
86
def col_names(cur): return (col[0] for col in cur.description)
87

    
88
def rows(cur): return iter(lambda: cur.fetchone(), None)
89

    
90
def consume_rows(cur):
91
    '''Used to fetch all rows so result will be cached'''
92
    iters.consume_iter(rows(cur))
93

    
94
def next_row(cur): return rows(cur).next()
95

    
96
def row(cur):
97
    row_ = next_row(cur)
98
    consume_rows(cur)
99
    return row_
100

    
101
def next_value(cur): return next_row(cur)[0]
102

    
103
def value(cur): return row(cur)[0]
104

    
105
def values(cur): return iters.func_iter(lambda: next_value(cur))
106

    
107
def value_or_none(cur):
108
    try: return value(cur)
109
    except StopIteration: return None
110

    
111
##### Escaping
112

    
113
def esc_name_by_module(module, name):
114
    if module == 'psycopg2' or module == None: quote = '"'
115
    elif module == 'MySQLdb': quote = '`'
116
    else: raise NotImplementedError("Can't escape name for "+module+' database')
117
    return sql_gen.esc_name(name, quote)
118

    
119
def esc_name_by_engine(engine, name, **kw_args):
120
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
121

    
122
def esc_name(db, name, **kw_args):
123
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
124

    
125
def qual_name(db, schema, table):
126
    def esc_name_(name): return esc_name(db, name)
127
    table = esc_name_(table)
128
    if schema != None: return esc_name_(schema)+'.'+table
129
    else: return table
130

    
131
##### Database connections
132

    
133
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
134

    
135
db_engines = {
136
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
137
    'PostgreSQL': ('psycopg2', {}),
138
}
139

    
140
DatabaseErrors_set = set([DbException])
141
DatabaseErrors = tuple(DatabaseErrors_set)
142

    
143
def _add_module(module):
144
    DatabaseErrors_set.add(module.DatabaseError)
145
    global DatabaseErrors
146
    DatabaseErrors = tuple(DatabaseErrors_set)
147

    
148
def db_config_str(db_config):
149
    return db_config['engine']+' database '+db_config['database']
150

    
151
log_debug_none = lambda msg, level=2: None
152

    
153
class DbConn:
154
    def __init__(self, db_config, autocommit=True, caching=True,
155
        log_debug=log_debug_none, debug_temp=False):
156
        '''
157
        @param debug_temp Whether temporary objects should instead be permanent.
158
            This assists in debugging the internal objects used by the program.
159
        '''
160
        self.db_config = db_config
161
        self.autocommit = autocommit
162
        self.caching = caching
163
        self.log_debug = log_debug
164
        self.debug = log_debug != log_debug_none
165
        self.debug_temp = debug_temp
166
        self.autoanalyze = False
167
        
168
        self.__db = None
169
        self.query_results = {}
170
        self._savepoint = 0
171
        self._notices_seen = set()
172
    
173
    def __getattr__(self, name):
174
        if name == '__dict__': raise Exception('getting __dict__')
175
        if name == 'db': return self._db()
176
        else: raise AttributeError()
177
    
178
    def __getstate__(self):
179
        state = copy.copy(self.__dict__) # shallow copy
180
        state['log_debug'] = None # don't pickle the debug callback
181
        state['_DbConn__db'] = None # don't pickle the connection
182
        return state
183
    
184
    def connected(self): return self.__db != None
185
    
186
    def _db(self):
187
        if self.__db == None:
188
            # Process db_config
189
            db_config = self.db_config.copy() # don't modify input!
190
            schemas = db_config.pop('schemas', None)
191
            module_name, mappings = db_engines[db_config.pop('engine')]
192
            module = __import__(module_name)
193
            _add_module(module)
194
            for orig, new in mappings.iteritems():
195
                try: util.rename_key(db_config, orig, new)
196
                except KeyError: pass
197
            
198
            # Connect
199
            self.__db = module.connect(**db_config)
200
            
201
            # Configure connection
202
            if hasattr(self.db, 'set_isolation_level'):
203
                import psycopg2.extensions
204
                self.db.set_isolation_level(
205
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
206
            if schemas != None:
207
                search_path = [self.esc_name(s) for s in schemas.split(',')]
208
                search_path.append(value(run_query(self, 'SHOW search_path',
209
                    log_level=4)))
210
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
211
                    log_level=3)
212
        
213
        return self.__db
214
    
215
    class DbCursor(Proxy):
216
        def __init__(self, outer):
217
            Proxy.__init__(self, outer.db.cursor())
218
            self.outer = outer
219
            self.query_results = outer.query_results
220
            self.query_lookup = None
221
            self.result = []
222
        
223
        def execute(self, query):
224
            self._is_insert = query.startswith('INSERT')
225
            self.query_lookup = query
226
            try:
227
                try:
228
                    cur = self.inner.execute(query)
229
                    self.outer.do_autocommit()
230
                finally: self.query = get_cur_query(self.inner, query)
231
            except Exception, e:
232
                _add_cursor_info(e, self, query)
233
                self.result = e # cache the exception as the result
234
                self._cache_result()
235
                raise
236
            
237
            # Always cache certain queries
238
            if query.startswith('CREATE') or query.startswith('ALTER'):
239
                # structural changes
240
                # Rest of query must be unique in the face of name collisions,
241
                # so don't cache ADD COLUMN unless it has distinguishing comment
242
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
243
                    self._cache_result()
244
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
245
                consume_rows(self) # fetch all rows so result will be cached
246
            
247
            return cur
248
        
249
        def fetchone(self):
250
            row = self.inner.fetchone()
251
            if row != None: self.result.append(row)
252
            # otherwise, fetched all rows
253
            else: self._cache_result()
254
            return row
255
        
256
        def _cache_result(self):
257
            # For inserts that return a result set, don't cache result set since
258
            # inserts are not idempotent. Other non-SELECT queries don't have
259
            # their result set read, so only exceptions will be cached (an
260
            # invalid query will always be invalid).
261
            if self.query_results != None and (not self._is_insert
262
                or isinstance(self.result, Exception)):
263
                
264
                assert self.query_lookup != None
265
                self.query_results[self.query_lookup] = self.CacheCursor(
266
                    util.dict_subset(dicts.AttrsDictView(self),
267
                    ['query', 'result', 'rowcount', 'description']))
268
        
269
        class CacheCursor:
270
            def __init__(self, cached_result): self.__dict__ = cached_result
271
            
272
            def execute(self, *args, **kw_args):
273
                if isinstance(self.result, Exception): raise self.result
274
                # otherwise, result is a rows list
275
                self.iter = iter(self.result)
276
            
277
            def fetchone(self):
278
                try: return self.iter.next()
279
                except StopIteration: return None
280
    
281
    def esc_value(self, value):
282
        try: str_ = self.mogrify('%s', [value])
283
        except NotImplementedError, e:
284
            module = util.root_module(self.db)
285
            if module == 'MySQLdb':
286
                import _mysql
287
                str_ = _mysql.escape_string(value)
288
            else: raise e
289
        return strings.to_unicode(str_)
290
    
291
    def esc_name(self, name): return esc_name(self, name) # calls global func
292
    
293
    def std_code(self, str_):
294
        '''Standardizes SQL code.
295
        * Ensures that string literals are prefixed by `E`
296
        '''
297
        if str_.startswith("'"): str_ = 'E'+str_
298
        return str_
299
    
300
    def can_mogrify(self):
301
        module = util.root_module(self.db)
302
        return module == 'psycopg2'
303
    
304
    def mogrify(self, query, params=None):
305
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
306
        else: raise NotImplementedError("Can't mogrify query")
307
    
308
    def print_notices(self):
309
        if hasattr(self.db, 'notices'):
310
            for msg in self.db.notices:
311
                if msg not in self._notices_seen:
312
                    self._notices_seen.add(msg)
313
                    self.log_debug(msg, level=2)
314
    
315
    def run_query(self, query, cacheable=False, log_level=2,
316
        debug_msg_ref=None):
317
        '''
318
        @param log_ignore_excs The log_level will be increased by 2 if the query
319
            throws one of these exceptions.
320
        @param debug_msg_ref If specified, the log message will be returned in
321
            this instead of being output. This allows you to filter log messages
322
            depending on the result of the query.
323
        '''
324
        assert query != None
325
        
326
        if not self.caching: cacheable = False
327
        used_cache = False
328
        
329
        def log_msg(query):
330
            if used_cache: cache_status = 'cache hit'
331
            elif cacheable: cache_status = 'cache miss'
332
            else: cache_status = 'non-cacheable'
333
            return 'DB query: '+cache_status+':\n'+strings.as_code(query, 'SQL')
334
        
335
        try:
336
            # Get cursor
337
            if cacheable:
338
                try:
339
                    cur = self.query_results[query]
340
                    used_cache = True
341
                except KeyError: cur = self.DbCursor(self)
342
            else: cur = self.db.cursor()
343
            
344
            # Log query
345
            if self.debug and debug_msg_ref == None: # log before running
346
                self.log_debug(log_msg(query), log_level)
347
            
348
            # Run query
349
            cur.execute(query)
350
        finally:
351
            self.print_notices()
352
            if self.debug and debug_msg_ref != None: # return after running
353
                debug_msg_ref[0] = log_msg(str(get_cur_query(cur, query)))
354
        
355
        return cur
356
    
357
    def is_cached(self, query): return query in self.query_results
358
    
359
    def with_autocommit(self, func):
360
        import psycopg2.extensions
361
        
362
        prev_isolation_level = self.db.isolation_level
363
        self.db.set_isolation_level(
364
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
365
        try: return func()
366
        finally: self.db.set_isolation_level(prev_isolation_level)
367
    
368
    def with_savepoint(self, func):
369
        savepoint = 'level_'+str(self._savepoint)
370
        self.run_query('SAVEPOINT '+savepoint, log_level=4)
371
        self._savepoint += 1
372
        try: return func()
373
        except:
374
            self.run_query('ROLLBACK TO SAVEPOINT '+savepoint, log_level=4)
375
            raise
376
        finally:
377
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
378
            # "The savepoint remains valid and can be rolled back to again"
379
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
380
            self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
381
            
382
            self._savepoint -= 1
383
            assert self._savepoint >= 0
384
            
385
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
386
    
387
    def do_autocommit(self):
388
        '''Autocommits if outside savepoint'''
389
        assert self._savepoint >= 0
390
        if self.autocommit and self._savepoint == 0:
391
            self.log_debug('Autocommitting', level=4)
392
            self.db.commit()
393
    
394
    def col_info(self, col):
395
        table = sql_gen.Table('columns', 'information_schema')
396
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
397
            'USER-DEFINED'), sql_gen.Col('udt_name'))
398
        cols = [type_, 'column_default',
399
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
400
        
401
        conds = [('table_name', col.table.name), ('column_name', col.name)]
402
        schema = col.table.schema
403
        if schema != None: conds.append(('table_schema', schema))
404
        
405
        type_, default, nullable = row(select(self, table, cols, conds,
406
            order_by='table_schema', limit=1, cacheable=False, log_level=4))
407
            # TODO: order_by search_path schema order
408
        default = sql_gen.as_Code(default, self)
409
        
410
        return sql_gen.TypedCol(col.name, type_, default, nullable)
411
    
412
    def TempFunction(self, name):
413
        if self.debug_temp: schema = None
414
        else: schema = 'pg_temp'
415
        return sql_gen.Function(name, schema)
416

    
417
connect = DbConn
418

    
419
##### Recoverable querying
420

    
421
def with_savepoint(db, func): return db.with_savepoint(func)
422

    
423
def run_query(db, query, recover=None, cacheable=False, log_level=2,
424
    log_ignore_excs=None, **kw_args):
425
    '''For params, see DbConn.run_query()'''
426
    if recover == None: recover = False
427
    if log_ignore_excs == None: log_ignore_excs = ()
428
    log_ignore_excs = tuple(log_ignore_excs)
429
    
430
    debug_msg_ref = None # usually, db.run_query() logs query before running it
431
    # But if filtering with log_ignore_excs, wait until after exception parsing
432
    if log_ignore_excs != () or not db.can_mogrify(): debug_msg_ref = [None]
433
    
434
    try:
435
        try:
436
            def run(): return db.run_query(query, cacheable, log_level,
437
                debug_msg_ref, **kw_args)
438
            if recover and not db.is_cached(query):
439
                return with_savepoint(db, run)
440
            else: return run() # don't need savepoint if cached
441
        except Exception, e:
442
            msg = strings.ustr(e.args[0])
443
            
444
            match = re.match(r'^duplicate key value violates unique constraint '
445
                r'"((_?[^\W_]+(?=[._]))?.+?)"', msg)
446
            if match:
447
                constraint, table = match.groups()
448
                cols = []
449
                if recover: # need auto-rollback to run index_cols()
450
                    try: cols = index_cols(db, table, constraint)
451
                    except NotImplementedError: pass
452
                raise DuplicateKeyException(constraint, cols, e)
453
            
454
            match = re.match(r'^null value in column "(.+?)" violates not-null'
455
                r' constraint', msg)
456
            if match: raise NullValueException('NOT NULL', [match.group(1)], e)
457
            
458
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
459
                r'|.+? field value out of range): "(.+?)"', msg)
460
            if match:
461
                value, = match.groups()
462
                raise InvalidValueException(strings.to_unicode(value), e)
463
            
464
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
465
                r'is of type', msg)
466
            if match:
467
                col, type_ = match.groups()
468
                raise MissingCastException(type_, col, e)
469
            
470
            match = re.match(r'^(\S+) "(.+?)".*? already exists', msg)
471
            if match:
472
                type_, name = match.groups()
473
                raise DuplicateException(type_, name, e)
474
            
475
            raise # no specific exception raised
476
    except log_ignore_excs:
477
        log_level += 2
478
        raise
479
    finally:
480
        if debug_msg_ref != None and debug_msg_ref[0] != None:
481
            db.log_debug(debug_msg_ref[0], log_level)
482

    
483
##### Basic queries
484

    
485
def next_version(name):
486
    version = 1 # first existing name was version 0
487
    match = re.match(r'^(.*)#(\d+)$', name)
488
    if match:
489
        name, version = match.groups()
490
        version = int(version)+1
491
    return sql_gen.concat(name, '#'+str(version))
492

    
493
def lock_table(db, table, mode):
494
    table = sql_gen.as_Table(table)
495
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
496

    
497
def run_query_into(db, query, into=None, add_indexes_=False, **kw_args):
498
    '''Outputs a query to a temp table.
499
    For params, see run_query().
500
    '''
501
    if into == None: return run_query(db, query, **kw_args)
502
    
503
    assert isinstance(into, sql_gen.Table)
504
    
505
    into.is_temp = True
506
    # "temporary tables cannot specify a schema name", so remove schema
507
    into.schema = None
508
    
509
    kw_args['recover'] = True
510
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
511
    
512
    temp = not db.debug_temp # tables are permanent in debug_temp mode
513
    
514
    # Create table
515
    while True:
516
        create_query = 'CREATE'
517
        if temp: create_query += ' TEMP'
518
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
519
        
520
        try:
521
            cur = run_query(db, create_query, **kw_args)
522
                # CREATE TABLE AS sets rowcount to # rows in query
523
            break
524
        except DuplicateException, e:
525
            into.name = next_version(into.name)
526
            # try again with next version of name
527
    
528
    if add_indexes_: add_indexes(db, into)
529
    
530
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
531
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
532
    # table is going to be used in complex queries, it is wise to run ANALYZE on
533
    # the temporary table after it is populated."
534
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
535
    # If into is not a temp table, ANALYZE is useful but not required.
536
    analyze(db, into)
537
    
538
    return cur
539

    
540
order_by_pkey = object() # tells mk_select() to order by the pkey
541

    
542
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
543

    
544
def mk_select(db, tables, fields=None, conds=None, distinct_on=[], limit=None,
545
    start=None, order_by=order_by_pkey, default_table=None):
546
    '''
547
    @param tables The single table to select from, or a list of tables to join
548
        together, with tables after the first being sql_gen.Join objects
549
    @param fields Use None to select all fields in the table
550
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
551
        * container can be any iterable type
552
        * compare_left_side: sql_gen.Code|str (for col name)
553
        * compare_right_side: sql_gen.ValueCond|literal value
554
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
555
        use all columns
556
    @return query
557
    '''
558
    # Parse tables param
559
    tables = lists.mk_seq(tables)
560
    tables = list(tables) # don't modify input! (list() copies input)
561
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
562
    
563
    # Parse other params
564
    if conds == None: conds = []
565
    elif dicts.is_dict(conds): conds = conds.items()
566
    conds = list(conds) # don't modify input! (list() copies input)
567
    assert limit == None or type(limit) == int
568
    assert start == None or type(start) == int
569
    if order_by is order_by_pkey:
570
        if distinct_on != []: order_by = None
571
        else: order_by = pkey(db, table0, recover=True)
572
    
573
    query = 'SELECT'
574
    
575
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
576
    
577
    # DISTINCT ON columns
578
    if distinct_on != []:
579
        query += '\nDISTINCT'
580
        if distinct_on is not distinct_on_all:
581
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
582
    
583
    # Columns
584
    if fields == None:
585
        if query.find('\n') >= 0: whitespace = '\n'
586
        else: whitespace = ' '
587
        query += whitespace+'*'
588
    else:
589
        assert fields != []
590
        query += '\n'+('\n, '.join(map(parse_col, fields)))
591
    
592
    # Main table
593
    query += '\nFROM '+table0.to_str(db)
594
    
595
    # Add joins
596
    left_table = table0
597
    for join_ in tables:
598
        table = join_.table
599
        
600
        # Parse special values
601
        if join_.type_ is sql_gen.filter_out: # filter no match
602
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
603
                sql_gen.CompareCond(None, '~=')))
604
        
605
        query += '\n'+join_.to_str(db, left_table)
606
        
607
        left_table = table
608
    
609
    missing = True
610
    if conds != []:
611
        if len(conds) == 1: whitespace = ' '
612
        else: whitespace = '\n'
613
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
614
            .to_str(db) for l, r in conds], 'WHERE')
615
        missing = False
616
    if order_by != None:
617
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
618
    if limit != None: query += '\nLIMIT '+str(limit); missing = False
619
    if start != None:
620
        if start != 0: query += '\nOFFSET '+str(start)
621
        missing = False
622
    if missing: warnings.warn(DbWarning(
623
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
624
    
625
    return query
626

    
627
def select(db, *args, **kw_args):
628
    '''For params, see mk_select() and run_query()'''
629
    recover = kw_args.pop('recover', None)
630
    cacheable = kw_args.pop('cacheable', True)
631
    log_level = kw_args.pop('log_level', 2)
632
    
633
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
634
        log_level=log_level)
635

    
636
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
637
    embeddable=False, ignore=False):
638
    '''
639
    @param returning str|None An inserted column (such as pkey) to return
640
    @param embeddable Whether the query should be embeddable as a nested SELECT.
641
        Warning: If you set this and cacheable=True when the query is run, the
642
        query will be fully cached, not just if it raises an exception.
643
    @param ignore Whether to ignore duplicate keys.
644
    '''
645
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
646
    if cols == []: cols = None # no cols (all defaults) = unknown col names
647
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
648
    if select_query == None: select_query = 'DEFAULT VALUES'
649
    if returning != None: returning = sql_gen.as_Col(returning, table)
650
    
651
    first_line = 'INSERT INTO '+table.to_str(db)
652
    
653
    def mk_insert(select_query):
654
        query = first_line
655
        if cols != None:
656
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
657
        query += '\n'+select_query
658
        
659
        if returning != None:
660
            returning_name_col = sql_gen.to_name_only_col(returning)
661
            query += '\nRETURNING '+returning_name_col.to_str(db)
662
        
663
        return query
664
    
665
    return_type = 'unknown'
666
    if returning != None: return_type = returning.to_str(db)+'%TYPE'
667
    
668
    lang = 'sql'
669
    if ignore:
670
        # Always return something to set the correct rowcount
671
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
672
        
673
        embeddable = True # must use function
674
        lang = 'plpgsql'
675
        
676
        if cols == None:
677
            row = [sql_gen.Col(sql_gen.all_cols, 'row')]
678
            row_vars = [sql_gen.Table('row')]
679
        else:
680
            row_vars = row = [sql_gen.Col(c.name, 'row') for c in cols]
681
        
682
        query = '''\
683
DECLARE
684
    row '''+table.to_str(db)+'''%ROWTYPE;
685
BEGIN
686
    /* Need an EXCEPTION block for each individual row because "When an error is
687
    caught by an EXCEPTION clause, [...] all changes to persistent database
688
    state within the block are rolled back."
689
    This is unfortunate because "A block containing an EXCEPTION clause is
690
    significantly more expensive to enter and exit than a block without one."
691
    (http://www.postgresql.org/docs/8.3/static/plpgsql-control-structures.html\
692
#PLPGSQL-ERROR-TRAPPING)
693
    */
694
    FOR '''+(', '.join((v.to_str(db) for v in row_vars)))+''' IN
695
'''+select_query+'''
696
    LOOP
697
        BEGIN
698
            RETURN QUERY
699
'''+mk_insert(sql_gen.Values(row).to_str(db))+'''
700
;
701
        EXCEPTION
702
            WHEN unique_violation THEN NULL; -- continue to next row
703
        END;
704
    END LOOP;
705
END;\
706
'''
707
    else: query = mk_insert(select_query)
708
    
709
    if embeddable:
710
        # Create function
711
        function_name = sql_gen.clean_name(first_line)
712
        while True:
713
            try:
714
                function = db.TempFunction(function_name)
715
                
716
                function_query = '''\
717
CREATE FUNCTION '''+function.to_str(db)+'''()
718
RETURNS SETOF '''+return_type+'''
719
LANGUAGE '''+lang+'''
720
AS $$
721
'''+query+'''
722
$$;
723
'''
724
                run_query(db, function_query, recover=True, cacheable=True,
725
                    log_ignore_excs=(DuplicateException,))
726
                break # this version was successful
727
            except DuplicateException, e:
728
                function_name = next_version(function_name)
729
                # try again with next version of name
730
        
731
        # Return query that uses function
732
        cols = None
733
        if returning != None: cols = [returning]
734
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(function),
735
            cols) # AS clause requires function alias
736
        return mk_select(db, func_table, start=0, order_by=None)
737
    
738
    return query
739

    
740
def insert_select(db, table, *args, **kw_args):
741
    '''For params, see mk_insert_select() and run_query_into()
742
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
743
        values in
744
    '''
745
    into = kw_args.pop('into', None)
746
    if into != None: kw_args['embeddable'] = True
747
    recover = kw_args.pop('recover', None)
748
    if kw_args.get('ignore', False): recover = True
749
    cacheable = kw_args.pop('cacheable', True)
750
    log_level = kw_args.pop('log_level', 2)
751
    
752
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
753
        into, recover=recover, cacheable=cacheable, log_level=log_level)
754
    autoanalyze(db, table)
755
    return cur
756

    
757
default = sql_gen.default # tells insert() to use the default value for a column
758

    
759
def insert(db, table, row, *args, **kw_args):
760
    '''For params, see insert_select()'''
761
    if lists.is_seq(row): cols = None
762
    else:
763
        cols = row.keys()
764
        row = row.values()
765
    row = list(row) # ensure that "== []" works
766
    
767
    if row == []: query = None
768
    else: query = sql_gen.Values(row).to_str(db)
769
    
770
    return insert_select(db, table, cols, query, *args, **kw_args)
771

    
772
def mk_update(db, table, changes=None, cond=None, in_place=False):
773
    '''
774
    @param changes [(col, new_value),...]
775
        * container can be any iterable type
776
        * col: sql_gen.Code|str (for col name)
777
        * new_value: sql_gen.Code|literal value
778
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
779
    @param in_place If set, locks the table and updates rows in place.
780
        This avoids creating dead rows in PostgreSQL.
781
        * cond must be None
782
    @return str query
783
    '''
784
    table = sql_gen.as_Table(table)
785
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
786
        for c, v in changes]
787
    
788
    if in_place:
789
        assert cond == None
790
        
791
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
792
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
793
            +db.col_info(sql_gen.with_default_table(c, table)).type
794
            +'\nUSING '+v.to_str(db) for c, v in changes))
795
    else:
796
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
797
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
798
            for c, v in changes))
799
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
800
    
801
    return query
802

    
803
def update(db, table, *args, **kw_args):
804
    '''For params, see mk_update() and run_query()'''
805
    recover = kw_args.pop('recover', None)
806
    cacheable = kw_args.pop('cacheable', False)
807
    log_level = kw_args.pop('log_level', 2)
808
    
809
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
810
        cacheable, log_level=log_level)
811
    autoanalyze(db, table)
812
    return cur
813

    
814
def last_insert_id(db):
815
    module = util.root_module(db.db)
816
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
817
    elif module == 'MySQLdb': return db.insert_id()
818
    else: return None
819

    
820
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
821
    '''Creates a mapping from original column names (which may have collisions)
822
    to names that will be distinct among the columns' tables.
823
    This is meant to be used for several tables that are being joined together.
824
    @param cols The columns to combine. Duplicates will be removed.
825
    @param into The table for the new columns.
826
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
827
        columns will be included in the mapping even if they are not in cols.
828
        The tables of the provided Col objects will be changed to into, so make
829
        copies of them if you want to keep the original tables.
830
    @param as_items Whether to return a list of dict items instead of a dict
831
    @return dict(orig_col=new_col, ...)
832
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
833
        * new_col: sql_gen.Col(orig_col_name, into)
834
        * All mappings use the into table so its name can easily be
835
          changed for all columns at once
836
    '''
837
    cols = lists.uniqify(cols)
838
    
839
    items = []
840
    for col in preserve:
841
        orig_col = copy.copy(col)
842
        col.table = into
843
        items.append((orig_col, col))
844
    preserve = set(preserve)
845
    for col in cols:
846
        if col not in preserve:
847
            items.append((col, sql_gen.Col(str(col), into, col.srcs)))
848
    
849
    if not as_items: items = dict(items)
850
    return items
851

    
852
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
853
    '''For params, see mk_flatten_mapping()
854
    @return See return value of mk_flatten_mapping()
855
    '''
856
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
857
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
858
    run_query_into(db, mk_select(db, joins, cols, limit=limit, start=start),
859
        into=into, add_indexes_=True)
860
    return dict(items)
861

    
862
##### Database structure introspection
863

    
864
#### Tables
865

    
866
def tables(db, schema_like='public', table_like='%', exact=False):
867
    if exact: compare = '='
868
    else: compare = 'LIKE'
869
    
870
    module = util.root_module(db.db)
871
    if module == 'psycopg2':
872
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
873
            ('tablename', sql_gen.CompareCond(table_like, compare))]
874
        return values(select(db, 'pg_tables', ['tablename'], conds,
875
            order_by='tablename', log_level=4))
876
    elif module == 'MySQLdb':
877
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
878
            , cacheable=True, log_level=4))
879
    else: raise NotImplementedError("Can't list tables for "+module+' database')
880

    
881
def table_exists(db, table):
882
    table = sql_gen.as_Table(table)
883
    return list(tables(db, table.schema, table.name, exact=True)) != []
884

    
885
def table_row_count(db, table, recover=None):
886
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
887
        order_by=None, start=0), recover=recover, log_level=3))
888

    
889
def table_cols(db, table, recover=None):
890
    return list(col_names(select(db, table, limit=0, order_by=None,
891
        recover=recover, log_level=4)))
892

    
893
def pkey(db, table, recover=None):
894
    '''Assumed to be first column in table'''
895
    return table_cols(db, table, recover)[0]
896

    
897
not_null_col = 'not_null_col'
898

    
899
def table_not_null_col(db, table, recover=None):
900
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
901
    if not_null_col in table_cols(db, table, recover): return not_null_col
902
    else: return pkey(db, table, recover)
903

    
904
def index_cols(db, table, index):
905
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
906
    automatically created. When you don't know whether something is a UNIQUE
907
    constraint or a UNIQUE index, use this function.'''
908
    module = util.root_module(db.db)
909
    if module == 'psycopg2':
910
        return list(values(run_query(db, '''\
911
SELECT attname
912
FROM
913
(
914
        SELECT attnum, attname
915
        FROM pg_index
916
        JOIN pg_class index ON index.oid = indexrelid
917
        JOIN pg_class table_ ON table_.oid = indrelid
918
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
919
        WHERE
920
            table_.relname = '''+db.esc_value(table)+'''
921
            AND index.relname = '''+db.esc_value(index)+'''
922
    UNION
923
        SELECT attnum, attname
924
        FROM
925
        (
926
            SELECT
927
                indrelid
928
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
929
                    AS indkey
930
            FROM pg_index
931
            JOIN pg_class index ON index.oid = indexrelid
932
            JOIN pg_class table_ ON table_.oid = indrelid
933
            WHERE
934
                table_.relname = '''+db.esc_value(table)+'''
935
                AND index.relname = '''+db.esc_value(index)+'''
936
        ) s
937
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
938
) s
939
ORDER BY attnum
940
'''
941
            , cacheable=True, log_level=4)))
942
    else: raise NotImplementedError("Can't list index columns for "+module+
943
        ' database')
944

    
945
def constraint_cols(db, table, constraint):
946
    module = util.root_module(db.db)
947
    if module == 'psycopg2':
948
        return list(values(run_query(db, '''\
949
SELECT attname
950
FROM pg_constraint
951
JOIN pg_class ON pg_class.oid = conrelid
952
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
953
WHERE
954
    relname = '''+db.esc_value(table)+'''
955
    AND conname = '''+db.esc_value(constraint)+'''
956
ORDER BY attnum
957
'''
958
            )))
959
    else: raise NotImplementedError("Can't list constraint columns for "+module+
960
        ' database')
961

    
962
#### Functions
963

    
964
def function_exists(db, function):
965
    function = sql_gen.as_Function(function)
966
    
967
    info_table = sql_gen.Table('routines', 'information_schema')
968
    conds = [('routine_name', function.name)]
969
    schema = function.schema
970
    if schema != None: conds.append(('routine_schema', schema))
971
    # Exclude trigger functions, since they cannot be called directly
972
    conds.append(('data_type', sql_gen.CompareCond('trigger', '!=')))
973
    
974
    return list(values(select(db, info_table, ['routine_name'], conds,
975
        order_by='routine_schema', limit=1, log_level=4))) != []
976
        # TODO: order_by search_path schema order
977

    
978
##### Structural changes
979

    
980
#### Columns
981

    
982
def add_col(db, table, col, comment=None, **kw_args):
983
    '''
984
    @param col TypedCol Name may be versioned, so be sure to propagate any
985
        renaming back to any source column for the TypedCol.
986
    @param comment None|str SQL comment used to distinguish columns of the same
987
        name from each other when they contain different data, to allow the
988
        ADD COLUMN query to be cached. If not set, query will not be cached.
989
    '''
990
    assert isinstance(col, sql_gen.TypedCol)
991
    
992
    while True:
993
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
994
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
995
        
996
        try:
997
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
998
            break
999
        except DuplicateException:
1000
            col.name = next_version(col.name)
1001
            # try again with next version of name
1002

    
1003
def add_not_null(db, col):
1004
    table = col.table
1005
    col = sql_gen.to_name_only_col(col)
1006
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1007
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1008

    
1009
row_num_col = '_row_num'
1010

    
1011
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1012
    constraints='PRIMARY KEY')
1013

    
1014
def add_row_num(db, table):
1015
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1016
    be the primary key.'''
1017
    add_col(db, table, row_num_typed_col, log_level=3)
1018

    
1019
#### Indexes
1020

    
1021
def add_pkey(db, table, cols=None, recover=None):
1022
    '''Adds a primary key.
1023
    @param cols [sql_gen.Col,...] The columns in the primary key.
1024
        Defaults to the first column in the table.
1025
    @pre The table must not already have a primary key.
1026
    '''
1027
    table = sql_gen.as_Table(table)
1028
    if cols == None: cols = [pkey(db, table, recover)]
1029
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1030
    
1031
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1032
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1033
        log_ignore_excs=(DuplicateException,))
1034

    
1035
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1036
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1037
    Currently, only function calls are supported as expressions.
1038
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1039
        This allows indexes to be used for comparisons where NULLs are equal.
1040
    '''
1041
    exprs = lists.mk_seq(exprs)
1042
    
1043
    # Parse exprs
1044
    old_exprs = exprs[:]
1045
    exprs = []
1046
    cols = []
1047
    for i, expr in enumerate(old_exprs):
1048
        expr = sql_gen.as_Col(expr, table)
1049
        
1050
        # Handle nullable columns
1051
        if ensure_not_null_:
1052
            try: expr = ensure_not_null(db, expr)
1053
            except KeyError: pass # unknown type, so just create plain index
1054
        
1055
        # Extract col
1056
        expr = copy.deepcopy(expr) # don't modify input!
1057
        if isinstance(expr, sql_gen.FunctionCall):
1058
            col = expr.args[0]
1059
            expr = sql_gen.Expr(expr)
1060
        else: col = expr
1061
        assert isinstance(col, sql_gen.Col)
1062
        
1063
        # Extract table
1064
        if table == None:
1065
            assert sql_gen.is_table_col(col)
1066
            table = col.table
1067
        
1068
        col.table = None
1069
        
1070
        exprs.append(expr)
1071
        cols.append(col)
1072
    
1073
    table = sql_gen.as_Table(table)
1074
    index = sql_gen.Table(str(sql_gen.Col(','.join(map(str, cols)), table)))
1075
    
1076
    # Add index
1077
    while True:
1078
        str_ = 'CREATE'
1079
        if unique: str_ += ' UNIQUE'
1080
        str_ += ' INDEX '+index.to_str(db)+' ON '+table.to_str(db)+' ('+(
1081
            ', '.join((v.to_str(db) for v in exprs)))+')'
1082
        
1083
        try:
1084
            run_query(db, str_, recover=True, cacheable=True, log_level=3,
1085
                log_ignore_excs=(DuplicateException,))
1086
            break
1087
        except DuplicateException:
1088
            index.name = next_version(index.name)
1089
            # try again with next version of name
1090

    
1091
def add_index_col(db, col, suffix, expr, nullable=True):
1092
    if sql_gen.index_col(col) != None: return # already has index col
1093
    
1094
    new_col = sql_gen.suffixed_col(col, suffix)
1095
    
1096
    # Add column
1097
    new_typed_col = sql_gen.TypedCol(new_col.name, db.col_info(col).type)
1098
    add_col(db, col.table, new_typed_col, comment='src: '+repr(col),
1099
        log_level=3)
1100
    new_col.name = new_typed_col.name # propagate any renaming
1101
    
1102
    update(db, col.table, [(new_col, expr)], in_place=True, cacheable=True,
1103
        log_level=3)
1104
    if not nullable: add_not_null(db, new_col)
1105
    add_index(db, new_col)
1106
    
1107
    col.table.index_cols[col.name] = new_col.name
1108

    
1109
# Controls when ensure_not_null() will use index columns
1110
not_null_index_cols_min_rows = 0 # rows; initially always use index columns
1111

    
1112
def ensure_not_null(db, col):
1113
    '''For params, see sql_gen.ensure_not_null()'''
1114
    expr = sql_gen.ensure_not_null(db, col)
1115
    
1116
    # If a nullable column in a temp table, add separate index column instead.
1117
    # Note that for small datasources, this adds 6-25% to the total import time.
1118
    if (sql_gen.is_temp_col(col) and isinstance(expr, sql_gen.EnsureNotNull)
1119
        and table_row_count(db, col.table) >= not_null_index_cols_min_rows):
1120
        add_index_col(db, col, '::NOT NULL', expr, nullable=False)
1121
        expr = sql_gen.index_col(col)
1122
    
1123
    return expr
1124

    
1125
already_indexed = object() # tells add_indexes() the pkey has already been added
1126

    
1127
def add_indexes(db, table, has_pkey=True):
1128
    '''Adds an index on all columns in a table.
1129
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1130
        index should be added on the first column.
1131
        * If already_indexed, the pkey is assumed to have already been added
1132
    '''
1133
    cols = table_cols(db, table)
1134
    if has_pkey:
1135
        if has_pkey is not already_indexed: add_pkey(db, table)
1136
        cols = cols[1:]
1137
    for col in cols: add_index(db, col, table)
1138

    
1139
#### Tables
1140

    
1141
### Maintenance
1142

    
1143
def analyze(db, table):
1144
    table = sql_gen.as_Table(table)
1145
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1146

    
1147
def autoanalyze(db, table):
1148
    if db.autoanalyze: analyze(db, table)
1149

    
1150
def vacuum(db, table):
1151
    table = sql_gen.as_Table(table)
1152
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1153
        log_level=3))
1154

    
1155
### Lifecycle
1156

    
1157
def drop_table(db, table):
1158
    table = sql_gen.as_Table(table)
1159
    return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE')
1160

    
1161
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1162
    like=None):
1163
    '''Creates a table.
1164
    @param cols [sql_gen.TypedCol,...] The column names and types
1165
    @param has_pkey If set, the first column becomes the primary key.
1166
    @param col_indexes bool|[ref]
1167
        * If True, indexes will be added on all non-pkey columns.
1168
        * If a list reference, [0] will be set to a function to do this.
1169
          This can be used to delay index creation until the table is populated.
1170
    '''
1171
    table = sql_gen.as_Table(table)
1172
    
1173
    if like != None:
1174
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1175
            ]+cols
1176
    if has_pkey:
1177
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1178
        pkey.constraints = 'PRIMARY KEY'
1179
    
1180
    temp = table.is_temp and not db.debug_temp
1181
        # temp tables permanent in debug_temp mode
1182
    
1183
    # Create table
1184
    while True:
1185
        str_ = 'CREATE'
1186
        if temp: str_ += ' TEMP'
1187
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1188
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1189
        str_ += '\n);\n'
1190
        
1191
        try:
1192
            run_query(db, str_, cacheable=True, log_level=2,
1193
                log_ignore_excs=(DuplicateException,))
1194
            break
1195
        except DuplicateException:
1196
            table.name = next_version(table.name)
1197
            # try again with next version of name
1198
    
1199
    # Add indexes
1200
    if has_pkey: has_pkey = already_indexed
1201
    def add_indexes_(): add_indexes(db, table, has_pkey)
1202
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1203
    elif col_indexes: add_indexes_() # add now
1204

    
1205
def copy_table_struct(db, src, dest):
1206
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1207
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1208

    
1209
### Data
1210

    
1211
def truncate(db, table, schema='public', **kw_args):
1212
    '''For params, see run_query()'''
1213
    table = sql_gen.as_Table(table, schema)
1214
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1215

    
1216
def empty_temp(db, tables):
1217
    if db.debug_temp: return # leave temp tables there for debugging
1218
    tables = lists.mk_seq(tables)
1219
    for table in tables: truncate(db, table, log_level=3)
1220

    
1221
def empty_db(db, schema='public', **kw_args):
1222
    '''For kw_args, see tables()'''
1223
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1224

    
1225
def distinct_table(db, table, distinct_on):
1226
    '''Creates a copy of a temp table which is distinct on the given columns.
1227
    The old and new tables will both get an index on these columns, to
1228
    facilitate merge joins.
1229
    @param distinct_on If empty, creates a table with one row. This is useful if
1230
        your distinct_on columns are all literal values.
1231
    @return The new table.
1232
    '''
1233
    new_table = sql_gen.suffixed_table(table, '_distinct')
1234
    
1235
    copy_table_struct(db, table, new_table)
1236
    
1237
    limit = None
1238
    if distinct_on == []: limit = 1 # one sample row
1239
    else:
1240
        add_index(db, distinct_on, new_table, unique=True)
1241
        add_index(db, distinct_on, table) # for join optimization
1242
    
1243
    insert_select(db, new_table, None, mk_select(db, table, start=0,
1244
        limit=limit), ignore=True)
1245
    analyze(db, new_table)
1246
    
1247
    return new_table
(24-24/37)