Source code for alex.utils.fs

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# This code is PEP8-compliant. See http://www.python.org/dev/peps/pep-0008.
"""
Filesystem utility functions.

"""
from __future__ import unicode_literals

import fnmatch
import os
import os.path
import re
from multiprocessing import Process
from select import select
from time import sleep

DEBUG = False
# DEBUG = True


[docs]def normalise_path(path): """ Normalises a filesystem path using tilde expansion, absolutising and normalising the path, and resolving symlinks. """ return os.path.realpath(os.path.abspath(os.path.expanduser(path)))
[docs]def find(dir_, glob_, mindepth=2, maxdepth=6, ignore_globs=list(), ignore_paths=None, follow_symlinks=True, prune=False, rx=None, notrx=None): """ A simplified version of the GNU `find' utility. Lists files with basename matching `glob_' found in `dir_' in depth between `mindepth' and `maxdepth'. The `ignore_globs' argument specifies a glob for basenames of files to be ignored. The `ignore_paths' argument specifies a collection of real absolute pathnames that are pruned from the search. For efficiency reasons, it should be a set. In the current implementation, the traversal resolves symlinks before the file name is checked. However, taking symlinks into account can be forbidden altogether by specifying `follow_symlinks=False'. Cycles during the traversal are avoided. - prune: whether to prune the subtree below a matching directory - rx: regexp to use as an additional matching criterion apart from `glob_'; the `re.match' function is used, as opposed to `re.find' - notrx: like `rx' but this specifies the regexp that must NOT match The returned set of files consists of real absolute pathnames of those files. """ # Check the arguments. if (isinstance(mindepth, int) and isinstance(maxdepth, int) and mindepth > maxdepth): return set() # Normalise paths specified. dir_ = normalise_path(dir_) if ignore_paths: ignore_paths = map(normalise_path, ignore_paths) # Special case: mindepth == 0. ret = set() # Return an empty set by default. if mindepth is None or mindepth == 0: dirname = os.path.basename(dir_) # Apply the path filter. if ignore_paths and dir_ in ignore_paths: return set() # Apply the glob filter. for ignore_glob in ignore_globs: if fnmatch.fnmatch(dirname, ignore_glob): return set() # Match the glob. if (fnmatch.fnmatch(dirname, glob_) and (rx is None or re.match(rx, dirname)) and (notrx is None or not re.match(notrx, dirname))): ret = set((dir_, )) # Special case: also maxdepth == 0. Or we prune the whole subtree. if maxdepth == 0 or (ret and prune): return ret # If ignored paths are specified and non-empty, if ignore_paths: # Build a function filtering out the specified ignored paths. ignore_path_filter = lambda path_data: path_data[1] not in ignore_paths else: # Build an all-true filter. ignore_path_filter = lambda path_data: True # Call the implementation with the filter function. ret.update(_find_ignorefunc(dir_, glob_, mindepth, maxdepth, ignore_globs, ignore_path_filter, follow_symlinks, prune, rx, notrx, set())[0]) return ret
def _find_ignorefunc(dir_, glob_, mindepth, maxdepth, ignore_globs=list(), ignore_path_filter=lambda _: True, follow_symlinks=True, prune=False, rx=None, notrx=None, visited=set()): """ Implements the same functionality as the `find' function in this module. The difference is that the ignored paths are specified by a function. `ignore_path_filter' is a function that takes a tuple (basename, normalised name) and returns False iff the file should be ignored. `visited' is the set of normalised names of directories visited before this (recursive) call. This function cannot handle the depth of 0. That is handled by the wrapper function `find'. """ # List files/dirs in this directory, and remove symlinks if asked to. if os.path.isdir(dir_): try: children = os.listdir(dir_) except OSError as e: import sys print >>sys.stderr, e matched = set() return matched, visited else: children = [dir_] if follow_symlinks is False: children = filter(lambda path: not os.path.islink(path), children) # Resolve symlinks and store both the real path and the basename for each # child. children = map( lambda basename: normalise_path(os.path.join(dir_, basename)), children) children = map( lambda realpath: (os.path.basename(realpath), realpath), children) # Filter out ignored paths. children = filter(ignore_path_filter, children) # Apply ignore globs. for ignore_glob in ignore_globs: children = filter( lambda child: not fnmatch.fnmatch(child[0], ignore_glob), children) # Match children by the glob. if mindepth is None or mindepth < 2: matched = filter( lambda child: (fnmatch.fnmatch(child[0], glob_) and (rx is None or re.match(rx, child[0])) and (notrx is None or not re.match(notrx, child[0]))), children) matched = set(map(lambda child: child[1], matched)) else: matched = set() # Recur to subdirectories. if maxdepth is None or maxdepth > 1: subdirs = set(map( lambda path_data: path_data[1], filter(lambda child: os.path.isdir(child[1]), children)))\ - visited if prune: subdirs -= matched for subdir in subdirs: visited.add(dir_) more_matched, more_visited = \ _find_ignorefunc(subdir, glob_, mindepth - 1 if isinstance(mindepth, int) else None, maxdepth - 1 if isinstance(maxdepth, int) else None, ignore_globs, ignore_path_filter, follow_symlinks, prune, rx, notrx, visited) matched.update(more_matched) visited.update(more_visited) # Return. return matched, visited
[docs]class GrepFilter(Process): def __init__(self, stdin, stdout, breakchar='\n'): Process.__init__(self) self.stdin = stdin self.stdout = stdout self.buf = '' self.breakchar = breakchar self.listeners = list() # :: [(regex, callback)] self.valid = list() # :: [?is_listener_valid] self.closed = False
[docs] def add_listener(self, regex, callback): """ Adds a listener to the output strings. Arguments: regex -- the compiled regular expression to look for (`regex.search') in any piece of output callback -- a callable that is invoked for output where `regex' was found. This will be called like this: outputting &= callback(output_unicode_str) That means, callback should take the unicode string argument containing what would have been output and return a boolean value which is True iff outputting should stop. Returns the index of the listener for later reference. """ if all(self.valid): idx = len(self.listeners) self.listeners.append((regex, callback)) self.valid.append(True) else: idx = self.valid.index(False) self.listeners[idx] = (regex, callback) self.valid[idx] = True return idx
[docs] def remove_listener(self, listener_idx): if listener_idx >= len(self.listeners): raise IndexError('Listener index out of bounds.') if listener_idx == len(self.listeners) - 1: del self.listeners[-1] del self.valid[-1] else: self.valid[listener_idx] = False
[docs] def flush(self, force=True): flushing = False indata = '' while True: ready_inputs, _, _ = select((self.stdin, ), tuple(), tuple(), 0) if not ready_inputs: break indata = self.stdin.read() if not indata: break flushing |= self.breakchar in indata self.buf += indata if DEBUG: print "grep: stdin received:", indata print "flushing:", flushing print "self.buf:", self.buf if force and self.buf and self.buf[-1] != self.breakchar: self.buf += self.breakchar flushing = True if flushing: chunks = self.buf.split(self.breakchar) if DEBUG: print "chunks:", chunks self.buf = chunks[-1] del chunks[-1] map(self.write, chunks)
[docs] def write(self, unistr): # DEBUG if DEBUG: print print 'Outputting "{}"'.format(unistr) outputting = True for regex, callback in self.listeners: if regex.search(unistr): outputting &= callback(unistr) # DEBUG if DEBUG: print 'outputting={}'.format(outputting) print if outputting: if self.closed: raise Exception('The output stream has already been closed!') self.stdout.write(unistr + self.breakchar) else: self.closed = True
[docs] def run(self): while True: self.flush(False) sleep(.1)
# TODO Fix this and remove this file from nose2.cfg. # XXX This is very provisional.
[docs]def test_grep_filter(): # from StringIO import StringIO from subprocess import Popen, PIPE import re import sys devnull = open('/dev/null', 'a+') producer = Popen( ['head', '-n', '3', __file__], stdout=PIPE, stderr=devnull) # sio = StringIO() sio = sys.stdout gio = GrepFilter(producer.stdout, sio) def gfunc(outstr): print "(II) match found:", outstr # return False return True rx = re.compile('ho', re.I) gio.add_listener(rx, gfunc) gio.start() if DEBUG: print producer.stdout sleep(1.) from pprint import pprint pprint(producer.__dict__.items()) gio.flush() gio.terminate()
# XXX This is very provisional. if __name__ == "__main__": test_grep_filter()