Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import warnings
6

    
7
import exc
8
import dicts
9
import iters
10
import lists
11
from Proxy import Proxy
12
import rand
13
import sql_gen
14
import strings
15
import util
16

    
17
##### Exceptions
18

    
19
def get_cur_query(cur, input_query=None):
20
    raw_query = None
21
    if hasattr(cur, 'query'): raw_query = cur.query
22
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
23
    
24
    if raw_query != None: return raw_query
25
    else: return '[input] '+strings.ustr(input_query)
26

    
27
def _add_cursor_info(e, *args, **kw_args):
28
    '''For params, see get_cur_query()'''
29
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
30

    
31
class DbException(exc.ExceptionWithCause):
32
    def __init__(self, msg, cause=None, cur=None):
33
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
34
        if cur != None: _add_cursor_info(self, cur)
35

    
36
class ExceptionWithName(DbException):
37
    def __init__(self, name, cause=None):
38
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name)), cause)
39
        self.name = name
40

    
41
class ExceptionWithValue(DbException):
42
    def __init__(self, value, cause=None):
43
        DbException.__init__(self, 'for value: '+strings.as_tt(repr(value)),
44
            cause)
45
        self.value = value
46

    
47
class ExceptionWithNameType(DbException):
48
    def __init__(self, type_, name, cause=None):
49
        DbException.__init__(self, 'for type: '+strings.as_tt(str(type_))
50
            +'; name: '+strings.as_tt(name), cause)
51
        self.type = type_
52
        self.name = name
53

    
54
class ConstraintException(DbException):
55
    def __init__(self, name, cols, cause=None):
56
        DbException.__init__(self, 'Violated '+strings.as_tt(name)
57
            +' constraint on columns: '+strings.as_tt(', '.join(cols)), cause)
58
        self.name = name
59
        self.cols = cols
60

    
61
class MissingCastException(DbException):
62
    def __init__(self, type_, col, cause=None):
63
        DbException.__init__(self, 'Missing cast to type '+strings.as_tt(type_)
64
            +' on column: '+strings.as_tt(col), cause)
65
        self.type = type_
66
        self.col = col
67

    
68
class NameException(DbException): pass
69

    
70
class DuplicateKeyException(ConstraintException): pass
71

    
72
class NullValueException(ConstraintException): pass
73

    
74
class InvalidValueException(ExceptionWithValue): pass
75

    
76
class DuplicateException(ExceptionWithNameType): pass
77

    
78
class EmptyRowException(DbException): pass
79

    
80
##### Warnings
81

    
82
class DbWarning(UserWarning): pass
83

    
84
##### Result retrieval
85

    
86
def col_names(cur): return (col[0] for col in cur.description)
87

    
88
def rows(cur): return iter(lambda: cur.fetchone(), None)
89

    
90
def consume_rows(cur):
91
    '''Used to fetch all rows so result will be cached'''
92
    iters.consume_iter(rows(cur))
93

    
94
def next_row(cur): return rows(cur).next()
95

    
96
def row(cur):
97
    row_ = next_row(cur)
98
    consume_rows(cur)
99
    return row_
100

    
101
def next_value(cur): return next_row(cur)[0]
102

    
103
def value(cur): return row(cur)[0]
104

    
105
def values(cur): return iters.func_iter(lambda: next_value(cur))
106

    
107
def value_or_none(cur):
108
    try: return value(cur)
109
    except StopIteration: return None
110

    
111
##### Escaping
112

    
113
def esc_name_by_module(module, name):
114
    if module == 'psycopg2' or module == None: quote = '"'
115
    elif module == 'MySQLdb': quote = '`'
116
    else: raise NotImplementedError("Can't escape name for "+module+' database')
117
    return sql_gen.esc_name(name, quote)
118

    
119
def esc_name_by_engine(engine, name, **kw_args):
120
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
121

    
122
def esc_name(db, name, **kw_args):
123
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
124

    
125
def qual_name(db, schema, table):
126
    def esc_name_(name): return esc_name(db, name)
127
    table = esc_name_(table)
128
    if schema != None: return esc_name_(schema)+'.'+table
129
    else: return table
130

    
131
##### Database connections
132

    
133
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
134

    
135
db_engines = {
136
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
137
    'PostgreSQL': ('psycopg2', {}),
138
}
139

    
140
DatabaseErrors_set = set([DbException])
141
DatabaseErrors = tuple(DatabaseErrors_set)
142

    
143
def _add_module(module):
144
    DatabaseErrors_set.add(module.DatabaseError)
145
    global DatabaseErrors
146
    DatabaseErrors = tuple(DatabaseErrors_set)
147

    
148
def db_config_str(db_config):
149
    return db_config['engine']+' database '+db_config['database']
150

    
151
log_debug_none = lambda msg, level=2: None
152

    
153
class DbConn:
154
    def __init__(self, db_config, autocommit=True, caching=True,
155
        log_debug=log_debug_none, debug_temp=False):
156
        '''
157
        @param debug_temp Whether temporary objects should instead be permanent.
158
            This assists in debugging the internal objects used by the program.
159
        '''
160
        self.db_config = db_config
161
        self.autocommit = autocommit
162
        self.caching = caching
163
        self.log_debug = log_debug
164
        self.debug = log_debug != log_debug_none
165
        self.debug_temp = debug_temp
166
        self.autoanalyze = False
167
        
168
        self.reset()
169
    
170
    def __getattr__(self, name):
171
        if name == '__dict__': raise Exception('getting __dict__')
172
        if name == 'db': return self._db()
173
        else: raise AttributeError()
174
    
175
    def __getstate__(self):
176
        state = copy.copy(self.__dict__) # shallow copy
177
        state['log_debug'] = None # don't pickle the debug callback
178
        state['_DbConn__db'] = None # don't pickle the connection
179
        return state
180
    
181
    def clear_cache(self): self.query_results = {}
182
    
183
    def reset(self):
184
        self.clear_cache()
185
        self._savepoint = 0
186
        self._notices_seen = set()
187
        self.__db = None
188
    
189
    def connected(self): return self.__db != None
190
    
191
    def close(self):
192
        if self.connected(): self.db.close()
193
        self.__db = None
194
    
195
    def _db(self):
196
        if self.__db == None:
197
            # Process db_config
198
            db_config = self.db_config.copy() # don't modify input!
199
            schemas = db_config.pop('schemas', None)
200
            module_name, mappings = db_engines[db_config.pop('engine')]
201
            module = __import__(module_name)
202
            _add_module(module)
203
            for orig, new in mappings.iteritems():
204
                try: util.rename_key(db_config, orig, new)
205
                except KeyError: pass
206
            
207
            # Connect
208
            self.__db = module.connect(**db_config)
209
            
210
            # Configure connection
211
            if hasattr(self.db, 'set_isolation_level'):
212
                import psycopg2.extensions
213
                self.db.set_isolation_level(
214
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
215
            if schemas != None:
216
                search_path = [self.esc_name(s) for s in schemas.split(',')]
217
                search_path.append(value(run_query(self, 'SHOW search_path',
218
                    log_level=4)))
219
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
220
                    log_level=3)
221
        
222
        return self.__db
223
    
224
    class DbCursor(Proxy):
225
        def __init__(self, outer):
226
            Proxy.__init__(self, outer.db.cursor())
227
            self.outer = outer
228
            self.query_results = outer.query_results
229
            self.query_lookup = None
230
            self.result = []
231
        
232
        def execute(self, query):
233
            self._is_insert = query.startswith('INSERT')
234
            self.query_lookup = query
235
            try:
236
                try:
237
                    cur = self.inner.execute(query)
238
                    self.outer.do_autocommit()
239
                finally: self.query = get_cur_query(self.inner, query)
240
            except Exception, e:
241
                _add_cursor_info(e, self, query)
242
                self.result = e # cache the exception as the result
243
                self._cache_result()
244
                raise
245
            
246
            # Always cache certain queries
247
            if query.startswith('CREATE') or query.startswith('ALTER'):
248
                # structural changes
249
                # Rest of query must be unique in the face of name collisions,
250
                # so don't cache ADD COLUMN unless it has distinguishing comment
251
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
252
                    self._cache_result()
253
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
254
                consume_rows(self) # fetch all rows so result will be cached
255
            
256
            return cur
257
        
258
        def fetchone(self):
259
            row = self.inner.fetchone()
260
            if row != None: self.result.append(row)
261
            # otherwise, fetched all rows
262
            else: self._cache_result()
263
            return row
264
        
265
        def _cache_result(self):
266
            # For inserts that return a result set, don't cache result set since
267
            # inserts are not idempotent. Other non-SELECT queries don't have
268
            # their result set read, so only exceptions will be cached (an
269
            # invalid query will always be invalid).
270
            if self.query_results != None and (not self._is_insert
271
                or isinstance(self.result, Exception)):
272
                
273
                assert self.query_lookup != None
274
                self.query_results[self.query_lookup] = self.CacheCursor(
275
                    util.dict_subset(dicts.AttrsDictView(self),
276
                    ['query', 'result', 'rowcount', 'description']))
277
        
278
        class CacheCursor:
279
            def __init__(self, cached_result): self.__dict__ = cached_result
280
            
281
            def execute(self, *args, **kw_args):
282
                if isinstance(self.result, Exception): raise self.result
283
                # otherwise, result is a rows list
284
                self.iter = iter(self.result)
285
            
286
            def fetchone(self):
287
                try: return self.iter.next()
288
                except StopIteration: return None
289
    
290
    def esc_value(self, value):
291
        try: str_ = self.mogrify('%s', [value])
292
        except NotImplementedError, e:
293
            module = util.root_module(self.db)
294
            if module == 'MySQLdb':
295
                import _mysql
296
                str_ = _mysql.escape_string(value)
297
            else: raise e
298
        return strings.to_unicode(str_)
299
    
300
    def esc_name(self, name): return esc_name(self, name) # calls global func
301
    
302
    def std_code(self, str_):
303
        '''Standardizes SQL code.
304
        * Ensures that string literals are prefixed by `E`
305
        '''
306
        if str_.startswith("'"): str_ = 'E'+str_
307
        return str_
308
    
309
    def can_mogrify(self):
310
        module = util.root_module(self.db)
311
        return module == 'psycopg2'
312
    
313
    def mogrify(self, query, params=None):
314
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
315
        else: raise NotImplementedError("Can't mogrify query")
316
    
317
    def print_notices(self):
318
        if hasattr(self.db, 'notices'):
319
            for msg in self.db.notices:
320
                if msg not in self._notices_seen:
321
                    self._notices_seen.add(msg)
322
                    self.log_debug(msg, level=2)
323
    
324
    def run_query(self, query, cacheable=False, log_level=2,
325
        debug_msg_ref=None):
326
        '''
327
        @param log_ignore_excs The log_level will be increased by 2 if the query
328
            throws one of these exceptions.
329
        @param debug_msg_ref If specified, the log message will be returned in
330
            this instead of being output. This allows you to filter log messages
331
            depending on the result of the query.
332
        '''
333
        assert query != None
334
        
335
        if not self.caching: cacheable = False
336
        used_cache = False
337
        
338
        def log_msg(query):
339
            if used_cache: cache_status = 'cache hit'
340
            elif cacheable: cache_status = 'cache miss'
341
            else: cache_status = 'non-cacheable'
342
            return 'DB query: '+cache_status+':\n'+strings.as_code(query, 'SQL')
343
        
344
        try:
345
            # Get cursor
346
            if cacheable:
347
                try:
348
                    cur = self.query_results[query]
349
                    used_cache = True
350
                except KeyError: cur = self.DbCursor(self)
351
            else: cur = self.db.cursor()
352
            
353
            # Log query
354
            if self.debug and debug_msg_ref == None: # log before running
355
                self.log_debug(log_msg(query), log_level)
356
            
357
            # Run query
358
            cur.execute(query)
359
        finally:
360
            self.print_notices()
361
            if self.debug and debug_msg_ref != None: # return after running
362
                debug_msg_ref[0] = log_msg(str(get_cur_query(cur, query)))
363
        
364
        return cur
365
    
366
    def is_cached(self, query): return query in self.query_results
367
    
368
    def with_autocommit(self, func):
369
        import psycopg2.extensions
370
        
371
        prev_isolation_level = self.db.isolation_level
372
        self.db.set_isolation_level(
373
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
374
        try: return func()
375
        finally: self.db.set_isolation_level(prev_isolation_level)
376
    
377
    def with_savepoint(self, func):
378
        savepoint = 'level_'+str(self._savepoint)
379
        self.run_query('SAVEPOINT '+savepoint, log_level=4)
380
        self._savepoint += 1
381
        try: return func()
382
        except:
383
            self.run_query('ROLLBACK TO SAVEPOINT '+savepoint, log_level=4)
384
            raise
385
        finally:
386
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
387
            # "The savepoint remains valid and can be rolled back to again"
388
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
389
            self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
390
            
391
            self._savepoint -= 1
392
            assert self._savepoint >= 0
393
            
394
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
395
    
396
    def do_autocommit(self):
397
        '''Autocommits if outside savepoint'''
398
        assert self._savepoint >= 0
399
        if self.autocommit and self._savepoint == 0:
400
            self.log_debug('Autocommitting', level=4)
401
            self.db.commit()
402
    
403
    def col_info(self, col):
404
        table = sql_gen.Table('columns', 'information_schema')
405
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
406
            'USER-DEFINED'), sql_gen.Col('udt_name'))
407
        cols = [type_, 'column_default',
408
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
409
        
410
        conds = [('table_name', col.table.name), ('column_name', col.name)]
411
        schema = col.table.schema
412
        if schema != None: conds.append(('table_schema', schema))
413
        
414
        type_, default, nullable = row(select(self, table, cols, conds,
415
            order_by='table_schema', limit=1, cacheable=False, log_level=4))
416
            # TODO: order_by search_path schema order
417
        default = sql_gen.as_Code(default, self)
418
        
419
        return sql_gen.TypedCol(col.name, type_, default, nullable)
420
    
421
    def TempFunction(self, name):
422
        if self.debug_temp: schema = None
423
        else: schema = 'pg_temp'
424
        return sql_gen.Function(name, schema)
425

    
426
connect = DbConn
427

    
428
##### Recoverable querying
429

    
430
def with_savepoint(db, func): return db.with_savepoint(func)
431

    
432
def run_query(db, query, recover=None, cacheable=False, log_level=2,
433
    log_ignore_excs=None, **kw_args):
434
    '''For params, see DbConn.run_query()'''
435
    if recover == None: recover = False
436
    if log_ignore_excs == None: log_ignore_excs = ()
437
    log_ignore_excs = tuple(log_ignore_excs)
438
    
439
    debug_msg_ref = None # usually, db.run_query() logs query before running it
440
    # But if filtering with log_ignore_excs, wait until after exception parsing
441
    if log_ignore_excs != () or not db.can_mogrify(): debug_msg_ref = [None]
442
    
443
    try:
444
        try:
445
            def run(): return db.run_query(query, cacheable, log_level,
446
                debug_msg_ref, **kw_args)
447
            if recover and not db.is_cached(query):
448
                return with_savepoint(db, run)
449
            else: return run() # don't need savepoint if cached
450
        except Exception, e:
451
            msg = strings.ustr(e.args[0])
452
            
453
            match = re.match(r'^duplicate key value violates unique constraint '
454
                r'"((_?[^\W_]+(?=[._]))?.+?)"', msg)
455
            if match:
456
                constraint, table = match.groups()
457
                cols = []
458
                if recover: # need auto-rollback to run index_cols()
459
                    try: cols = index_cols(db, table, constraint)
460
                    except NotImplementedError: pass
461
                raise DuplicateKeyException(constraint, cols, e)
462
            
463
            match = re.match(r'^null value in column "(.+?)" violates not-null'
464
                r' constraint', msg)
465
            if match: raise NullValueException('NOT NULL', [match.group(1)], e)
466
            
467
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
468
                r'|.+? field value out of range): "(.+?)"', msg)
469
            if match:
470
                value, = match.groups()
471
                raise InvalidValueException(strings.to_unicode(value), e)
472
            
473
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
474
                r'is of type', msg)
475
            if match:
476
                col, type_ = match.groups()
477
                raise MissingCastException(type_, col, e)
478
            
479
            match = re.match(r'^(\S+) "(.+?)".*? already exists', msg)
480
            if match:
481
                type_, name = match.groups()
482
                raise DuplicateException(type_, name, e)
483
            
484
            raise # no specific exception raised
485
    except log_ignore_excs:
486
        log_level += 2
487
        raise
488
    finally:
489
        if debug_msg_ref != None and debug_msg_ref[0] != None:
490
            db.log_debug(debug_msg_ref[0], log_level)
491

    
492
##### Basic queries
493

    
494
def next_version(name):
495
    version = 1 # first existing name was version 0
496
    match = re.match(r'^(.*)#(\d+)$', name)
497
    if match:
498
        name, version = match.groups()
499
        version = int(version)+1
500
    return sql_gen.concat(name, '#'+str(version))
501

    
502
def lock_table(db, table, mode):
503
    table = sql_gen.as_Table(table)
504
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
505

    
506
def run_query_into(db, query, into=None, add_indexes_=False, **kw_args):
507
    '''Outputs a query to a temp table.
508
    For params, see run_query().
509
    '''
510
    if into == None: return run_query(db, query, **kw_args)
511
    
512
    assert isinstance(into, sql_gen.Table)
513
    
514
    into.is_temp = True
515
    # "temporary tables cannot specify a schema name", so remove schema
516
    into.schema = None
517
    
518
    kw_args['recover'] = True
519
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
520
    
521
    temp = not db.debug_temp # tables are permanent in debug_temp mode
522
    
523
    # Create table
524
    while True:
525
        create_query = 'CREATE'
526
        if temp: create_query += ' TEMP'
527
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
528
        
529
        try:
530
            cur = run_query(db, create_query, **kw_args)
531
                # CREATE TABLE AS sets rowcount to # rows in query
532
            break
533
        except DuplicateException, e:
534
            into.name = next_version(into.name)
535
            # try again with next version of name
536
    
537
    if add_indexes_: add_indexes(db, into)
538
    
539
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
540
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
541
    # table is going to be used in complex queries, it is wise to run ANALYZE on
542
    # the temporary table after it is populated."
543
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
544
    # If into is not a temp table, ANALYZE is useful but not required.
545
    analyze(db, into)
546
    
547
    return cur
548

    
549
order_by_pkey = object() # tells mk_select() to order by the pkey
550

    
551
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
552

    
553
def mk_select(db, tables, fields=None, conds=None, distinct_on=[], limit=None,
554
    start=None, order_by=order_by_pkey, default_table=None):
555
    '''
556
    @param tables The single table to select from, or a list of tables to join
557
        together, with tables after the first being sql_gen.Join objects
558
    @param fields Use None to select all fields in the table
559
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
560
        * container can be any iterable type
561
        * compare_left_side: sql_gen.Code|str (for col name)
562
        * compare_right_side: sql_gen.ValueCond|literal value
563
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
564
        use all columns
565
    @return query
566
    '''
567
    # Parse tables param
568
    tables = lists.mk_seq(tables)
569
    tables = list(tables) # don't modify input! (list() copies input)
570
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
571
    
572
    # Parse other params
573
    if conds == None: conds = []
574
    elif dicts.is_dict(conds): conds = conds.items()
575
    conds = list(conds) # don't modify input! (list() copies input)
576
    assert limit == None or type(limit) == int
577
    assert start == None or type(start) == int
578
    if order_by is order_by_pkey:
579
        if distinct_on != []: order_by = None
580
        else: order_by = pkey(db, table0, recover=True)
581
    
582
    query = 'SELECT'
583
    
584
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
585
    
586
    # DISTINCT ON columns
587
    if distinct_on != []:
588
        query += '\nDISTINCT'
589
        if distinct_on is not distinct_on_all:
590
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
591
    
592
    # Columns
593
    if fields == None:
594
        if query.find('\n') >= 0: whitespace = '\n'
595
        else: whitespace = ' '
596
        query += whitespace+'*'
597
    else:
598
        assert fields != []
599
        query += '\n'+('\n, '.join(map(parse_col, fields)))
600
    
601
    # Main table
602
    query += '\nFROM '+table0.to_str(db)
603
    
604
    # Add joins
605
    left_table = table0
606
    for join_ in tables:
607
        table = join_.table
608
        
609
        # Parse special values
610
        if join_.type_ is sql_gen.filter_out: # filter no match
611
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
612
                sql_gen.CompareCond(None, '~=')))
613
        
614
        query += '\n'+join_.to_str(db, left_table)
615
        
616
        left_table = table
617
    
618
    missing = True
619
    if conds != []:
620
        if len(conds) == 1: whitespace = ' '
621
        else: whitespace = '\n'
622
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
623
            .to_str(db) for l, r in conds], 'WHERE')
624
        missing = False
625
    if order_by != None:
626
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
627
    if limit != None: query += '\nLIMIT '+str(limit); missing = False
628
    if start != None:
629
        if start != 0: query += '\nOFFSET '+str(start)
630
        missing = False
631
    if missing: warnings.warn(DbWarning(
632
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
633
    
634
    return query
635

    
636
def select(db, *args, **kw_args):
637
    '''For params, see mk_select() and run_query()'''
638
    recover = kw_args.pop('recover', None)
639
    cacheable = kw_args.pop('cacheable', True)
640
    log_level = kw_args.pop('log_level', 2)
641
    
642
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
643
        log_level=log_level)
644

    
645
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
646
    embeddable=False, ignore=False):
647
    '''
648
    @param returning str|None An inserted column (such as pkey) to return
649
    @param embeddable Whether the query should be embeddable as a nested SELECT.
650
        Warning: If you set this and cacheable=True when the query is run, the
651
        query will be fully cached, not just if it raises an exception.
652
    @param ignore Whether to ignore duplicate keys.
653
    '''
654
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
655
    if cols == []: cols = None # no cols (all defaults) = unknown col names
656
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
657
    if select_query == None: select_query = 'DEFAULT VALUES'
658
    if returning != None: returning = sql_gen.as_Col(returning, table)
659
    
660
    first_line = 'INSERT INTO '+table.to_str(db)
661
    
662
    def mk_insert(select_query):
663
        query = first_line
664
        if cols != None:
665
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
666
        query += '\n'+select_query
667
        
668
        if returning != None:
669
            returning_name_col = sql_gen.to_name_only_col(returning)
670
            query += '\nRETURNING '+returning_name_col.to_str(db)
671
        
672
        return query
673
    
674
    return_type = 'unknown'
675
    if returning != None: return_type = returning.to_str(db)+'%TYPE'
676
    
677
    lang = 'sql'
678
    if ignore:
679
        # Always return something to set the correct rowcount
680
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
681
        
682
        embeddable = True # must use function
683
        lang = 'plpgsql'
684
        
685
        if cols == None:
686
            row = [sql_gen.Col(sql_gen.all_cols, 'row')]
687
            row_vars = [sql_gen.Table('row')]
688
        else:
689
            row_vars = row = [sql_gen.Col(c.name, 'row') for c in cols]
690
        
691
        query = '''\
692
DECLARE
693
    row '''+table.to_str(db)+'''%ROWTYPE;
694
BEGIN
695
    /* Need an EXCEPTION block for each individual row because "When an error is
696
    caught by an EXCEPTION clause, [...] all changes to persistent database
697
    state within the block are rolled back."
698
    This is unfortunate because "A block containing an EXCEPTION clause is
699
    significantly more expensive to enter and exit than a block without one."
700
    (http://www.postgresql.org/docs/8.3/static/plpgsql-control-structures.html\
701
#PLPGSQL-ERROR-TRAPPING)
702
    */
703
    FOR '''+(', '.join((v.to_str(db) for v in row_vars)))+''' IN
704
'''+select_query+'''
705
    LOOP
706
        BEGIN
707
            RETURN QUERY
708
'''+mk_insert(sql_gen.Values(row).to_str(db))+'''
709
;
710
        EXCEPTION
711
            WHEN unique_violation THEN NULL; -- continue to next row
712
        END;
713
    END LOOP;
714
END;\
715
'''
716
    else: query = mk_insert(select_query)
717
    
718
    if embeddable:
719
        # Create function
720
        function_name = sql_gen.clean_name(first_line)
721
        while True:
722
            try:
723
                function = db.TempFunction(function_name)
724
                
725
                function_query = '''\
726
CREATE FUNCTION '''+function.to_str(db)+'''()
727
RETURNS SETOF '''+return_type+'''
728
LANGUAGE '''+lang+'''
729
AS $$
730
'''+query+'''
731
$$;
732
'''
733
                run_query(db, function_query, recover=True, cacheable=True,
734
                    log_ignore_excs=(DuplicateException,))
735
                break # this version was successful
736
            except DuplicateException, e:
737
                function_name = next_version(function_name)
738
                # try again with next version of name
739
        
740
        # Return query that uses function
741
        cols = None
742
        if returning != None: cols = [returning]
743
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(function),
744
            cols) # AS clause requires function alias
745
        return mk_select(db, func_table, start=0, order_by=None)
746
    
747
    return query
748

    
749
def insert_select(db, table, *args, **kw_args):
750
    '''For params, see mk_insert_select() and run_query_into()
751
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
752
        values in
753
    '''
754
    into = kw_args.pop('into', None)
755
    if into != None: kw_args['embeddable'] = True
756
    recover = kw_args.pop('recover', None)
757
    if kw_args.get('ignore', False): recover = True
758
    cacheable = kw_args.pop('cacheable', True)
759
    log_level = kw_args.pop('log_level', 2)
760
    
761
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
762
        into, recover=recover, cacheable=cacheable, log_level=log_level)
763
    autoanalyze(db, table)
764
    return cur
765

    
766
default = sql_gen.default # tells insert() to use the default value for a column
767

    
768
def insert(db, table, row, *args, **kw_args):
769
    '''For params, see insert_select()'''
770
    if lists.is_seq(row): cols = None
771
    else:
772
        cols = row.keys()
773
        row = row.values()
774
    row = list(row) # ensure that "== []" works
775
    
776
    if row == []: query = None
777
    else: query = sql_gen.Values(row).to_str(db)
778
    
779
    return insert_select(db, table, cols, query, *args, **kw_args)
780

    
781
def mk_update(db, table, changes=None, cond=None, in_place=False):
782
    '''
783
    @param changes [(col, new_value),...]
784
        * container can be any iterable type
785
        * col: sql_gen.Code|str (for col name)
786
        * new_value: sql_gen.Code|literal value
787
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
788
    @param in_place If set, locks the table and updates rows in place.
789
        This avoids creating dead rows in PostgreSQL.
790
        * cond must be None
791
    @return str query
792
    '''
793
    table = sql_gen.as_Table(table)
794
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
795
        for c, v in changes]
796
    
797
    if in_place:
798
        assert cond == None
799
        
800
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
801
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
802
            +db.col_info(sql_gen.with_default_table(c, table)).type
803
            +'\nUSING '+v.to_str(db) for c, v in changes))
804
    else:
805
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
806
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
807
            for c, v in changes))
808
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
809
    
810
    return query
811

    
812
def update(db, table, *args, **kw_args):
813
    '''For params, see mk_update() and run_query()'''
814
    recover = kw_args.pop('recover', None)
815
    cacheable = kw_args.pop('cacheable', False)
816
    log_level = kw_args.pop('log_level', 2)
817
    
818
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
819
        cacheable, log_level=log_level)
820
    autoanalyze(db, table)
821
    return cur
822

    
823
def last_insert_id(db):
824
    module = util.root_module(db.db)
825
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
826
    elif module == 'MySQLdb': return db.insert_id()
827
    else: return None
828

    
829
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
830
    '''Creates a mapping from original column names (which may have collisions)
831
    to names that will be distinct among the columns' tables.
832
    This is meant to be used for several tables that are being joined together.
833
    @param cols The columns to combine. Duplicates will be removed.
834
    @param into The table for the new columns.
835
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
836
        columns will be included in the mapping even if they are not in cols.
837
        The tables of the provided Col objects will be changed to into, so make
838
        copies of them if you want to keep the original tables.
839
    @param as_items Whether to return a list of dict items instead of a dict
840
    @return dict(orig_col=new_col, ...)
841
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
842
        * new_col: sql_gen.Col(orig_col_name, into)
843
        * All mappings use the into table so its name can easily be
844
          changed for all columns at once
845
    '''
846
    cols = lists.uniqify(cols)
847
    
848
    items = []
849
    for col in preserve:
850
        orig_col = copy.copy(col)
851
        col.table = into
852
        items.append((orig_col, col))
853
    preserve = set(preserve)
854
    for col in cols:
855
        if col not in preserve:
856
            items.append((col, sql_gen.Col(str(col), into, col.srcs)))
857
    
858
    if not as_items: items = dict(items)
859
    return items
860

    
861
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
862
    '''For params, see mk_flatten_mapping()
863
    @return See return value of mk_flatten_mapping()
864
    '''
865
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
866
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
867
    run_query_into(db, mk_select(db, joins, cols, limit=limit, start=start),
868
        into=into, add_indexes_=True)
869
    return dict(items)
870

    
871
##### Database structure introspection
872

    
873
#### Tables
874

    
875
def tables(db, schema_like='public', table_like='%', exact=False):
876
    if exact: compare = '='
877
    else: compare = 'LIKE'
878
    
879
    module = util.root_module(db.db)
880
    if module == 'psycopg2':
881
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
882
            ('tablename', sql_gen.CompareCond(table_like, compare))]
883
        return values(select(db, 'pg_tables', ['tablename'], conds,
884
            order_by='tablename', log_level=4))
885
    elif module == 'MySQLdb':
886
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
887
            , cacheable=True, log_level=4))
888
    else: raise NotImplementedError("Can't list tables for "+module+' database')
889

    
890
def table_exists(db, table):
891
    table = sql_gen.as_Table(table)
892
    return list(tables(db, table.schema, table.name, exact=True)) != []
893

    
894
def table_row_count(db, table, recover=None):
895
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
896
        order_by=None, start=0), recover=recover, log_level=3))
897

    
898
def table_cols(db, table, recover=None):
899
    return list(col_names(select(db, table, limit=0, order_by=None,
900
        recover=recover, log_level=4)))
901

    
902
def pkey(db, table, recover=None):
903
    '''Assumed to be first column in table'''
904
    return table_cols(db, table, recover)[0]
905

    
906
not_null_col = 'not_null_col'
907

    
908
def table_not_null_col(db, table, recover=None):
909
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
910
    if not_null_col in table_cols(db, table, recover): return not_null_col
911
    else: return pkey(db, table, recover)
912

    
913
def index_cols(db, table, index):
914
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
915
    automatically created. When you don't know whether something is a UNIQUE
916
    constraint or a UNIQUE index, use this function.'''
917
    module = util.root_module(db.db)
918
    if module == 'psycopg2':
919
        return list(values(run_query(db, '''\
920
SELECT attname
921
FROM
922
(
923
        SELECT attnum, attname
924
        FROM pg_index
925
        JOIN pg_class index ON index.oid = indexrelid
926
        JOIN pg_class table_ ON table_.oid = indrelid
927
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
928
        WHERE
929
            table_.relname = '''+db.esc_value(table)+'''
930
            AND index.relname = '''+db.esc_value(index)+'''
931
    UNION
932
        SELECT attnum, attname
933
        FROM
934
        (
935
            SELECT
936
                indrelid
937
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
938
                    AS indkey
939
            FROM pg_index
940
            JOIN pg_class index ON index.oid = indexrelid
941
            JOIN pg_class table_ ON table_.oid = indrelid
942
            WHERE
943
                table_.relname = '''+db.esc_value(table)+'''
944
                AND index.relname = '''+db.esc_value(index)+'''
945
        ) s
946
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
947
) s
948
ORDER BY attnum
949
'''
950
            , cacheable=True, log_level=4)))
951
    else: raise NotImplementedError("Can't list index columns for "+module+
952
        ' database')
953

    
954
def constraint_cols(db, table, constraint):
955
    module = util.root_module(db.db)
956
    if module == 'psycopg2':
957
        return list(values(run_query(db, '''\
958
SELECT attname
959
FROM pg_constraint
960
JOIN pg_class ON pg_class.oid = conrelid
961
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
962
WHERE
963
    relname = '''+db.esc_value(table)+'''
964
    AND conname = '''+db.esc_value(constraint)+'''
965
ORDER BY attnum
966
'''
967
            )))
968
    else: raise NotImplementedError("Can't list constraint columns for "+module+
969
        ' database')
970

    
971
#### Functions
972

    
973
def function_exists(db, function):
974
    function = sql_gen.as_Function(function)
975
    
976
    info_table = sql_gen.Table('routines', 'information_schema')
977
    conds = [('routine_name', function.name)]
978
    schema = function.schema
979
    if schema != None: conds.append(('routine_schema', schema))
980
    # Exclude trigger functions, since they cannot be called directly
981
    conds.append(('data_type', sql_gen.CompareCond('trigger', '!=')))
982
    
983
    return list(values(select(db, info_table, ['routine_name'], conds,
984
        order_by='routine_schema', limit=1, log_level=4))) != []
985
        # TODO: order_by search_path schema order
986

    
987
##### Structural changes
988

    
989
#### Columns
990

    
991
def add_col(db, table, col, comment=None, **kw_args):
992
    '''
993
    @param col TypedCol Name may be versioned, so be sure to propagate any
994
        renaming back to any source column for the TypedCol.
995
    @param comment None|str SQL comment used to distinguish columns of the same
996
        name from each other when they contain different data, to allow the
997
        ADD COLUMN query to be cached. If not set, query will not be cached.
998
    '''
999
    assert isinstance(col, sql_gen.TypedCol)
1000
    
1001
    while True:
1002
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
1003
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
1004
        
1005
        try:
1006
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
1007
            break
1008
        except DuplicateException:
1009
            col.name = next_version(col.name)
1010
            # try again with next version of name
1011

    
1012
def add_not_null(db, col):
1013
    table = col.table
1014
    col = sql_gen.to_name_only_col(col)
1015
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1016
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1017

    
1018
row_num_col = '_row_num'
1019

    
1020
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1021
    constraints='PRIMARY KEY')
1022

    
1023
def add_row_num(db, table):
1024
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1025
    be the primary key.'''
1026
    add_col(db, table, row_num_typed_col, log_level=3)
1027

    
1028
#### Indexes
1029

    
1030
def add_pkey(db, table, cols=None, recover=None):
1031
    '''Adds a primary key.
1032
    @param cols [sql_gen.Col,...] The columns in the primary key.
1033
        Defaults to the first column in the table.
1034
    @pre The table must not already have a primary key.
1035
    '''
1036
    table = sql_gen.as_Table(table)
1037
    if cols == None: cols = [pkey(db, table, recover)]
1038
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1039
    
1040
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1041
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1042
        log_ignore_excs=(DuplicateException,))
1043

    
1044
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1045
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1046
    Currently, only function calls are supported as expressions.
1047
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1048
        This allows indexes to be used for comparisons where NULLs are equal.
1049
    '''
1050
    exprs = lists.mk_seq(exprs)
1051
    
1052
    # Parse exprs
1053
    old_exprs = exprs[:]
1054
    exprs = []
1055
    cols = []
1056
    for i, expr in enumerate(old_exprs):
1057
        expr = sql_gen.as_Col(expr, table)
1058
        
1059
        # Handle nullable columns
1060
        if ensure_not_null_:
1061
            try: expr = ensure_not_null(db, expr)
1062
            except KeyError: pass # unknown type, so just create plain index
1063
        
1064
        # Extract col
1065
        expr = copy.deepcopy(expr) # don't modify input!
1066
        if isinstance(expr, sql_gen.FunctionCall):
1067
            col = expr.args[0]
1068
            expr = sql_gen.Expr(expr)
1069
        else: col = expr
1070
        assert isinstance(col, sql_gen.Col)
1071
        
1072
        # Extract table
1073
        if table == None:
1074
            assert sql_gen.is_table_col(col)
1075
            table = col.table
1076
        
1077
        col.table = None
1078
        
1079
        exprs.append(expr)
1080
        cols.append(col)
1081
    
1082
    table = sql_gen.as_Table(table)
1083
    index = sql_gen.Table(str(sql_gen.Col(','.join(map(str, cols)), table)))
1084
    
1085
    # Add index
1086
    while True:
1087
        str_ = 'CREATE'
1088
        if unique: str_ += ' UNIQUE'
1089
        str_ += ' INDEX '+index.to_str(db)+' ON '+table.to_str(db)+' ('+(
1090
            ', '.join((v.to_str(db) for v in exprs)))+')'
1091
        
1092
        try:
1093
            run_query(db, str_, recover=True, cacheable=True, log_level=3,
1094
                log_ignore_excs=(DuplicateException,))
1095
            break
1096
        except DuplicateException:
1097
            index.name = next_version(index.name)
1098
            # try again with next version of name
1099

    
1100
def add_index_col(db, col, suffix, expr, nullable=True):
1101
    if sql_gen.index_col(col) != None: return # already has index col
1102
    
1103
    new_col = sql_gen.suffixed_col(col, suffix)
1104
    
1105
    # Add column
1106
    new_typed_col = sql_gen.TypedCol(new_col.name, db.col_info(col).type)
1107
    add_col(db, col.table, new_typed_col, comment='src: '+repr(col),
1108
        log_level=3)
1109
    new_col.name = new_typed_col.name # propagate any renaming
1110
    
1111
    update(db, col.table, [(new_col, expr)], in_place=True, cacheable=True,
1112
        log_level=3)
1113
    if not nullable: add_not_null(db, new_col)
1114
    add_index(db, new_col)
1115
    
1116
    col.table.index_cols[col.name] = new_col.name
1117

    
1118
# Controls when ensure_not_null() will use index columns
1119
not_null_index_cols_min_rows = 0 # rows; initially always use index columns
1120

    
1121
def ensure_not_null(db, col):
1122
    '''For params, see sql_gen.ensure_not_null()'''
1123
    expr = sql_gen.ensure_not_null(db, col)
1124
    
1125
    # If a nullable column in a temp table, add separate index column instead.
1126
    # Note that for small datasources, this adds 6-25% to the total import time.
1127
    if (sql_gen.is_temp_col(col) and isinstance(expr, sql_gen.EnsureNotNull)
1128
        and table_row_count(db, col.table) >= not_null_index_cols_min_rows):
1129
        add_index_col(db, col, '::NOT NULL', expr, nullable=False)
1130
        expr = sql_gen.index_col(col)
1131
    
1132
    return expr
1133

    
1134
already_indexed = object() # tells add_indexes() the pkey has already been added
1135

    
1136
def add_indexes(db, table, has_pkey=True):
1137
    '''Adds an index on all columns in a table.
1138
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1139
        index should be added on the first column.
1140
        * If already_indexed, the pkey is assumed to have already been added
1141
    '''
1142
    cols = table_cols(db, table)
1143
    if has_pkey:
1144
        if has_pkey is not already_indexed: add_pkey(db, table)
1145
        cols = cols[1:]
1146
    for col in cols: add_index(db, col, table)
1147

    
1148
#### Tables
1149

    
1150
### Maintenance
1151

    
1152
def analyze(db, table):
1153
    table = sql_gen.as_Table(table)
1154
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1155

    
1156
def autoanalyze(db, table):
1157
    if db.autoanalyze: analyze(db, table)
1158

    
1159
def vacuum(db, table):
1160
    table = sql_gen.as_Table(table)
1161
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1162
        log_level=3))
1163

    
1164
### Lifecycle
1165

    
1166
def drop_table(db, table):
1167
    table = sql_gen.as_Table(table)
1168
    return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE')
1169

    
1170
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1171
    like=None):
1172
    '''Creates a table.
1173
    @param cols [sql_gen.TypedCol,...] The column names and types
1174
    @param has_pkey If set, the first column becomes the primary key.
1175
    @param col_indexes bool|[ref]
1176
        * If True, indexes will be added on all non-pkey columns.
1177
        * If a list reference, [0] will be set to a function to do this.
1178
          This can be used to delay index creation until the table is populated.
1179
    '''
1180
    table = sql_gen.as_Table(table)
1181
    
1182
    if like != None:
1183
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1184
            ]+cols
1185
    if has_pkey:
1186
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1187
        pkey.constraints = 'PRIMARY KEY'
1188
    
1189
    temp = table.is_temp and not db.debug_temp
1190
        # temp tables permanent in debug_temp mode
1191
    
1192
    # Create table
1193
    while True:
1194
        str_ = 'CREATE'
1195
        if temp: str_ += ' TEMP'
1196
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1197
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1198
        str_ += '\n);\n'
1199
        
1200
        try:
1201
            run_query(db, str_, cacheable=True, log_level=2,
1202
                log_ignore_excs=(DuplicateException,))
1203
            break
1204
        except DuplicateException:
1205
            table.name = next_version(table.name)
1206
            # try again with next version of name
1207
    
1208
    # Add indexes
1209
    if has_pkey: has_pkey = already_indexed
1210
    def add_indexes_(): add_indexes(db, table, has_pkey)
1211
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1212
    elif col_indexes: add_indexes_() # add now
1213

    
1214
def copy_table_struct(db, src, dest):
1215
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1216
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1217

    
1218
### Data
1219

    
1220
def truncate(db, table, schema='public', **kw_args):
1221
    '''For params, see run_query()'''
1222
    table = sql_gen.as_Table(table, schema)
1223
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1224

    
1225
def empty_temp(db, tables):
1226
    if db.debug_temp: return # leave temp tables there for debugging
1227
    tables = lists.mk_seq(tables)
1228
    for table in tables: truncate(db, table, log_level=3)
1229

    
1230
def empty_db(db, schema='public', **kw_args):
1231
    '''For kw_args, see tables()'''
1232
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1233

    
1234
def distinct_table(db, table, distinct_on):
1235
    '''Creates a copy of a temp table which is distinct on the given columns.
1236
    The old and new tables will both get an index on these columns, to
1237
    facilitate merge joins.
1238
    @param distinct_on If empty, creates a table with one row. This is useful if
1239
        your distinct_on columns are all literal values.
1240
    @return The new table.
1241
    '''
1242
    new_table = sql_gen.suffixed_table(table, '_distinct')
1243
    
1244
    copy_table_struct(db, table, new_table)
1245
    
1246
    limit = None
1247
    if distinct_on == []: limit = 1 # one sample row
1248
    else:
1249
        add_index(db, distinct_on, new_table, unique=True)
1250
        add_index(db, distinct_on, table) # for join optimization
1251
    
1252
    insert_select(db, new_table, None, mk_select(db, table, start=0,
1253
        limit=limit), ignore=True)
1254
    analyze(db, new_table)
1255
    
1256
    return new_table
(24-24/37)