Project

General

Profile

1
# CSV I/O
2

    
3
import csv
4
import _csv
5
import StringIO
6

    
7
import exc
8
import streams
9
import strings
10
import util
11

    
12
delims = ',;\t|`'
13
tab_padded_delims = ['\t|\t']
14
tsv_delim = '\t'
15
escape = '\\'
16

    
17
ending_placeholder = r'\n'
18

    
19
def is_tsv(dialect): return dialect.delimiter.startswith(tsv_delim)
20

    
21
def sniff(line):
22
    '''Automatically detects the dialect'''
23
    line, ending = strings.extract_line_ending(line)
24
    try: dialect = csv.Sniffer().sniff(line, delims)
25
    except _csv.Error, e:
26
        if exc.e_msg(e) == 'Could not determine delimiter': dialect = csv.excel
27
        else: raise
28
    
29
    if is_tsv(dialect):
30
        dialect.quoting = csv.QUOTE_NONE
31
        # Check multi-char delims using \t
32
        delim = strings.find_any(line, tab_padded_delims)
33
        if delim:
34
            dialect.delimiter = delim
35
            line_suffix = delim.rstrip('\t')
36
            if line.endswith(line_suffix): ending = line_suffix+ending
37
    else: dialect.doublequote = True # Sniffer doesn't turn this on by default
38
    dialect.lineterminator = ending
39
    
40
    return dialect
41

    
42
def has_unbalanced_quotes(str_): return str_.count('"') % 2 == 1 # odd # of "
43

    
44
def has_multiline_column(str_): return has_unbalanced_quotes(str_)
45

    
46
def stream_info(stream, parse_header=False):
47
    '''Automatically detects the dialect based on the header line.
48
    Uses the Excel dialect if the CSV file is empty.
49
    @return NamedTuple {header_line, header, dialect}'''
50
    info = util.NamedTuple()
51
    info.header_line = stream.readline()
52
    if has_multiline_column(info.header_line): # 1st line not full header
53
        # assume it's a header-only csv with multiline columns
54
        info.header_line += ''.join(stream.readlines()) # use entire file
55
    info.header = None
56
    if info.header_line != '':
57
        info.dialect = sniff(info.header_line)
58
    else: info.dialect = csv.excel # line of '' indicates EOF = empty stream
59
    
60
    if parse_header:
61
        try: info.header = reader_class(info.dialect)(
62
            StringIO.StringIO(info.header_line), info.dialect).next()
63
        except StopIteration: info.header = []
64
    
65
    return info
66

    
67
tsv_encode_map = strings.json_encode_map[:]
68
tsv_encode_map.append(('\t', r'\t'))
69
tsv_decode_map = strings.flip_map(tsv_encode_map)
70

    
71
class TsvReader:
72
    '''Unlike csv.reader, for TSVs, interprets \ as escaping a line ending but
73
    ignores it before everything else (e.g. \N for NULL).
74
    Also expands tsv_encode_map escapes.
75
    '''
76
    def __init__(self, stream, dialect):
77
        assert is_tsv(dialect)
78
        self.stream = stream
79
        self.dialect = dialect
80
    
81
    def __iter__(self): return self
82
    
83
    def next(self):
84
        record = ''
85
        ending = None
86
        while True:
87
            line = self.stream.readline()
88
            if line == '': raise StopIteration
89
            
90
            line = strings.remove_suffix(self.dialect.lineterminator, line)
91
            contents = strings.remove_suffix(escape, line)
92
            record += contents
93
            if len(contents) == len(line): break # no line continuation
94
            record += ending_placeholder
95
        
96
        # Prevent "new-line character seen in unquoted field" errors
97
        record = record.replace('\r', ending_placeholder)
98
        
99
        # Split line
100
        if record == '': row = [] # csv.reader would interpret as EOF
101
        elif len(self.dialect.delimiter) > 1: # multi-char delims
102
            row = record.split(self.dialect.delimiter)
103
        else: row = csv.reader(StringIO.StringIO(record), self.dialect).next()
104
        
105
        return [strings.replace_all(tsv_decode_map, v) for v in row]
106

    
107
def reader_class(dialect):
108
    if is_tsv(dialect): return TsvReader
109
    else: return csv.reader
110

    
111
def make_reader(stream, dialect): return reader_class(dialect)(stream, dialect)
112

    
113
def reader_and_header(stream):
114
    '''Automatically detects the dialect based on the header line
115
    @return tuple (reader, header)'''
116
    info = stream_info(stream, parse_header=True)
117
    return (make_reader(stream, info.dialect), info.header)
118

    
119
def header(stream):
120
    '''fetches just the header line of a stream'''
121
    reader, header = reader_and_header(stream)
122
    return header
123

    
124
##### csv modifications
125

    
126
# Note that these methods only work on *instances* of Dialect classes
127
csv.Dialect.__eq__ = lambda self, other: self.__dict__ == other.__dict__
128
csv.Dialect.__ne__ = lambda self, other: not (self == other)
129

    
130
__Dialect__validate_orig = csv.Dialect._validate
131
def __Dialect__validate(self):
132
        try: __Dialect__validate_orig(self)
133
        except _csv.Error, e:
134
            if str(e) == '"delimiter" must be an 1-character string': pass # OK
135
            else: raise
136
csv.Dialect._validate = __Dialect__validate
137

    
138
##### Row filters
139

    
140
class Filter:
141
    '''Wraps a reader, filtering each row'''
142
    def __init__(self, filter_, reader):
143
        self.reader = reader
144
        self.filter = filter_
145
    
146
    def __iter__(self): return self
147
    
148
    def next(self): return self.filter(self.reader.next())
149
    
150
    def close(self): pass # support using as a stream
151

    
152
std_nulls = [r'\N']
153
empty_nulls = [''] + std_nulls
154

    
155
class NullFilter(Filter):
156
    '''Translates special string values to None'''
157
    def __init__(self, reader, nulls=std_nulls):
158
        map_ = dict.fromkeys(nulls, None)
159
        def filter_(row): return [map_.get(v, v) for v in row]
160
        Filter.__init__(self, filter_, reader)
161

    
162
class StripFilter(Filter):
163
    '''Strips whitespace'''
164
    def __init__(self, reader):
165
        def filter_(row): return [v.strip() for v in row]
166
        Filter.__init__(self, filter_, reader)
167

    
168
class ColCtFilter(Filter):
169
    '''Gives all rows the same # columns'''
170
    def __init__(self, reader, cols_ct):
171
        def filter_(row): return util.list_as_length(row, cols_ct)
172
        Filter.__init__(self, filter_, reader)
173

    
174
##### Translators
175

    
176
class StreamFilter(Filter):
177
    '''Wraps a reader in a way that's usable to a filter stream that does not
178
    require lines to be strings. Reports EOF as '' instead of StopIteration.'''
179
    def __init__(self, reader):
180
        Filter.__init__(self, None, reader)
181
    
182
    def readline(self):
183
        try: return self.reader.next()
184
        except StopIteration: return '' # EOF
185

    
186
class ColInsertFilter(Filter):
187
    '''Adds column(s) to each row
188
    @param mk_value(row, row_num) | literal_value
189
    '''
190
    def __init__(self, reader, mk_value, index=0, n=1):
191
        if not callable(mk_value):
192
            value = mk_value
193
            def mk_value(row, row_num): return value
194
        
195
        def filter_(row):
196
            row = list(row) # make sure it's mutable; don't modify input!
197
            for i in xrange(n):
198
                row.insert(index+i, mk_value(row, self.reader.line_num))
199
            return row
200
        Filter.__init__(self, filter_,
201
            streams.LineCountInputStream(StreamFilter(reader)))
202

    
203
class RowNumFilter(ColInsertFilter):
204
    '''Adds a row # column at the beginning of each row'''
205
    def __init__(self, reader):
206
        def mk_value(row, row_num): return row_num
207
        ColInsertFilter.__init__(self, reader, mk_value, 0)
208

    
209
class InputRewriter(StreamFilter):
210
    '''Wraps a reader, writing each row back to CSV'''
211
    def __init__(self, reader, dialect=csv.excel):
212
        StreamFilter.__init__(self, reader)
213
        
214
        self.dialect = dialect
215
    
216
    def readline(self):
217
        try:
218
            row = self.reader.readline()
219
            if row == '': return row # EOF
220
            
221
            line_stream = StringIO.StringIO()
222
            csv.writer(line_stream, self.dialect).writerow(row)
223
            return line_stream.getvalue()
224
        except Exception, e:
225
            exc.print_ex(e)
226
            raise
227
    
228
    def read(self, n): return self.readline() # forward all reads to readline()
(11-11/49)