Project

General

Profile

Column-based import

note that a much simpler replacement for column-based import, trigger-based import, is under development

this consists of the put_table() Python function and the XPath mappings (which control how column-based import normalizes the data). this algorithm is a Python reimplementation of VegBank's XML normalization algorithm, which uses *R.P. Bourret's object-relational mapping technique* for XML.

Benchmarks

It is >1 order of magnitude faster than row-based import. See the *detailed comparison*.

Simultaneously Row-based 2012-3-22 Column-based 2012-11-16
Datasource # Rows Time ms/row # Rows Time ms/row Change
Total1: 3,428,810 2.5 days 62.7 31,864,948 16:37 1.9 33x

Note that row-based import is a significant bottleneck due to the need to pipe data through XML (this is likely a bottleneck for VegBank, too).
Even column-based import is a significant bottleneck compared to cached import methods (such as refactor-in-place) because each output table does a DISTINCT ON on the full input table instead of on incremental result tables with much fewer rows.

1 Non-bolded totals are not directly comparable because row-based import was run with fewer rows.
.

Main steps

The algorithm autogenerates SQL code for a given source->VegBIEN mapping, which both maps and normalizes the data.

This code performs the following steps on each group of columns in the normalization hierarchy:

  1. Try inserting the rows without DISTINCT ON, in order to determine which unique constraint is violated and thus which columns to use in the DISTINCT ON.
  2. Insert rows that don't already exist in the output table, comparing on the DISTINCT ON columns.
  3. Get the pkeys of the already-existing and just-inserted rows, and place them in a temp table.
  4. For any rows that were missing (e.g. because they violated a NOT NULL constraint), fill in NULL (or in a hierarchy, the next level's pkey) for their pkeys.
  5. Pass the pkeys to the next outer column that uses them as foreign keys.

These steps occur in sql_io.put_table() at the bottom of */lib/sql_io.py*

The following examples use the ACAD dataset, which is on vegbiendev in *vegbien's ACAD.Specimen table* (access instructions on the wiki under PhpPgAdmin).

Notes:
  • Table and column names often contain special characters like -.() for clarity. Everything inside "" is one identifier.
  • The input temp table is just called "in" plus a version # to avoid clutter (because it's used often). Each step uses its own input temp table, so that "in#" always refers to the same thing within a step.

Main case: Columns with unique constraint

Inserting these input columns into "taxonlabel":

Output Input
'parent_id' "taxonlabel[rank='order']"."out.taxonlabel_id"
'rank' 'family'
'taxonepithet' "ACAD.Specimen"."family"
  1. Joining together input tables into temp table
    CREATE TEMP TABLE "in#38" AS
    SELECT
    "ACAD.Specimen"."row_num" AS "row_num" 
    , "taxonlabel[rank='order']"."out.taxonlabel_id" AS "taxonlabel[rank='order'].out.taxonlabel_id" 
    , "ACAD.Specimen"."family" AS "ACAD.Specimen.family" 
    FROM "ACAD.Specimen" 
    JOIN "taxonlabel[rank='order']" USING ("row_num")
    ORDER BY row_num
    
    Temp table: "in#38"
    CREATE TEMP TABLE "in#38_full" (
    LIKE "in#38" INCLUDING ALL
    );
    
    INSERT INTO "in#38_full" 
    SELECT * FROM "in#38" 
    
  2. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO taxonlabel#14"()
    RETURNS SETOF "taxonlabel"."taxonlabel_id"%TYPE
    LANGUAGE sql
    AS $$
    INSERT INTO "taxonlabel" 
    ("parent_id", "rank", "taxonepithet")
    SELECT
    "in#38"."taxonlabel[rank='order'].out.taxonlabel_id" 
    , 'family' AS "rank" 
    , "in#38"."ACAD.Specimen.family" 
    FROM "in#38" 
    RETURNING "taxonlabel_id" 
    $$;
    
    CREATE TEMP TABLE "in#38_insert_out_pkeys" AS
    SELECT * FROM "pg_temp"."INSERT INTO taxonlabel#14"() AS "f" ("taxonlabel_id")
    
  3. Caught exception: NullValueException: Violated NOT NULL constraint on columns: source_id
  4. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO taxonlabel#15"()
    RETURNS SETOF "taxonlabel"."taxonlabel_id"%TYPE
    LANGUAGE sql
    AS $$
    INSERT INTO "taxonlabel" 
    ("parent_id", "rank", "taxonepithet", "source_id")
    SELECT
    "in#38"."taxonlabel[rank='order'].out.taxonlabel_id" 
    , 'family' AS "rank" 
    , "in#38"."ACAD.Specimen.family" 
    , 1 AS "source_id" 
    FROM "in#38" 
    RETURNING "taxonlabel_id" 
    $$;
    
    CREATE TEMP TABLE "in#38_insert_out_pkeys" AS
    SELECT * FROM "pg_temp"."INSERT INTO taxonlabel#15"() AS "f" ("taxonlabel_id")
    
  5. Caught exception: DuplicateKeyException: Violated taxonlabel_unique constraint on columns: parent_id, taxonepithet, rank, source_id, sourceaccessioncode, taxonomicname
  6. Ignoring existing rows, comparing on these columns:
    Output: Input
    "taxonlabel"."parent_id": "in#38"."taxonlabel[rank='order'].out.taxonlabel_id" 
    "taxonlabel"."taxonepithet": "in#38"."ACAD.Specimen.family" 
    "taxonlabel"."rank": 'family'
    "taxonlabel"."source_id": 1
    "taxonlabel"."sourceaccessioncode": None
    "taxonlabel"."taxonomicname": None
    
    CREATE TEMP TABLE "in#38_distinct" (
    LIKE "in#38" INCLUDING ALL
    );
    
    INSERT INTO "in#38_distinct" 
    SELECT
    DISTINCT ON ("in#38"."taxonlabel[rank='order'].out.taxonlabel_id", "in#38"."ACAD.Specimen.family")
    *
    FROM "in#38" 
    
  7. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO taxonlabel#16"()
    RETURNS SETOF unknown
    LANGUAGE plpgsql
    AS $$
    DECLARE
        "row" "taxonlabel"%ROWTYPE;
    BEGIN
        FOR "row"."parent_id", "row"."rank", "row"."taxonepithet", "row"."source_id" IN
            SELECT
            "in#38_distinct"."taxonlabel[rank='order'].out.taxonlabel_id" 
            , 'family' AS "rank" 
            , "in#38_distinct"."ACAD.Specimen.family" 
            , 1 AS "source_id" 
            FROM "in#38_distinct" 
            ORDER BY row_num
        LOOP
            BEGIN
                RETURN QUERY
                    INSERT INTO "taxonlabel" 
                    ("parent_id", "rank", "taxonepithet", "source_id")
                    VALUES ("row"."parent_id", "row"."rank", "row"."taxonepithet", "row"."source_id")
                    RETURNING NULL AS "NULL" 
                ;
            EXCEPTION
                WHEN unique_violation THEN NULL;
            END;
        END LOOP;
    END;
    
    $$;
    
    CREATE TEMP TABLE "rowcount#8" AS
    SELECT * FROM "pg_temp"."INSERT INTO taxonlabel#16"() AS "f" ("NULL")
    
  8. Getting output table pkeys of existing/inserted rows
    CREATE TEMP TABLE "taxonlabel[rank='family']" AS
    SELECT
    "in#38"."row_num" 
    , "taxonlabel"."taxonlabel_id" AS "out.taxonlabel_id" 
    FROM "in#38" 
    JOIN "taxonlabel" ON
    COALESCE("taxonlabel"."parent_id", CAST(2147483647 AS integer)) = COALESCE("in#38"."taxonlabel[rank='order'].out.taxonlabel_id", CAST(2147483647 AS integer))
    AND COALESCE("taxonlabel"."taxonepithet", CAST('\N' AS text)) = COALESCE("in#38"."ACAD.Specimen.family", CAST('\N' AS text))
    AND COALESCE("taxonlabel"."rank", CAST('unknown' AS taxonrank)) = 'family'
    AND "taxonlabel"."source_id" = 1
    AND COALESCE("taxonlabel"."sourceaccessioncode", CAST('\N' AS text)) = COALESCE(NULL, CAST('\N' AS text))
    AND COALESCE("taxonlabel"."taxonomicname", CAST('\N' AS text)) = COALESCE(NULL, CAST('\N' AS text))
    
  9. Setting pkeys of missing rows to "in#38"."taxonlabel[rank='order'].out.taxonlabel_id"
    INSERT INTO "taxonlabel[rank='family']" 
    ("row_num", "out.taxonlabel_id")
    SELECT
    "in#38_full"."row_num" 
    , "in#38_full"."taxonlabel[rank='order'].out.taxonlabel_id" AS "out.taxonlabel_id" 
    FROM "in#38_full" 
    LEFT JOIN "taxonlabel[rank='family']" ON "taxonlabel[rank='family']"."row_num" = "in#38_full"."row_num" 
    WHERE "taxonlabel[rank='family']"."row_num" IS NULL
    

Special case: No unique constraint

Inserting these input columns into "locationevent":

Output Input
'obsenddate' "_dateRangeEnd(eventDate)"."result"
'location_id' "location_pkeys"."out.location_id"
'obsstartdate' "_dateRangeStart(eventDate)"."result"
  1. Joining together input tables into temp table
    CREATE TEMP TABLE "in#8" AS
    SELECT
    "ACAD.Specimen"."row_num" AS "row_num" 
    , "_dateRangeEnd(eventDate)"."result" AS "_dateRangeEnd(eventDate).result" 
    , "location_pkeys"."out.location_id" AS "location_pkeys.out.location_id" 
    , "_dateRangeStart(eventDate)"."result" AS "_dateRangeStart(eventDate).result" 
    FROM "ACAD.Specimen" 
    JOIN "_dateRangeEnd(eventDate)" USING ("row_num")
    JOIN "location_pkeys" USING ("row_num")
    JOIN "_dateRangeStart(eventDate)" USING ("row_num")
    ORDER BY row_num
    
    Temp table: "in#8"
    CREATE TEMP TABLE "in#8_full" (
    LIKE "in#8" INCLUDING ALL
    );
    
    INSERT INTO "in#8_full" 
    SELECT * FROM "in#8" 
    
  2. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO locationevent"()
    RETURNS SETOF "locationevent"."locationevent_id"%TYPE
    LANGUAGE sql
    AS $$
    INSERT INTO "locationevent" 
    ("obsenddate", "location_id", "obsstartdate")
    SELECT
    "in#8"."_dateRangeEnd(eventDate).result" 
    , "in#8"."location_pkeys.out.location_id" 
    , "in#8"."_dateRangeStart(eventDate).result" 
    FROM "in#8" 
    RETURNING "locationevent_id" 
    $$;
    
  3. Caught exception: MissingCastException: Missing cast to type date on column: obsenddate
  4. Casting 'obsenddate' input to date
    CREATE FUNCTION "pg_temp"."date(eventDate)"("value" anyelement)
    RETURNS date
    LANGUAGE plpgsql
    STRICT
    AS $$
    BEGIN
        /* The explicit cast to the return type is needed to make the cast happen
        inside the try block. (Implicit casts to the return type happen at the end
        of the function, outside any block.) */
        RETURN value::date;
    EXCEPTION
        WHEN data_exception THEN
            -- Save error in errors table.
            DECLARE
                error_code text := SQLSTATE;
                error text := SQLERRM;
                value text := value;
                "column" text;
            BEGIN
                -- Insert the value and error for *each* source column.
                FOR "column" IN
                    SELECT * FROM (VALUES ('eventDate')) AS "c" 
                LOOP
                    BEGIN
                        INSERT INTO "ACAD"."Specimen.errors" 
                        ("column", "value", "error_code", "error")
                        VALUES ("column", "value", "error_code", "error");
                    EXCEPTION
                        WHEN unique_violation THEN NULL;
                    END;
                END LOOP;
    
            END;
    
            RETURN NULL;
    END;
    $$;
    
    ALTER TABLE "in#8" ADD COLUMN "_dateRangeEnd(eventDate).result::date" date /*"in#8"."_dateRangeEnd(eventDate).result"::date*/
    
    ALTER TABLE "in#8" 
    ALTER COLUMN "_dateRangeEnd(eventDate).result::date" TYPE date
    USING "pg_temp"."date(eventDate)"("in#8"."_dateRangeEnd(eventDate).result")
    
  5. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO locationevent"()
    RETURNS SETOF "locationevent"."locationevent_id"%TYPE
    LANGUAGE sql
    AS $$
    INSERT INTO "locationevent" 
    ("obsenddate", "location_id", "obsstartdate")
    SELECT
    "in#8"."_dateRangeEnd(eventDate).result::date" 
    , "in#8"."location_pkeys.out.location_id" 
    , "in#8"."_dateRangeStart(eventDate).result" 
    FROM "in#8" 
    RETURNING "locationevent_id" 
    $$;
    
  6. Caught exception: MissingCastException: Missing cast to type date on column: obsstartdate
  7. Casting 'obsstartdate' input to date
    CREATE FUNCTION "pg_temp"."date(eventDate)"("value" anyelement)
    RETURNS date
    LANGUAGE plpgsql
    STRICT
    AS $$
    BEGIN
        /* The explicit cast to the return type is needed to make the cast happen
        inside the try block. (Implicit casts to the return type happen at the end
        of the function, outside any block.) */
        RETURN value::date;
    EXCEPTION
        WHEN data_exception THEN
            -- Save error in errors table.
            DECLARE
                error_code text := SQLSTATE;
                error text := SQLERRM;
                value text := value;
                "column" text;
            BEGIN
                -- Insert the value and error for *each* source column.
                FOR "column" IN
                    SELECT * FROM (VALUES ('eventDate')) AS "c" 
                LOOP
                    BEGIN
                        INSERT INTO "ACAD"."Specimen.errors" 
                        ("column", "value", "error_code", "error")
                        VALUES ("column", "value", "error_code", "error");
                    EXCEPTION
                        WHEN unique_violation THEN NULL;
                    END;
                END LOOP;
    
            END;
    
            RETURN NULL;
    END;
    $$;
    
    ALTER TABLE "in#8" ADD COLUMN "_dateRangeStart(eventDate).result::date" date /*"in#8"."_dateRangeStart(eventDate).result"::date*/
    
    ALTER TABLE "in#8" 
    ALTER COLUMN "_dateRangeStart(eventDate).result::date" TYPE date
    USING "pg_temp"."date(eventDate)"("in#8"."_dateRangeStart(eventDate).result")
    
  8. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO locationevent"()
    RETURNS SETOF "locationevent"."locationevent_id"%TYPE
    LANGUAGE sql
    AS $$
    INSERT INTO "locationevent" 
    ("obsenddate", "location_id", "obsstartdate")
    SELECT
    "in#8"."_dateRangeEnd(eventDate).result::date" 
    , "in#8"."location_pkeys.out.location_id" 
    , "in#8"."_dateRangeStart(eventDate).result::date" 
    FROM "in#8" 
    RETURNING "locationevent_id" 
    $$;
    
    CREATE TEMP TABLE "in#8_insert_out_pkeys" AS
    SELECT * FROM "pg_temp"."INSERT INTO locationevent"() AS "f" ("locationevent_id")
    
  9. Caught exception: NullValueException: Violated NOT NULL constraint on columns: source_id
  10. Trying to insert new rows
    CREATE FUNCTION "pg_temp"."INSERT INTO locationevent#1"()
    RETURNS SETOF "locationevent"."locationevent_id"%TYPE
    LANGUAGE sql
    AS $$
    INSERT INTO "locationevent" 
    ("obsenddate", "location_id", "obsstartdate", "source_id")
    SELECT
    "in#8"."_dateRangeEnd(eventDate).result::date" 
    , "in#8"."location_pkeys.out.location_id" 
    , "in#8"."_dateRangeStart(eventDate).result::date" 
    , 1 AS "source_id" 
    FROM "in#8" 
    RETURNING "locationevent_id" 
    $$;
    
    CREATE TEMP TABLE "in#8_insert_out_pkeys" AS
    SELECT * FROM "pg_temp"."INSERT INTO locationevent#1"() AS "f" ("locationevent_id")
    
  11. Getting input table pkeys of inserted rows
    CREATE TEMP TABLE "in#8_insert_in_pkeys" AS
    SELECT "row_num" FROM "in#8" 
    
  12. Combining output and input pkeys in inserted order
    CREATE TEMP TABLE "locationevent_pkeys" AS
    SELECT
    "in#8_insert_in_pkeys"."row_num" 
    , "in#8_insert_out_pkeys"."locationevent_id" AS "out.locationevent_id" 
    FROM "in#8_insert_in_pkeys" 
    JOIN "in#8_insert_out_pkeys" USING ("_row_num")
    
  13. Setting pkeys of missing rows to None
    INSERT INTO "locationevent_pkeys" 
    ("row_num", "out.locationevent_id")
    SELECT
    "in#8_full"."row_num" 
    , NULL AS "out.locationevent_id" 
    FROM "in#8_full" 
    LEFT JOIN "locationevent_pkeys" ON "locationevent_pkeys"."row_num" = "in#8_full"."row_num" 
    WHERE "locationevent_pkeys"."row_num" IS NULL
    

Special case: SQL function

Inserting these input columns into "_alt":

Output Input
'1' "ACAD.Specimen"."id"
'2' "_join(1=institutionCode, 3=catalogNumber, 2=_join(1=collectionC"."result"
  1. Joining together input tables into temp table
    CREATE TEMP TABLE "in#4" AS
    SELECT
    "ACAD.Specimen"."row_num" AS "row_num" 
    , "ACAD.Specimen"."id" AS "ACAD.Specimen.id" 
    , "_join(1=institutionCode, 3=catalogNumber, 2=_join(1=collectionC"."result" AS "_join(1=institutionCode, 3=catalogNumber, 2=_join(1=coll.result" 
    FROM "ACAD.Specimen" 
    JOIN "_join(1=institutionCode, 3=catalogNumber, 2=_join(1=collectionC" USING ("row_num")
    ORDER BY row_num
    
    Temp table: "in#4"
  2. Defining wrapper function
    CREATE TEMP TABLE "_alt(1=id, 2=_join(1=institutionCode, 3=catalogNumber, 2=_join(" AS
    SELECT
    "in#4"."row_num" 
    , "_alt"("1" := "in#4"."ACAD.Specimen.id", "2" := "in#4"."_join(1=institutionCode, 3=catalogNumber, 2=_join(1=coll.result") AS "result" 
    FROM "in#4" 
    LIMIT 0
    
    CREATE FUNCTION "pg_temp"."_alt(1=id, 2=_join(1=institutionCode, 3=catalogNumber, 2=__wrap"()
    RETURNS SETOF "_alt(1=id, 2=_join(1=institutionCode, 3=catalogNumber, 2=_join(" 
    LANGUAGE plpgsql
    AS $$
    DECLARE
        "row" "in#4"%ROWTYPE;
    BEGIN
        FOR "row" IN
            SELECT * FROM "in#4" 
        LOOP
            BEGIN
                BEGIN
                    RETURN QUERY
                        SELECT
                        "row"."row_num" 
                        , "_alt"("1" := "row"."ACAD.Specimen.id", "2" := "row"."_join(1=institutionCode, 3=catalogNumber, 2=_join(1=coll.result")
                    ;
                EXCEPTION
                    WHEN internal_error THEN
                        -- Handle PL/Python exceptions
                        DECLARE
                            matches text[] := regexp_matches(SQLERRM,
                                E'^(?:PL/Python: )?(\\w+): (.*)$'); -- .* also matches \n
                            exc_name text := matches[1];
                            msg text := matches[2];
                        BEGIN
                            /* Re-raise PL/Python exceptions with the PL/Python prefix removed.
                            This allows the exception to be parsed like a native exception.
                            Always raise as data_exception so it goes in the errors table. */
                            IF exc_name IS NOT NULL THEN
                                RAISE data_exception USING MESSAGE = msg;
                            -- Re-raise non-PL/Python exceptions
                            ELSE
                                RAISE USING ERRCODE = SQLSTATE, MESSAGE = SQLERRM;
                            END IF;
                        END;
                END;
            EXCEPTION
                WHEN data_exception THEN
                    -- Save error in errors table.
                    DECLARE
                        error_code text := SQLSTATE;
                        error text := SQLERRM;
                        value text := array_to_string(ARRAY[COALESCE(CAST("row"."ACAD.Specimen.id" AS text), CAST(NULL AS text)), COALESCE(CAST("row"."_join(1=institutionCode, 3=catalogNumber, 2=_join(1=coll.result" AS text), CAST(NULL AS text))], ',');
                        "column" text;
                    BEGIN
                        -- Insert the value and error for *each* source column.
                        FOR "column" IN
                            SELECT * FROM (VALUES ('id,institutionCode'), ('id,catalogNumber'), ('id,collectionCode'), ('id,collectionID')) AS "c" 
                        LOOP
                            BEGIN
                                INSERT INTO "ACAD"."Specimen.errors" 
                                ("column", "value", "error_code", "error")
                                VALUES ("column", "value", "error_code", "error");
                            EXCEPTION
                                WHEN unique_violation THEN NULL;
                            END;
                        END LOOP;
    
                    END;
    
                    RETURN QUERY
                        SELECT
                        "row"."row_num" 
                        , CAST(NULL AS text)
                    ;
            END;
        END LOOP;
    END;
    
    $$;
    
  3. Calling function
    INSERT INTO "_alt(1=id, 2=_join(1=institutionCode, 3=catalogNumber, 2=_join(" 
    ("row_num", "result")
    SELECT * FROM "pg_temp"."_alt(1=id, 2=_join(1=institutionCode, 3=catalogNumber, 2=__wrap"()