Project

General

Profile

« Previous | Next » 

Revision 2206

sql.py: put_table(): Do inserts and selects in a loop so that it will keep retrying the operation with additional constraints until it succeeds

View differences:

sql.py
790 790
        return mk_select(db, insert_joins, cols, distinct_on=distinct_on,
791 791
            order_by=None, limit=limit, start=start, table_is_esc=table_is_esc)
792 792
    
793
    # Do inserts and selects
793 794
    out_pkeys_ref = ['out_pkeys_'+temp_suffix]
794
    def insert_(distinct_on=None):
795
        '''Inserts and capture output pkeys.'''
796
        cur = insert_select(db, out_table, mapping.keys(),
797
            *mk_select_(mapping.values(), distinct_on), returning=out_pkey,
798
            into_ref=out_pkeys_ref, recover=True, table_is_esc=table_is_esc)
799
        if row_ct_ref != None and cur.rowcount >= 0:
800
            row_ct_ref[0] += cur.rowcount
801
            add_row_num(db, out_pkeys_ref[0]) # for joining it with input pkeys
802
        
803
        # Get input pkeys corresponding to rows in insert
804
        in_pkeys_ref = ['in_pkeys_'+temp_suffix]
805
        run_query_into(db, *mk_select_([in_pkey]), into_ref=in_pkeys_ref)
806
        add_row_num(db, in_pkeys_ref[0]) # for joining it with output pkeys
807
        
808
        # Join together output and input pkeys
809
        run_query_into_pkeys(*mk_select(db, [in_pkeys_ref[0], (out_pkeys_ref[0],
810
            {row_num_col: join_using})], pkeys_cols, start=0))
795
    distinct_on = None
796
    while True:
797
        try:
798
            cur = insert_select(db, out_table, mapping.keys(),
799
                *mk_select_(mapping.values(), distinct_on), returning=out_pkey,
800
                into_ref=out_pkeys_ref, recover=True, table_is_esc=table_is_esc)
801
            if row_ct_ref != None and cur.rowcount >= 0:
802
                row_ct_ref[0] += cur.rowcount
803
                add_row_num(db, out_pkeys_ref[0]) # for joining with input pkeys
804
            
805
            # Get input pkeys corresponding to rows in insert
806
            in_pkeys_ref = ['in_pkeys_'+temp_suffix]
807
            run_query_into(db, *mk_select_([in_pkey]), into_ref=in_pkeys_ref)
808
            add_row_num(db, in_pkeys_ref[0]) # for joining with output pkeys
809
            
810
            # Join together output and input pkeys
811
            run_query_into_pkeys(*mk_select(db, [in_pkeys_ref[0],
812
                (out_pkeys_ref[0], {row_num_col: join_using})], pkeys_cols,
813
                start=0))
814
            
815
            break # insert successful
816
        except DuplicateKeyException, e:
817
            join_cols = util.dict_subset_right_join(mapping, e.cols)
818
            select_joins = insert_joins + [(out_table, join_cols)]
819
            
820
            # Get pkeys of already existing rows
821
            run_query_into_pkeys(*mk_select(db, select_joins, pkeys_cols,
822
                order_by=None, start=0, table_is_esc=table_is_esc))
823
            
824
            # Save existing pkeys in temp table for joining on
825
            existing_pkeys_ref = ['existing_pkeys_'+temp_suffix]
826
            run_query_into(db, *mk_select(db, pkeys_ref[0], [in_pkey],
827
                order_by=None, start=0, table_is_esc=True),
828
                into_ref=existing_pkeys_ref)
829
                # need table_is_esc=True to make table name case-insensitive
830
            
831
            # rerun loop with additional constraints
832
            break # but until NullValueExceptions are handled, end loop here
811 833
    
812
    # Do inserts and selects
813
    try: insert_()
814
    except DuplicateKeyException, e:
815
        join_cols = util.dict_subset_right_join(mapping, e.cols)
816
        select_joins = insert_joins + [(out_table, join_cols)]
817
        
818
        # Get pkeys of already existing rows
819
        run_query_into_pkeys(*mk_select(db, select_joins, pkeys_cols,
820
            order_by=None, start=0, table_is_esc=table_is_esc))
821
        
822
        # Save existing pkeys in temp table for joining on
823
        existing_pkeys_ref = ['existing_pkeys_'+temp_suffix]
824
        run_query_into(db, *mk_select(db, pkeys_ref[0], [in_pkey],
825
            order_by=None, start=0, table_is_esc=True),
826
            into_ref=existing_pkeys_ref)
827
            # need table_is_esc=True to make table name case-insensitive
828
    
829 834
    return (pkeys_ref[0], out_pkey)
830 835

  
831 836
##### Data cleanup

Also available in: Unified diff