Project

General

Profile

1
# Database access
2

    
3
import copy
4
import re
5
import warnings
6

    
7
import exc
8
import dicts
9
import iters
10
import lists
11
from Proxy import Proxy
12
import rand
13
import sql_gen
14
import strings
15
import util
16

    
17
##### Exceptions
18

    
19
def get_cur_query(cur, input_query=None):
20
    raw_query = None
21
    if hasattr(cur, 'query'): raw_query = cur.query
22
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
23
    
24
    if raw_query != None: return raw_query
25
    else: return '[input] '+strings.ustr(input_query)
26

    
27
def _add_cursor_info(e, *args, **kw_args):
28
    '''For params, see get_cur_query()'''
29
    exc.add_msg(e, 'query: '+strings.ustr(get_cur_query(*args, **kw_args)))
30

    
31
class DbException(exc.ExceptionWithCause):
32
    def __init__(self, msg, cause=None, cur=None):
33
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
34
        if cur != None: _add_cursor_info(self, cur)
35

    
36
class ExceptionWithName(DbException):
37
    def __init__(self, name, cause=None):
38
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name)), cause)
39
        self.name = name
40

    
41
class ExceptionWithValue(DbException):
42
    def __init__(self, value, cause=None):
43
        DbException.__init__(self, 'for value: '+strings.as_tt(repr(value)),
44
            cause)
45
        self.value = value
46

    
47
class ExceptionWithNameType(DbException):
48
    def __init__(self, type_, name, cause=None):
49
        DbException.__init__(self, 'for type: '+strings.as_tt(str(type_))
50
            +'; name: '+strings.as_tt(name), cause)
51
        self.type = type_
52
        self.name = name
53

    
54
class ConstraintException(DbException):
55
    def __init__(self, name, cols, cause=None):
56
        DbException.__init__(self, 'Violated '+strings.as_tt(name)
57
            +' constraint on columns: '+strings.as_tt(', '.join(cols)), cause)
58
        self.name = name
59
        self.cols = cols
60

    
61
class MissingCastException(DbException):
62
    def __init__(self, type_, col, cause=None):
63
        DbException.__init__(self, 'Missing cast to type '+strings.as_tt(type_)
64
            +' on column: '+strings.as_tt(col), cause)
65
        self.type = type_
66
        self.col = col
67

    
68
class NameException(DbException): pass
69

    
70
class DuplicateKeyException(ConstraintException): pass
71

    
72
class NullValueException(ConstraintException): pass
73

    
74
class InvalidValueException(ExceptionWithValue): pass
75

    
76
class DuplicateException(ExceptionWithNameType): pass
77

    
78
class EmptyRowException(DbException): pass
79

    
80
##### Warnings
81

    
82
class DbWarning(UserWarning): pass
83

    
84
##### Result retrieval
85

    
86
def col_names(cur): return (col[0] for col in cur.description)
87

    
88
def rows(cur): return iter(lambda: cur.fetchone(), None)
89

    
90
def consume_rows(cur):
91
    '''Used to fetch all rows so result will be cached'''
92
    iters.consume_iter(rows(cur))
93

    
94
def next_row(cur): return rows(cur).next()
95

    
96
def row(cur):
97
    row_ = next_row(cur)
98
    consume_rows(cur)
99
    return row_
100

    
101
def next_value(cur): return next_row(cur)[0]
102

    
103
def value(cur): return row(cur)[0]
104

    
105
def values(cur): return iters.func_iter(lambda: next_value(cur))
106

    
107
def value_or_none(cur):
108
    try: return value(cur)
109
    except StopIteration: return None
110

    
111
##### Escaping
112

    
113
def esc_name_by_module(module, name):
114
    if module == 'psycopg2' or module == None: quote = '"'
115
    elif module == 'MySQLdb': quote = '`'
116
    else: raise NotImplementedError("Can't escape name for "+module+' database')
117
    return sql_gen.esc_name(name, quote)
118

    
119
def esc_name_by_engine(engine, name, **kw_args):
120
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
121

    
122
def esc_name(db, name, **kw_args):
123
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
124

    
125
def qual_name(db, schema, table):
126
    def esc_name_(name): return esc_name(db, name)
127
    table = esc_name_(table)
128
    if schema != None: return esc_name_(schema)+'.'+table
129
    else: return table
130

    
131
##### Database connections
132

    
133
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
134

    
135
db_engines = {
136
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
137
    'PostgreSQL': ('psycopg2', {}),
138
}
139

    
140
DatabaseErrors_set = set([DbException])
141
DatabaseErrors = tuple(DatabaseErrors_set)
142

    
143
def _add_module(module):
144
    DatabaseErrors_set.add(module.DatabaseError)
145
    global DatabaseErrors
146
    DatabaseErrors = tuple(DatabaseErrors_set)
147

    
148
def db_config_str(db_config):
149
    return db_config['engine']+' database '+db_config['database']
150

    
151
log_debug_none = lambda msg, level=2: None
152

    
153
class DbConn:
154
    def __init__(self, db_config, autocommit=True, caching=True,
155
        log_debug=log_debug_none, debug_temp=False):
156
        '''
157
        @param debug_temp Whether temporary objects should instead be permanent.
158
            This assists in debugging the internal objects used by the program.
159
        '''
160
        self.db_config = db_config
161
        self.autocommit = autocommit
162
        self.caching = caching
163
        self.log_debug = log_debug
164
        self.debug = log_debug != log_debug_none
165
        self.debug_temp = debug_temp
166
        self.autoanalyze = False
167
        
168
        self._savepoint = 0
169
        self._reset()
170
    
171
    def __getattr__(self, name):
172
        if name == '__dict__': raise Exception('getting __dict__')
173
        if name == 'db': return self._db()
174
        else: raise AttributeError()
175
    
176
    def __getstate__(self):
177
        state = copy.copy(self.__dict__) # shallow copy
178
        state['log_debug'] = None # don't pickle the debug callback
179
        state['_DbConn__db'] = None # don't pickle the connection
180
        return state
181
    
182
    def clear_cache(self): self.query_results = {}
183
    
184
    def _reset(self):
185
        self.clear_cache()
186
        assert self._savepoint == 0
187
        self._notices_seen = set()
188
        self.__db = None
189
    
190
    def connected(self): return self.__db != None
191
    
192
    def close(self):
193
        if not self.connected(): return
194
        
195
        self.db.close()
196
        self._reset()
197
    
198
    def _db(self):
199
        if self.__db == None:
200
            # Process db_config
201
            db_config = self.db_config.copy() # don't modify input!
202
            schemas = db_config.pop('schemas', None)
203
            module_name, mappings = db_engines[db_config.pop('engine')]
204
            module = __import__(module_name)
205
            _add_module(module)
206
            for orig, new in mappings.iteritems():
207
                try: util.rename_key(db_config, orig, new)
208
                except KeyError: pass
209
            
210
            # Connect
211
            self.__db = module.connect(**db_config)
212
            
213
            # Configure connection
214
            if hasattr(self.db, 'set_isolation_level'):
215
                import psycopg2.extensions
216
                self.db.set_isolation_level(
217
                    psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
218
            if schemas != None:
219
                search_path = [self.esc_name(s) for s in schemas.split(',')]
220
                search_path.append(value(run_query(self, 'SHOW search_path',
221
                    log_level=4)))
222
                run_query(self, 'SET search_path TO '+(','.join(search_path)),
223
                    log_level=3)
224
        
225
        return self.__db
226
    
227
    class DbCursor(Proxy):
228
        def __init__(self, outer):
229
            Proxy.__init__(self, outer.db.cursor())
230
            self.outer = outer
231
            self.query_results = outer.query_results
232
            self.query_lookup = None
233
            self.result = []
234
        
235
        def execute(self, query):
236
            self._is_insert = query.startswith('INSERT')
237
            self.query_lookup = query
238
            try:
239
                try:
240
                    cur = self.inner.execute(query)
241
                    self.outer.do_autocommit()
242
                finally: self.query = get_cur_query(self.inner, query)
243
            except Exception, e:
244
                _add_cursor_info(e, self, query)
245
                self.result = e # cache the exception as the result
246
                self._cache_result()
247
                raise
248
            
249
            # Always cache certain queries
250
            if query.startswith('CREATE') or query.startswith('ALTER'):
251
                # structural changes
252
                # Rest of query must be unique in the face of name collisions,
253
                # so don't cache ADD COLUMN unless it has distinguishing comment
254
                if query.find('ADD COLUMN') < 0 or query.endswith('*/'):
255
                    self._cache_result()
256
            elif self.rowcount == 0 and query.startswith('SELECT'): # empty
257
                consume_rows(self) # fetch all rows so result will be cached
258
            
259
            return cur
260
        
261
        def fetchone(self):
262
            row = self.inner.fetchone()
263
            if row != None: self.result.append(row)
264
            # otherwise, fetched all rows
265
            else: self._cache_result()
266
            return row
267
        
268
        def _cache_result(self):
269
            # For inserts that return a result set, don't cache result set since
270
            # inserts are not idempotent. Other non-SELECT queries don't have
271
            # their result set read, so only exceptions will be cached (an
272
            # invalid query will always be invalid).
273
            if self.query_results != None and (not self._is_insert
274
                or isinstance(self.result, Exception)):
275
                
276
                assert self.query_lookup != None
277
                self.query_results[self.query_lookup] = self.CacheCursor(
278
                    util.dict_subset(dicts.AttrsDictView(self),
279
                    ['query', 'result', 'rowcount', 'description']))
280
        
281
        class CacheCursor:
282
            def __init__(self, cached_result): self.__dict__ = cached_result
283
            
284
            def execute(self, *args, **kw_args):
285
                if isinstance(self.result, Exception): raise self.result
286
                # otherwise, result is a rows list
287
                self.iter = iter(self.result)
288
            
289
            def fetchone(self):
290
                try: return self.iter.next()
291
                except StopIteration: return None
292
    
293
    def esc_value(self, value):
294
        try: str_ = self.mogrify('%s', [value])
295
        except NotImplementedError, e:
296
            module = util.root_module(self.db)
297
            if module == 'MySQLdb':
298
                import _mysql
299
                str_ = _mysql.escape_string(value)
300
            else: raise e
301
        return strings.to_unicode(str_)
302
    
303
    def esc_name(self, name): return esc_name(self, name) # calls global func
304
    
305
    def std_code(self, str_):
306
        '''Standardizes SQL code.
307
        * Ensures that string literals are prefixed by `E`
308
        '''
309
        if str_.startswith("'"): str_ = 'E'+str_
310
        return str_
311
    
312
    def can_mogrify(self):
313
        module = util.root_module(self.db)
314
        return module == 'psycopg2'
315
    
316
    def mogrify(self, query, params=None):
317
        if self.can_mogrify(): return self.db.cursor().mogrify(query, params)
318
        else: raise NotImplementedError("Can't mogrify query")
319
    
320
    def print_notices(self):
321
        if hasattr(self.db, 'notices'):
322
            for msg in self.db.notices:
323
                if msg not in self._notices_seen:
324
                    self._notices_seen.add(msg)
325
                    self.log_debug(msg, level=2)
326
    
327
    def run_query(self, query, cacheable=False, log_level=2,
328
        debug_msg_ref=None):
329
        '''
330
        @param log_ignore_excs The log_level will be increased by 2 if the query
331
            throws one of these exceptions.
332
        @param debug_msg_ref If specified, the log message will be returned in
333
            this instead of being output. This allows you to filter log messages
334
            depending on the result of the query.
335
        '''
336
        assert query != None
337
        
338
        if not self.caching: cacheable = False
339
        used_cache = False
340
        
341
        def log_msg(query):
342
            if used_cache: cache_status = 'cache hit'
343
            elif cacheable: cache_status = 'cache miss'
344
            else: cache_status = 'non-cacheable'
345
            return 'DB query: '+cache_status+':\n'+strings.as_code(query, 'SQL')
346
        
347
        try:
348
            # Get cursor
349
            if cacheable:
350
                try:
351
                    cur = self.query_results[query]
352
                    used_cache = True
353
                except KeyError: cur = self.DbCursor(self)
354
            else: cur = self.db.cursor()
355
            
356
            # Log query
357
            if self.debug and debug_msg_ref == None: # log before running
358
                self.log_debug(log_msg(query), log_level)
359
            
360
            # Run query
361
            cur.execute(query)
362
        finally:
363
            self.print_notices()
364
            if self.debug and debug_msg_ref != None: # return after running
365
                debug_msg_ref[0] = log_msg(str(get_cur_query(cur, query)))
366
        
367
        return cur
368
    
369
    def is_cached(self, query): return query in self.query_results
370
    
371
    def with_autocommit(self, func):
372
        import psycopg2.extensions
373
        
374
        prev_isolation_level = self.db.isolation_level
375
        self.db.set_isolation_level(
376
            psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
377
        try: return func()
378
        finally: self.db.set_isolation_level(prev_isolation_level)
379
    
380
    def with_savepoint(self, func):
381
        savepoint = 'level_'+str(self._savepoint)
382
        self.run_query('SAVEPOINT '+savepoint, log_level=4)
383
        self._savepoint += 1
384
        try: return func()
385
        except:
386
            self.run_query('ROLLBACK TO SAVEPOINT '+savepoint, log_level=4)
387
            raise
388
        finally:
389
            # Always release savepoint, because after ROLLBACK TO SAVEPOINT,
390
            # "The savepoint remains valid and can be rolled back to again"
391
            # (http://www.postgresql.org/docs/8.3/static/sql-rollback-to.html).
392
            self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
393
            
394
            self._savepoint -= 1
395
            assert self._savepoint >= 0
396
            
397
            self.do_autocommit() # OK to do this after ROLLBACK TO SAVEPOINT
398
    
399
    def do_autocommit(self):
400
        '''Autocommits if outside savepoint'''
401
        assert self._savepoint >= 0
402
        if self.autocommit and self._savepoint == 0:
403
            self.log_debug('Autocommitting', level=4)
404
            self.db.commit()
405
    
406
    def col_info(self, col):
407
        table = sql_gen.Table('columns', 'information_schema')
408
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
409
            'USER-DEFINED'), sql_gen.Col('udt_name'))
410
        cols = [type_, 'column_default',
411
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
412
        
413
        conds = [('table_name', col.table.name), ('column_name', col.name)]
414
        schema = col.table.schema
415
        if schema != None: conds.append(('table_schema', schema))
416
        
417
        type_, default, nullable = row(select(self, table, cols, conds,
418
            order_by='table_schema', limit=1, cacheable=False, log_level=4))
419
            # TODO: order_by search_path schema order
420
        default = sql_gen.as_Code(default, self)
421
        
422
        return sql_gen.TypedCol(col.name, type_, default, nullable)
423
    
424
    def TempFunction(self, name):
425
        if self.debug_temp: schema = None
426
        else: schema = 'pg_temp'
427
        return sql_gen.Function(name, schema)
428

    
429
connect = DbConn
430

    
431
##### Recoverable querying
432

    
433
def with_savepoint(db, func): return db.with_savepoint(func)
434

    
435
def run_query(db, query, recover=None, cacheable=False, log_level=2,
436
    log_ignore_excs=None, **kw_args):
437
    '''For params, see DbConn.run_query()'''
438
    if recover == None: recover = False
439
    if log_ignore_excs == None: log_ignore_excs = ()
440
    log_ignore_excs = tuple(log_ignore_excs)
441
    
442
    debug_msg_ref = None # usually, db.run_query() logs query before running it
443
    # But if filtering with log_ignore_excs, wait until after exception parsing
444
    if log_ignore_excs != () or not db.can_mogrify(): debug_msg_ref = [None]
445
    
446
    try:
447
        try:
448
            def run(): return db.run_query(query, cacheable, log_level,
449
                debug_msg_ref, **kw_args)
450
            if recover and not db.is_cached(query):
451
                return with_savepoint(db, run)
452
            else: return run() # don't need savepoint if cached
453
        except Exception, e:
454
            msg = strings.ustr(e.args[0])
455
            
456
            match = re.match(r'^duplicate key value violates unique constraint '
457
                r'"((_?[^\W_]+(?=[._]))?.+?)"', msg)
458
            if match:
459
                constraint, table = match.groups()
460
                cols = []
461
                if recover: # need auto-rollback to run index_cols()
462
                    try: cols = index_cols(db, table, constraint)
463
                    except NotImplementedError: pass
464
                raise DuplicateKeyException(constraint, cols, e)
465
            
466
            match = re.match(r'^null value in column "(.+?)" violates not-null'
467
                r' constraint', msg)
468
            if match: raise NullValueException('NOT NULL', [match.group(1)], e)
469
            
470
            match = re.match(r'^(?:invalid input (?:syntax|value)\b.*?'
471
                r'|.+? field value out of range): "(.+?)"', msg)
472
            if match:
473
                value, = match.groups()
474
                raise InvalidValueException(strings.to_unicode(value), e)
475
            
476
            match = re.match(r'^column "(.+?)" is of type (.+?) but expression '
477
                r'is of type', msg)
478
            if match:
479
                col, type_ = match.groups()
480
                raise MissingCastException(type_, col, e)
481
            
482
            match = re.match(r'^(\S+) "(.+?)".*? already exists', msg)
483
            if match:
484
                type_, name = match.groups()
485
                raise DuplicateException(type_, name, e)
486
            
487
            raise # no specific exception raised
488
    except log_ignore_excs:
489
        log_level += 2
490
        raise
491
    finally:
492
        if debug_msg_ref != None and debug_msg_ref[0] != None:
493
            db.log_debug(debug_msg_ref[0], log_level)
494

    
495
##### Basic queries
496

    
497
def next_version(name):
498
    version = 1 # first existing name was version 0
499
    match = re.match(r'^(.*)#(\d+)$', name)
500
    if match:
501
        name, version = match.groups()
502
        version = int(version)+1
503
    return sql_gen.concat(name, '#'+str(version))
504

    
505
def lock_table(db, table, mode):
506
    table = sql_gen.as_Table(table)
507
    run_query(db, 'LOCK TABLE '+table.to_str(db)+' IN '+mode+' MODE')
508

    
509
def run_query_into(db, query, into=None, add_indexes_=False, **kw_args):
510
    '''Outputs a query to a temp table.
511
    For params, see run_query().
512
    '''
513
    if into == None: return run_query(db, query, **kw_args)
514
    
515
    assert isinstance(into, sql_gen.Table)
516
    
517
    into.is_temp = True
518
    # "temporary tables cannot specify a schema name", so remove schema
519
    into.schema = None
520
    
521
    kw_args['recover'] = True
522
    kw_args.setdefault('log_ignore_excs', (DuplicateException,))
523
    
524
    temp = not db.debug_temp # tables are permanent in debug_temp mode
525
    
526
    # Create table
527
    while True:
528
        create_query = 'CREATE'
529
        if temp: create_query += ' TEMP'
530
        create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
531
        
532
        try:
533
            cur = run_query(db, create_query, **kw_args)
534
                # CREATE TABLE AS sets rowcount to # rows in query
535
            break
536
        except DuplicateException, e:
537
            into.name = next_version(into.name)
538
            # try again with next version of name
539
    
540
    if add_indexes_: add_indexes(db, into)
541
    
542
    # According to the PostgreSQL doc, "The autovacuum daemon cannot access and
543
    # therefore cannot vacuum or analyze temporary tables. [...] if a temporary
544
    # table is going to be used in complex queries, it is wise to run ANALYZE on
545
    # the temporary table after it is populated."
546
    # (http://www.postgresql.org/docs/9.1/static/sql-createtable.html)
547
    # If into is not a temp table, ANALYZE is useful but not required.
548
    analyze(db, into)
549
    
550
    return cur
551

    
552
order_by_pkey = object() # tells mk_select() to order by the pkey
553

    
554
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
555

    
556
def mk_select(db, tables, fields=None, conds=None, distinct_on=[], limit=None,
557
    start=None, order_by=order_by_pkey, default_table=None):
558
    '''
559
    @param tables The single table to select from, or a list of tables to join
560
        together, with tables after the first being sql_gen.Join objects
561
    @param fields Use None to select all fields in the table
562
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
563
        * container can be any iterable type
564
        * compare_left_side: sql_gen.Code|str (for col name)
565
        * compare_right_side: sql_gen.ValueCond|literal value
566
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
567
        use all columns
568
    @return query
569
    '''
570
    # Parse tables param
571
    tables = lists.mk_seq(tables)
572
    tables = list(tables) # don't modify input! (list() copies input)
573
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
574
    
575
    # Parse other params
576
    if conds == None: conds = []
577
    elif dicts.is_dict(conds): conds = conds.items()
578
    conds = list(conds) # don't modify input! (list() copies input)
579
    assert limit == None or type(limit) == int
580
    assert start == None or type(start) == int
581
    if order_by is order_by_pkey:
582
        if distinct_on != []: order_by = None
583
        else: order_by = pkey(db, table0, recover=True)
584
    
585
    query = 'SELECT'
586
    
587
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
588
    
589
    # DISTINCT ON columns
590
    if distinct_on != []:
591
        query += '\nDISTINCT'
592
        if distinct_on is not distinct_on_all:
593
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
594
    
595
    # Columns
596
    if fields == None:
597
        if query.find('\n') >= 0: whitespace = '\n'
598
        else: whitespace = ' '
599
        query += whitespace+'*'
600
    else:
601
        assert fields != []
602
        query += '\n'+('\n, '.join(map(parse_col, fields)))
603
    
604
    # Main table
605
    query += '\nFROM '+table0.to_str(db)
606
    
607
    # Add joins
608
    left_table = table0
609
    for join_ in tables:
610
        table = join_.table
611
        
612
        # Parse special values
613
        if join_.type_ is sql_gen.filter_out: # filter no match
614
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
615
                sql_gen.CompareCond(None, '~=')))
616
        
617
        query += '\n'+join_.to_str(db, left_table)
618
        
619
        left_table = table
620
    
621
    missing = True
622
    if conds != []:
623
        if len(conds) == 1: whitespace = ' '
624
        else: whitespace = '\n'
625
        query += '\n'+sql_gen.combine_conds([sql_gen.ColValueCond(l, r)
626
            .to_str(db) for l, r in conds], 'WHERE')
627
        missing = False
628
    if order_by != None:
629
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
630
    if limit != None: query += '\nLIMIT '+str(limit); missing = False
631
    if start != None:
632
        if start != 0: query += '\nOFFSET '+str(start)
633
        missing = False
634
    if missing: warnings.warn(DbWarning(
635
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
636
    
637
    return query
638

    
639
def select(db, *args, **kw_args):
640
    '''For params, see mk_select() and run_query()'''
641
    recover = kw_args.pop('recover', None)
642
    cacheable = kw_args.pop('cacheable', True)
643
    log_level = kw_args.pop('log_level', 2)
644
    
645
    return run_query(db, mk_select(db, *args, **kw_args), recover, cacheable,
646
        log_level=log_level)
647

    
648
def mk_insert_select(db, table, cols=None, select_query=None, returning=None,
649
    embeddable=False, ignore=False):
650
    '''
651
    @param returning str|None An inserted column (such as pkey) to return
652
    @param embeddable Whether the query should be embeddable as a nested SELECT.
653
        Warning: If you set this and cacheable=True when the query is run, the
654
        query will be fully cached, not just if it raises an exception.
655
    @param ignore Whether to ignore duplicate keys.
656
    '''
657
    table = sql_gen.remove_table_rename(sql_gen.as_Table(table))
658
    if cols == []: cols = None # no cols (all defaults) = unknown col names
659
    if cols != None: cols = [sql_gen.to_name_only_col(c, table) for c in cols]
660
    if select_query == None: select_query = 'DEFAULT VALUES'
661
    if returning != None: returning = sql_gen.as_Col(returning, table)
662
    
663
    first_line = 'INSERT INTO '+table.to_str(db)
664
    
665
    def mk_insert(select_query):
666
        query = first_line
667
        if cols != None:
668
            query += '\n('+(', '.join((c.to_str(db) for c in cols)))+')'
669
        query += '\n'+select_query
670
        
671
        if returning != None:
672
            returning_name_col = sql_gen.to_name_only_col(returning)
673
            query += '\nRETURNING '+returning_name_col.to_str(db)
674
        
675
        return query
676
    
677
    return_type = 'unknown'
678
    if returning != None: return_type = returning.to_str(db)+'%TYPE'
679
    
680
    lang = 'sql'
681
    if ignore:
682
        # Always return something to set the correct rowcount
683
        if returning == None: returning = sql_gen.NamedCol('NULL', None)
684
        
685
        embeddable = True # must use function
686
        lang = 'plpgsql'
687
        
688
        if cols == None:
689
            row = [sql_gen.Col(sql_gen.all_cols, 'row')]
690
            row_vars = [sql_gen.Table('row')]
691
        else:
692
            row_vars = row = [sql_gen.Col(c.name, 'row') for c in cols]
693
        
694
        query = '''\
695
DECLARE
696
    row '''+table.to_str(db)+'''%ROWTYPE;
697
BEGIN
698
    /* Need an EXCEPTION block for each individual row because "When an error is
699
    caught by an EXCEPTION clause, [...] all changes to persistent database
700
    state within the block are rolled back."
701
    This is unfortunate because "A block containing an EXCEPTION clause is
702
    significantly more expensive to enter and exit than a block without one."
703
    (http://www.postgresql.org/docs/8.3/static/plpgsql-control-structures.html\
704
#PLPGSQL-ERROR-TRAPPING)
705
    */
706
    FOR '''+(', '.join((v.to_str(db) for v in row_vars)))+''' IN
707
'''+select_query+'''
708
    LOOP
709
        BEGIN
710
            RETURN QUERY
711
'''+mk_insert(sql_gen.Values(row).to_str(db))+'''
712
;
713
        EXCEPTION
714
            WHEN unique_violation THEN NULL; -- continue to next row
715
        END;
716
    END LOOP;
717
END;\
718
'''
719
    else: query = mk_insert(select_query)
720
    
721
    if embeddable:
722
        # Create function
723
        function_name = sql_gen.clean_name(first_line)
724
        while True:
725
            try:
726
                function = db.TempFunction(function_name)
727
                
728
                function_query = '''\
729
CREATE FUNCTION '''+function.to_str(db)+'''()
730
RETURNS SETOF '''+return_type+'''
731
LANGUAGE '''+lang+'''
732
AS $$
733
'''+query+'''
734
$$;
735
'''
736
                run_query(db, function_query, recover=True, cacheable=True,
737
                    log_ignore_excs=(DuplicateException,))
738
                break # this version was successful
739
            except DuplicateException, e:
740
                function_name = next_version(function_name)
741
                # try again with next version of name
742
        
743
        # Return query that uses function
744
        cols = None
745
        if returning != None: cols = [returning]
746
        func_table = sql_gen.NamedTable('f', sql_gen.FunctionCall(function),
747
            cols) # AS clause requires function alias
748
        return mk_select(db, func_table, start=0, order_by=None)
749
    
750
    return query
751

    
752
def insert_select(db, table, *args, **kw_args):
753
    '''For params, see mk_insert_select() and run_query_into()
754
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
755
        values in
756
    '''
757
    into = kw_args.pop('into', None)
758
    if into != None: kw_args['embeddable'] = True
759
    recover = kw_args.pop('recover', None)
760
    if kw_args.get('ignore', False): recover = True
761
    cacheable = kw_args.pop('cacheable', True)
762
    log_level = kw_args.pop('log_level', 2)
763
    
764
    cur = run_query_into(db, mk_insert_select(db, table, *args, **kw_args),
765
        into, recover=recover, cacheable=cacheable, log_level=log_level)
766
    autoanalyze(db, table)
767
    return cur
768

    
769
default = sql_gen.default # tells insert() to use the default value for a column
770

    
771
def insert(db, table, row, *args, **kw_args):
772
    '''For params, see insert_select()'''
773
    if lists.is_seq(row): cols = None
774
    else:
775
        cols = row.keys()
776
        row = row.values()
777
    row = list(row) # ensure that "== []" works
778
    
779
    if row == []: query = None
780
    else: query = sql_gen.Values(row).to_str(db)
781
    
782
    return insert_select(db, table, cols, query, *args, **kw_args)
783

    
784
def mk_update(db, table, changes=None, cond=None, in_place=False):
785
    '''
786
    @param changes [(col, new_value),...]
787
        * container can be any iterable type
788
        * col: sql_gen.Code|str (for col name)
789
        * new_value: sql_gen.Code|literal value
790
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
791
    @param in_place If set, locks the table and updates rows in place.
792
        This avoids creating dead rows in PostgreSQL.
793
        * cond must be None
794
    @return str query
795
    '''
796
    table = sql_gen.as_Table(table)
797
    changes = [(sql_gen.to_name_only_col(c, table), sql_gen.as_Value(v))
798
        for c, v in changes]
799
    
800
    if in_place:
801
        assert cond == None
802
        
803
        query = 'ALTER TABLE '+table.to_str(db)+'\n'
804
        query += ',\n'.join(('ALTER COLUMN '+c.to_str(db)+' TYPE '
805
            +db.col_info(sql_gen.with_default_table(c, table)).type
806
            +'\nUSING '+v.to_str(db) for c, v in changes))
807
    else:
808
        query = 'UPDATE '+table.to_str(db)+'\nSET\n'
809
        query += ',\n'.join((c.to_str(db)+' = '+v.to_str(db)
810
            for c, v in changes))
811
        if cond != None: query += '\nWHERE\n'+cond.to_str(db)
812
    
813
    return query
814

    
815
def update(db, table, *args, **kw_args):
816
    '''For params, see mk_update() and run_query()'''
817
    recover = kw_args.pop('recover', None)
818
    cacheable = kw_args.pop('cacheable', False)
819
    log_level = kw_args.pop('log_level', 2)
820
    
821
    cur = run_query(db, mk_update(db, table, *args, **kw_args), recover,
822
        cacheable, log_level=log_level)
823
    autoanalyze(db, table)
824
    return cur
825

    
826
def last_insert_id(db):
827
    module = util.root_module(db.db)
828
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
829
    elif module == 'MySQLdb': return db.insert_id()
830
    else: return None
831

    
832
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
833
    '''Creates a mapping from original column names (which may have collisions)
834
    to names that will be distinct among the columns' tables.
835
    This is meant to be used for several tables that are being joined together.
836
    @param cols The columns to combine. Duplicates will be removed.
837
    @param into The table for the new columns.
838
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
839
        columns will be included in the mapping even if they are not in cols.
840
        The tables of the provided Col objects will be changed to into, so make
841
        copies of them if you want to keep the original tables.
842
    @param as_items Whether to return a list of dict items instead of a dict
843
    @return dict(orig_col=new_col, ...)
844
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
845
        * new_col: sql_gen.Col(orig_col_name, into)
846
        * All mappings use the into table so its name can easily be
847
          changed for all columns at once
848
    '''
849
    cols = lists.uniqify(cols)
850
    
851
    items = []
852
    for col in preserve:
853
        orig_col = copy.copy(col)
854
        col.table = into
855
        items.append((orig_col, col))
856
    preserve = set(preserve)
857
    for col in cols:
858
        if col not in preserve:
859
            items.append((col, sql_gen.Col(str(col), into, col.srcs)))
860
    
861
    if not as_items: items = dict(items)
862
    return items
863

    
864
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
865
    '''For params, see mk_flatten_mapping()
866
    @return See return value of mk_flatten_mapping()
867
    '''
868
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
869
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
870
    run_query_into(db, mk_select(db, joins, cols, limit=limit, start=start),
871
        into=into, add_indexes_=True)
872
    return dict(items)
873

    
874
##### Database structure introspection
875

    
876
#### Tables
877

    
878
def tables(db, schema_like='public', table_like='%', exact=False):
879
    if exact: compare = '='
880
    else: compare = 'LIKE'
881
    
882
    module = util.root_module(db.db)
883
    if module == 'psycopg2':
884
        conds = [('schemaname', sql_gen.CompareCond(schema_like, compare)),
885
            ('tablename', sql_gen.CompareCond(table_like, compare))]
886
        return values(select(db, 'pg_tables', ['tablename'], conds,
887
            order_by='tablename', log_level=4))
888
    elif module == 'MySQLdb':
889
        return values(run_query(db, 'SHOW TABLES LIKE '+db.esc_value(table_like)
890
            , cacheable=True, log_level=4))
891
    else: raise NotImplementedError("Can't list tables for "+module+' database')
892

    
893
def table_exists(db, table):
894
    table = sql_gen.as_Table(table)
895
    return list(tables(db, table.schema, table.name, exact=True)) != []
896

    
897
def table_row_count(db, table, recover=None):
898
    return value(run_query(db, mk_select(db, table, [sql_gen.row_count],
899
        order_by=None, start=0), recover=recover, log_level=3))
900

    
901
def table_cols(db, table, recover=None):
902
    return list(col_names(select(db, table, limit=0, order_by=None,
903
        recover=recover, log_level=4)))
904

    
905
def pkey(db, table, recover=None):
906
    '''Assumed to be first column in table'''
907
    return table_cols(db, table, recover)[0]
908

    
909
not_null_col = 'not_null_col'
910

    
911
def table_not_null_col(db, table, recover=None):
912
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
913
    if not_null_col in table_cols(db, table, recover): return not_null_col
914
    else: return pkey(db, table, recover)
915

    
916
def index_cols(db, table, index):
917
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
918
    automatically created. When you don't know whether something is a UNIQUE
919
    constraint or a UNIQUE index, use this function.'''
920
    module = util.root_module(db.db)
921
    if module == 'psycopg2':
922
        return list(values(run_query(db, '''\
923
SELECT attname
924
FROM
925
(
926
        SELECT attnum, attname
927
        FROM pg_index
928
        JOIN pg_class index ON index.oid = indexrelid
929
        JOIN pg_class table_ ON table_.oid = indrelid
930
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
931
        WHERE
932
            table_.relname = '''+db.esc_value(table)+'''
933
            AND index.relname = '''+db.esc_value(index)+'''
934
    UNION
935
        SELECT attnum, attname
936
        FROM
937
        (
938
            SELECT
939
                indrelid
940
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
941
                    AS indkey
942
            FROM pg_index
943
            JOIN pg_class index ON index.oid = indexrelid
944
            JOIN pg_class table_ ON table_.oid = indrelid
945
            WHERE
946
                table_.relname = '''+db.esc_value(table)+'''
947
                AND index.relname = '''+db.esc_value(index)+'''
948
        ) s
949
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
950
) s
951
ORDER BY attnum
952
'''
953
            , cacheable=True, log_level=4)))
954
    else: raise NotImplementedError("Can't list index columns for "+module+
955
        ' database')
956

    
957
def constraint_cols(db, table, constraint):
958
    module = util.root_module(db.db)
959
    if module == 'psycopg2':
960
        return list(values(run_query(db, '''\
961
SELECT attname
962
FROM pg_constraint
963
JOIN pg_class ON pg_class.oid = conrelid
964
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
965
WHERE
966
    relname = '''+db.esc_value(table)+'''
967
    AND conname = '''+db.esc_value(constraint)+'''
968
ORDER BY attnum
969
'''
970
            )))
971
    else: raise NotImplementedError("Can't list constraint columns for "+module+
972
        ' database')
973

    
974
#### Functions
975

    
976
def function_exists(db, function):
977
    function = sql_gen.as_Function(function)
978
    
979
    info_table = sql_gen.Table('routines', 'information_schema')
980
    conds = [('routine_name', function.name)]
981
    schema = function.schema
982
    if schema != None: conds.append(('routine_schema', schema))
983
    # Exclude trigger functions, since they cannot be called directly
984
    conds.append(('data_type', sql_gen.CompareCond('trigger', '!=')))
985
    
986
    return list(values(select(db, info_table, ['routine_name'], conds,
987
        order_by='routine_schema', limit=1, log_level=4))) != []
988
        # TODO: order_by search_path schema order
989

    
990
##### Structural changes
991

    
992
#### Columns
993

    
994
def add_col(db, table, col, comment=None, **kw_args):
995
    '''
996
    @param col TypedCol Name may be versioned, so be sure to propagate any
997
        renaming back to any source column for the TypedCol.
998
    @param comment None|str SQL comment used to distinguish columns of the same
999
        name from each other when they contain different data, to allow the
1000
        ADD COLUMN query to be cached. If not set, query will not be cached.
1001
    '''
1002
    assert isinstance(col, sql_gen.TypedCol)
1003
    
1004
    while True:
1005
        str_ = 'ALTER TABLE '+table.to_str(db)+' ADD COLUMN '+col.to_str(db)
1006
        if comment != None: str_ += ' '+sql_gen.esc_comment(comment)
1007
        
1008
        try:
1009
            run_query(db, str_, recover=True, cacheable=True, **kw_args)
1010
            break
1011
        except DuplicateException:
1012
            col.name = next_version(col.name)
1013
            # try again with next version of name
1014

    
1015
def add_not_null(db, col):
1016
    table = col.table
1017
    col = sql_gen.to_name_only_col(col)
1018
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ALTER COLUMN '
1019
        +col.to_str(db)+' SET NOT NULL', cacheable=True, log_level=3)
1020

    
1021
row_num_col = '_row_num'
1022

    
1023
row_num_typed_col = sql_gen.TypedCol(row_num_col, 'serial', nullable=False,
1024
    constraints='PRIMARY KEY')
1025

    
1026
def add_row_num(db, table):
1027
    '''Adds a row number column to a table. Its name is in row_num_col. It will
1028
    be the primary key.'''
1029
    add_col(db, table, row_num_typed_col, log_level=3)
1030

    
1031
#### Indexes
1032

    
1033
def add_pkey(db, table, cols=None, recover=None):
1034
    '''Adds a primary key.
1035
    @param cols [sql_gen.Col,...] The columns in the primary key.
1036
        Defaults to the first column in the table.
1037
    @pre The table must not already have a primary key.
1038
    '''
1039
    table = sql_gen.as_Table(table)
1040
    if cols == None: cols = [pkey(db, table, recover)]
1041
    col_strs = [sql_gen.to_name_only_col(v).to_str(db) for v in cols]
1042
    
1043
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD PRIMARY KEY ('
1044
        +(', '.join(col_strs))+')', recover=True, cacheable=True, log_level=3,
1045
        log_ignore_excs=(DuplicateException,))
1046

    
1047
def add_index(db, exprs, table=None, unique=False, ensure_not_null_=True):
1048
    '''Adds an index on column(s) or expression(s) if it doesn't already exist.
1049
    Currently, only function calls are supported as expressions.
1050
    @param ensure_not_null_ If set, translates NULL values to sentinel values.
1051
        This allows indexes to be used for comparisons where NULLs are equal.
1052
    '''
1053
    exprs = lists.mk_seq(exprs)
1054
    
1055
    # Parse exprs
1056
    old_exprs = exprs[:]
1057
    exprs = []
1058
    cols = []
1059
    for i, expr in enumerate(old_exprs):
1060
        expr = sql_gen.as_Col(expr, table)
1061
        
1062
        # Handle nullable columns
1063
        if ensure_not_null_:
1064
            try: expr = ensure_not_null(db, expr)
1065
            except KeyError: pass # unknown type, so just create plain index
1066
        
1067
        # Extract col
1068
        expr = copy.deepcopy(expr) # don't modify input!
1069
        if isinstance(expr, sql_gen.FunctionCall):
1070
            col = expr.args[0]
1071
            expr = sql_gen.Expr(expr)
1072
        else: col = expr
1073
        assert isinstance(col, sql_gen.Col)
1074
        
1075
        # Extract table
1076
        if table == None:
1077
            assert sql_gen.is_table_col(col)
1078
            table = col.table
1079
        
1080
        col.table = None
1081
        
1082
        exprs.append(expr)
1083
        cols.append(col)
1084
    
1085
    table = sql_gen.as_Table(table)
1086
    index = sql_gen.Table(str(sql_gen.Col(','.join(map(str, cols)), table)))
1087
    
1088
    # Add index
1089
    while True:
1090
        str_ = 'CREATE'
1091
        if unique: str_ += ' UNIQUE'
1092
        str_ += ' INDEX '+index.to_str(db)+' ON '+table.to_str(db)+' ('+(
1093
            ', '.join((v.to_str(db) for v in exprs)))+')'
1094
        
1095
        try:
1096
            run_query(db, str_, recover=True, cacheable=True, log_level=3,
1097
                log_ignore_excs=(DuplicateException,))
1098
            break
1099
        except DuplicateException:
1100
            index.name = next_version(index.name)
1101
            # try again with next version of name
1102

    
1103
def add_index_col(db, col, suffix, expr, nullable=True):
1104
    if sql_gen.index_col(col) != None: return # already has index col
1105
    
1106
    new_col = sql_gen.suffixed_col(col, suffix)
1107
    
1108
    # Add column
1109
    new_typed_col = sql_gen.TypedCol(new_col.name, db.col_info(col).type)
1110
    add_col(db, col.table, new_typed_col, comment='src: '+repr(col),
1111
        log_level=3)
1112
    new_col.name = new_typed_col.name # propagate any renaming
1113
    
1114
    update(db, col.table, [(new_col, expr)], in_place=True, cacheable=True,
1115
        log_level=3)
1116
    if not nullable: add_not_null(db, new_col)
1117
    add_index(db, new_col)
1118
    
1119
    col.table.index_cols[col.name] = new_col.name
1120

    
1121
# Controls when ensure_not_null() will use index columns
1122
not_null_index_cols_min_rows = 0 # rows; initially always use index columns
1123

    
1124
def ensure_not_null(db, col):
1125
    '''For params, see sql_gen.ensure_not_null()'''
1126
    expr = sql_gen.ensure_not_null(db, col)
1127
    
1128
    # If a nullable column in a temp table, add separate index column instead.
1129
    # Note that for small datasources, this adds 6-25% to the total import time.
1130
    if (sql_gen.is_temp_col(col) and isinstance(expr, sql_gen.EnsureNotNull)
1131
        and table_row_count(db, col.table) >= not_null_index_cols_min_rows):
1132
        add_index_col(db, col, '::NOT NULL', expr, nullable=False)
1133
        expr = sql_gen.index_col(col)
1134
    
1135
    return expr
1136

    
1137
already_indexed = object() # tells add_indexes() the pkey has already been added
1138

    
1139
def add_indexes(db, table, has_pkey=True):
1140
    '''Adds an index on all columns in a table.
1141
    @param has_pkey bool|already_indexed Whether a pkey instead of a regular
1142
        index should be added on the first column.
1143
        * If already_indexed, the pkey is assumed to have already been added
1144
    '''
1145
    cols = table_cols(db, table)
1146
    if has_pkey:
1147
        if has_pkey is not already_indexed: add_pkey(db, table)
1148
        cols = cols[1:]
1149
    for col in cols: add_index(db, col, table)
1150

    
1151
#### Tables
1152

    
1153
### Maintenance
1154

    
1155
def analyze(db, table):
1156
    table = sql_gen.as_Table(table)
1157
    run_query(db, 'ANALYZE '+table.to_str(db), log_level=3)
1158

    
1159
def autoanalyze(db, table):
1160
    if db.autoanalyze: analyze(db, table)
1161

    
1162
def vacuum(db, table):
1163
    table = sql_gen.as_Table(table)
1164
    db.with_autocommit(lambda: run_query(db, 'VACUUM ANALYZE '+table.to_str(db),
1165
        log_level=3))
1166

    
1167
### Lifecycle
1168

    
1169
def drop_table(db, table):
1170
    table = sql_gen.as_Table(table)
1171
    return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE')
1172

    
1173
def create_table(db, table, cols=[], has_pkey=True, col_indexes=True,
1174
    like=None):
1175
    '''Creates a table.
1176
    @param cols [sql_gen.TypedCol,...] The column names and types
1177
    @param has_pkey If set, the first column becomes the primary key.
1178
    @param col_indexes bool|[ref]
1179
        * If True, indexes will be added on all non-pkey columns.
1180
        * If a list reference, [0] will be set to a function to do this.
1181
          This can be used to delay index creation until the table is populated.
1182
    '''
1183
    table = sql_gen.as_Table(table)
1184
    
1185
    if like != None:
1186
        cols = [sql_gen.CustomCode('LIKE '+like.to_str(db)+' INCLUDING ALL')
1187
            ]+cols
1188
    if has_pkey:
1189
        cols[0] = pkey = copy.copy(cols[0]) # don't modify input!
1190
        pkey.constraints = 'PRIMARY KEY'
1191
    
1192
    temp = table.is_temp and not db.debug_temp
1193
        # temp tables permanent in debug_temp mode
1194
    
1195
    # Create table
1196
    while True:
1197
        str_ = 'CREATE'
1198
        if temp: str_ += ' TEMP'
1199
        str_ += ' TABLE '+table.to_str(db)+' (\n'
1200
        str_ += '\n, '.join(c.to_str(db) for c in cols)
1201
        str_ += '\n);\n'
1202
        
1203
        try:
1204
            run_query(db, str_, cacheable=True, log_level=2,
1205
                log_ignore_excs=(DuplicateException,))
1206
            break
1207
        except DuplicateException:
1208
            table.name = next_version(table.name)
1209
            # try again with next version of name
1210
    
1211
    # Add indexes
1212
    if has_pkey: has_pkey = already_indexed
1213
    def add_indexes_(): add_indexes(db, table, has_pkey)
1214
    if isinstance(col_indexes, list): col_indexes[0] = add_indexes_ # defer
1215
    elif col_indexes: add_indexes_() # add now
1216

    
1217
def copy_table_struct(db, src, dest):
1218
    '''Creates a structure-only copy of a table. (Does not copy data.)'''
1219
    create_table(db, dest, has_pkey=False, col_indexes=False, like=src)
1220

    
1221
### Data
1222

    
1223
def truncate(db, table, schema='public', **kw_args):
1224
    '''For params, see run_query()'''
1225
    table = sql_gen.as_Table(table, schema)
1226
    return run_query(db, 'TRUNCATE '+table.to_str(db)+' CASCADE', **kw_args)
1227

    
1228
def empty_temp(db, tables):
1229
    if db.debug_temp: return # leave temp tables there for debugging
1230
    tables = lists.mk_seq(tables)
1231
    for table in tables: truncate(db, table, log_level=3)
1232

    
1233
def empty_db(db, schema='public', **kw_args):
1234
    '''For kw_args, see tables()'''
1235
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1236

    
1237
def distinct_table(db, table, distinct_on):
1238
    '''Creates a copy of a temp table which is distinct on the given columns.
1239
    The old and new tables will both get an index on these columns, to
1240
    facilitate merge joins.
1241
    @param distinct_on If empty, creates a table with one row. This is useful if
1242
        your distinct_on columns are all literal values.
1243
    @return The new table.
1244
    '''
1245
    new_table = sql_gen.suffixed_table(table, '_distinct')
1246
    
1247
    copy_table_struct(db, table, new_table)
1248
    
1249
    limit = None
1250
    if distinct_on == []: limit = 1 # one sample row
1251
    else:
1252
        add_index(db, distinct_on, new_table, unique=True)
1253
        add_index(db, distinct_on, table) # for join optimization
1254
    
1255
    insert_select(db, new_table, None, mk_select(db, table, start=0,
1256
        limit=limit), ignore=True)
1257
    analyze(db, new_table)
1258
    
1259
    return new_table
(24-24/37)