Project

General

Profile

1 1630 aaronmk
# I/O
2
3 1673 aaronmk
import timeouts
4 1635 aaronmk
5 1928 aaronmk
##### Exceptions
6
7
class InputStreamsOnlyException(Exception):
8
    def __init__(self): Exception.__init__(self, 'Only input streams allowed')
9
10 1759 aaronmk
##### Wrapped streams
11
12 1635 aaronmk
class WrapStream:
13
    '''Forwards close() to the underlying stream'''
14
    def __init__(self, stream): self.stream = stream
15
16
    def close(self): self.stream.close()
17
18 1759 aaronmk
##### Line iteration
19
20 1635 aaronmk
class StreamIter(WrapStream):
21 1630 aaronmk
    '''Iterates over the lines in a stream.
22
    Unlike stream.__iter__(), doesn't wait for EOF to start returning lines.
23
    Offers curr() method.
24
    '''
25
    def __init__(self, stream):
26 1635 aaronmk
        WrapStream.__init__(self, stream)
27 1630 aaronmk
        self.line = None
28
29
    def __iter__(self): return self
30
31
    def curr(self):
32
        if self.line == None: self.line = self.stream.readline()
33
        if self.line == '': raise StopIteration
34
        return self.line
35
36
    def next(self):
37
        line = self.curr()
38
        self.line = None
39
        return line
40 1635 aaronmk
41 1686 aaronmk
def copy(from_, to):
42
    for line in StreamIter(from_): to.write(line)
43
44 1759 aaronmk
def consume(in_):
45
    for line in StreamIter(in_): pass
46
47
##### Timeouts
48
49 1635 aaronmk
class TimeoutInputStream(WrapStream):
50 1673 aaronmk
    '''@param timeout sec'''
51
    def __init__(self, stream, timeout):
52 1635 aaronmk
        WrapStream.__init__(self, stream)
53 1673 aaronmk
        self.timeout = timeout
54 1630 aaronmk
55 1635 aaronmk
    def readline(self):
56 1673 aaronmk
        return timeouts.run(lambda: self.stream.readline(), self.timeout)
57 1928 aaronmk
58
    def write(self, str_): raise InputStreamsOnlyException()
59 1630 aaronmk
60 1710 aaronmk
##### Filtered/traced streams
61 1686 aaronmk
62 1710 aaronmk
class FilterStream(WrapStream):
63
    '''Wraps a stream, filtering each string read or written'''
64
    def __init__(self, filter_, stream):
65 1682 aaronmk
        WrapStream.__init__(self, stream)
66 1710 aaronmk
        self.filter = filter_
67 1630 aaronmk
68 1710 aaronmk
    def readline(self): return self.filter(self.stream.readline())
69 1682 aaronmk
70 1762 aaronmk
    def read(self, n): return self.readline() # forward all reads to readline()
71
72 1710 aaronmk
    def write(self, str_): return self.stream.write(self.filter(str_))
73 1630 aaronmk
74 1710 aaronmk
class TracedStream(FilterStream):
75
    '''Wraps a stream, running a trace function on each string read or written
76
    '''
77
    def __init__(self, trace, stream):
78
        def filter_(str_):
79
            trace(str_)
80
            return str_
81
        FilterStream.__init__(self, filter_, stream)
82
83 1684 aaronmk
class LineCountStream(TracedStream):
84
    '''Wraps a unidirectional stream, making the current line number available.
85 1630 aaronmk
    Lines start counting from 1.'''
86
    def __init__(self, stream):
87
        self.line_num = 1
88
        def trace(str_): self.line_num += str_.count('\n')
89 1682 aaronmk
        TracedStream.__init__(self, trace, stream)
90
91 1928 aaronmk
class LineCountInputStream(TracedStream):
92
    '''Wraps an input stream, making the current line number available.
93
    Lines start counting from 1.
94
    This is faster than LineCountStream for input streams, because all reading
95
    is done through readline() so newlines don't need to be explicitly counted.
96
    '''
97
    def __init__(self, stream):
98
        self.line_num = 1
99
        def trace(str_): self.line_num += 1
100
        TracedStream.__init__(self, trace, stream)
101
102
    def write(self, str_): raise InputStreamsOnlyException()
103
104 1939 aaronmk
class ProgressInputStream(TracedStream):
105
    '''Wraps an input stream, reporting the # lines read every n lines and after
106
    the last line is read.
107
    @param log The output stream for progress messages
108
    '''
109
    def __init__(self, stream, log, msg='Read %d line(s)', n=100):
110
        self.eof = False
111
112
        def trace(str_):
113
            msg_ = msg # may be modified, so can't have same name as outer var
114
            if str_ == None: # closed
115
                if self.eof: return # not closed prematurely
116
                str_ = '' # closed prematurely, so signal EOF
117
                msg_ += ' (not all input read)'
118
119
            self.eof = eof = str_ == ''
120
            if eof: line_ending = '\n'
121
            else: line_ending = '\r'
122
123
            line_ct = self.stream.line_num - 1 # make it 0-based
124
            if eof or line_ct % n == 0: log.write((msg_ % line_ct)+line_ending)
125
        self.trace = trace
126
127
        TracedStream.__init__(self, trace, LineCountInputStream(stream))
128
129
    def close(self):
130
        self.trace(None) # signal closed
131
        TracedStream.close(self)
132
133 1682 aaronmk
class CaptureStream(TracedStream):
134
    '''Wraps a stream, capturing matching text.
135 1706 aaronmk
    Matches can be retrieved from self.matches.'''
136 1682 aaronmk
    def __init__(self, stream, start_str, end_str):
137
        self.recording = False
138 1706 aaronmk
        self.matches = []
139 1682 aaronmk
140
        def trace(str_):
141 1707 aaronmk
            start_idx = 0
142
            if not self.recording:
143
                start_idx = str_.find(start_str)
144
                if start_idx >= 0:
145
                    self.recording = True
146
                    self.matches.append('')
147
            if self.recording:
148
                end_idx = str_.find(end_str)
149
                if end_idx >= 0:
150
                    self.recording = False
151
                    end_idx += len(end_str)
152
                else: end_idx = len(str_)
153
154
                self.matches[-1] += str_[start_idx:end_idx]
155 1682 aaronmk
156
        TracedStream.__init__(self, trace, stream)