Source code for walkdir

# -*- coding: utf-8 -*-
"""walkdir - iterative tools for working with os.walk() and similar interfaces
"""
import fnmatch
import os.path
import sys

# Should be compatible with 2.7 and 3.2+
try:
    _str_base = basestring
except NameError:
    _str_base = str


# Filtering for inclusion


def _make_include_filter(patterns):
    """Create a filtering function from a collection of inclusion patterns"""
    # Trivial case: exclude everything
    if not patterns:
        def _filter(names):
            return names[0:0]

        return _filter
    # Use fnmatch.filter if it's applicable
    if len(patterns) == 1:
        def _filter(names):
            return fnmatch.filter(names, patterns[0])

        return _filter

    # Handle the general case for inclusion
    def _should_include(name):
        return any(fnmatch.fnmatch(name, pattern)
                   for pattern in patterns)

    def _filter(names):
        for name in names:
            if _should_include(name):
                yield name

    return _filter


[docs]def include_dirs(walk_iter, *include_filters): """Use :func:`fnmatch.fnmatch` patterns to select directories of interest Inclusion filters are passed directly as arguments. This filter works by modifying the subdirectory lists produced by the underlying iterator, and hence requires a top-down traversal of the directory hierarchy. """ filter_subdirs = _make_include_filter(include_filters) for dir_entry in walk_iter: subdirs = dir_entry[1] subdirs[:] = filter_subdirs(subdirs) yield dir_entry
[docs]def include_files(walk_iter, *include_filters): """Use :func:`fnmatch.fnmatch` patterns to select files of interest Inclusion filters are passed directly as arguments This filter does not modify the subdirectory lists produced by the underlying iterator, and hence supports both top-down and bottom-up traversal of the directory hierarchy. """ filter_files = _make_include_filter(include_filters) for dir_entry in walk_iter: files = dir_entry[2] files[:] = filter_files(files) yield dir_entry
# Filtering for exclusion def _make_exclude_filter(patterns): """Create a filtering function from a collection of exclusion patterns""" # Trivial case: include everything if not patterns: def _filter(names): return names return _filter # Handle the general case for exclusion def _should_exclude(name): return any(fnmatch.fnmatch(name, pattern) for pattern in patterns) def _filter(names): for name in names: if not _should_exclude(name): yield name return _filter
[docs]def exclude_dirs(walk_iter, *exclude_filters): """Use :func:`fnmatch.fnmatch` patterns to skip irrelevant directories Exclusion filters are passed directly as arguments This filter works by modifying the subdirectory lists produced by the underlying iterator, and hence requires a top-down traversal of the directory hierarchy. """ filter_subdirs = _make_exclude_filter(exclude_filters) for dir_entry in walk_iter: subdirs = dir_entry[1] subdirs[:] = filter_subdirs(subdirs) yield dir_entry
[docs]def exclude_files(walk_iter, *exclude_filters): """Use :func:`fnmatch.fnmatch` patterns to skip irrelevant files Exclusion filters are passed directly as arguments This filter does not modify the subdirectory lists produced by the underlying iterator, and hence supports both top-down and bottom-up traversal of the directory hierarchy. """ filter_files = _make_exclude_filter(exclude_filters) for dir_entry in walk_iter: files = dir_entry[2] files[:] = filter_files(files) yield dir_entry
# Depth limiting
[docs]def limit_depth(walk_iter, depth): """Limit the depth of recursion into subdirectories. A *depth* of 0 limits the walk to the top level directory, a *depth* of 1 includes subdirectories, etc. Path depth is calculated by counting directory separators, using the depth of the first path produced by the underlying iterator as a reference point. This filter works by modifying the subdirectory lists produced by the underlying iterator, and hence requires a top-down traversal of the directory hierarchy. """ if depth < 0: msg = "Depth limit less than 0 ({0!r} provided)" raise ValueError(msg.format(depth)) sep = os.sep for dir_entry in walk_iter: yield dir_entry top = dir_entry[0] subdirs = dir_entry[1] initial_depth = top.count(sep) if depth == 0: subdirs[:] = [] break for dir_entry in walk_iter: dirpath = dir_entry[0] subdirs = dir_entry[1] current_depth = dirpath.count(sep) - initial_depth yield dir_entry if current_depth >= depth: subdirs[:] = []
[docs]def min_depth(walk_iter, depth): """Only process subdirectories beyond a minimum depth A *depth* of 1 omits the top level directory, a *depth* of 2 starts with subdirectories 2 levels down, etc. Path depth is calculated by counting directory separators, using the depth of the first path produced by the underlying iterator as a reference point. .. note:: Since this filter *doesn't yield* higher level directories, any subsequent directory filtering that relies on updating the subdirectory list will have no effect at the minimum depth. Accordingly, this filter should only be applied *after* any directory filtering operations. .. note:: The result of using this filter is effectively the same as chaining multiple independent :func:`os.walk` iterators using :func:`itertools.chain`. For example, given the following directory tree:: >>> tree test test ├── file1.txt ├── file2.txt ├── test2 │ ├── file1.txt │ ├── file2.txt │ └── test3 └── test4 ├── file1.txt └── test5 Then ``min_depth(os.walk("test"), depth=1)`` will produce the same output as ``itertools.chain(os.walk("test/test2"), os.walk("test/test4")).`` This filter works by modifying the subdirectory lists produced by the underlying iterator, and hence requires a top-down traversal of the directory hierarchy. """ if depth < 1: msg = "Minimum depth less than 1 ({0!r} provided)" raise ValueError(msg.format(depth)) sep = os.sep for dir_entry in walk_iter: initial_depth = dir_entry[0].count(sep) break for dir_entry in walk_iter: dirpath = dir_entry[0] current_depth = dirpath.count(sep) - initial_depth if current_depth >= depth: yield dir_entry
# Symlink loop handling # Convenience function that puts together an iterator pipeline
[docs]def filtered_walk(top, included_files=None, included_dirs=None, excluded_files=None, excluded_dirs=None, depth=None, followlinks=False, min_depth=None): """This is a wrapper around ``os.walk()`` and other filesystem traversal iterators, with these additional features: - *top* may be either a string (which will be passed to ``os.walk()``) or any iterable that produces sequences with ``path, subdirs, files`` as the first three elements in the sequence - allows independent glob-style filters for filenames and subdirectories - allows a recursion depth limit to be specified - allows a minimum depth to be specified to report only subdirectory contents - emits a message to stderr and skips the directory if a symlink loop is encountered when following links Filtered walks created by passing in a string are always top down, as the subdirectory listings must be altered to provide a number of the above features. *include_files*, *include_dirs*, *exclude_files* and *exclude_dirs* are used to apply the relevant filtering steps to the walk. A *depth* of ``None`` (the default) disables depth limiting. Otherwise, *depth* must be at least zero and indicates how far to descend into the directory hierarchy. A depth of zero is useful to get separate filtered subdirectory and file listings for *top*. Setting *min_depth* allows directories higher in the tree to be excluded from the walk (e.g. a *min_depth* of 1 excludes *top*, but any subdirectories will still be processed) *followlinks* enables symbolic loop detection (when set to ``True``) and is also passed to ``os.walk()`` when top is a string """ if isinstance(top, str): walk_iter = os.walk(top, followlinks=followlinks) else: walk_iter = top # Depth limiting first, since it can cut great swathes from the tree if depth is not None: walk_iter = limit_depth(walk_iter, depth) # Next we do our path based filtering that can skip directories if included_dirs is not None: walk_iter = include_dirs(walk_iter, *included_dirs) if excluded_dirs is not None: walk_iter = exclude_dirs(walk_iter, *excluded_dirs) # And then we check the filesystem for symlink loops if followlinks: walk_iter = handle_symlink_loops(walk_iter) # Now that all other directory filtering has been handled, we can apply # the minimum depth check if min_depth is not None: walk_iter = globals()["min_depth"](walk_iter, min_depth) # Finally, apply the file filters that can't alter the shape of the tree if included_files is not None: walk_iter = include_files(walk_iter, *included_files) if excluded_files is not None: walk_iter = exclude_files(walk_iter, *excluded_files) for triple in walk_iter: yield triple
# Iterators that flatten the output into a series of paths
[docs]def dir_paths(walk_iter): """Iterate over the directories visited by the underlying walk Directories are emitted in the order visited, so the underlying walk may be either top-down or bottom-up. """ for dir_entry in walk_iter: yield dir_entry[0]
[docs]def all_dir_paths(walk_iter): """Iterate over all directories reachable through the underlying walk This covers: * all visited directories (similar to dir_paths) * all reported subdirectories of visited directories (even if not otherwise visited) Example cases where the output may differ from dir_paths: * all_dir_paths always includes symlinks to directories even when the underlying iterator doesn't follow symlinks * all_dir_paths will include subdirectories of directories at the maximum depth in a depth limited walk This iterator expects new root directories to be emitted by the underlying walk before any of their contents, and hence requires a top-down traversal of the directory hierarchy. """ dir_entry = next(walk_iter, None) if dir_entry is None: return top = dir_entry[0] yield top while dir_entry: dirpath = dir_entry[0] if not dirpath.startswith(top): yield dirpath top = dirpath for subdir in dir_entry[1]: yield os.path.join(dirpath, subdir) dir_entry = next(walk_iter, None)
[docs]def file_paths(walk_iter): """Iterate over the files in directories visited by the underlying walk Directory contents are emitted in the order visited, so the underlying walk may be either top-down or bottom-up. """ for dir_entry in walk_iter: for fname in dir_entry[2]: yield os.path.join(dir_entry[0], fname)
[docs]def all_paths(walk_iter): """Iterate over all paths reachable through the underlying walk This covers: * all visited directories * all files in visited directories * all reported subdirectories of visited directories (even if not otherwise visited) This iterator expects new root directories to be emitted by the underlying walk before any of their contents, and hence requires a top-down traversal of the directory hierarchy. """ dir_entry = next(walk_iter, None) if dir_entry is None: return top = dir_entry[0] yield top while dir_entry: dirpath = dir_entry[0] if not dirpath.startswith(top): yield dirpath top = dirpath for fname in dir_entry[2]: yield os.path.join(dirpath, fname) for subdir in dir_entry[1]: yield os.path.join(dirpath, subdir) dir_entry = next(walk_iter, None)
# Legacy API iter_dir_paths = dir_paths iter_file_paths = file_paths iter_paths = all_paths