# -*- coding: utf-8 -*-
"""walkdir - iterative tools for working with os.walk() and similar interfaces
"""
import fnmatch
import os.path
import sys
# Should be compatible with 2.7 and 3.2+
try:
_str_base = basestring
except NameError:
_str_base = str
# Filtering for inclusion
def _make_include_filter(patterns):
"""Create a filtering function from a collection of inclusion patterns"""
# Trivial case: exclude everything
if not patterns:
def _filter(names):
return names[0:0]
return _filter
# Use fnmatch.filter if it's applicable
if len(patterns) == 1:
def _filter(names):
return fnmatch.filter(names, patterns[0])
return _filter
# Handle the general case for inclusion
def _should_include(name):
return any(fnmatch.fnmatch(name, pattern)
for pattern in patterns)
def _filter(names):
for name in names:
if _should_include(name):
yield name
return _filter
[docs]def include_dirs(walk_iter, *include_filters):
"""Use :func:`fnmatch.fnmatch` patterns to select directories of interest
Inclusion filters are passed directly as arguments.
This filter works by modifying the subdirectory lists produced by the
underlying iterator, and hence requires a top-down traversal of the
directory hierarchy.
"""
filter_subdirs = _make_include_filter(include_filters)
for dir_entry in walk_iter:
subdirs = dir_entry[1]
subdirs[:] = filter_subdirs(subdirs)
yield dir_entry
[docs]def include_files(walk_iter, *include_filters):
"""Use :func:`fnmatch.fnmatch` patterns to select files of interest
Inclusion filters are passed directly as arguments
This filter does not modify the subdirectory lists produced by the
underlying iterator, and hence supports both top-down and bottom-up
traversal of the directory hierarchy.
"""
filter_files = _make_include_filter(include_filters)
for dir_entry in walk_iter:
files = dir_entry[2]
files[:] = filter_files(files)
yield dir_entry
# Filtering for exclusion
def _make_exclude_filter(patterns):
"""Create a filtering function from a collection of exclusion patterns"""
# Trivial case: include everything
if not patterns:
def _filter(names):
return names
return _filter
# Handle the general case for exclusion
def _should_exclude(name):
return any(fnmatch.fnmatch(name, pattern)
for pattern in patterns)
def _filter(names):
for name in names:
if not _should_exclude(name):
yield name
return _filter
[docs]def exclude_dirs(walk_iter, *exclude_filters):
"""Use :func:`fnmatch.fnmatch` patterns to skip irrelevant directories
Exclusion filters are passed directly as arguments
This filter works by modifying the subdirectory lists produced by the
underlying iterator, and hence requires a top-down traversal of the
directory hierarchy.
"""
filter_subdirs = _make_exclude_filter(exclude_filters)
for dir_entry in walk_iter:
subdirs = dir_entry[1]
subdirs[:] = filter_subdirs(subdirs)
yield dir_entry
[docs]def exclude_files(walk_iter, *exclude_filters):
"""Use :func:`fnmatch.fnmatch` patterns to skip irrelevant files
Exclusion filters are passed directly as arguments
This filter does not modify the subdirectory lists produced by the
underlying iterator, and hence supports both top-down and bottom-up
traversal of the directory hierarchy.
"""
filter_files = _make_exclude_filter(exclude_filters)
for dir_entry in walk_iter:
files = dir_entry[2]
files[:] = filter_files(files)
yield dir_entry
# Depth limiting
[docs]def limit_depth(walk_iter, depth):
"""Limit the depth of recursion into subdirectories.
A *depth* of 0 limits the walk to the top level directory, a *depth* of 1
includes subdirectories, etc.
Path depth is calculated by counting directory separators, using the
depth of the first path produced by the underlying iterator as a
reference point.
This filter works by modifying the subdirectory lists produced by the
underlying iterator, and hence requires a top-down traversal of the
directory hierarchy.
"""
if depth < 0:
msg = "Depth limit less than 0 ({0!r} provided)"
raise ValueError(msg.format(depth))
sep=os.sep
for dir_entry in walk_iter:
yield dir_entry
top = dir_entry[0]
subdirs = dir_entry[1]
initial_depth = top.count(sep)
if depth == 0:
subdirs[:] = []
break
for dir_entry in walk_iter:
dirpath = dir_entry[0]
subdirs = dir_entry[1]
current_depth = dirpath.count(sep) - initial_depth
yield dir_entry
if current_depth >= depth:
subdirs[:] = []
[docs]def min_depth(walk_iter, depth):
"""Only process subdirectories beyond a minimum depth
A *depth* of 1 omits the top level directory, a *depth* of 2
starts with subdirectories 2 levels down, etc.
Path depth is calculated by counting directory separators, using the
depth of the first path produced by the underlying iterator as a
reference point.
.. note:: Since this filter *doesn't yield* higher level directories, any
subsequent directory filtering that relies on updating the subdirectory
list will have no effect at the minimum depth. Accordingly, this filter
should only be applied *after* any directory filtering operations.
.. note:: The result of using this filter is effectively the same as
chaining multiple independent :func:`os.walk` iterators using
:func:`itertools.chain`. For example, given the following directory tree::
>>> tree test
test
├── file1.txt
├── file2.txt
├── test2
│ ├── file1.txt
│ ├── file2.txt
│ └── test3
└── test4
├── file1.txt
└── test5
Then ``min_depth(os.walk("test"), depth=1)`` will produce the same output
as ``itertools.chain(os.walk("test/test2"), os.walk("test/test4")).``
This filter works by modifying the subdirectory lists produced by the
underlying iterator, and hence requires a top-down traversal of the
directory hierarchy.
"""
if depth < 1:
msg = "Minimum depth less than 1 ({0!r} provided)"
raise ValueError(msg.format(depth))
sep=os.sep
for dir_entry in walk_iter:
initial_depth = dir_entry[0].count(sep)
break
for dir_entry in walk_iter:
dirpath = dir_entry[0]
current_depth = dirpath.count(sep) - initial_depth
if current_depth >= depth:
yield dir_entry
# Symlink loop handling
[docs]def handle_symlink_loops(walk_iter, onloop=None):
"""Handle symlink loops when following symlinks during a walk
By default, prints a warning and then skips processing
the directory a second time.
This can be overridden by providing the `onloop` callback, which
accepts the offending symlink as a parameter. Returning a true value
from this callback will mean that the directory is still processed,
otherwise it will be skipped.
This filter skips processing subdirectories by modifying the subdirectory
lists produced by the underlying iterator, and hence requires a
top-down traversal of the directory hierarchy.
"""
if onloop is None:
def onloop(dirpath):
msg = "Symlink {0!r} refers to a parent directory, skipping\n"
sys.stderr.write(msg.format(dirpath))
sys.stderr.flush()
sep=os.sep
for dir_entry in walk_iter:
yield dir_entry
top = dir_entry[0]
real_top = os.path.abspath(os.path.realpath(top))
break
for dir_entry in walk_iter:
dirpath = dir_entry[0]
subdirs = dir_entry[1]
if os.path.islink(dirpath):
# We just descended into a directory via a symbolic link
# Check if we're referring to a directory that is
# a parent of our nominal directory
relative = os.path.relpath(dirpath, top)
nominal_path = os.path.join(real_top, relative)
real_path = os.path.abspath(os.path.realpath(dirpath))
path_fragments = zip(nominal_path.split(sep), real_path.split(sep))
for nominal, real in path_fragments:
if nominal != real:
break
else:
if not onloop(dirpath):
subdirs[:] = []
continue
yield dir_entry
# Convenience function that puts together an iterator pipeline
[docs]def filtered_walk(top, included_files=None, included_dirs=None,
excluded_files=None, excluded_dirs=None,
depth=None, followlinks=False, min_depth=None):
"""This is a wrapper around ``os.walk()`` and other filesystem traversal
iterators, with these additional features:
- *top* may be either a string (which will be passed to ``os.walk()``)
or any iterable that produces sequences with ``path, subdirs, files``
as the first three elements in the sequence
- allows independent glob-style filters for filenames and subdirectories
- allows a recursion depth limit to be specified
- allows a minimum depth to be specified to report only subdirectory
contents
- emits a message to stderr and skips the directory if a symlink loop
is encountered when following links
Filtered walks created by passing in a string are always top down, as
the subdirectory listings must be altered to provide a number of the
above features.
*include_files*, *include_dirs*, *exclude_files* and *exclude_dirs* are
used to apply the relevant filtering steps to the walk.
A *depth* of ``None`` (the default) disables depth limiting. Otherwise,
*depth* must be at least zero and indicates how far to descend into the
directory hierarchy. A depth of zero is useful to get separate filtered
subdirectory and file listings for *top*.
Setting *min_depth* allows directories higher in the tree to be
excluded from the walk (e.g. a *min_depth* of 1 excludes *top*, but
any subdirectories will still be processed)
*followlinks* enables symbolic loop detection (when set to ``True``)
and is also passed to ``os.walk()`` when top is a string
"""
if isinstance(top, str):
walk_iter = os.walk(top, followlinks=followlinks)
else:
walk_iter = top
# Depth limiting first, since it can cut great swathes from the tree
if depth is not None:
walk_iter = limit_depth(walk_iter, depth)
# Next we do our path based filtering that can skip directories
if included_dirs is not None:
walk_iter = include_dirs(walk_iter, *included_dirs)
if excluded_dirs is not None:
walk_iter = exclude_dirs(walk_iter, *excluded_dirs)
# And then we check the filesystem for symlink loops
if followlinks:
walk_iter = handle_symlink_loops(walk_iter)
# Now that all other directory filtering has been handled, we can apply
# the minimum depth check
if min_depth is not None:
walk_iter = globals()["min_depth"](walk_iter, min_depth)
# Finally, apply the file filters that can't alter the shape of the tree
if included_files is not None:
walk_iter = include_files(walk_iter, *included_files)
if excluded_files is not None:
walk_iter = exclude_files(walk_iter, *excluded_files)
for triple in walk_iter:
yield triple
# Iterators that flatten the output into a series of paths
[docs]def dir_paths(walk_iter):
"""Iterate over the directories visited by the underlying walk
Directories are emitted in the order visited, so the underlying walk may
be either top-down or bottom-up.
"""
for dir_entry in walk_iter:
yield dir_entry[0]
[docs]def all_dir_paths(walk_iter):
"""Iterate over all directories reachable through the underlying walk
This covers:
* all visited directories (similar to dir_paths)
* all reported subdirectories of visited directories (even if not
otherwise visited)
Example cases where the output may differ from dir_paths:
* all_dir_paths always includes symlinks to directories even when the
underlying iterator doesn't follow symlinks
* all_dir_paths will include subdirectories of directories at the maximum
depth in a depth limited walk
This iterator expects new root directories to be emitted by the underlying
walk before any of their contents, and hence requires a top-down traversal
of the directory hierarchy.
"""
dir_entry = next(walk_iter, None)
if dir_entry is None:
return
top = dir_entry[0]
yield top
while dir_entry:
dirpath = dir_entry[0]
if not dirpath.startswith(top):
yield dirpath
top = dirpath
for subdir in dir_entry[1]:
yield os.path.join(dirpath, subdir)
dir_entry = next(walk_iter, None)
[docs]def file_paths(walk_iter):
"""Iterate over the files in directories visited by the underlying walk
Directory contents are emitted in the order visited, so the underlying walk
may be either top-down or bottom-up.
"""
for dir_entry in walk_iter:
for fname in dir_entry[2]:
yield os.path.join(dir_entry[0], fname)
[docs]def all_paths(walk_iter):
"""Iterate over all paths reachable through the underlying walk
This covers:
* all visited directories
* all files in visited directories
* all reported subdirectories of visited directories (even if not
otherwise visited)
This iterator expects new root directories to be emitted by the underlying
walk before any of their contents, and hence requires a top-down traversal
of the directory hierarchy.
"""
dir_entry = next(walk_iter, None)
if dir_entry is None:
return
top = dir_entry[0]
yield top
while dir_entry:
dirpath = dir_entry[0]
if not dirpath.startswith(top):
yield dirpath
top = dirpath
for fname in dir_entry[2]:
yield os.path.join(dirpath, fname)
for subdir in dir_entry[1]:
yield os.path.join(dirpath, subdir)
dir_entry = next(walk_iter, None)
# Legacy API
iter_dir_paths = dir_paths
iter_file_paths = file_paths
iter_paths = all_paths