summaryrefslogtreecommitdiff
path: root/lib/python2.7/mimetools.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/mimetools.py')
-rw-r--r--lib/python2.7/mimetools.py250
1 files changed, 250 insertions, 0 deletions
diff --git a/lib/python2.7/mimetools.py b/lib/python2.7/mimetools.py
new file mode 100644
index 0000000..71ca8f8
--- /dev/null
+++ b/lib/python2.7/mimetools.py
@@ -0,0 +1,250 @@
+"""Various tools used by MIME-reading or MIME-writing programs."""
+
+
+import os
+import sys
+import tempfile
+from warnings import filterwarnings, catch_warnings
+with catch_warnings():
+ if sys.py3kwarning:
+ filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning)
+ import rfc822
+
+from warnings import warnpy3k
+warnpy3k("in 3.x, mimetools has been removed in favor of the email package",
+ stacklevel=2)
+
+__all__ = ["Message","choose_boundary","encode","decode","copyliteral",
+ "copybinary"]
+
+class Message(rfc822.Message):
+ """A derived class of rfc822.Message that knows about MIME headers and
+ contains some hooks for decoding encoded and multipart messages."""
+
+ def __init__(self, fp, seekable = 1):
+ rfc822.Message.__init__(self, fp, seekable)
+ self.encodingheader = \
+ self.getheader('content-transfer-encoding')
+ self.typeheader = \
+ self.getheader('content-type')
+ self.parsetype()
+ self.parseplist()
+
+ def parsetype(self):
+ str = self.typeheader
+ if str is None:
+ str = 'text/plain'
+ if ';' in str:
+ i = str.index(';')
+ self.plisttext = str[i:]
+ str = str[:i]
+ else:
+ self.plisttext = ''
+ fields = str.split('/')
+ for i in range(len(fields)):
+ fields[i] = fields[i].strip().lower()
+ self.type = '/'.join(fields)
+ self.maintype = fields[0]
+ self.subtype = '/'.join(fields[1:])
+
+ def parseplist(self):
+ str = self.plisttext
+ self.plist = []
+ while str[:1] == ';':
+ str = str[1:]
+ if ';' in str:
+ # XXX Should parse quotes!
+ end = str.index(';')
+ else:
+ end = len(str)
+ f = str[:end]
+ if '=' in f:
+ i = f.index('=')
+ f = f[:i].strip().lower() + \
+ '=' + f[i+1:].strip()
+ self.plist.append(f.strip())
+ str = str[end:]
+
+ def getplist(self):
+ return self.plist
+
+ def getparam(self, name):
+ name = name.lower() + '='
+ n = len(name)
+ for p in self.plist:
+ if p[:n] == name:
+ return rfc822.unquote(p[n:])
+ return None
+
+ def getparamnames(self):
+ result = []
+ for p in self.plist:
+ i = p.find('=')
+ if i >= 0:
+ result.append(p[:i].lower())
+ return result
+
+ def getencoding(self):
+ if self.encodingheader is None:
+ return '7bit'
+ return self.encodingheader.lower()
+
+ def gettype(self):
+ return self.type
+
+ def getmaintype(self):
+ return self.maintype
+
+ def getsubtype(self):
+ return self.subtype
+
+
+
+
+# Utility functions
+# -----------------
+
+try:
+ import thread
+except ImportError:
+ import dummy_thread as thread
+_counter_lock = thread.allocate_lock()
+del thread
+
+_counter = 0
+def _get_next_counter():
+ global _counter
+ _counter_lock.acquire()
+ _counter += 1
+ result = _counter
+ _counter_lock.release()
+ return result
+
+_prefix = None
+
+def choose_boundary():
+ """Return a string usable as a multipart boundary.
+
+ The string chosen is unique within a single program run, and
+ incorporates the user id (if available), process id (if available),
+ and current time. So it's very unlikely the returned string appears
+ in message text, but there's no guarantee.
+
+ The boundary contains dots so you have to quote it in the header."""
+
+ global _prefix
+ import time
+ if _prefix is None:
+ import socket
+ try:
+ hostid = socket.gethostbyname(socket.gethostname())
+ except socket.gaierror:
+ hostid = '127.0.0.1'
+ try:
+ uid = repr(os.getuid())
+ except AttributeError:
+ uid = '1'
+ try:
+ pid = repr(os.getpid())
+ except AttributeError:
+ pid = '1'
+ _prefix = hostid + '.' + uid + '.' + pid
+ return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
+
+
+# Subroutines for decoding some common content-transfer-types
+
+def decode(input, output, encoding):
+ """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
+ if encoding == 'base64':
+ import base64
+ return base64.decode(input, output)
+ if encoding == 'quoted-printable':
+ import quopri
+ return quopri.decode(input, output)
+ if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
+ import uu
+ return uu.decode(input, output)
+ if encoding in ('7bit', '8bit'):
+ return output.write(input.read())
+ if encoding in decodetab:
+ pipethrough(input, decodetab[encoding], output)
+ else:
+ raise ValueError, \
+ 'unknown Content-Transfer-Encoding: %s' % encoding
+
+def encode(input, output, encoding):
+ """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
+ if encoding == 'base64':
+ import base64
+ return base64.encode(input, output)
+ if encoding == 'quoted-printable':
+ import quopri
+ return quopri.encode(input, output, 0)
+ if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
+ import uu
+ return uu.encode(input, output)
+ if encoding in ('7bit', '8bit'):
+ return output.write(input.read())
+ if encoding in encodetab:
+ pipethrough(input, encodetab[encoding], output)
+ else:
+ raise ValueError, \
+ 'unknown Content-Transfer-Encoding: %s' % encoding
+
+# The following is no longer used for standard encodings
+
+# XXX This requires that uudecode and mmencode are in $PATH
+
+uudecode_pipe = '''(
+TEMP=/tmp/@uu.$$
+sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
+cat $TEMP
+rm $TEMP
+)'''
+
+decodetab = {
+ 'uuencode': uudecode_pipe,
+ 'x-uuencode': uudecode_pipe,
+ 'uue': uudecode_pipe,
+ 'x-uue': uudecode_pipe,
+ 'quoted-printable': 'mmencode -u -q',
+ 'base64': 'mmencode -u -b',
+}
+
+encodetab = {
+ 'x-uuencode': 'uuencode tempfile',
+ 'uuencode': 'uuencode tempfile',
+ 'x-uue': 'uuencode tempfile',
+ 'uue': 'uuencode tempfile',
+ 'quoted-printable': 'mmencode -q',
+ 'base64': 'mmencode -b',
+}
+
+def pipeto(input, command):
+ pipe = os.popen(command, 'w')
+ copyliteral(input, pipe)
+ pipe.close()
+
+def pipethrough(input, command, output):
+ (fd, tempname) = tempfile.mkstemp()
+ temp = os.fdopen(fd, 'w')
+ copyliteral(input, temp)
+ temp.close()
+ pipe = os.popen(command + ' <' + tempname, 'r')
+ copybinary(pipe, output)
+ pipe.close()
+ os.unlink(tempname)
+
+def copyliteral(input, output):
+ while 1:
+ line = input.readline()
+ if not line: break
+ output.write(line)
+
+def copybinary(input, output):
+ BUFSIZE = 8192
+ while 1:
+ line = input.read(BUFSIZE)
+ if not line: break
+ output.write(line)