# Copyright (C) 2006-2007 Robey Pointer # Copyright 2012 Citrix Systems, Inc. Licensed under the # Apache License, Version 2.0 (the "License"); you may not use this # file except in compliance with the License. Citrix Systems, Inc. # reserves all rights not expressly granted by the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Automatically generated by addcopyright.py at 04/03/2012 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. """ Attempt to generalize the "feeder" part of a Channel: an object which can be read from and closed, but is reading from a buffer fed by another thread. The read operations are blocking and can have a timeout set. """ import array import threading import time class PipeTimeout (IOError): """ Indicates that a timeout was reached on a read from a L{BufferedPipe}. """ pass class BufferedPipe (object): """ A buffer that obeys normal read (with timeout) & close semantics for a file or socket, but is fed data from another thread. This is used by L{Channel}. """ def __init__(self): self._lock = threading.Lock() self._cv = threading.Condition(self._lock) self._event = None self._buffer = array.array('B') self._closed = False def set_event(self, event): """ Set an event on this buffer. When data is ready to be read (or the buffer has been closed), the event will be set. When no data is ready, the event will be cleared. @param event: the event to set/clear @type event: Event """ self._event = event if len(self._buffer) > 0: event.set() else: event.clear() def feed(self, data): """ Feed new data into this pipe. This method is assumed to be called from a separate thread, so synchronization is done. @param data: the data to add @type data: str """ self._lock.acquire() try: if self._event is not None: self._event.set() self._buffer.fromstring(data) self._cv.notifyAll() finally: self._lock.release() def read_ready(self): """ Returns true if data is buffered and ready to be read from this feeder. A C{False} result does not mean that the feeder has closed; it means you may need to wait before more data arrives. @return: C{True} if a L{read} call would immediately return at least one byte; C{False} otherwise. @rtype: bool """ self._lock.acquire() try: if len(self._buffer) == 0: return False return True finally: self._lock.release() def read(self, nbytes, timeout=None): """ Read data from the pipe. The return value is a string representing the data received. The maximum amount of data to be received at once is specified by C{nbytes}. If a string of length zero is returned, the pipe has been closed. The optional C{timeout} argument can be a nonnegative float expressing seconds, or C{None} for no timeout. If a float is given, a C{PipeTimeout} will be raised if the timeout period value has elapsed before any data arrives. @param nbytes: maximum number of bytes to read @type nbytes: int @param timeout: maximum seconds to wait (or C{None}, the default, to wait forever) @type timeout: float @return: data @rtype: str @raise PipeTimeout: if a timeout was specified and no data was ready before that timeout """ out = '' self._lock.acquire() try: if len(self._buffer) == 0: if self._closed: return out # should we block? if timeout == 0.0: raise PipeTimeout() # loop here in case we get woken up but a different thread has # grabbed everything in the buffer. while (len(self._buffer) == 0) and not self._closed: then = time.time() self._cv.wait(timeout) if timeout is not None: timeout -= time.time() - then if timeout <= 0.0: raise PipeTimeout() # something's in the buffer and we have the lock! if len(self._buffer) <= nbytes: out = self._buffer.tostring() del self._buffer[:] if (self._event is not None) and not self._closed: self._event.clear() else: out = self._buffer[:nbytes].tostring() del self._buffer[:nbytes] finally: self._lock.release() return out def empty(self): """ Clear out the buffer and return all data that was in it. @return: any data that was in the buffer prior to clearing it out @rtype: str """ self._lock.acquire() try: out = self._buffer.tostring() del self._buffer[:] if (self._event is not None) and not self._closed: self._event.clear() return out finally: self._lock.release() def close(self): """ Close this pipe object. Future calls to L{read} after the buffer has been emptied will return immediately with an empty string. """ self._lock.acquire() try: self._closed = True self._cv.notifyAll() if self._event is not None: self._event.set() finally: self._lock.release() def __len__(self): """ Return the number of bytes buffered. @return: number of bytes bufferes @rtype: int """ self._lock.acquire() try: return len(self._buffer) finally: self._lock.release()