Project

General

Profile

1 11 aaronmk
# Database access
2
3 1869 aaronmk
import copy
4 2127 aaronmk
import operator
5 11 aaronmk
import re
6 865 aaronmk
import warnings
7 11 aaronmk
8 300 aaronmk
import exc
9 1909 aaronmk
import dicts
10 1893 aaronmk
import iters
11 1960 aaronmk
import lists
12 1889 aaronmk
from Proxy import Proxy
13 1872 aaronmk
import rand
14 2217 aaronmk
import sql_gen
15 862 aaronmk
import strings
16 131 aaronmk
import util
17 11 aaronmk
18 832 aaronmk
##### Exceptions
19
20 2170 aaronmk
def get_cur_query(cur, input_query=None, input_params=None):
21 2168 aaronmk
    raw_query = None
22
    if hasattr(cur, 'query'): raw_query = cur.query
23
    elif hasattr(cur, '_last_executed'): raw_query = cur._last_executed
24 2170 aaronmk
25
    if raw_query != None: return raw_query
26 2371 aaronmk
    else: return '[input] '+strings.ustr(input_query)+' % '+repr(input_params)
27 14 aaronmk
28 2170 aaronmk
def _add_cursor_info(e, *args, **kw_args):
29
    '''For params, see get_cur_query()'''
30
    exc.add_msg(e, 'query: '+str(get_cur_query(*args, **kw_args)))
31 135 aaronmk
32 300 aaronmk
class DbException(exc.ExceptionWithCause):
33 14 aaronmk
    def __init__(self, msg, cause=None, cur=None):
34 2145 aaronmk
        exc.ExceptionWithCause.__init__(self, msg, cause, cause_newline=True)
35 14 aaronmk
        if cur != None: _add_cursor_info(self, cur)
36
37 2143 aaronmk
class ExceptionWithName(DbException):
38
    def __init__(self, name, cause=None):
39 2484 aaronmk
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name)), cause)
40 2143 aaronmk
        self.name = name
41 360 aaronmk
42 2240 aaronmk
class ExceptionWithNameValue(DbException):
43
    def __init__(self, name, value, cause=None):
44 2484 aaronmk
        DbException.__init__(self, 'for name: '+strings.as_tt(str(name))
45
            +'; value: '+strings.as_tt(repr(value)), cause)
46 2240 aaronmk
        self.name = name
47
        self.value = value
48
49 2306 aaronmk
class ConstraintException(DbException):
50
    def __init__(self, name, cols, cause=None):
51 2484 aaronmk
        DbException.__init__(self, 'Violated '+strings.as_tt(name)
52
            +' constraint on columns: '+strings.as_tt(', '.join(cols)), cause)
53 2306 aaronmk
        self.name = name
54 468 aaronmk
        self.cols = cols
55 11 aaronmk
56 2143 aaronmk
class NameException(DbException): pass
57
58 2306 aaronmk
class DuplicateKeyException(ConstraintException): pass
59 13 aaronmk
60 2306 aaronmk
class NullValueException(ConstraintException): pass
61 13 aaronmk
62 2240 aaronmk
class FunctionValueException(ExceptionWithNameValue): pass
63 2239 aaronmk
64 2143 aaronmk
class DuplicateTableException(ExceptionWithName): pass
65
66 2188 aaronmk
class DuplicateFunctionException(ExceptionWithName): pass
67
68 89 aaronmk
class EmptyRowException(DbException): pass
69
70 865 aaronmk
##### Warnings
71
72
class DbWarning(UserWarning): pass
73
74 1930 aaronmk
##### Result retrieval
75
76
def col_names(cur): return (col[0] for col in cur.description)
77
78
def rows(cur): return iter(lambda: cur.fetchone(), None)
79
80
def consume_rows(cur):
81
    '''Used to fetch all rows so result will be cached'''
82
    iters.consume_iter(rows(cur))
83
84
def next_row(cur): return rows(cur).next()
85
86
def row(cur):
87
    row_ = next_row(cur)
88
    consume_rows(cur)
89
    return row_
90
91
def next_value(cur): return next_row(cur)[0]
92
93
def value(cur): return row(cur)[0]
94
95
def values(cur): return iters.func_iter(lambda: next_value(cur))
96
97
def value_or_none(cur):
98
    try: return value(cur)
99
    except StopIteration: return None
100
101 2101 aaronmk
##### Input validation
102
103
def check_name(name):
104
    if re.search(r'\W', name) != None: raise NameException('Name "'+name
105
        +'" may contain only alphanumeric characters and _')
106
107
def esc_name_by_module(module, name, ignore_case=False):
108 2388 aaronmk
    if module == 'psycopg2' or module == None:
109 2101 aaronmk
        if ignore_case:
110
            # Don't enclose in quotes because this disables case-insensitivity
111
            check_name(name)
112
            return name
113
        else: quote = '"'
114
    elif module == 'MySQLdb': quote = '`'
115
    else: raise NotImplementedError("Can't escape name for "+module+' database')
116 2500 aaronmk
    return sql_gen.esc_name(name, quote)
117 2101 aaronmk
118
def esc_name_by_engine(engine, name, **kw_args):
119
    return esc_name_by_module(db_engines[engine][0], name, **kw_args)
120
121
def esc_name(db, name, **kw_args):
122
    return esc_name_by_module(util.root_module(db.db), name, **kw_args)
123
124
def qual_name(db, schema, table):
125
    def esc_name_(name): return esc_name(db, name)
126
    table = esc_name_(table)
127
    if schema != None: return esc_name_(schema)+'.'+table
128
    else: return table
129
130 1869 aaronmk
##### Database connections
131 1849 aaronmk
132 2097 aaronmk
db_config_names = ['engine', 'host', 'user', 'password', 'database', 'schemas']
133 1926 aaronmk
134 1869 aaronmk
db_engines = {
135
    'MySQL': ('MySQLdb', {'password': 'passwd', 'database': 'db'}),
136
    'PostgreSQL': ('psycopg2', {}),
137
}
138
139
DatabaseErrors_set = set([DbException])
140
DatabaseErrors = tuple(DatabaseErrors_set)
141
142
def _add_module(module):
143
    DatabaseErrors_set.add(module.DatabaseError)
144
    global DatabaseErrors
145
    DatabaseErrors = tuple(DatabaseErrors_set)
146
147
def db_config_str(db_config):
148
    return db_config['engine']+' database '+db_config['database']
149
150 1909 aaronmk
def _query_lookup(query, params): return (query, dicts.make_hashable(params))
151 1894 aaronmk
152 2448 aaronmk
log_debug_none = lambda msg, level=2: None
153 1901 aaronmk
154 1849 aaronmk
class DbConn:
155 2190 aaronmk
    def __init__(self, db_config, serializable=True, autocommit=False,
156
        caching=True, log_debug=log_debug_none):
157 1869 aaronmk
        self.db_config = db_config
158
        self.serializable = serializable
159 2190 aaronmk
        self.autocommit = autocommit
160
        self.caching = caching
161 1901 aaronmk
        self.log_debug = log_debug
162 2193 aaronmk
        self.debug = log_debug != log_debug_none
163 1869 aaronmk
164
        self.__db = None
165 1889 aaronmk
        self.query_results = {}
166 2139 aaronmk
        self._savepoint = 0
167 1869 aaronmk
168
    def __getattr__(self, name):
169
        if name == '__dict__': raise Exception('getting __dict__')
170
        if name == 'db': return self._db()
171
        else: raise AttributeError()
172
173
    def __getstate__(self):
174
        state = copy.copy(self.__dict__) # shallow copy
175 1915 aaronmk
        state['log_debug'] = None # don't pickle the debug callback
176 1869 aaronmk
        state['_DbConn__db'] = None # don't pickle the connection
177
        return state
178
179 2165 aaronmk
    def connected(self): return self.__db != None
180
181 1869 aaronmk
    def _db(self):
182
        if self.__db == None:
183
            # Process db_config
184
            db_config = self.db_config.copy() # don't modify input!
185 2097 aaronmk
            schemas = db_config.pop('schemas', None)
186 1869 aaronmk
            module_name, mappings = db_engines[db_config.pop('engine')]
187
            module = __import__(module_name)
188
            _add_module(module)
189
            for orig, new in mappings.iteritems():
190
                try: util.rename_key(db_config, orig, new)
191
                except KeyError: pass
192
193
            # Connect
194
            self.__db = module.connect(**db_config)
195
196
            # Configure connection
197 2234 aaronmk
            if self.serializable and not self.autocommit: run_raw_query(self,
198
                'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE')
199 2101 aaronmk
            if schemas != None:
200
                schemas_ = ''.join((esc_name(self, s)+', '
201
                    for s in schemas.split(',')))
202
                run_raw_query(self, "SELECT set_config('search_path', \
203
%s || current_setting('search_path'), false)", [schemas_])
204 1869 aaronmk
205
        return self.__db
206 1889 aaronmk
207 1891 aaronmk
    class DbCursor(Proxy):
208 1927 aaronmk
        def __init__(self, outer):
209 1891 aaronmk
            Proxy.__init__(self, outer.db.cursor())
210 2191 aaronmk
            self.outer = outer
211 1927 aaronmk
            self.query_results = outer.query_results
212 1894 aaronmk
            self.query_lookup = None
213 1891 aaronmk
            self.result = []
214 1889 aaronmk
215 1894 aaronmk
        def execute(self, query, params=None):
216 1930 aaronmk
            self._is_insert = query.upper().find('INSERT') >= 0
217 1894 aaronmk
            self.query_lookup = _query_lookup(query, params)
218 2148 aaronmk
            try:
219 2191 aaronmk
                try:
220
                    return_value = self.inner.execute(query, params)
221
                    self.outer.do_autocommit()
222 2148 aaronmk
                finally: self.query = get_cur_query(self.inner)
223 1904 aaronmk
            except Exception, e:
224 2170 aaronmk
                _add_cursor_info(e, self, query, params)
225 1904 aaronmk
                self.result = e # cache the exception as the result
226
                self._cache_result()
227
                raise
228 1930 aaronmk
            # Fetch all rows so result will be cached
229
            if self.rowcount == 0 and not self._is_insert: consume_rows(self)
230 1894 aaronmk
            return return_value
231
232 1891 aaronmk
        def fetchone(self):
233
            row = self.inner.fetchone()
234 1899 aaronmk
            if row != None: self.result.append(row)
235
            # otherwise, fetched all rows
236 1904 aaronmk
            else: self._cache_result()
237
            return row
238
239
        def _cache_result(self):
240 1906 aaronmk
            # For inserts, only cache exceptions since inserts are not
241
            # idempotent, but an invalid insert will always be invalid
242 1930 aaronmk
            if self.query_results != None and (not self._is_insert
243 1906 aaronmk
                or isinstance(self.result, Exception)):
244
245 1894 aaronmk
                assert self.query_lookup != None
246 1916 aaronmk
                self.query_results[self.query_lookup] = self.CacheCursor(
247
                    util.dict_subset(dicts.AttrsDictView(self),
248
                    ['query', 'result', 'rowcount', 'description']))
249 1906 aaronmk
250 1916 aaronmk
        class CacheCursor:
251
            def __init__(self, cached_result): self.__dict__ = cached_result
252
253 1927 aaronmk
            def execute(self, *args, **kw_args):
254 1916 aaronmk
                if isinstance(self.result, Exception): raise self.result
255
                # otherwise, result is a rows list
256
                self.iter = iter(self.result)
257
258
            def fetchone(self):
259
                try: return self.iter.next()
260
                except StopIteration: return None
261 1891 aaronmk
262 2212 aaronmk
    def esc_value(self, value):
263 2215 aaronmk
        module = util.root_module(self.db)
264 2374 aaronmk
        if module == 'psycopg2': str_ = self.db.cursor().mogrify('%s', [value])
265 2212 aaronmk
        elif module == 'MySQLdb':
266
            import _mysql
267 2374 aaronmk
            str_ = _mysql.escape_string(value)
268 2212 aaronmk
        else: raise NotImplementedError("Can't escape value for "+module
269
            +' database')
270 2374 aaronmk
        return strings.to_unicode(str_)
271 2212 aaronmk
272 2347 aaronmk
    def esc_name(self, name): return esc_name(self, name) # calls global func
273
274 2445 aaronmk
    def run_query(self, query, params=None, cacheable=False, log_level=2,
275 2464 aaronmk
        debug_msg_ref=None):
276 2445 aaronmk
        '''
277 2464 aaronmk
        @param log_ignore_excs The log_level will be increased by 2 if the query
278
            throws one of these exceptions.
279 2445 aaronmk
        '''
280 2167 aaronmk
        assert query != None
281
282 2047 aaronmk
        if not self.caching: cacheable = False
283 1903 aaronmk
        used_cache = False
284
        try:
285 1927 aaronmk
            # Get cursor
286
            if cacheable:
287
                query_lookup = _query_lookup(query, params)
288
                try:
289
                    cur = self.query_results[query_lookup]
290
                    used_cache = True
291
                except KeyError: cur = self.DbCursor(self)
292
            else: cur = self.db.cursor()
293
294
            # Run query
295 2148 aaronmk
            cur.execute(query, params)
296 1903 aaronmk
        finally:
297 2464 aaronmk
            if self.debug and debug_msg_ref != None:# only compute msg if needed
298 2470 aaronmk
                if used_cache: cache_status = 'cache hit'
299
                elif cacheable: cache_status = 'cache miss'
300
                else: cache_status = 'non-cacheable'
301 2472 aaronmk
                query_code = strings.as_code(str(get_cur_query(cur, query,
302
                    params)), 'SQL')
303
                debug_msg_ref[0] = 'DB query: '+cache_status+':\n'+query_code
304 1903 aaronmk
305
        return cur
306 1914 aaronmk
307
    def is_cached(self, query, params=None):
308
        return _query_lookup(query, params) in self.query_results
309 2139 aaronmk
310
    def with_savepoint(self, func):
311 2171 aaronmk
        savepoint = 'level_'+str(self._savepoint)
312 2443 aaronmk
        self.run_query('SAVEPOINT '+savepoint, log_level=4)
313 2139 aaronmk
        self._savepoint += 1
314
        try:
315
            try: return_val = func()
316
            finally:
317
                self._savepoint -= 1
318
                assert self._savepoint >= 0
319
        except:
320 2443 aaronmk
            self.run_query('ROLLBACK TO SAVEPOINT '+savepoint, log_level=4)
321 2139 aaronmk
            raise
322
        else:
323 2443 aaronmk
            self.run_query('RELEASE SAVEPOINT '+savepoint, log_level=4)
324 2191 aaronmk
            self.do_autocommit()
325 2139 aaronmk
            return return_val
326 2191 aaronmk
327
    def do_autocommit(self):
328
        '''Autocommits if outside savepoint'''
329
        assert self._savepoint >= 0
330
        if self.autocommit and self._savepoint == 0:
331
            self.log_debug('Autocommiting')
332
            self.db.commit()
333 1849 aaronmk
334 1869 aaronmk
connect = DbConn
335
336 832 aaronmk
##### Querying
337
338 1894 aaronmk
def run_raw_query(db, *args, **kw_args):
339 2085 aaronmk
    '''For params, see DbConn.run_query()'''
340 1894 aaronmk
    return db.run_query(*args, **kw_args)
341 11 aaronmk
342 2068 aaronmk
def mogrify(db, query, params):
343
    module = util.root_module(db.db)
344
    if module == 'psycopg2': return db.db.cursor().mogrify(query, params)
345
    else: raise NotImplementedError("Can't mogrify query for "+module+
346
        ' database')
347
348 832 aaronmk
##### Recoverable querying
349 15 aaronmk
350 2139 aaronmk
def with_savepoint(db, func): return db.with_savepoint(func)
351 11 aaronmk
352 2464 aaronmk
def run_query(db, query, params=None, recover=None, cacheable=False,
353
    log_level=2, log_ignore_excs=None, **kw_args):
354 2441 aaronmk
    '''For params, see run_raw_query()'''
355 830 aaronmk
    if recover == None: recover = False
356 2464 aaronmk
    if log_ignore_excs == None: log_ignore_excs = ()
357
    log_ignore_excs = tuple(log_ignore_excs)
358 830 aaronmk
359 2464 aaronmk
    debug_msg_ref = [None]
360 2148 aaronmk
    try:
361 2464 aaronmk
        try:
362
            def run(): return run_raw_query(db, query, params, cacheable,
363
                log_level, debug_msg_ref, **kw_args)
364
            if recover and not db.is_cached(query, params):
365
                return with_savepoint(db, run)
366
            else: return run() # don't need savepoint if cached
367
        except Exception, e:
368
            if not recover: raise # need savepoint to run index_cols()
369
            msg = exc.str_(e)
370
371
            match = re.search(r'duplicate key value violates unique constraint '
372 2493 aaronmk
                r'"((_?[^\W_]+)_.+?)"', msg)
373 2464 aaronmk
            if match:
374
                constraint, table = match.groups()
375
                try: cols = index_cols(db, table, constraint)
376
                except NotImplementedError: raise e
377
                else: raise DuplicateKeyException(constraint, cols, e)
378
379 2493 aaronmk
            match = re.search(r'null value in column "(.+?)" violates not-null'
380 2464 aaronmk
                r' constraint', msg)
381
            if match: raise NullValueException('NOT NULL', [match.group(1)], e)
382
383
            match = re.search(r'\b(?:invalid input (?:syntax|value)\b.*?'
384
                r'|date/time field value out of range): "(.+?)"\n'
385 2493 aaronmk
                r'(?:(?s).*?)\bfunction "(.+?)".*?\bat assignment', msg)
386 2464 aaronmk
            if match:
387
                value, name = match.groups()
388
                raise FunctionValueException(name, strings.to_unicode(value), e)
389
390 2493 aaronmk
            match = re.search(r'relation "(.+?)" already exists', msg)
391 2464 aaronmk
            if match: raise DuplicateTableException(match.group(1), e)
392
393 2493 aaronmk
            match = re.search(r'function "(.+?)" already exists', msg)
394 2464 aaronmk
            if match: raise DuplicateFunctionException(match.group(1), e)
395
396
            raise # no specific exception raised
397
    except log_ignore_excs:
398
        log_level += 2
399
        raise
400
    finally:
401
        if debug_msg_ref[0] != None: db.log_debug(debug_msg_ref[0], log_level)
402 830 aaronmk
403 832 aaronmk
##### Basic queries
404
405 2153 aaronmk
def next_version(name):
406
    '''Prepends the version # so it won't be removed if the name is truncated'''
407 2163 aaronmk
    version = 1 # first existing name was version 0
408 2498 aaronmk
    match = re.match(r'^#(\d+)-(.*)$', name)
409 2153 aaronmk
    if match:
410
        version = int(match.group(1))+1
411
        name = match.group(2)
412 2498 aaronmk
    return '#'+str(version)+'-'+name
413 2153 aaronmk
414 2386 aaronmk
def run_query_into(db, query, params, into=None, *args, **kw_args):
415 2085 aaronmk
    '''Outputs a query to a temp table.
416
    For params, see run_query().
417
    '''
418 2386 aaronmk
    if into == None: return run_query(db, query, params, *args, **kw_args)
419 2085 aaronmk
    else: # place rows in temp table
420 2386 aaronmk
        assert isinstance(into, sql_gen.Table)
421 2385 aaronmk
422 2153 aaronmk
        kw_args['recover'] = True
423 2464 aaronmk
        kw_args.setdefault('log_ignore_excs', (DuplicateTableException,))
424 2440 aaronmk
425 2468 aaronmk
        temp = not db.autocommit # tables are permanent in autocommit mode
426 2440 aaronmk
        # "temporary tables cannot specify a schema name", so remove schema
427
        if temp: into.schema = None
428
429 2153 aaronmk
        while True:
430
            try:
431 2194 aaronmk
                create_query = 'CREATE'
432 2440 aaronmk
                if temp: create_query += ' TEMP'
433 2467 aaronmk
                create_query += ' TABLE '+into.to_str(db)+' AS\n'+query
434 2194 aaronmk
435
                return run_query(db, create_query, params, *args, **kw_args)
436 2153 aaronmk
                    # CREATE TABLE AS sets rowcount to # rows in query
437
            except DuplicateTableException, e:
438 2386 aaronmk
                into.name = next_version(into.name)
439 2153 aaronmk
                # try again with next version of name
440 2085 aaronmk
441 2120 aaronmk
order_by_pkey = object() # tells mk_select() to order by the pkey
442
443 2199 aaronmk
distinct_on_all = object() # tells mk_select() to SELECT DISTINCT ON all columns
444
445 2233 aaronmk
def mk_select(db, tables, fields=None, conds=None, distinct_on=[], limit=None,
446 2293 aaronmk
    start=None, order_by=order_by_pkey, default_table=None):
447 1981 aaronmk
    '''
448 2121 aaronmk
    @param tables The single table to select from, or a list of tables to join
449 2280 aaronmk
        together, with tables after the first being sql_gen.Join objects
450 1981 aaronmk
    @param fields Use None to select all fields in the table
451 2377 aaronmk
    @param conds WHERE conditions: [(compare_left_side, compare_right_side),...]
452 2379 aaronmk
        * container can be any iterable type
453 2399 aaronmk
        * compare_left_side: sql_gen.Code|str (for col name)
454
        * compare_right_side: sql_gen.ValueCond|literal value
455 2199 aaronmk
    @param distinct_on The columns to SELECT DISTINCT ON, or distinct_on_all to
456
        use all columns
457 2054 aaronmk
    @return tuple(query, params)
458 1981 aaronmk
    '''
459 2315 aaronmk
    # Parse tables param
460 2121 aaronmk
    if not lists.is_seq(tables): tables = [tables]
461 2141 aaronmk
    tables = list(tables) # don't modify input! (list() copies input)
462 2315 aaronmk
    table0 = sql_gen.as_Table(tables.pop(0)) # first table is separate
463 2121 aaronmk
464 2315 aaronmk
    # Parse other params
465 2376 aaronmk
    if conds == None: conds = []
466
    elif isinstance(conds, dict): conds = conds.items()
467 2379 aaronmk
    conds = list(conds) # don't modify input! (list() copies input)
468 135 aaronmk
    assert limit == None or type(limit) == int
469 865 aaronmk
    assert start == None or type(start) == int
470 2315 aaronmk
    if order_by is order_by_pkey:
471
        if distinct_on != []: order_by = None
472
        else: order_by = pkey(db, table0, recover=True)
473 865 aaronmk
474 2315 aaronmk
    query = 'SELECT'
475 2056 aaronmk
476 2315 aaronmk
    def parse_col(col): return sql_gen.as_Col(col, default_table).to_str(db)
477 2056 aaronmk
478 2200 aaronmk
    # DISTINCT ON columns
479 2233 aaronmk
    if distinct_on != []:
480 2467 aaronmk
        query += '\nDISTINCT'
481 2254 aaronmk
        if distinct_on is not distinct_on_all:
482 2200 aaronmk
            query += ' ON ('+(', '.join(map(parse_col, distinct_on)))+')'
483
484
    # Columns
485 2467 aaronmk
    query += '\n'
486 1135 aaronmk
    if fields == None: query += '*'
487 2479 aaronmk
    else: query += '\n, '.join(map(parse_col, fields))
488 2200 aaronmk
489
    # Main table
490 2467 aaronmk
    query += '\nFROM '+table0.to_str(db)
491 865 aaronmk
492 2122 aaronmk
    # Add joins
493 2271 aaronmk
    left_table = table0
494 2263 aaronmk
    for join_ in tables:
495
        table = join_.table
496 2238 aaronmk
497 2343 aaronmk
        # Parse special values
498
        if join_.type_ is sql_gen.filter_out: # filter no match
499 2376 aaronmk
            conds.append((sql_gen.Col(table_not_null_col(db, table), table),
500
                None))
501 2343 aaronmk
502 2467 aaronmk
        query += '\n'+join_.to_str(db, left_table)
503 2122 aaronmk
504
        left_table = table
505
506 865 aaronmk
    missing = True
507 2376 aaronmk
    if conds != []:
508 2467 aaronmk
        query += '\nWHERE\n'+('\nAND\n'.join(('('+sql_gen.ColValueCond(l, r)
509 2410 aaronmk
            .to_str(db)+')' for l, r in conds)))
510 865 aaronmk
        missing = False
511 2227 aaronmk
    if order_by != None:
512 2467 aaronmk
        query += '\nORDER BY '+sql_gen.as_Col(order_by, table0).to_str(db)
513
    if limit != None: query += '\nLIMIT '+str(limit); missing = False
514 865 aaronmk
    if start != None:
515 2467 aaronmk
        if start != 0: query += '\nOFFSET '+str(start)
516 865 aaronmk
        missing = False
517
    if missing: warnings.warn(DbWarning(
518
        'SELECT statement missing a WHERE, LIMIT, or OFFSET clause: '+query))
519
520 2315 aaronmk
    return (query, [])
521 11 aaronmk
522 2054 aaronmk
def select(db, *args, **kw_args):
523
    '''For params, see mk_select() and run_query()'''
524
    recover = kw_args.pop('recover', None)
525
    cacheable = kw_args.pop('cacheable', True)
526 2442 aaronmk
    log_level = kw_args.pop('log_level', 2)
527 2054 aaronmk
528
    query, params = mk_select(db, *args, **kw_args)
529 2442 aaronmk
    return run_query(db, query, params, recover, cacheable, log_level=log_level)
530 2054 aaronmk
531 2066 aaronmk
def mk_insert_select(db, table, cols=None, select_query=None, params=None,
532 2292 aaronmk
    returning=None, embeddable=False):
533 1960 aaronmk
    '''
534
    @param returning str|None An inserted column (such as pkey) to return
535 2070 aaronmk
    @param embeddable Whether the query should be embeddable as a nested SELECT.
536 2073 aaronmk
        Warning: If you set this and cacheable=True when the query is run, the
537
        query will be fully cached, not just if it raises an exception.
538 1960 aaronmk
    '''
539 2328 aaronmk
    table = sql_gen.as_Table(table)
540 2318 aaronmk
    if cols == []: cols = None # no cols (all defaults) = unknown col names
541
    if cols != None: cols = [sql_gen.as_Col(v).to_str(db) for v in cols]
542 2063 aaronmk
    if select_query == None: select_query = 'DEFAULT VALUES'
543 2327 aaronmk
    if returning != None: returning = sql_gen.as_Col(returning, table)
544 2063 aaronmk
545
    # Build query
546 2497 aaronmk
    first_line = 'INSERT INTO '+table.to_str(db)
547
    query = first_line
548 2467 aaronmk
    if cols != None: query += '\n('+', '.join(cols)+')'
549
    query += '\n'+select_query
550 2063 aaronmk
551
    if returning != None:
552 2327 aaronmk
        returning_name = copy.copy(returning)
553
        returning_name.table = None
554
        returning_name = returning_name.to_str(db)
555 2467 aaronmk
        query += '\nRETURNING '+returning_name
556 2063 aaronmk
557 2070 aaronmk
    if embeddable:
558 2327 aaronmk
        assert returning != None
559
560 2070 aaronmk
        # Create function
561 2513 aaronmk
        function_name = sql_gen.clean_name(first_line)
562 2327 aaronmk
        return_type = 'SETOF '+returning.to_str(db)+'%TYPE'
563 2189 aaronmk
        while True:
564
            try:
565 2327 aaronmk
                func_schema = None
566 2468 aaronmk
                if not db.autocommit: func_schema = 'pg_temp'
567 2327 aaronmk
                function = sql_gen.Table(function_name, func_schema).to_str(db)
568 2194 aaronmk
569 2189 aaronmk
                function_query = '''\
570 2467 aaronmk
CREATE FUNCTION '''+function+'''()
571
RETURNS '''+return_type+'''
572
LANGUAGE sql
573
AS $$
574
'''+mogrify(db, query, params)+''';
575
$$;
576 2070 aaronmk
'''
577 2446 aaronmk
                run_query(db, function_query, recover=True, cacheable=True,
578 2464 aaronmk
                    log_ignore_excs=(DuplicateFunctionException,))
579 2189 aaronmk
                break # this version was successful
580
            except DuplicateFunctionException, e:
581
                function_name = next_version(function_name)
582
                # try again with next version of name
583 2070 aaronmk
584 2337 aaronmk
        # Return query that uses function
585
        func_table = sql_gen.NamedTable('f', sql_gen.CustomCode(function+'()'),
586
            [returning_name]) # AS clause requires function alias
587
        return mk_select(db, func_table, start=0, order_by=None)
588 2070 aaronmk
589 2066 aaronmk
    return (query, params)
590
591
def insert_select(db, *args, **kw_args):
592 2085 aaronmk
    '''For params, see mk_insert_select() and run_query_into()
593 2386 aaronmk
    @param into sql_gen.Table with suggested name of temp table to put RETURNING
594
        values in
595 2072 aaronmk
    '''
596 2386 aaronmk
    into = kw_args.pop('into', None)
597
    if into != None: kw_args['embeddable'] = True
598 2066 aaronmk
    recover = kw_args.pop('recover', None)
599
    cacheable = kw_args.pop('cacheable', True)
600
601
    query, params = mk_insert_select(db, *args, **kw_args)
602 2386 aaronmk
    return run_query_into(db, query, params, into, recover=recover,
603 2153 aaronmk
        cacheable=cacheable)
604 2063 aaronmk
605 2066 aaronmk
default = object() # tells insert() to use the default value for a column
606
607 2063 aaronmk
def insert(db, table, row, *args, **kw_args):
608 2085 aaronmk
    '''For params, see insert_select()'''
609 1960 aaronmk
    if lists.is_seq(row): cols = None
610
    else:
611
        cols = row.keys()
612
        row = row.values()
613
    row = list(row) # ensure that "!= []" works
614
615 1961 aaronmk
    # Check for special values
616
    labels = []
617
    values = []
618
    for value in row:
619 2254 aaronmk
        if value is default: labels.append('DEFAULT')
620 1961 aaronmk
        else:
621
            labels.append('%s')
622
            values.append(value)
623
624
    # Build query
625 2467 aaronmk
    if values != []: query = 'VALUES ('+(', '.join(labels))+')'
626 2063 aaronmk
    else: query = None
627 1554 aaronmk
628 2064 aaronmk
    return insert_select(db, table, cols, query, values, *args, **kw_args)
629 11 aaronmk
630 2402 aaronmk
def mk_update(db, table, changes=None, cond=None):
631
    '''
632
    @param changes [(col, new_value),...]
633
        * container can be any iterable type
634
        * col: sql_gen.Code|str (for col name)
635
        * new_value: sql_gen.Code|literal value
636
    @param cond sql_gen.Code WHERE condition. e.g. use sql_gen.*Cond objects.
637
    @return str query
638
    '''
639
    query = 'UPDATE '+sql_gen.as_Table(table).to_str(db)+'\nSET\n'
640 2405 aaronmk
    query += ',\n'.join((sql_gen.to_name_only_col(col, table).to_str(db)+' = '
641 2402 aaronmk
        +sql_gen.as_Value(new_value).to_str(db) for col, new_value in changes))
642 2467 aaronmk
    if cond != None: query += '\nWHERE\n'+cond.to_str(db)
643 2402 aaronmk
644
    return query
645
646
def update(db, *args, **kw_args):
647
    '''For params, see mk_update() and run_query()'''
648
    recover = kw_args.pop('recover', None)
649
650
    return run_query(db, mk_update(db, *args, **kw_args), [], recover)
651
652 135 aaronmk
def last_insert_id(db):
653 1849 aaronmk
    module = util.root_module(db.db)
654 135 aaronmk
    if module == 'psycopg2': return value(run_query(db, 'SELECT lastval()'))
655
    elif module == 'MySQLdb': return db.insert_id()
656
    else: return None
657 13 aaronmk
658 1968 aaronmk
def truncate(db, table, schema='public'):
659
    return run_query(db, 'TRUNCATE '+qual_name(db, schema, table)+' CASCADE')
660 832 aaronmk
661 2394 aaronmk
def mk_flatten_mapping(db, into, cols, preserve=[], as_items=False):
662 2383 aaronmk
    '''Creates a mapping from original column names (which may have collisions)
663 2415 aaronmk
    to names that will be distinct among the columns' tables.
664 2383 aaronmk
    This is meant to be used for several tables that are being joined together.
665 2415 aaronmk
    @param cols The columns to combine. Duplicates will be removed.
666
    @param into The table for the new columns.
667 2394 aaronmk
    @param preserve [sql_gen.Col...] Columns not to rename. Note that these
668
        columns will be included in the mapping even if they are not in cols.
669
        The tables of the provided Col objects will be changed to into, so make
670
        copies of them if you want to keep the original tables.
671
    @param as_items Whether to return a list of dict items instead of a dict
672 2383 aaronmk
    @return dict(orig_col=new_col, ...)
673
        * orig_col: sql_gen.Col(orig_col_name, orig_table)
674 2392 aaronmk
        * new_col: sql_gen.Col(orig_col_name, into)
675
        * All mappings use the into table so its name can easily be
676 2383 aaronmk
          changed for all columns at once
677
    '''
678 2415 aaronmk
    cols = lists.uniqify(cols)
679
680 2394 aaronmk
    items = []
681 2389 aaronmk
    for col in preserve:
682 2390 aaronmk
        orig_col = copy.copy(col)
683 2392 aaronmk
        col.table = into
684 2394 aaronmk
        items.append((orig_col, col))
685
    preserve = set(preserve)
686
    for col in cols:
687 2515 aaronmk
        if col not in preserve: items.append((col, sql_gen.Col(str(col), into)))
688 2394 aaronmk
689
    if not as_items: items = dict(items)
690
    return items
691 2383 aaronmk
692 2393 aaronmk
def flatten(db, into, joins, cols, limit=None, start=None, **kw_args):
693 2391 aaronmk
    '''For params, see mk_flatten_mapping()
694
    @return See return value of mk_flatten_mapping()
695
    '''
696 2394 aaronmk
    items = mk_flatten_mapping(db, into, cols, as_items=True, **kw_args)
697
    cols = [sql_gen.NamedCol(new.name, old) for old, new in items]
698 2391 aaronmk
    run_query_into(db, *mk_select(db, joins, cols, limit=limit, start=start),
699 2392 aaronmk
        into=into)
700 2394 aaronmk
    return dict(items)
701 2391 aaronmk
702 2414 aaronmk
##### Database structure queries
703
704 2426 aaronmk
def table_row_count(db, table, recover=None):
705
    return value(run_query(db, *mk_select(db, table, [sql_gen.row_count],
706 2443 aaronmk
        order_by=None, start=0), recover=recover, log_level=3))
707 2426 aaronmk
708 2414 aaronmk
def table_cols(db, table, recover=None):
709
    return list(col_names(select(db, table, limit=0, order_by=None,
710 2443 aaronmk
        recover=recover, log_level=4)))
711 2414 aaronmk
712 2291 aaronmk
def pkey(db, table, recover=None):
713 832 aaronmk
    '''Assumed to be first column in table'''
714 2339 aaronmk
    return table_cols(db, table, recover)[0]
715 832 aaronmk
716 2340 aaronmk
not_null_col = 'not_null'
717
718
def table_not_null_col(db, table, recover=None):
719
    '''Name assumed to be the value of not_null_col. If not found, uses pkey.'''
720
    if not_null_col in table_cols(db, table, recover): return not_null_col
721
    else: return pkey(db, table, recover)
722
723 853 aaronmk
def index_cols(db, table, index):
724
    '''Can also use this for UNIQUE constraints, because a UNIQUE index is
725
    automatically created. When you don't know whether something is a UNIQUE
726
    constraint or a UNIQUE index, use this function.'''
727 1909 aaronmk
    module = util.root_module(db.db)
728
    if module == 'psycopg2':
729
        return list(values(run_query(db, '''\
730 853 aaronmk
SELECT attname
731 866 aaronmk
FROM
732
(
733
        SELECT attnum, attname
734
        FROM pg_index
735
        JOIN pg_class index ON index.oid = indexrelid
736
        JOIN pg_class table_ ON table_.oid = indrelid
737
        JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY (indkey)
738
        WHERE
739
            table_.relname = %(table)s
740
            AND index.relname = %(index)s
741
    UNION
742
        SELECT attnum, attname
743
        FROM
744
        (
745
            SELECT
746
                indrelid
747
                , (regexp_matches(indexprs, E':varattno (\\\\d+)', 'g'))[1]::int
748
                    AS indkey
749
            FROM pg_index
750
            JOIN pg_class index ON index.oid = indexrelid
751
            JOIN pg_class table_ ON table_.oid = indrelid
752
            WHERE
753
                table_.relname = %(table)s
754
                AND index.relname = %(index)s
755
        ) s
756
        JOIN pg_attribute ON attrelid = indrelid AND attnum = indkey
757
) s
758 853 aaronmk
ORDER BY attnum
759
''',
760 2443 aaronmk
            {'table': table, 'index': index}, cacheable=True, log_level=4)))
761 1909 aaronmk
    else: raise NotImplementedError("Can't list index columns for "+module+
762
        ' database')
763 853 aaronmk
764 464 aaronmk
def constraint_cols(db, table, constraint):
765 1849 aaronmk
    module = util.root_module(db.db)
766 464 aaronmk
    if module == 'psycopg2':
767
        return list(values(run_query(db, '''\
768
SELECT attname
769
FROM pg_constraint
770
JOIN pg_class ON pg_class.oid = conrelid
771
JOIN pg_attribute ON attrelid = conrelid AND attnum = ANY (conkey)
772
WHERE
773
    relname = %(table)s
774
    AND conname = %(constraint)s
775
ORDER BY attnum
776
''',
777
            {'table': table, 'constraint': constraint})))
778
    else: raise NotImplementedError("Can't list constraint columns for "+module+
779
        ' database')
780
781 2096 aaronmk
row_num_col = '_row_num'
782
783 2408 aaronmk
def index_col(db, col):
784
    '''Adds an index on a column if it doesn't already exist.'''
785
    assert sql_gen.is_table_col(col)
786
787
    table = col.table
788 2515 aaronmk
    index = sql_gen.as_Table(str(col))
789 2408 aaronmk
    col = sql_gen.to_name_only_col(col)
790
    try: run_query(db, 'CREATE INDEX '+index.to_str(db)+' ON '+table.to_str(db)
791 2443 aaronmk
        +' ('+col.to_str(db)+')', recover=True, cacheable=True, log_level=3)
792 2408 aaronmk
    except DuplicateTableException: pass # index already existed
793
794 2406 aaronmk
def index_pkey(db, table, recover=None):
795
    '''Makes the first column in a table the primary key.
796
    @pre The table must not already have a primary key.
797
    '''
798
    table = sql_gen.as_Table(table)
799
800
    index = sql_gen.as_Table(table.name+'_pkey')
801
    col = sql_gen.to_name_only_col(pkey(db, table, recover))
802
    run_query(db, 'ALTER TABLE '+table.to_str(db)+' ADD CONSTRAINT '
803 2443 aaronmk
        +index.to_str(db)+' PRIMARY KEY('+col.to_str(db)+')', recover=recover,
804
        log_level=3)
805 2406 aaronmk
806 2086 aaronmk
def add_row_num(db, table):
807 2117 aaronmk
    '''Adds a row number column to a table. Its name is in row_num_col. It will
808
    be the primary key.'''
809 2320 aaronmk
    table = sql_gen.as_Table(table).to_str(db)
810 2096 aaronmk
    run_query(db, 'ALTER TABLE '+table+' ADD COLUMN '+row_num_col
811 2443 aaronmk
        +' serial NOT NULL PRIMARY KEY', log_level=3)
812 2086 aaronmk
813 1968 aaronmk
def tables(db, schema='public', table_like='%'):
814 1849 aaronmk
    module = util.root_module(db.db)
815 1968 aaronmk
    params = {'schema': schema, 'table_like': table_like}
816 832 aaronmk
    if module == 'psycopg2':
817 1968 aaronmk
        return values(run_query(db, '''\
818
SELECT tablename
819
FROM pg_tables
820
WHERE
821
    schemaname = %(schema)s
822
    AND tablename LIKE %(table_like)s
823
ORDER BY tablename
824
''',
825
            params, cacheable=True))
826
    elif module == 'MySQLdb':
827
        return values(run_query(db, 'SHOW TABLES LIKE %(table_like)s', params,
828
            cacheable=True))
829 832 aaronmk
    else: raise NotImplementedError("Can't list tables for "+module+' database')
830 830 aaronmk
831 833 aaronmk
##### Database management
832
833 1968 aaronmk
def empty_db(db, schema='public', **kw_args):
834
    '''For kw_args, see tables()'''
835
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
836 833 aaronmk
837 832 aaronmk
##### Heuristic queries
838
839 2104 aaronmk
def put(db, table, row, pkey_=None, row_ct_ref=None):
840 1554 aaronmk
    '''Recovers from errors.
841 2077 aaronmk
    Only works under PostgreSQL (uses INSERT RETURNING).
842
    '''
843 2104 aaronmk
    if pkey_ == None: pkey_ = pkey(db, table, recover=True)
844
845 471 aaronmk
    try:
846 2149 aaronmk
        cur = insert(db, table, row, pkey_, recover=True)
847 1554 aaronmk
        if row_ct_ref != None and cur.rowcount >= 0:
848
            row_ct_ref[0] += cur.rowcount
849
        return value(cur)
850 471 aaronmk
    except DuplicateKeyException, e:
851 2104 aaronmk
        return value(select(db, table, [pkey_],
852 1069 aaronmk
            util.dict_subset_right_join(row, e.cols), recover=True))
853 471 aaronmk
854 473 aaronmk
def get(db, table, row, pkey, row_ct_ref=None, create=False):
855 830 aaronmk
    '''Recovers from errors'''
856 2209 aaronmk
    try: return value(select(db, table, [pkey], row, limit=1, recover=True))
857 14 aaronmk
    except StopIteration:
858 40 aaronmk
        if not create: raise
859 471 aaronmk
        return put(db, table, row, pkey, row_ct_ref) # insert new row
860 2078 aaronmk
861 2508 aaronmk
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None,
862
    default=None):
863 2078 aaronmk
    '''Recovers from errors.
864
    Only works under PostgreSQL (uses INSERT RETURNING).
865 2131 aaronmk
    @param in_tables The main input table to select from, followed by a list of
866
        tables to join with it using the main input table's pkey
867 2312 aaronmk
    @param mapping dict(out_table_col=in_table_col, ...)
868
        * out_table_col: sql_gen.Col|str
869 2323 aaronmk
        * in_table_col: sql_gen.Col Wrap literal values in a sql_gen.NamedCol
870 2489 aaronmk
    @param into The table to contain the output and input pkeys.
871 2495 aaronmk
        Defaults to `out_table.name+'-pkeys'`.
872 2509 aaronmk
    @param default The *output* column to use as the pkey for missing rows.
873
        If this output column does not exist in the mapping, uses None.
874 2312 aaronmk
    @return sql_gen.Col Where the output pkeys are made available
875 2078 aaronmk
    '''
876 2329 aaronmk
    out_table = sql_gen.as_Table(out_table)
877 2312 aaronmk
    for in_table_col in mapping.itervalues():
878
        assert isinstance(in_table_col, sql_gen.Col)
879 2495 aaronmk
    if into == None: into = out_table.name+'-pkeys'
880 2489 aaronmk
    into = sql_gen.as_Table(into)
881 2312 aaronmk
882 2450 aaronmk
    def log_debug(msg): db.log_debug(msg, level=1.5)
883 2505 aaronmk
    def col_ustr(str_):
884 2514 aaronmk
        return strings.repr_no_u(sql_gen.remove_col_rename(
885
            sql_gen.as_Col(str_)))
886 2450 aaronmk
887 2486 aaronmk
    log_debug('********** New iteration **********')
888 2505 aaronmk
    log_debug('Inserting these input columns into '+strings.as_tt(
889
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
890 2463 aaronmk
891 2382 aaronmk
    # Create input joins from list of input tables
892
    in_tables_ = in_tables[:] # don't modify input!
893
    in_tables0 = in_tables_.pop(0) # first table is separate
894 2279 aaronmk
    in_pkey = pkey(db, in_tables0, recover=True)
895 2285 aaronmk
    in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
896 2460 aaronmk
    input_joins = [in_tables0]+[sql_gen.Join(v,
897
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
898 2131 aaronmk
899 2486 aaronmk
    log_debug('Joining together input tables into temp table')
900 2395 aaronmk
    # Place in new table for speed and so don't modify input if values edited
901 2495 aaronmk
    in_table = sql_gen.Table(into.name+'-input')
902 2395 aaronmk
    flatten_cols = filter(sql_gen.is_table_col, mapping.values())
903
    mapping = dicts.join(mapping, flatten(db, in_table, input_joins,
904
        flatten_cols, preserve=[in_pkey_col], start=0))
905
    input_joins = [in_table]
906 2486 aaronmk
    db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
907 2395 aaronmk
908 2509 aaronmk
    # Resolve default value column
909
    try: default = mapping[default]
910
    except KeyError:
911
        if default != None:
912
            db.log_debug('Default value column '
913
                +strings.as_tt(strings.repr_no_u(default))
914 2511 aaronmk
                +' does not exist in mapping, falling back to None', level=2.1)
915 2509 aaronmk
            default = None
916
917 2279 aaronmk
    out_pkey = pkey(db, out_table, recover=True)
918 2285 aaronmk
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
919 2142 aaronmk
920 2387 aaronmk
    pkeys_names = [in_pkey, out_pkey]
921 2236 aaronmk
    pkeys_cols = [in_pkey_col, out_pkey_col]
922
923 2201 aaronmk
    pkeys_table_exists_ref = [False]
924 2420 aaronmk
    def insert_into_pkeys(joins, cols):
925
        query, params = mk_select(db, joins, cols, order_by=None, start=0)
926 2201 aaronmk
        if pkeys_table_exists_ref[0]:
927 2489 aaronmk
            insert_select(db, into, pkeys_names, query, params)
928 2201 aaronmk
        else:
929 2489 aaronmk
            run_query_into(db, query, params, into=into)
930 2201 aaronmk
            pkeys_table_exists_ref[0] = True
931
932 2429 aaronmk
    limit_ref = [None]
933 2380 aaronmk
    conds = set()
934 2233 aaronmk
    distinct_on = []
935 2325 aaronmk
    def mk_main_select(joins, cols):
936 2429 aaronmk
        return mk_select(db, joins, cols, conds, distinct_on,
937
            limit=limit_ref[0], start=0)
938 2132 aaronmk
939 2517 aaronmk
    exceptions = set()
940 2309 aaronmk
    def log_exc(e):
941 2450 aaronmk
        log_debug('Caught exception: '+exc.str_(e, first_line_only=True))
942 2517 aaronmk
        assert e not in exceptions # avoid infinite loops
943
        exceptions.add(e)
944 2451 aaronmk
    def remove_all_rows():
945 2450 aaronmk
        log_debug('Returning NULL for all rows')
946 2429 aaronmk
        limit_ref[0] = 0 # just create an empty pkeys table
947 2409 aaronmk
    def ignore(in_col, value):
948 2514 aaronmk
        in_col_str = repr(in_col)
949 2450 aaronmk
        log_debug('Adding index on '+in_col_str+' to enable fast filtering')
950 2409 aaronmk
        index_col(db, in_col)
951 2450 aaronmk
        log_debug('Ignoring rows with '+in_col_str+' = '+repr(value))
952 2403 aaronmk
    def remove_rows(in_col, value):
953 2409 aaronmk
        ignore(in_col, value)
954 2378 aaronmk
        cond = (in_col, sql_gen.CompareCond(value, '!='))
955
        assert cond not in conds # avoid infinite loops
956 2380 aaronmk
        conds.add(cond)
957 2403 aaronmk
    def invalid2null(in_col, value):
958 2409 aaronmk
        ignore(in_col, value)
959 2403 aaronmk
        update(db, in_table, [(in_col, None)],
960
            sql_gen.ColValueCond(in_col, value))
961 2245 aaronmk
962 2206 aaronmk
    # Do inserts and selects
963 2257 aaronmk
    join_cols = {}
964 2495 aaronmk
    insert_out_pkeys = sql_gen.Table(into.name+'-insert_out_pkeys')
965
    insert_in_pkeys = sql_gen.Table(into.name+'-insert_in_pkeys')
966 2206 aaronmk
    while True:
967 2303 aaronmk
        has_joins = join_cols != {}
968
969 2305 aaronmk
        # Prepare to insert new rows
970 2325 aaronmk
        insert_joins = input_joins[:] # don't modify original!
971 2403 aaronmk
        insert_args = dict(recover=True, cacheable=False)
972 2303 aaronmk
        if has_joins:
973 2317 aaronmk
            distinct_on = [v.to_Col() for v in join_cols.values()]
974 2325 aaronmk
            insert_joins.append(sql_gen.Join(out_table, join_cols,
975
                sql_gen.filter_out))
976
        else:
977 2404 aaronmk
            insert_args.update(dict(returning=out_pkey, into=insert_out_pkeys))
978 2303 aaronmk
979 2486 aaronmk
        log_debug('Trying to insert new rows')
980 2206 aaronmk
        try:
981
            cur = insert_select(db, out_table, mapping.keys(),
982 2325 aaronmk
                *mk_main_select(insert_joins, mapping.values()), **insert_args)
983 2357 aaronmk
            break # insert successful
984 2206 aaronmk
        except DuplicateKeyException, e:
985 2309 aaronmk
            log_exc(e)
986
987 2258 aaronmk
            old_join_cols = join_cols.copy()
988 2334 aaronmk
            join_cols.update(util.dict_subset(mapping, e.cols))
989 2486 aaronmk
            log_debug('Ignoring existing rows, comparing on these columns:\n'
990 2505 aaronmk
                +strings.as_inline_table(join_cols, ustr=col_ustr))
991 2258 aaronmk
            assert join_cols != old_join_cols # avoid infinite loops
992 2230 aaronmk
        except NullValueException, e:
993 2309 aaronmk
            log_exc(e)
994
995 2230 aaronmk
            out_col, = e.cols
996
            try: in_col = mapping[out_col]
997 2356 aaronmk
            except KeyError:
998 2486 aaronmk
                log_debug('Missing mapping for NOT NULL column '+out_col)
999 2451 aaronmk
                remove_all_rows()
1000 2403 aaronmk
            else: remove_rows(in_col, None)
1001 2243 aaronmk
        except FunctionValueException, e:
1002 2309 aaronmk
            log_exc(e)
1003
1004 2344 aaronmk
            assert e.name == out_table.name
1005 2243 aaronmk
            out_col = 'value' # assume function param was named "value"
1006 2403 aaronmk
            invalid2null(mapping[out_col], e.value)
1007 2429 aaronmk
        except DatabaseErrors, e:
1008
            log_exc(e)
1009
1010 2451 aaronmk
            msg = 'No handler for exception: '+exc.str_(e, first_line_only=True)
1011
            warnings.warn(DbWarning(msg))
1012
            log_debug(msg)
1013
            remove_all_rows()
1014 2358 aaronmk
        # after exception handled, rerun loop with additional constraints
1015 2132 aaronmk
1016 2357 aaronmk
    if row_ct_ref != None and cur.rowcount >= 0:
1017
        row_ct_ref[0] += cur.rowcount
1018
1019
    if has_joins:
1020
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
1021 2486 aaronmk
        log_debug('Getting output table pkeys of existing/inserted rows')
1022 2420 aaronmk
        insert_into_pkeys(select_joins, pkeys_cols)
1023 2357 aaronmk
    else:
1024 2404 aaronmk
        add_row_num(db, insert_out_pkeys) # for joining with input pkeys
1025 2357 aaronmk
1026 2486 aaronmk
        log_debug('Getting input table pkeys of inserted rows')
1027 2357 aaronmk
        run_query_into(db, *mk_main_select(input_joins, [in_pkey]),
1028 2404 aaronmk
            into=insert_in_pkeys)
1029
        add_row_num(db, insert_in_pkeys) # for joining with output pkeys
1030 2357 aaronmk
1031 2428 aaronmk
        assert table_row_count(db, insert_out_pkeys) == table_row_count(db,
1032
            insert_in_pkeys)
1033
1034 2486 aaronmk
        log_debug('Combining output and input pkeys in inserted order')
1035 2404 aaronmk
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
1036 2357 aaronmk
            {row_num_col: sql_gen.join_same_not_null})]
1037 2420 aaronmk
        insert_into_pkeys(pkey_joins, pkeys_names)
1038 2357 aaronmk
1039 2486 aaronmk
    db.log_debug('Adding pkey on pkeys table to enable fast joins', level=2.5)
1040 2489 aaronmk
    index_pkey(db, into)
1041 2407 aaronmk
1042 2508 aaronmk
    log_debug('Setting pkeys of missing rows to '+strings.as_tt(repr(default)))
1043 2489 aaronmk
    missing_rows_joins = input_joins+[sql_gen.Join(into,
1044 2357 aaronmk
        {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
1045
        # must use join_same_not_null or query will take forever
1046 2420 aaronmk
    insert_into_pkeys(missing_rows_joins,
1047 2508 aaronmk
        [in_pkey_col, sql_gen.NamedCol(out_pkey, default)])
1048 2357 aaronmk
1049 2489 aaronmk
    assert table_row_count(db, into) == table_row_count(db, in_table)
1050 2428 aaronmk
1051 2489 aaronmk
    return sql_gen.Col(out_pkey, into)
1052 2115 aaronmk
1053
##### Data cleanup
1054
1055 2290 aaronmk
def cleanup_table(db, table, cols):
1056 2115 aaronmk
    def esc_name_(name): return esc_name(db, name)
1057
1058 2290 aaronmk
    table = sql_gen.as_Table(table).to_str(db)
1059 2115 aaronmk
    cols = map(esc_name_, cols)
1060
1061
    run_query(db, 'UPDATE '+table+' SET\n'+(',\n'.join(('\n'+col
1062
        +' = nullif(nullif(trim(both from '+col+"), %(null0)s), %(null1)s)"
1063
            for col in cols))),
1064
        dict(null0='', null1=r'\N'))