Project

General

Profile

1 1630 aaronmk
# I/O
2
3 4917 aaronmk
import operator
4 1673 aaronmk
import timeouts
5 1635 aaronmk
6 1928 aaronmk
##### Exceptions
7
8
class InputStreamsOnlyException(Exception):
9
    def __init__(self): Exception.__init__(self, 'Only input streams allowed')
10
11 1759 aaronmk
##### Wrapped streams
12
13 1635 aaronmk
class WrapStream:
14
    '''Forwards close() to the underlying stream'''
15
    def __init__(self, stream): self.stream = stream
16
17
    def close(self): self.stream.close()
18
19 1759 aaronmk
##### Line iteration
20
21 1635 aaronmk
class StreamIter(WrapStream):
22 1630 aaronmk
    '''Iterates over the lines in a stream.
23
    Unlike stream.__iter__(), doesn't wait for EOF to start returning lines.
24
    Offers curr() method.
25
    '''
26
    def __init__(self, stream):
27 1635 aaronmk
        WrapStream.__init__(self, stream)
28 1630 aaronmk
        self.line = None
29
30
    def __iter__(self): return self
31
32
    def curr(self):
33 1964 aaronmk
        if self.line == None: self.line = self.readline()
34 1630 aaronmk
        if self.line == '': raise StopIteration
35
        return self.line
36
37
    def next(self):
38
        line = self.curr()
39
        self.line = None
40
        return line
41 1964 aaronmk
42
    # Define as a separate method so it can be overridden
43
    def readline(self): return self.stream.readline()
44 1635 aaronmk
45 1686 aaronmk
def copy(from_, to):
46
    for line in StreamIter(from_): to.write(line)
47
48 1759 aaronmk
def consume(in_):
49
    for line in StreamIter(in_): pass
50
51 4917 aaronmk
def read_all(stream): return reduce(operator.add, StreamIter(stream), '')
52
53 1759 aaronmk
##### Timeouts
54
55 1635 aaronmk
class TimeoutInputStream(WrapStream):
56 1673 aaronmk
    '''@param timeout sec'''
57
    def __init__(self, stream, timeout):
58 1635 aaronmk
        WrapStream.__init__(self, stream)
59 1673 aaronmk
        self.timeout = timeout
60 1630 aaronmk
61 1635 aaronmk
    def readline(self):
62 1673 aaronmk
        return timeouts.run(lambda: self.stream.readline(), self.timeout)
63 1928 aaronmk
64
    def write(self, str_): raise InputStreamsOnlyException()
65 1630 aaronmk
66 1710 aaronmk
##### Filtered/traced streams
67 1686 aaronmk
68 1962 aaronmk
class FilterStream(StreamIter):
69 1710 aaronmk
    '''Wraps a stream, filtering each string read or written'''
70
    def __init__(self, filter_, stream):
71 1962 aaronmk
        StreamIter.__init__(self, stream)
72 1710 aaronmk
        self.filter = filter_
73 1630 aaronmk
74 1964 aaronmk
    def readline(self): return self.filter(StreamIter.readline(self))
75 1682 aaronmk
76 1762 aaronmk
    def read(self, n): return self.readline() # forward all reads to readline()
77
78 1710 aaronmk
    def write(self, str_): return self.stream.write(self.filter(str_))
79 1630 aaronmk
80 1710 aaronmk
class TracedStream(FilterStream):
81
    '''Wraps a stream, running a trace function on each string read or written
82
    '''
83
    def __init__(self, trace, stream):
84
        def filter_(str_):
85
            trace(str_)
86
            return str_
87
        FilterStream.__init__(self, filter_, stream)
88
89 1684 aaronmk
class LineCountStream(TracedStream):
90
    '''Wraps a unidirectional stream, making the current line number available.
91 1630 aaronmk
    Lines start counting from 1.'''
92
    def __init__(self, stream):
93
        self.line_num = 1
94
        def trace(str_): self.line_num += str_.count('\n')
95 1682 aaronmk
        TracedStream.__init__(self, trace, stream)
96
97 1928 aaronmk
class LineCountInputStream(TracedStream):
98
    '''Wraps an input stream, making the current line number available.
99
    Lines start counting from 1.
100
    This is faster than LineCountStream for input streams, because all reading
101
    is done through readline() so newlines don't need to be explicitly counted.
102
    '''
103
    def __init__(self, stream):
104
        self.line_num = 1
105 2672 aaronmk
        def trace(str_):
106
            if str_ != '': self.line_num += 1 # EOF is not a line
107 1928 aaronmk
        TracedStream.__init__(self, trace, stream)
108
109
    def write(self, str_): raise InputStreamsOnlyException()
110
111 1939 aaronmk
class ProgressInputStream(TracedStream):
112
    '''Wraps an input stream, reporting the # lines read every n lines and after
113
    the last line is read.
114
    @param log The output stream for progress messages
115
    '''
116
    def __init__(self, stream, log, msg='Read %d line(s)', n=100):
117
        self.eof = False
118
119
        def trace(str_):
120
            msg_ = msg # may be modified, so can't have same name as outer var
121
            if str_ == None: # closed
122
                if self.eof: return # not closed prematurely
123
                str_ = '' # closed prematurely, so signal EOF
124
                msg_ += ' (not all input read)'
125
126
            self.eof = eof = str_ == ''
127
            if eof: line_ending = '\n'
128
            else: line_ending = '\r'
129
130
            line_ct = self.stream.line_num - 1 # make it 0-based
131
            if eof or line_ct % n == 0: log.write((msg_ % line_ct)+line_ending)
132
        self.trace = trace
133
134
        TracedStream.__init__(self, trace, LineCountInputStream(stream))
135
136
    def close(self):
137
        self.trace(None) # signal closed
138
        TracedStream.close(self)
139
140 1682 aaronmk
class CaptureStream(TracedStream):
141
    '''Wraps a stream, capturing matching text.
142 1706 aaronmk
    Matches can be retrieved from self.matches.'''
143 1682 aaronmk
    def __init__(self, stream, start_str, end_str):
144
        self.recording = False
145 1706 aaronmk
        self.matches = []
146 1682 aaronmk
147
        def trace(str_):
148 1707 aaronmk
            start_idx = 0
149
            if not self.recording:
150
                start_idx = str_.find(start_str)
151
                if start_idx >= 0:
152
                    self.recording = True
153
                    self.matches.append('')
154
            if self.recording:
155
                end_idx = str_.find(end_str)
156
                if end_idx >= 0:
157
                    self.recording = False
158
                    end_idx += len(end_str)
159
                else: end_idx = len(str_)
160
161
                self.matches[-1] += str_[start_idx:end_idx]
162 1682 aaronmk
163
        TracedStream.__init__(self, trace, stream)