Project

General

Profile

1
# I/O
2

    
3
import operator
4
import timeouts
5

    
6
##### Exceptions
7

    
8
class InputStreamsOnlyException(Exception):
9
    def __init__(self): Exception.__init__(self, 'Only input streams allowed')
10

    
11
##### Wrapped streams
12

    
13
class WrapStream:
14
    '''Forwards close() to the underlying stream'''
15
    def __init__(self, stream): self.stream = stream
16
    
17
    def close(self): self.stream.close()
18

    
19
##### Line iteration
20

    
21
class StreamIter(WrapStream):
22
    '''Iterates over the lines in a stream.
23
    Unlike stream.__iter__(), doesn't wait for EOF to start returning lines.
24
    Offers curr() method.
25
    '''
26
    def __init__(self, stream):
27
        WrapStream.__init__(self, stream)
28
        self.line = None
29
    
30
    def __iter__(self): return self
31
    
32
    def curr(self):
33
        if self.line == None: self.line = self.readline()
34
        if self.line == '': raise StopIteration
35
        return self.line
36
    
37
    def next(self):
38
        line = self.curr()
39
        self.line = None
40
        return line
41
    
42
    # Define as a separate method so it can be overridden
43
    def readline(self): return self.stream.readline()
44

    
45
def copy(from_, to):
46
    for line in StreamIter(from_): to.write(line)
47

    
48
def consume(in_):
49
    for line in StreamIter(in_): pass
50

    
51
def read_all(stream): return reduce(operator.add, StreamIter(stream), '')
52

    
53
def file_get_contents(file_path): return read_all(open(file_path))
54

    
55
##### Timeouts
56

    
57
class TimeoutInputStream(WrapStream):
58
    '''@param timeout sec'''
59
    def __init__(self, stream, timeout):
60
        WrapStream.__init__(self, stream)
61
        self.timeout = timeout
62
    
63
    def readline(self):
64
        return timeouts.run(lambda: self.stream.readline(), self.timeout)
65
    
66
    def write(self, str_): raise InputStreamsOnlyException()
67

    
68
##### Filtered/traced streams
69

    
70
class FilterStream(StreamIter):
71
    '''Wraps a stream, filtering each string read or written'''
72
    def __init__(self, filter_, stream):
73
        StreamIter.__init__(self, stream)
74
        self.filter = filter_
75
    
76
    def readline(self): return self.filter(StreamIter.readline(self))
77
    
78
    def read(self, n): return self.readline() # forward all reads to readline()
79
    
80
    def write(self, str_): return self.stream.write(self.filter(str_))
81

    
82
class TracedStream(FilterStream):
83
    '''Wraps a stream, running a trace function on each string read or written
84
    '''
85
    def __init__(self, trace, stream):
86
        def filter_(str_):
87
            trace(str_)
88
            return str_
89
        FilterStream.__init__(self, filter_, stream)
90

    
91
class LineCountStream(TracedStream):
92
    '''Wraps a unidirectional stream, making the current line number available.
93
    Lines start counting from 1, incrementing before each line is returned.'''
94
    def __init__(self, stream):
95
        self.line_num = 0
96
        def trace(str_): self.line_num += str_.count('\n')
97
        TracedStream.__init__(self, trace, stream)
98

    
99
class LineCountInputStream(TracedStream):
100
    '''Wraps an input stream, making the current line number available.
101
    Lines start counting from 1, incrementing before each line is returned.
102
    This is faster than LineCountStream for input streams, because all reading
103
    is done through readline() so newlines don't need to be explicitly counted.
104
    '''
105
    def __init__(self, stream):
106
        self.line_num = 0
107
        def trace(str_):
108
            if str_ != '': self.line_num += 1 # EOF is not a line
109
        TracedStream.__init__(self, trace, stream)
110
    
111
    def write(self, str_): raise InputStreamsOnlyException()
112

    
113
class ProgressInputStream(TracedStream):
114
    '''Wraps an input stream, reporting the # lines read every n lines and after
115
    the last line is read.
116
    @param log The output stream for progress messages
117
    '''
118
    def __init__(self, stream, log, msg='Read %d line(s)', n=100):
119
        self.eof = False
120
        
121
        def trace(str_):
122
            msg_ = msg # may be modified, so can't have same name as outer var
123
            if str_ == None: # closed
124
                if self.eof: return # not closed prematurely
125
                str_ = '' # closed prematurely, so signal EOF
126
                msg_ += ' (not all input read)'
127
            
128
            self.eof = eof = str_ == ''
129
            if eof: line_ending = '\n'
130
            else: line_ending = '\r'
131
            
132
            line_ct = self.stream.line_num
133
            if eof or (line_ct > 0 and line_ct % n == 0):
134
                log.write((msg_ % line_ct)+line_ending)
135
        self.trace = trace
136
        
137
        TracedStream.__init__(self, trace, LineCountInputStream(stream))
138
    
139
    def close(self):
140
        self.trace(None) # signal closed
141
        TracedStream.close(self)
142

    
143
class CaptureStream(TracedStream):
144
    '''Wraps a stream, capturing matching text.
145
    Matches can be retrieved from self.matches.'''
146
    def __init__(self, stream, start_str, end_str):
147
        self.recording = False
148
        self.matches = []
149
        
150
        def trace(str_):
151
            start_idx = 0
152
            if not self.recording:
153
                start_idx = str_.find(start_str)
154
                if start_idx >= 0:
155
                    self.recording = True
156
                    self.matches.append('')
157
            if self.recording:
158
                end_idx = str_.find(end_str)
159
                if end_idx >= 0:
160
                    self.recording = False
161
                    end_idx += len(end_str)
162
                else: end_idx = len(str_)
163
                
164
                self.matches[-1] += str_[start_idx:end_idx]
165
        
166
        TracedStream.__init__(self, trace, stream)
(38-38/49)