123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 |
- """Temporary files.
- This module provides generic, low- and high-level interfaces for
- creating temporary files and directories. The interfaces listed
- as "safe" just below can be used without fear of race conditions.
- Those listed as "unsafe" cannot, and are provided for backward
- compatibility only.
- This module also provides some data items to the user:
- TMP_MAX - maximum number of names that will be tried before
- giving up.
- template - the default prefix for all temporary names.
- You may change this to control the default prefix.
- tempdir - If this is set to a string before the first use of
- any routine from this module, it will be considered as
- another candidate location to store temporary files.
- """
- __all__ = [
- "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
- "mkstemp", "mkdtemp", # low level safe interfaces
- "mktemp", # deprecated unsafe interface
- "TMP_MAX", "gettempprefix", # constants
- "tempdir", "gettempdir"
- ]
- # Imports.
- import os as _os
- import errno as _errno
- from random import Random as _Random
- if _os.name == 'mac':
- import Carbon.Folder as _Folder
- import Carbon.Folders as _Folders
- try:
- import fcntl as _fcntl
- except ImportError:
- def _set_cloexec(fd):
- pass
- else:
- def _set_cloexec(fd):
- try:
- flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
- except IOError:
- pass
- else:
- # flags read successfully, modify
- flags |= _fcntl.FD_CLOEXEC
- _fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
- try:
- import thread as _thread
- except ImportError:
- import dummy_thread as _thread
- _allocate_lock = _thread.allocate_lock
- _text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
- if hasattr(_os, 'O_NOINHERIT'):
- _text_openflags |= _os.O_NOINHERIT
- if hasattr(_os, 'O_NOFOLLOW'):
- _text_openflags |= _os.O_NOFOLLOW
- _bin_openflags = _text_openflags
- if hasattr(_os, 'O_BINARY'):
- _bin_openflags |= _os.O_BINARY
- if hasattr(_os, 'TMP_MAX'):
- TMP_MAX = _os.TMP_MAX
- else:
- TMP_MAX = 10000
- template = "tmp"
- tempdir = None
- # Internal routines.
- _once_lock = _allocate_lock()
- if hasattr(_os, "lstat"):
- _stat = _os.lstat
- elif hasattr(_os, "stat"):
- _stat = _os.stat
- else:
- # Fallback. All we need is something that raises os.error if the
- # file doesn't exist.
- def _stat(fn):
- try:
- f = open(fn)
- except IOError:
- raise _os.error
- f.close()
- def _exists(fn):
- try:
- _stat(fn)
- except _os.error:
- return False
- else:
- return True
- class _RandomNameSequence:
- """An instance of _RandomNameSequence generates an endless
- sequence of unpredictable strings which can safely be incorporated
- into file names. Each string is six characters long. Multiple
- threads can safely use the same instance at the same time.
- _RandomNameSequence is an iterator."""
- characters = ("abcdefghijklmnopqrstuvwxyz" +
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
- "0123456789-_")
- def __init__(self):
- self.mutex = _allocate_lock()
- self.rng = _Random()
- self.normcase = _os.path.normcase
- def __iter__(self):
- return self
- def next(self):
- m = self.mutex
- c = self.characters
- choose = self.rng.choice
- m.acquire()
- try:
- letters = [choose(c) for dummy in "123456"]
- finally:
- m.release()
- return self.normcase(''.join(letters))
- def _candidate_tempdir_list():
- """Generate a list of candidate temporary directories which
- _get_default_tempdir will try."""
- dirlist = []
- # First, try the environment.
- for envname in 'TMPDIR', 'TEMP', 'TMP':
- dirname = _os.getenv(envname)
- if dirname: dirlist.append(dirname)
- # Failing that, try OS-specific locations.
- if _os.name == 'mac':
- try:
- fsr = _Folder.FSFindFolder(_Folders.kOnSystemDisk,
- _Folders.kTemporaryFolderType, 1)
- dirname = fsr.as_pathname()
- dirlist.append(dirname)
- except _Folder.error:
- pass
- elif _os.name == 'riscos':
- dirname = _os.getenv('Wimp$ScrapDir')
- if dirname: dirlist.append(dirname)
- elif _os.name == 'nt':
- dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
- else:
- dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
- # As a last resort, the current directory.
- try:
- dirlist.append(_os.getcwd())
- except (AttributeError, _os.error):
- dirlist.append(_os.curdir)
- return dirlist
- def _get_default_tempdir():
- """Calculate the default directory to use for temporary files.
- This routine should be called exactly once.
- We determine whether or not a candidate temp dir is usable by
- trying to create and write to a file in that directory. If this
- is successful, the test file is deleted. To prevent denial of
- service, the name of the test file must be randomized."""
- namer = _RandomNameSequence()
- dirlist = _candidate_tempdir_list()
- flags = _text_openflags
- for dir in dirlist:
- if dir != _os.curdir:
- dir = _os.path.normcase(_os.path.abspath(dir))
- # Try only a few names per directory.
- for seq in xrange(100):
- name = namer.next()
- filename = _os.path.join(dir, name)
- try:
- fd = _os.open(filename, flags, 0600)
- fp = _os.fdopen(fd, 'w')
- fp.write('blat')
- fp.close()
- _os.unlink(filename)
- del fp, fd
- return dir
- except (OSError, IOError), e:
- if e[0] != _errno.EEXIST:
- break # no point trying more names in this directory
- pass
- raise IOError, (_errno.ENOENT,
- ("No usable temporary directory found in %s" % dirlist))
- _name_sequence = None
- def _get_candidate_names():
- """Common setup sequence for all user-callable interfaces."""
- global _name_sequence
- if _name_sequence is None:
- _once_lock.acquire()
- try:
- if _name_sequence is None:
- _name_sequence = _RandomNameSequence()
- finally:
- _once_lock.release()
- return _name_sequence
- def _mkstemp_inner(dir, pre, suf, flags):
- """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
- names = _get_candidate_names()
- for seq in xrange(TMP_MAX):
- name = names.next()
- file = _os.path.join(dir, pre + name + suf)
- try:
- fd = _os.open(file, flags, 0600)
- _set_cloexec(fd)
- return (fd, _os.path.abspath(file))
- except OSError, e:
- if e.errno == _errno.EEXIST:
- continue # try again
- raise
- raise IOError, (_errno.EEXIST, "No usable temporary file name found")
- # User visible interfaces.
- def gettempprefix():
- """Accessor for tempdir.template."""
- return template
- tempdir = None
- def gettempdir():
- """Accessor for tempdir.tempdir."""
- global tempdir
- if tempdir is None:
- _once_lock.acquire()
- try:
- if tempdir is None:
- tempdir = _get_default_tempdir()
- finally:
- _once_lock.release()
- return tempdir
- def mkstemp(suffix="", prefix=template, dir=None, text=False):
- """mkstemp([suffix, [prefix, [dir, [text]]]])
- User-callable function to create and return a unique temporary
- file. The return value is a pair (fd, name) where fd is the
- file descriptor returned by os.open, and name is the filename.
- If 'suffix' is specified, the file name will end with that suffix,
- otherwise there will be no suffix.
- If 'prefix' is specified, the file name will begin with that prefix,
- otherwise a default prefix is used.
- If 'dir' is specified, the file will be created in that directory,
- otherwise a default directory is used.
- If 'text' is specified and true, the file is opened in text
- mode. Else (the default) the file is opened in binary mode. On
- some operating systems, this makes no difference.
- The file is readable and writable only by the creating user ID.
- If the operating system uses permission bits to indicate whether a
- file is executable, the file is executable by no one. The file
- descriptor is not inherited by children of this process.
- Caller is responsible for deleting the file when done with it.
- """
- if dir is None:
- dir = gettempdir()
- if text:
- flags = _text_openflags
- else:
- flags = _bin_openflags
- return _mkstemp_inner(dir, prefix, suffix, flags)
- def mkdtemp(suffix="", prefix=template, dir=None):
- """mkdtemp([suffix, [prefix, [dir]]])
- User-callable function to create and return a unique temporary
- directory. The return value is the pathname of the directory.
- Arguments are as for mkstemp, except that the 'text' argument is
- not accepted.
- The directory is readable, writable, and searchable only by the
- creating user.
- Caller is responsible for deleting the directory when done with it.
- """
- if dir is None:
- dir = gettempdir()
- names = _get_candidate_names()
- for seq in xrange(TMP_MAX):
- name = names.next()
- file = _os.path.join(dir, prefix + name + suffix)
- try:
- _os.mkdir(file, 0700)
- return file
- except OSError, e:
- if e.errno == _errno.EEXIST:
- continue # try again
- raise
- raise IOError, (_errno.EEXIST, "No usable temporary directory name found")
- def mktemp(suffix="", prefix=template, dir=None):
- """mktemp([suffix, [prefix, [dir]]])
- User-callable function to return a unique temporary file name. The
- file is not created.
- Arguments are as for mkstemp, except that the 'text' argument is
- not accepted.
- This function is unsafe and should not be used. The file name
- refers to a file that did not exist at some point, but by the time
- you get around to creating it, someone else may have beaten you to
- the punch.
- """
- ## from warnings import warn as _warn
- ## _warn("mktemp is a potential security risk to your program",
- ## RuntimeWarning, stacklevel=2)
- if dir is None:
- dir = gettempdir()
- names = _get_candidate_names()
- for seq in xrange(TMP_MAX):
- name = names.next()
- file = _os.path.join(dir, prefix + name + suffix)
- if not _exists(file):
- return file
- raise IOError, (_errno.EEXIST, "No usable temporary filename found")
- class _TemporaryFileWrapper:
- """Temporary file wrapper
- This class provides a wrapper around files opened for
- temporary use. In particular, it seeks to automatically
- remove the file when it is no longer needed.
- """
- def __init__(self, file, name):
- self.file = file
- self.name = name
- self.close_called = False
- def __getattr__(self, name):
- file = self.__dict__['file']
- a = getattr(file, name)
- if type(a) != type(0):
- setattr(self, name, a)
- return a
- # NT provides delete-on-close as a primitive, so we don't need
- # the wrapper to do anything special. We still use it so that
- # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
- if _os.name != 'nt':
- # Cache the unlinker so we don't get spurious errors at
- # shutdown when the module-level "os" is None'd out. Note
- # that this must be referenced as self.unlink, because the
- # name TemporaryFileWrapper may also get None'd out before
- # __del__ is called.
- unlink = _os.unlink
- def close(self):
- if not self.close_called:
- self.close_called = True
- self.file.close()
- self.unlink(self.name)
- def __del__(self):
- self.close()
- def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
- prefix=template, dir=None):
- """Create and return a temporary file.
- Arguments:
- 'prefix', 'suffix', 'dir' -- as for mkstemp.
- 'mode' -- the mode argument to os.fdopen (default "w+b").
- 'bufsize' -- the buffer size argument to os.fdopen (default -1).
- The file is created as mkstemp() would do it.
- Returns an object with a file-like interface; the name of the file
- is accessible as file.name. The file will be automatically deleted
- when it is closed.
- """
- if dir is None:
- dir = gettempdir()
- if 'b' in mode:
- flags = _bin_openflags
- else:
- flags = _text_openflags
- # Setting O_TEMPORARY in the flags causes the OS to delete
- # the file when it is closed. This is only supported by Windows.
- if _os.name == 'nt':
- flags |= _os.O_TEMPORARY
- (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
- file = _os.fdopen(fd, mode, bufsize)
- return _TemporaryFileWrapper(file, name)
- if _os.name != 'posix' or _os.sys.platform == 'cygwin':
- # On non-POSIX and Cygwin systems, assume that we cannot unlink a file
- # while it is open.
- TemporaryFile = NamedTemporaryFile
- else:
- def TemporaryFile(mode='w+b', bufsize=-1, suffix="",
- prefix=template, dir=None):
- """Create and return a temporary file.
- Arguments:
- 'prefix', 'suffix', 'directory' -- as for mkstemp.
- 'mode' -- the mode argument to os.fdopen (default "w+b").
- 'bufsize' -- the buffer size argument to os.fdopen (default -1).
- The file is created as mkstemp() would do it.
- Returns an object with a file-like interface. The file has no
- name, and will cease to exist when it is closed.
- """
- if dir is None:
- dir = gettempdir()
- if 'b' in mode:
- flags = _bin_openflags
- else:
- flags = _text_openflags
- (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
- try:
- _os.unlink(name)
- return _os.fdopen(fd, mode, bufsize)
- except:
- _os.close(fd)
- raise
|