Project

General

Profile

1 1388 aaronmk
# CSV I/O
2
3
import csv
4 5431 aaronmk
import _csv
5 1388 aaronmk
import StringIO
6
7 14594 aaronmk
import dicts
8 8069 aaronmk
import exc
9 14592 aaronmk
import lists
10 5593 aaronmk
import streams
11 1623 aaronmk
import strings
12 1444 aaronmk
import util
13
14 5426 aaronmk
delims = ',;\t|`'
15 5439 aaronmk
tab_padded_delims = ['\t|\t']
16 1623 aaronmk
tsv_delim = '\t'
17
escape = '\\'
18 1442 aaronmk
19 1623 aaronmk
ending_placeholder = r'\n'
20
21 5437 aaronmk
def is_tsv(dialect): return dialect.delimiter.startswith(tsv_delim)
22 1623 aaronmk
23 1621 aaronmk
def sniff(line):
24
    '''Automatically detects the dialect'''
25 5435 aaronmk
    line, ending = strings.extract_line_ending(line)
26 9961 aaronmk
    try: dialect = csv.Sniffer().sniff(line, delims)
27
    except _csv.Error, e:
28
        if exc.e_msg(e) == 'Could not determine delimiter': dialect = csv.excel
29
        else: raise
30 5435 aaronmk
31 5439 aaronmk
    if is_tsv(dialect):
32 8070 aaronmk
        dialect.quoting = csv.QUOTE_NONE
33 5439 aaronmk
        # Check multi-char delims using \t
34
        delim = strings.find_any(line, tab_padded_delims)
35
        if delim:
36
            dialect.delimiter = delim
37
            line_suffix = delim.rstrip('\t')
38
            if line.endswith(line_suffix): ending = line_suffix+ending
39 1621 aaronmk
    else: dialect.doublequote = True # Sniffer doesn't turn this on by default
40 5435 aaronmk
    dialect.lineterminator = ending
41
42 1621 aaronmk
    return dialect
43
44 8202 aaronmk
def has_unbalanced_quotes(str_): return str_.count('"') % 2 == 1 # odd # of "
45
46
def has_multiline_column(str_): return has_unbalanced_quotes(str_)
47
48 1923 aaronmk
def stream_info(stream, parse_header=False):
49 6589 aaronmk
    '''Automatically detects the dialect based on the header line.
50
    Uses the Excel dialect if the CSV file is empty.
51 1923 aaronmk
    @return NamedTuple {header_line, header, dialect}'''
52 1444 aaronmk
    info = util.NamedTuple()
53
    info.header_line = stream.readline()
54 8202 aaronmk
    if has_multiline_column(info.header_line): # 1st line not full header
55
        # assume it's a header-only csv with multiline columns
56
        info.header_line += ''.join(stream.readlines()) # use entire file
57 1923 aaronmk
    info.header = None
58
    if info.header_line != '':
59
        info.dialect = sniff(info.header_line)
60 6589 aaronmk
    else: info.dialect = csv.excel # line of '' indicates EOF = empty stream
61
62
    if parse_header:
63
        try: info.header = reader_class(info.dialect)(
64
            StringIO.StringIO(info.header_line), info.dialect).next()
65
        except StopIteration: info.header = []
66
67 1444 aaronmk
    return info
68
69 5170 aaronmk
tsv_encode_map = strings.json_encode_map[:]
70
tsv_encode_map.append(('\t', r'\t'))
71
tsv_decode_map = strings.flip_map(tsv_encode_map)
72 5146 aaronmk
73 1623 aaronmk
class TsvReader:
74
    '''Unlike csv.reader, for TSVs, interprets \ as escaping a line ending but
75 5145 aaronmk
    ignores it before everything else (e.g. \N for NULL).
76 5170 aaronmk
    Also expands tsv_encode_map escapes.
77 5145 aaronmk
    '''
78 1623 aaronmk
    def __init__(self, stream, dialect):
79
        assert is_tsv(dialect)
80
        self.stream = stream
81
        self.dialect = dialect
82
83
    def __iter__(self): return self
84
85
    def next(self):
86
        record = ''
87
        ending = None
88
        while True:
89
            line = self.stream.readline()
90
            if line == '': raise StopIteration
91
92 5438 aaronmk
            line = strings.remove_suffix(self.dialect.lineterminator, line)
93 5433 aaronmk
            contents = strings.remove_suffix(escape, line)
94 1623 aaronmk
            record += contents
95 5433 aaronmk
            if len(contents) == len(line): break # no line continuation
96 1623 aaronmk
            record += ending_placeholder
97 3055 aaronmk
98
        # Prevent "new-line character seen in unquoted field" errors
99
        record = record.replace('\r', ending_placeholder)
100
101 7211 aaronmk
        # Split line
102 8071 aaronmk
        if record == '': row = [] # csv.reader would interpret as EOF
103
        elif len(self.dialect.delimiter) > 1: # multi-char delims
104 7211 aaronmk
            row = record.split(self.dialect.delimiter)
105
        else: row = csv.reader(StringIO.StringIO(record), self.dialect).next()
106
107 5170 aaronmk
        return [strings.replace_all(tsv_decode_map, v) for v in row]
108 1623 aaronmk
109
def reader_class(dialect):
110
    if is_tsv(dialect): return TsvReader
111
    else: return csv.reader
112
113
def make_reader(stream, dialect): return reader_class(dialect)(stream, dialect)
114
115 1388 aaronmk
def reader_and_header(stream):
116
    '''Automatically detects the dialect based on the header line
117
    @return tuple (reader, header)'''
118 1923 aaronmk
    info = stream_info(stream, parse_header=True)
119 1958 aaronmk
    return (make_reader(stream, info.dialect), info.header)
120 1446 aaronmk
121 14577 aaronmk
def header(stream):
122
    '''fetches just the header line of a stream'''
123
    reader, header = reader_and_header(stream)
124
    return header
125
126 1446 aaronmk
##### csv modifications
127
128
# Note that these methods only work on *instances* of Dialect classes
129
csv.Dialect.__eq__ = lambda self, other: self.__dict__ == other.__dict__
130
csv.Dialect.__ne__ = lambda self, other: not (self == other)
131 2114 aaronmk
132 5430 aaronmk
__Dialect__validate_orig = csv.Dialect._validate
133
def __Dialect__validate(self):
134
        try: __Dialect__validate_orig(self)
135
        except _csv.Error, e:
136
            if str(e) == '"delimiter" must be an 1-character string': pass # OK
137
            else: raise
138
csv.Dialect._validate = __Dialect__validate
139
140 2114 aaronmk
##### Row filters
141
142 14612 aaronmk
class Reader:
143
    def __iter__(self): return self
144
145
    def close(self): pass # support using as a stream
146
147 14613 aaronmk
class WrapReader(Reader):
148
    def __init__(self, reader):
149
        self.reader = reader
150
151
    def next(self): return self.reader.next()
152
153 14614 aaronmk
class Filter(WrapReader):
154 2114 aaronmk
    '''Wraps a reader, filtering each row'''
155
    def __init__(self, filter_, reader):
156 14614 aaronmk
        WrapReader.__init__(self, reader)
157 2114 aaronmk
        self.filter = filter_
158
159 14614 aaronmk
    def next(self): return self.filter(WrapReader.next(self))
160 2114 aaronmk
161 14616 aaronmk
class MultiFilter(WrapReader):
162 14599 aaronmk
    '''enables applying multiple filters by nesting
163 14616 aaronmk
    @param reader outermost_filter(innermost_filter(reader))
164 14599 aaronmk
    '''
165 14616 aaronmk
    pass
166 14599 aaronmk
167 2114 aaronmk
std_nulls = [r'\N']
168
empty_nulls = [''] + std_nulls
169
170
class NullFilter(Filter):
171
    '''Translates special string values to None'''
172
    def __init__(self, reader, nulls=std_nulls):
173
        map_ = dict.fromkeys(nulls, None)
174
        def filter_(row): return [map_.get(v, v) for v in row]
175
        Filter.__init__(self, filter_, reader)
176
177
class StripFilter(Filter):
178
    '''Strips whitespace'''
179
    def __init__(self, reader):
180
        def filter_(row): return [v.strip() for v in row]
181
        Filter.__init__(self, filter_, reader)
182 5570 aaronmk
183
class ColCtFilter(Filter):
184
    '''Gives all rows the same # columns'''
185
    def __init__(self, reader, cols_ct):
186
        def filter_(row): return util.list_as_length(row, cols_ct)
187
        Filter.__init__(self, filter_, reader)
188 5571 aaronmk
189
##### Translators
190
191 5586 aaronmk
class StreamFilter(Filter):
192
    '''Wraps a reader in a way that's usable to a filter stream that does not
193
    require lines to be strings. Reports EOF as '' instead of StopIteration.'''
194
    def __init__(self, reader):
195
        Filter.__init__(self, None, reader)
196
197
    def readline(self):
198
        try: return self.reader.next()
199
        except StopIteration: return '' # EOF
200
201 14586 aaronmk
class ProgressInputFilter(streams.ProgressInputStream): # is also a reader
202
    # ProgressInputStream extends StreamIter, so this can be used as a reader
203
    '''wraps a reader, reporting the # rows read every n rows and after the last
204
    row is read
205
    @param log the output stream for progress messages
206
    '''
207
    def __init__(self, reader, log, msg='Read %d row(s)', n=100):
208
        streams.ProgressInputStream.__init__(self, StreamFilter(reader), log,
209
            msg, n)
210
211 5735 aaronmk
class ColInsertFilter(Filter):
212 7290 aaronmk
    '''Adds column(s) to each row
213 9509 aaronmk
    @param mk_value(row, row_num) | literal_value
214 5735 aaronmk
    '''
215 14592 aaronmk
    def __init__(self, reader, mk_value, index=0, n=1, col_names=None):
216
        line_num_skip = 0
217
        if col_names != None:
218
            col_names = lists.mk_seq(col_names)
219
            n = len(col_names)
220
            line_num_skip = 1
221
222 9509 aaronmk
        if not callable(mk_value):
223
            value = mk_value
224
            def mk_value(row, row_num): return value
225
226 5735 aaronmk
        def filter_(row):
227
            row = list(row) # make sure it's mutable; don't modify input!
228 14592 aaronmk
229
            if self.is_header and col_names != None:
230
                values = col_names
231
                self.is_header = False
232
            else: values = n*[mk_value(row, self.reader.line_num-line_num_skip)]
233
234
            for i in xrange(len(values)): row.insert(index+i, values[i])
235 5735 aaronmk
            return row
236
        Filter.__init__(self, filter_,
237
            streams.LineCountInputStream(StreamFilter(reader)))
238 14592 aaronmk
        self.is_header = True
239 5735 aaronmk
240 5736 aaronmk
class RowNumFilter(ColInsertFilter):
241 5593 aaronmk
    '''Adds a row # column at the beginning of each row'''
242 14593 aaronmk
    def __init__(self, reader, col_name=None):
243 5736 aaronmk
        def mk_value(row, row_num): return row_num
244 14593 aaronmk
        ColInsertFilter.__init__(self, reader, mk_value, col_names=col_name)
245 5593 aaronmk
246 14591 aaronmk
class InputRewriter(StreamFilter): # is also a stream
247 5571 aaronmk
    '''Wraps a reader, writing each row back to CSV'''
248
    def __init__(self, reader, dialect=csv.excel):
249 5587 aaronmk
        StreamFilter.__init__(self, reader)
250
251 5571 aaronmk
        self.dialect = dialect
252
253
    def readline(self):
254 8069 aaronmk
        try:
255 14590 aaronmk
            row = StreamFilter.readline(self) # translate EOF
256 8069 aaronmk
            if row == '': return row # EOF
257
258
            line_stream = StringIO.StringIO()
259
            csv.writer(line_stream, self.dialect).writerow(row)
260
            return line_stream.getvalue()
261
        except Exception, e:
262
            exc.print_ex(e)
263
            raise
264 5571 aaronmk
265
    def read(self, n): return self.readline() # forward all reads to readline()
266 14594 aaronmk
267
def row_dict_to_list(dict_, col_order=[]):
268
    '''translates a CSV dict-based row to a list-based one
269
    @param dict_ {'col': 'value', __}
270
    @return (header, row) = (['col', __], ['value', __])
271
    '''
272
    dict_ = dict_.copy() # don't modify input!
273
    pairs = []
274
    for col in col_order: pairs.append((col, dict_.pop(col))) # ordered cols 1st
275
    pairs += sorted(dict_.items()) # then remaining cols in alphabetical order
276
    return (dicts.pair_keys(pairs), dicts.pair_values(pairs))
277 14595 aaronmk
278 14615 aaronmk
class row_dict_to_list_reader(WrapReader):
279 14600 aaronmk
    '''reads dict-based rows as list-based rows
280
    @param reader [{'col': 'value', __}, __]
281 14595 aaronmk
    '''
282 14600 aaronmk
    def __init__(self, reader, col_order=[]):
283 14615 aaronmk
        WrapReader.__init__(self, reader)
284
        self.col_order = col_order
285 14595 aaronmk
        self.header = None
286 14615 aaronmk
        self.next_row = None
287
288
    def next(self):
289
        if self.next_row != None: # 1st dict row: data
290
            row = self.next_row # return cached row instead of reading new row
291
            self.next_row = None
292
        else:
293
            row_dict = WrapReader.next(self)
294
            header, row = row_dict_to_list(row_dict, self.col_order)
295 14600 aaronmk
            if self.header == None: # 1st dict row: header
296 14595 aaronmk
                self.header = header
297
                self.next_row = row
298
                row = header
299 14600 aaronmk
            else: # remaining dict rows
300 14595 aaronmk
                assert header == self.header # all rows must have same cols
301 14615 aaronmk
302
        return row
303 14600 aaronmk
304
class JsonReader(MultiFilter):
305
    '''reads parsed JSON data as row tuples
306
    @param json_data [{'col': 'value', __}, __]
307
    '''
308 14617 aaronmk
    def __init__(self, json_data, col_order=[], array_sep=None):
309
        def conv_value(value):
310
            if lists.is_seq(value):
311
                assert array_sep != None # required if there is any array data
312
                return array_sep.join(value)
313
            else: return value
314
        def conv_values(tuple_row): return map(conv_value, tuple_row)
315
        MultiFilter.__init__(self, Filter(conv_values, row_dict_to_list_reader(
316 14620 aaronmk
            iter(json_data), col_order)))