Project

General

Profile

1 1630 aaronmk
# I/O
2
3 4917 aaronmk
import operator
4 1673 aaronmk
import timeouts
5 1635 aaronmk
6 1928 aaronmk
##### Exceptions
7
8
class InputStreamsOnlyException(Exception):
9
    def __init__(self): Exception.__init__(self, 'Only input streams allowed')
10
11 1759 aaronmk
##### Wrapped streams
12
13 1635 aaronmk
class WrapStream:
14
    '''Forwards close() to the underlying stream'''
15
    def __init__(self, stream): self.stream = stream
16
17
    def close(self): self.stream.close()
18
19 1759 aaronmk
##### Line iteration
20
21 1635 aaronmk
class StreamIter(WrapStream):
22 1630 aaronmk
    '''Iterates over the lines in a stream.
23
    Unlike stream.__iter__(), doesn't wait for EOF to start returning lines.
24
    Offers curr() method.
25
    '''
26
    def __init__(self, stream):
27 1635 aaronmk
        WrapStream.__init__(self, stream)
28 1630 aaronmk
        self.line = None
29
30
    def __iter__(self): return self
31
32
    def curr(self):
33 1964 aaronmk
        if self.line == None: self.line = self.readline()
34 1630 aaronmk
        if self.line == '': raise StopIteration
35
        return self.line
36
37
    def next(self):
38
        line = self.curr()
39
        self.line = None
40
        return line
41 1964 aaronmk
42
    # Define as a separate method so it can be overridden
43
    def readline(self): return self.stream.readline()
44 1635 aaronmk
45 1686 aaronmk
def copy(from_, to):
46
    for line in StreamIter(from_): to.write(line)
47
48 1759 aaronmk
def consume(in_):
49
    for line in StreamIter(in_): pass
50
51 4917 aaronmk
def read_all(stream): return reduce(operator.add, StreamIter(stream), '')
52
53 1759 aaronmk
##### Timeouts
54
55 1635 aaronmk
class TimeoutInputStream(WrapStream):
56 1673 aaronmk
    '''@param timeout sec'''
57
    def __init__(self, stream, timeout):
58 1635 aaronmk
        WrapStream.__init__(self, stream)
59 1673 aaronmk
        self.timeout = timeout
60 1630 aaronmk
61 1635 aaronmk
    def readline(self):
62 1673 aaronmk
        return timeouts.run(lambda: self.stream.readline(), self.timeout)
63 1928 aaronmk
64
    def write(self, str_): raise InputStreamsOnlyException()
65 1630 aaronmk
66 1710 aaronmk
##### Filtered/traced streams
67 1686 aaronmk
68 1962 aaronmk
class FilterStream(StreamIter):
69 1710 aaronmk
    '''Wraps a stream, filtering each string read or written'''
70
    def __init__(self, filter_, stream):
71 1962 aaronmk
        StreamIter.__init__(self, stream)
72 1710 aaronmk
        self.filter = filter_
73 1630 aaronmk
74 1964 aaronmk
    def readline(self): return self.filter(StreamIter.readline(self))
75 1682 aaronmk
76 1762 aaronmk
    def read(self, n): return self.readline() # forward all reads to readline()
77
78 1710 aaronmk
    def write(self, str_): return self.stream.write(self.filter(str_))
79 1630 aaronmk
80 1710 aaronmk
class TracedStream(FilterStream):
81
    '''Wraps a stream, running a trace function on each string read or written
82
    '''
83
    def __init__(self, trace, stream):
84
        def filter_(str_):
85
            trace(str_)
86
            return str_
87
        FilterStream.__init__(self, filter_, stream)
88
89 1684 aaronmk
class LineCountStream(TracedStream):
90
    '''Wraps a unidirectional stream, making the current line number available.
91 5592 aaronmk
    Lines start counting from 1, incrementing before each line is returned.'''
92 1630 aaronmk
    def __init__(self, stream):
93 5592 aaronmk
        self.line_num = 0
94 1630 aaronmk
        def trace(str_): self.line_num += str_.count('\n')
95 1682 aaronmk
        TracedStream.__init__(self, trace, stream)
96
97 1928 aaronmk
class LineCountInputStream(TracedStream):
98
    '''Wraps an input stream, making the current line number available.
99 5592 aaronmk
    Lines start counting from 1, incrementing before each line is returned.
100 1928 aaronmk
    This is faster than LineCountStream for input streams, because all reading
101
    is done through readline() so newlines don't need to be explicitly counted.
102
    '''
103
    def __init__(self, stream):
104 5592 aaronmk
        self.line_num = 0
105 2672 aaronmk
        def trace(str_):
106
            if str_ != '': self.line_num += 1 # EOF is not a line
107 1928 aaronmk
        TracedStream.__init__(self, trace, stream)
108
109
    def write(self, str_): raise InputStreamsOnlyException()
110
111 1939 aaronmk
class ProgressInputStream(TracedStream):
112
    '''Wraps an input stream, reporting the # lines read every n lines and after
113
    the last line is read.
114
    @param log The output stream for progress messages
115
    '''
116
    def __init__(self, stream, log, msg='Read %d line(s)', n=100):
117
        self.eof = False
118
119
        def trace(str_):
120
            msg_ = msg # may be modified, so can't have same name as outer var
121
            if str_ == None: # closed
122
                if self.eof: return # not closed prematurely
123
                str_ = '' # closed prematurely, so signal EOF
124
                msg_ += ' (not all input read)'
125
126
            self.eof = eof = str_ == ''
127
            if eof: line_ending = '\n'
128
            else: line_ending = '\r'
129
130 5592 aaronmk
            line_ct = self.stream.line_num
131
            if eof or (line_ct > 0 and line_ct % n == 0):
132
                log.write((msg_ % line_ct)+line_ending)
133 1939 aaronmk
        self.trace = trace
134
135
        TracedStream.__init__(self, trace, LineCountInputStream(stream))
136
137
    def close(self):
138
        self.trace(None) # signal closed
139
        TracedStream.close(self)
140
141 1682 aaronmk
class CaptureStream(TracedStream):
142
    '''Wraps a stream, capturing matching text.
143 1706 aaronmk
    Matches can be retrieved from self.matches.'''
144 1682 aaronmk
    def __init__(self, stream, start_str, end_str):
145
        self.recording = False
146 1706 aaronmk
        self.matches = []
147 1682 aaronmk
148
        def trace(str_):
149 1707 aaronmk
            start_idx = 0
150
            if not self.recording:
151
                start_idx = str_.find(start_str)
152
                if start_idx >= 0:
153
                    self.recording = True
154
                    self.matches.append('')
155
            if self.recording:
156
                end_idx = str_.find(end_str)
157
                if end_idx >= 0:
158
                    self.recording = False
159
                    end_idx += len(end_str)
160
                else: end_idx = len(str_)
161
162
                self.matches[-1] += str_[start_idx:end_idx]
163 1682 aaronmk
164
        TracedStream.__init__(self, trace, stream)