summaryrefslogtreecommitdiff
path: root/lib/python2.7/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/test/test_logging.py')
-rw-r--r--lib/python2.7/test/test_logging.py1995
1 files changed, 1995 insertions, 0 deletions
diff --git a/lib/python2.7/test/test_logging.py b/lib/python2.7/test/test_logging.py
new file mode 100644
index 0000000..31bc48e
--- /dev/null
+++ b/lib/python2.7/test/test_logging.py
@@ -0,0 +1,1995 @@
+#!/usr/bin/env python
+#
+# Copyright 2001-2013 by Vinay Sajip. All Rights Reserved.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose and without fee is hereby granted,
+# provided that the above copyright notice appear in all copies and that
+# both that copyright notice and this permission notice appear in
+# supporting documentation, and that the name of Vinay Sajip
+# not be used in advertising or publicity pertaining to distribution
+# of the software without specific, written prior permission.
+# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
+# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
+# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
+# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Test harness for the logging module. Run all tests.
+
+Copyright (C) 2001-2013 Vinay Sajip. All Rights Reserved.
+"""
+
+import logging
+import logging.handlers
+import logging.config
+
+import codecs
+import cPickle
+import cStringIO
+import gc
+import json
+import os
+import random
+import re
+import select
+import socket
+from SocketServer import ThreadingTCPServer, StreamRequestHandler
+import struct
+import sys
+import tempfile
+from test.test_support import captured_stdout, run_with_locale, run_unittest
+import textwrap
+import time
+import unittest
+import warnings
+import weakref
+try:
+ import threading
+except ImportError:
+ threading = None
+
+class BaseTest(unittest.TestCase):
+
+ """Base class for logging tests."""
+
+ log_format = "%(name)s -> %(levelname)s: %(message)s"
+ expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
+ message_num = 0
+
+ def setUp(self):
+ """Setup the default logging stream to an internal StringIO instance,
+ so that we can examine log output as we want."""
+ logger_dict = logging.getLogger().manager.loggerDict
+ logging._acquireLock()
+ try:
+ self.saved_handlers = logging._handlers.copy()
+ self.saved_handler_list = logging._handlerList[:]
+ self.saved_loggers = logger_dict.copy()
+ self.saved_level_names = logging._levelNames.copy()
+ finally:
+ logging._releaseLock()
+
+ # Set two unused loggers: one non-ASCII and one Unicode.
+ # This is to test correct operation when sorting existing
+ # loggers in the configuration code. See issue 8201.
+ logging.getLogger("\xab\xd7\xbb")
+ logging.getLogger(u"\u013f\u00d6\u0047")
+
+ self.root_logger = logging.getLogger("")
+ self.original_logging_level = self.root_logger.getEffectiveLevel()
+
+ self.stream = cStringIO.StringIO()
+ self.root_logger.setLevel(logging.DEBUG)
+ self.root_hdlr = logging.StreamHandler(self.stream)
+ self.root_formatter = logging.Formatter(self.log_format)
+ self.root_hdlr.setFormatter(self.root_formatter)
+ self.root_logger.addHandler(self.root_hdlr)
+
+ def tearDown(self):
+ """Remove our logging stream, and restore the original logging
+ level."""
+ self.stream.close()
+ self.root_logger.removeHandler(self.root_hdlr)
+ while self.root_logger.handlers:
+ h = self.root_logger.handlers[0]
+ self.root_logger.removeHandler(h)
+ h.close()
+ self.root_logger.setLevel(self.original_logging_level)
+ logging._acquireLock()
+ try:
+ logging._levelNames.clear()
+ logging._levelNames.update(self.saved_level_names)
+ logging._handlers.clear()
+ logging._handlers.update(self.saved_handlers)
+ logging._handlerList[:] = self.saved_handler_list
+ loggerDict = logging.getLogger().manager.loggerDict
+ loggerDict.clear()
+ loggerDict.update(self.saved_loggers)
+ finally:
+ logging._releaseLock()
+
+ def assert_log_lines(self, expected_values, stream=None):
+ """Match the collected log lines against the regular expression
+ self.expected_log_pat, and compare the extracted group values to
+ the expected_values list of tuples."""
+ stream = stream or self.stream
+ pat = re.compile(self.expected_log_pat)
+ try:
+ stream.reset()
+ actual_lines = stream.readlines()
+ except AttributeError:
+ # StringIO.StringIO lacks a reset() method.
+ actual_lines = stream.getvalue().splitlines()
+ self.assertEqual(len(actual_lines), len(expected_values))
+ for actual, expected in zip(actual_lines, expected_values):
+ match = pat.search(actual)
+ if not match:
+ self.fail("Log line does not match expected pattern:\n" +
+ actual)
+ self.assertEqual(tuple(match.groups()), expected)
+ s = stream.read()
+ if s:
+ self.fail("Remaining output at end of log stream:\n" + s)
+
+ def next_message(self):
+ """Generate a message consisting solely of an auto-incrementing
+ integer."""
+ self.message_num += 1
+ return "%d" % self.message_num
+
+
+class BuiltinLevelsTest(BaseTest):
+ """Test builtin levels and their inheritance."""
+
+ def test_flat(self):
+ #Logging levels in a flat logger namespace.
+ m = self.next_message
+
+ ERR = logging.getLogger("ERR")
+ ERR.setLevel(logging.ERROR)
+ INF = logging.getLogger("INF")
+ INF.setLevel(logging.INFO)
+ DEB = logging.getLogger("DEB")
+ DEB.setLevel(logging.DEBUG)
+
+ # These should log.
+ ERR.log(logging.CRITICAL, m())
+ ERR.error(m())
+
+ INF.log(logging.CRITICAL, m())
+ INF.error(m())
+ INF.warn(m())
+ INF.info(m())
+
+ DEB.log(logging.CRITICAL, m())
+ DEB.error(m())
+ DEB.warn (m())
+ DEB.info (m())
+ DEB.debug(m())
+
+ # These should not log.
+ ERR.warn(m())
+ ERR.info(m())
+ ERR.debug(m())
+
+ INF.debug(m())
+
+ self.assert_log_lines([
+ ('ERR', 'CRITICAL', '1'),
+ ('ERR', 'ERROR', '2'),
+ ('INF', 'CRITICAL', '3'),
+ ('INF', 'ERROR', '4'),
+ ('INF', 'WARNING', '5'),
+ ('INF', 'INFO', '6'),
+ ('DEB', 'CRITICAL', '7'),
+ ('DEB', 'ERROR', '8'),
+ ('DEB', 'WARNING', '9'),
+ ('DEB', 'INFO', '10'),
+ ('DEB', 'DEBUG', '11'),
+ ])
+
+ def test_nested_explicit(self):
+ # Logging levels in a nested namespace, all explicitly set.
+ m = self.next_message
+
+ INF = logging.getLogger("INF")
+ INF.setLevel(logging.INFO)
+ INF_ERR = logging.getLogger("INF.ERR")
+ INF_ERR.setLevel(logging.ERROR)
+
+ # These should log.
+ INF_ERR.log(logging.CRITICAL, m())
+ INF_ERR.error(m())
+
+ # These should not log.
+ INF_ERR.warn(m())
+ INF_ERR.info(m())
+ INF_ERR.debug(m())
+
+ self.assert_log_lines([
+ ('INF.ERR', 'CRITICAL', '1'),
+ ('INF.ERR', 'ERROR', '2'),
+ ])
+
+ def test_nested_inherited(self):
+ #Logging levels in a nested namespace, inherited from parent loggers.
+ m = self.next_message
+
+ INF = logging.getLogger("INF")
+ INF.setLevel(logging.INFO)
+ INF_ERR = logging.getLogger("INF.ERR")
+ INF_ERR.setLevel(logging.ERROR)
+ INF_UNDEF = logging.getLogger("INF.UNDEF")
+ INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
+ UNDEF = logging.getLogger("UNDEF")
+
+ # These should log.
+ INF_UNDEF.log(logging.CRITICAL, m())
+ INF_UNDEF.error(m())
+ INF_UNDEF.warn(m())
+ INF_UNDEF.info(m())
+ INF_ERR_UNDEF.log(logging.CRITICAL, m())
+ INF_ERR_UNDEF.error(m())
+
+ # These should not log.
+ INF_UNDEF.debug(m())
+ INF_ERR_UNDEF.warn(m())
+ INF_ERR_UNDEF.info(m())
+ INF_ERR_UNDEF.debug(m())
+
+ self.assert_log_lines([
+ ('INF.UNDEF', 'CRITICAL', '1'),
+ ('INF.UNDEF', 'ERROR', '2'),
+ ('INF.UNDEF', 'WARNING', '3'),
+ ('INF.UNDEF', 'INFO', '4'),
+ ('INF.ERR.UNDEF', 'CRITICAL', '5'),
+ ('INF.ERR.UNDEF', 'ERROR', '6'),
+ ])
+
+ def test_nested_with_virtual_parent(self):
+ # Logging levels when some parent does not exist yet.
+ m = self.next_message
+
+ INF = logging.getLogger("INF")
+ GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
+ CHILD = logging.getLogger("INF.BADPARENT")
+ INF.setLevel(logging.INFO)
+
+ # These should log.
+ GRANDCHILD.log(logging.FATAL, m())
+ GRANDCHILD.info(m())
+ CHILD.log(logging.FATAL, m())
+ CHILD.info(m())
+
+ # These should not log.
+ GRANDCHILD.debug(m())
+ CHILD.debug(m())
+
+ self.assert_log_lines([
+ ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
+ ('INF.BADPARENT.UNDEF', 'INFO', '2'),
+ ('INF.BADPARENT', 'CRITICAL', '3'),
+ ('INF.BADPARENT', 'INFO', '4'),
+ ])
+
+ def test_invalid_name(self):
+ self.assertRaises(TypeError, logging.getLogger, any)
+
+class BasicFilterTest(BaseTest):
+
+ """Test the bundled Filter class."""
+
+ def test_filter(self):
+ # Only messages satisfying the specified criteria pass through the
+ # filter.
+ filter_ = logging.Filter("spam.eggs")
+ handler = self.root_logger.handlers[0]
+ try:
+ handler.addFilter(filter_)
+ spam = logging.getLogger("spam")
+ spam_eggs = logging.getLogger("spam.eggs")
+ spam_eggs_fish = logging.getLogger("spam.eggs.fish")
+ spam_bakedbeans = logging.getLogger("spam.bakedbeans")
+
+ spam.info(self.next_message())
+ spam_eggs.info(self.next_message()) # Good.
+ spam_eggs_fish.info(self.next_message()) # Good.
+ spam_bakedbeans.info(self.next_message())
+
+ self.assert_log_lines([
+ ('spam.eggs', 'INFO', '2'),
+ ('spam.eggs.fish', 'INFO', '3'),
+ ])
+ finally:
+ handler.removeFilter(filter_)
+
+
+#
+# First, we define our levels. There can be as many as you want - the only
+# limitations are that they should be integers, the lowest should be > 0 and
+# larger values mean less information being logged. If you need specific
+# level values which do not fit into these limitations, you can use a
+# mapping dictionary to convert between your application levels and the
+# logging system.
+#
+SILENT = 120
+TACITURN = 119
+TERSE = 118
+EFFUSIVE = 117
+SOCIABLE = 116
+VERBOSE = 115
+TALKATIVE = 114
+GARRULOUS = 113
+CHATTERBOX = 112
+BORING = 111
+
+LEVEL_RANGE = range(BORING, SILENT + 1)
+
+#
+# Next, we define names for our levels. You don't need to do this - in which
+# case the system will use "Level n" to denote the text for the level.
+#
+my_logging_levels = {
+ SILENT : 'Silent',
+ TACITURN : 'Taciturn',
+ TERSE : 'Terse',
+ EFFUSIVE : 'Effusive',
+ SOCIABLE : 'Sociable',
+ VERBOSE : 'Verbose',
+ TALKATIVE : 'Talkative',
+ GARRULOUS : 'Garrulous',
+ CHATTERBOX : 'Chatterbox',
+ BORING : 'Boring',
+}
+
+class GarrulousFilter(logging.Filter):
+
+ """A filter which blocks garrulous messages."""
+
+ def filter(self, record):
+ return record.levelno != GARRULOUS
+
+class VerySpecificFilter(logging.Filter):
+
+ """A filter which blocks sociable and taciturn messages."""
+
+ def filter(self, record):
+ return record.levelno not in [SOCIABLE, TACITURN]
+
+
+class CustomLevelsAndFiltersTest(BaseTest):
+
+ """Test various filtering possibilities with custom logging levels."""
+
+ # Skip the logger name group.
+ expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+
+ def setUp(self):
+ BaseTest.setUp(self)
+ for k, v in my_logging_levels.items():
+ logging.addLevelName(k, v)
+
+ def log_at_all_levels(self, logger):
+ for lvl in LEVEL_RANGE:
+ logger.log(lvl, self.next_message())
+
+ def test_logger_filter(self):
+ # Filter at logger level.
+ self.root_logger.setLevel(VERBOSE)
+ # Levels >= 'Verbose' are good.
+ self.log_at_all_levels(self.root_logger)
+ self.assert_log_lines([
+ ('Verbose', '5'),
+ ('Sociable', '6'),
+ ('Effusive', '7'),
+ ('Terse', '8'),
+ ('Taciturn', '9'),
+ ('Silent', '10'),
+ ])
+
+ def test_handler_filter(self):
+ # Filter at handler level.
+ self.root_logger.handlers[0].setLevel(SOCIABLE)
+ try:
+ # Levels >= 'Sociable' are good.
+ self.log_at_all_levels(self.root_logger)
+ self.assert_log_lines([
+ ('Sociable', '6'),
+ ('Effusive', '7'),
+ ('Terse', '8'),
+ ('Taciturn', '9'),
+ ('Silent', '10'),
+ ])
+ finally:
+ self.root_logger.handlers[0].setLevel(logging.NOTSET)
+
+ def test_specific_filters(self):
+ # Set a specific filter object on the handler, and then add another
+ # filter object on the logger itself.
+ handler = self.root_logger.handlers[0]
+ specific_filter = None
+ garr = GarrulousFilter()
+ handler.addFilter(garr)
+ try:
+ self.log_at_all_levels(self.root_logger)
+ first_lines = [
+ # Notice how 'Garrulous' is missing
+ ('Boring', '1'),
+ ('Chatterbox', '2'),
+ ('Talkative', '4'),
+ ('Verbose', '5'),
+ ('Sociable', '6'),
+ ('Effusive', '7'),
+ ('Terse', '8'),
+ ('Taciturn', '9'),
+ ('Silent', '10'),
+ ]
+ self.assert_log_lines(first_lines)
+
+ specific_filter = VerySpecificFilter()
+ self.root_logger.addFilter(specific_filter)
+ self.log_at_all_levels(self.root_logger)
+ self.assert_log_lines(first_lines + [
+ # Not only 'Garrulous' is still missing, but also 'Sociable'
+ # and 'Taciturn'
+ ('Boring', '11'),
+ ('Chatterbox', '12'),
+ ('Talkative', '14'),
+ ('Verbose', '15'),
+ ('Effusive', '17'),
+ ('Terse', '18'),
+ ('Silent', '20'),
+ ])
+ finally:
+ if specific_filter:
+ self.root_logger.removeFilter(specific_filter)
+ handler.removeFilter(garr)
+
+
+class MemoryHandlerTest(BaseTest):
+
+ """Tests for the MemoryHandler."""
+
+ # Do not bother with a logger name group.
+ expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+
+ def setUp(self):
+ BaseTest.setUp(self)
+ self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
+ self.root_hdlr)
+ self.mem_logger = logging.getLogger('mem')
+ self.mem_logger.propagate = 0
+ self.mem_logger.addHandler(self.mem_hdlr)
+
+ def tearDown(self):
+ self.mem_hdlr.close()
+ BaseTest.tearDown(self)
+
+ def test_flush(self):
+ # The memory handler flushes to its target handler based on specific
+ # criteria (message count and message level).
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines([])
+ self.mem_logger.info(self.next_message())
+ self.assert_log_lines([])
+ # This will flush because the level is >= logging.WARNING
+ self.mem_logger.warn(self.next_message())
+ lines = [
+ ('DEBUG', '1'),
+ ('INFO', '2'),
+ ('WARNING', '3'),
+ ]
+ self.assert_log_lines(lines)
+ for n in (4, 14):
+ for i in range(9):
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines(lines)
+ # This will flush because it's the 10th message since the last
+ # flush.
+ self.mem_logger.debug(self.next_message())
+ lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
+ self.assert_log_lines(lines)
+
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines(lines)
+
+
+class ExceptionFormatter(logging.Formatter):
+ """A special exception formatter."""
+ def formatException(self, ei):
+ return "Got a [%s]" % ei[0].__name__
+
+
+class ConfigFileTest(BaseTest):
+
+ """Reading logging config from a .ini-style config file."""
+
+ expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+
+ # config0 is a standard configuration.
+ config0 = """
+ [loggers]
+ keys=root
+
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=WARNING
+ handlers=hand1
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+ """
+
+ # config1 adds a little to the standard configuration.
+ config1 = """
+ [loggers]
+ keys=root,parser
+
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=WARNING
+ handlers=
+
+ [logger_parser]
+ level=DEBUG
+ handlers=hand1
+ propagate=1
+ qualname=compiler.parser
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+ """
+
+ # config1a moves the handler to the root.
+ config1a = """
+ [loggers]
+ keys=root,parser
+
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=WARNING
+ handlers=hand1
+
+ [logger_parser]
+ level=DEBUG
+ handlers=
+ propagate=1
+ qualname=compiler.parser
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+ """
+
+ # config2 has a subtle configuration error that should be reported
+ config2 = config1.replace("sys.stdout", "sys.stbout")
+
+ # config3 has a less subtle configuration error
+ config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
+
+ # config4 specifies a custom formatter class to be loaded
+ config4 = """
+ [loggers]
+ keys=root
+
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=NOTSET
+ handlers=hand1
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ class=""" + __name__ + """.ExceptionFormatter
+ format=%(levelname)s:%(name)s:%(message)s
+ datefmt=
+ """
+
+ # config5 specifies a custom handler class to be loaded
+ config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
+
+ # config6 uses ', ' delimiters in the handlers and formatters sections
+ config6 = """
+ [loggers]
+ keys=root,parser
+
+ [handlers]
+ keys=hand1, hand2
+
+ [formatters]
+ keys=form1, form2
+
+ [logger_root]
+ level=WARNING
+ handlers=
+
+ [logger_parser]
+ level=DEBUG
+ handlers=hand1
+ propagate=1
+ qualname=compiler.parser
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [handler_hand2]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stderr,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+
+ [formatter_form2]
+ format=%(message)s
+ datefmt=
+ """
+
+ # config7 adds a compiler logger.
+ config7 = """
+ [loggers]
+ keys=root,parser,compiler
+
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=WARNING
+ handlers=hand1
+
+ [logger_compiler]
+ level=DEBUG
+ handlers=
+ propagate=1
+ qualname=compiler
+
+ [logger_parser]
+ level=DEBUG
+ handlers=
+ propagate=1
+ qualname=compiler.parser
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+ """
+
+ disable_test = """
+ [loggers]
+ keys=root
+
+ [handlers]
+ keys=screen
+
+ [formatters]
+ keys=
+
+ [logger_root]
+ level=DEBUG
+ handlers=screen
+
+ [handler_screen]
+ level=DEBUG
+ class=StreamHandler
+ args=(sys.stdout,)
+ formatter=
+ """
+
+ def apply_config(self, conf, **kwargs):
+ file = cStringIO.StringIO(textwrap.dedent(conf))
+ logging.config.fileConfig(file, **kwargs)
+
+ def test_config0_ok(self):
+ # A simple config file which overrides the default settings.
+ with captured_stdout() as output:
+ self.apply_config(self.config0)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config1_ok(self, config=config1):
+ # A config file defining a sub-parser as well.
+ with captured_stdout() as output:
+ self.apply_config(config)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config2_failure(self):
+ # A simple config file which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config2)
+
+ def test_config3_failure(self):
+ # A simple config file which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config3)
+
+ def test_config4_ok(self):
+ # A config file specifying a custom formatter class.
+ with captured_stdout() as output:
+ self.apply_config(self.config4)
+ logger = logging.getLogger()
+ try:
+ raise RuntimeError()
+ except RuntimeError:
+ logging.exception("just testing")
+ sys.stdout.seek(0)
+ self.assertEqual(output.getvalue(),
+ "ERROR:root:just testing\nGot a [RuntimeError]\n")
+ # Original logger output is empty
+ self.assert_log_lines([])
+
+ def test_config5_ok(self):
+ self.test_config1_ok(config=self.config5)
+
+ def test_config6_ok(self):
+ self.test_config1_ok(config=self.config6)
+
+ def test_config7_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config1a)
+ logger = logging.getLogger("compiler.parser")
+ # See issue #11424. compiler-hyphenated sorts
+ # between compiler and compiler.xyz and this
+ # was preventing compiler.xyz from being included
+ # in the child loggers of compiler because of an
+ # overzealous loop termination condition.
+ hyphenated = logging.getLogger('compiler-hyphenated')
+ # All will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ hyphenated.critical(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ('CRITICAL', '3'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+ with captured_stdout() as output:
+ self.apply_config(self.config7)
+ logger = logging.getLogger("compiler.parser")
+ self.assertFalse(logger.disabled)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ logger = logging.getLogger("compiler.lexer")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ # Will not appear
+ hyphenated.critical(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '4'),
+ ('ERROR', '5'),
+ ('INFO', '6'),
+ ('ERROR', '7'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_logger_disabling(self):
+ self.apply_config(self.disable_test)
+ logger = logging.getLogger('foo')
+ self.assertFalse(logger.disabled)
+ self.apply_config(self.disable_test)
+ self.assertTrue(logger.disabled)
+ self.apply_config(self.disable_test, disable_existing_loggers=False)
+ self.assertFalse(logger.disabled)
+
+class LogRecordStreamHandler(StreamRequestHandler):
+
+ """Handler for a streaming logging request. It saves the log message in the
+ TCP server's 'log_output' attribute."""
+
+ TCP_LOG_END = "!!!END!!!"
+
+ def handle(self):
+ """Handle multiple requests - each expected to be of 4-byte length,
+ followed by the LogRecord in pickle format. Logs the record
+ according to whatever policy is configured locally."""
+ while True:
+ chunk = self.connection.recv(4)
+ if len(chunk) < 4:
+ break
+ slen = struct.unpack(">L", chunk)[0]
+ chunk = self.connection.recv(slen)
+ while len(chunk) < slen:
+ chunk = chunk + self.connection.recv(slen - len(chunk))
+ obj = self.unpickle(chunk)
+ record = logging.makeLogRecord(obj)
+ self.handle_log_record(record)
+
+ def unpickle(self, data):
+ return cPickle.loads(data)
+
+ def handle_log_record(self, record):
+ # If the end-of-messages sentinel is seen, tell the server to
+ # terminate.
+ if self.TCP_LOG_END in record.msg:
+ self.server.abort = 1
+ return
+ self.server.log_output += record.msg + "\n"
+
+
+class LogRecordSocketReceiver(ThreadingTCPServer):
+
+ """A simple-minded TCP socket-based logging receiver suitable for test
+ purposes."""
+
+ allow_reuse_address = 1
+ log_output = ""
+
+ def __init__(self, host='localhost',
+ port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
+ handler=LogRecordStreamHandler):
+ ThreadingTCPServer.__init__(self, (host, port), handler)
+ self.abort = False
+ self.timeout = 0.1
+ self.finished = threading.Event()
+
+ def serve_until_stopped(self):
+ while not self.abort:
+ rd, wr, ex = select.select([self.socket.fileno()], [], [],
+ self.timeout)
+ if rd:
+ self.handle_request()
+ # Notify the main thread that we're about to exit
+ self.finished.set()
+ # close the listen socket
+ self.server_close()
+
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SocketHandlerTest(BaseTest):
+
+ """Test for SocketHandler objects."""
+
+ def setUp(self):
+ """Set up a TCP server to receive log messages, and a SocketHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ self.tcpserver = LogRecordSocketReceiver(port=0)
+ self.port = self.tcpserver.socket.getsockname()[1]
+ self.threads = [
+ threading.Thread(target=self.tcpserver.serve_until_stopped)]
+ for thread in self.threads:
+ thread.start()
+
+ self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
+ self.sock_hdlr.setFormatter(self.root_formatter)
+ self.root_logger.removeHandler(self.root_logger.handlers[0])
+ self.root_logger.addHandler(self.sock_hdlr)
+
+ def tearDown(self):
+ """Shutdown the TCP server."""
+ try:
+ self.tcpserver.abort = True
+ del self.tcpserver
+ self.root_logger.removeHandler(self.sock_hdlr)
+ self.sock_hdlr.close()
+ for thread in self.threads:
+ thread.join(2.0)
+ finally:
+ BaseTest.tearDown(self)
+
+ def get_output(self):
+ """Get the log output as received by the TCP server."""
+ # Signal the TCP receiver and wait for it to terminate.
+ self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
+ self.tcpserver.finished.wait(2.0)
+ return self.tcpserver.log_output
+
+ def test_output(self):
+ # The log message sent to the SocketHandler is properly received.
+ logger = logging.getLogger("tcp")
+ logger.error("spam")
+ logger.debug("eggs")
+ self.assertEqual(self.get_output(), "spam\neggs\n")
+
+
+class MemoryTest(BaseTest):
+
+ """Test memory persistence of logger objects."""
+
+ def setUp(self):
+ """Create a dict to remember potentially destroyed objects."""
+ BaseTest.setUp(self)
+ self._survivors = {}
+
+ def _watch_for_survival(self, *args):
+ """Watch the given objects for survival, by creating weakrefs to
+ them."""
+ for obj in args:
+ key = id(obj), repr(obj)
+ self._survivors[key] = weakref.ref(obj)
+
+ def _assertTruesurvival(self):
+ """Assert that all objects watched for survival have survived."""
+ # Trigger cycle breaking.
+ gc.collect()
+ dead = []
+ for (id_, repr_), ref in self._survivors.items():
+ if ref() is None:
+ dead.append(repr_)
+ if dead:
+ self.fail("%d objects should have survived "
+ "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
+
+ def test_persistent_loggers(self):
+ # Logger objects are persistent and retain their configuration, even
+ # if visible references are destroyed.
+ self.root_logger.setLevel(logging.INFO)
+ foo = logging.getLogger("foo")
+ self._watch_for_survival(foo)
+ foo.setLevel(logging.DEBUG)
+ self.root_logger.debug(self.next_message())
+ foo.debug(self.next_message())
+ self.assert_log_lines([
+ ('foo', 'DEBUG', '2'),
+ ])
+ del foo
+ # foo has survived.
+ self._assertTruesurvival()
+ # foo has retained its settings.
+ bar = logging.getLogger("foo")
+ bar.debug(self.next_message())
+ self.assert_log_lines([
+ ('foo', 'DEBUG', '2'),
+ ('foo', 'DEBUG', '3'),
+ ])
+
+
+class EncodingTest(BaseTest):
+ def test_encoding_plain_file(self):
+ # In Python 2.x, a plain file object is treated as having no encoding.
+ log = logging.getLogger("test")
+ fn = tempfile.mktemp(".log")
+ # the non-ascii data we write to the log.
+ data = "foo\x80"
+ try:
+ handler = logging.FileHandler(fn)
+ log.addHandler(handler)
+ try:
+ # write non-ascii data to the log.
+ log.warning(data)
+ finally:
+ log.removeHandler(handler)
+ handler.close()
+ # check we wrote exactly those bytes, ignoring trailing \n etc
+ f = open(fn)
+ try:
+ self.assertEqual(f.read().rstrip(), data)
+ finally:
+ f.close()
+ finally:
+ if os.path.isfile(fn):
+ os.remove(fn)
+
+ def test_encoding_cyrillic_unicode(self):
+ log = logging.getLogger("test")
+ #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
+ message = u'\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
+ #Ensure it's written in a Cyrillic encoding
+ writer_class = codecs.getwriter('cp1251')
+ writer_class.encoding = 'cp1251'
+ stream = cStringIO.StringIO()
+ writer = writer_class(stream, 'strict')
+ handler = logging.StreamHandler(writer)
+ log.addHandler(handler)
+ try:
+ log.warning(message)
+ finally:
+ log.removeHandler(handler)
+ handler.close()
+ # check we wrote exactly those bytes, ignoring trailing \n etc
+ s = stream.getvalue()
+ #Compare against what the data should be when encoded in CP-1251
+ self.assertEqual(s, '\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
+
+
+class WarningsTest(BaseTest):
+
+ def test_warnings(self):
+ with warnings.catch_warnings():
+ logging.captureWarnings(True)
+ try:
+ warnings.filterwarnings("always", category=UserWarning)
+ file = cStringIO.StringIO()
+ h = logging.StreamHandler(file)
+ logger = logging.getLogger("py.warnings")
+ logger.addHandler(h)
+ warnings.warn("I'm warning you...")
+ logger.removeHandler(h)
+ s = file.getvalue()
+ h.close()
+ self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+
+ #See if an explicit file uses the original implementation
+ file = cStringIO.StringIO()
+ warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
+ file, "Dummy line")
+ s = file.getvalue()
+ file.close()
+ self.assertEqual(s,
+ "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
+ finally:
+ logging.captureWarnings(False)
+
+
+def formatFunc(format, datefmt=None):
+ return logging.Formatter(format, datefmt)
+
+def handlerFunc():
+ return logging.StreamHandler()
+
+class CustomHandler(logging.StreamHandler):
+ pass
+
+class ConfigDictTest(BaseTest):
+
+ """Reading logging config from a dictionary."""
+
+ expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+
+ # config0 is a standard configuration.
+ config0 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # config1 adds a little to the standard configuration.
+ config1 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config2 has a subtle configuration error that should be reported
+ config2 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdbout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #As config1 but with a misspelt level on a handler
+ config2a = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NTOSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+
+ #As config1 but with a misspelt level on a logger
+ config2b = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WRANING',
+ },
+ }
+
+ # config3 has a less subtle configuration error
+ config3 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'misspelled_name',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config4 specifies a custom formatter class to be loaded
+ config4 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ '()' : __name__ + '.ExceptionFormatter',
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'root' : {
+ 'level' : 'NOTSET',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # As config4 but using an actual callable rather than a string
+ config4a = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ '()' : ExceptionFormatter,
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ 'form2' : {
+ '()' : __name__ + '.formatFunc',
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ 'form3' : {
+ '()' : formatFunc,
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ 'hand2' : {
+ '()' : handlerFunc,
+ },
+ },
+ 'root' : {
+ 'level' : 'NOTSET',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # config5 specifies a custom handler class to be loaded
+ config5 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : __name__ + '.CustomHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config6 specifies a custom handler class to be loaded
+ # but has bad arguments
+ config6 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : __name__ + '.CustomHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ '9' : 'invalid parameter name',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #config 7 does not define compiler.parser but defines compiler.lexer
+ #so compiler.parser should be disabled after applying it
+ config7 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.lexer' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ config8 = {
+ 'version': 1,
+ 'disable_existing_loggers' : False,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ 'compiler.lexer' : {
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ config9 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'WARNING',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'NOTSET',
+ },
+ }
+
+ config9a = {
+ 'version': 1,
+ 'incremental' : True,
+ 'handlers' : {
+ 'hand1' : {
+ 'level' : 'WARNING',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'INFO',
+ },
+ },
+ }
+
+ config9b = {
+ 'version': 1,
+ 'incremental' : True,
+ 'handlers' : {
+ 'hand1' : {
+ 'level' : 'INFO',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'INFO',
+ },
+ },
+ }
+
+ #As config1 but with a filter added
+ config10 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'filters' : {
+ 'filt1' : {
+ 'name' : 'compiler.parser',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ 'filters' : ['filt1'],
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'filters' : ['filt1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ #As config1 but using cfg:// references
+ config11 = {
+ 'version': 1,
+ 'true_formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handler_configs': {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'formatters' : 'cfg://true_formatters',
+ 'handlers' : {
+ 'hand1' : 'cfg://handler_configs[hand1]',
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #As config11 but missing the version key
+ config12 = {
+ 'true_formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handler_configs': {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'formatters' : 'cfg://true_formatters',
+ 'handlers' : {
+ 'hand1' : 'cfg://handler_configs[hand1]',
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #As config11 but using an unsupported version
+ config13 = {
+ 'version': 2,
+ 'true_formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handler_configs': {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'formatters' : 'cfg://true_formatters',
+ 'handlers' : {
+ 'hand1' : 'cfg://handler_configs[hand1]',
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ out_of_order = {
+ "version": 1,
+ "formatters": {
+ "mySimpleFormatter": {
+ "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s"
+ }
+ },
+ "handlers": {
+ "fileGlobal": {
+ "class": "logging.StreamHandler",
+ "level": "DEBUG",
+ "formatter": "mySimpleFormatter"
+ },
+ "bufferGlobal": {
+ "class": "logging.handlers.MemoryHandler",
+ "capacity": 5,
+ "formatter": "mySimpleFormatter",
+ "target": "fileGlobal",
+ "level": "DEBUG"
+ }
+ },
+ "loggers": {
+ "mymodule": {
+ "level": "DEBUG",
+ "handlers": ["bufferGlobal"],
+ "propagate": "true"
+ }
+ }
+ }
+
+ def apply_config(self, conf):
+ logging.config.dictConfig(conf)
+
+ def test_config0_ok(self):
+ # A simple config which overrides the default settings.
+ with captured_stdout() as output:
+ self.apply_config(self.config0)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config1_ok(self, config=config1):
+ # A config defining a sub-parser as well.
+ with captured_stdout() as output:
+ self.apply_config(config)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config2_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config2)
+
+ def test_config2a_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config2a)
+
+ def test_config2b_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config2b)
+
+ def test_config3_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config3)
+
+ def test_config4_ok(self):
+ # A config specifying a custom formatter class.
+ with captured_stdout() as output:
+ self.apply_config(self.config4)
+ #logger = logging.getLogger()
+ try:
+ raise RuntimeError()
+ except RuntimeError:
+ logging.exception("just testing")
+ sys.stdout.seek(0)
+ self.assertEqual(output.getvalue(),
+ "ERROR:root:just testing\nGot a [RuntimeError]\n")
+ # Original logger output is empty
+ self.assert_log_lines([])
+
+ def test_config4a_ok(self):
+ # A config specifying a custom formatter class.
+ with captured_stdout() as output:
+ self.apply_config(self.config4a)
+ #logger = logging.getLogger()
+ try:
+ raise RuntimeError()
+ except RuntimeError:
+ logging.exception("just testing")
+ sys.stdout.seek(0)
+ self.assertEqual(output.getvalue(),
+ "ERROR:root:just testing\nGot a [RuntimeError]\n")
+ # Original logger output is empty
+ self.assert_log_lines([])
+
+ def test_config5_ok(self):
+ self.test_config1_ok(config=self.config5)
+
+ def test_config6_failure(self):
+ self.assertRaises(StandardError, self.apply_config, self.config6)
+
+ def test_config7_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config1)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+ with captured_stdout() as output:
+ self.apply_config(self.config7)
+ logger = logging.getLogger("compiler.parser")
+ self.assertTrue(logger.disabled)
+ logger = logging.getLogger("compiler.lexer")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ #Same as test_config_7_ok but don't disable old loggers.
+ def test_config_8_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config1)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+ with captured_stdout() as output:
+ self.apply_config(self.config8)
+ logger = logging.getLogger("compiler.parser")
+ self.assertFalse(logger.disabled)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ logger = logging.getLogger("compiler.lexer")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ('INFO', '5'),
+ ('ERROR', '6'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config_9_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config9)
+ logger = logging.getLogger("compiler.parser")
+ #Nothing will be output since both handler and logger are set to WARNING
+ logger.info(self.next_message())
+ self.assert_log_lines([], stream=output)
+ self.apply_config(self.config9a)
+ #Nothing will be output since both handler is still set to WARNING
+ logger.info(self.next_message())
+ self.assert_log_lines([], stream=output)
+ self.apply_config(self.config9b)
+ #Message should now be output
+ logger.info(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ], stream=output)
+
+ def test_config_10_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config10)
+ logger = logging.getLogger("compiler.parser")
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler.lexer')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger("compiler.parser.codegen")
+ #Output, as not filtered
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('WARNING', '1'),
+ ('ERROR', '4'),
+ ], stream=output)
+
+ def test_config11_ok(self):
+ self.test_config1_ok(self.config11)
+
+ def test_config12_failure(self):
+ self.assertRaises(StandardError, self.apply_config, self.config12)
+
+ def test_config13_failure(self):
+ self.assertRaises(StandardError, self.apply_config, self.config13)
+
+ @unittest.skipUnless(threading, 'listen() needs threading to work')
+ def setup_via_listener(self, text):
+ # Ask for a randomly assigned port (by using port 0)
+ t = logging.config.listen(0)
+ t.start()
+ t.ready.wait()
+ # Now get the port allocated
+ port = t.port
+ t.ready.clear()
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(2.0)
+ sock.connect(('localhost', port))
+
+ slen = struct.pack('>L', len(text))
+ s = slen + text
+ sentsofar = 0
+ left = len(s)
+ while left > 0:
+ sent = sock.send(s[sentsofar:])
+ sentsofar += sent
+ left -= sent
+ sock.close()
+ finally:
+ t.ready.wait(2.0)
+ logging.config.stopListening()
+ t.join(2.0)
+
+ def test_listen_config_10_ok(self):
+ with captured_stdout() as output:
+ self.setup_via_listener(json.dumps(self.config10))
+ logger = logging.getLogger("compiler.parser")
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler.lexer')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger("compiler.parser.codegen")
+ #Output, as not filtered
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('WARNING', '1'),
+ ('ERROR', '4'),
+ ], stream=output)
+
+ def test_listen_config_1_ok(self):
+ with captured_stdout() as output:
+ self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_out_of_order(self):
+ self.apply_config(self.out_of_order)
+ handler = logging.getLogger('mymodule').handlers[0]
+ self.assertIsInstance(handler.target, logging.Handler)
+
+class ManagerTest(BaseTest):
+ def test_manager_loggerclass(self):
+ logged = []
+
+ class MyLogger(logging.Logger):
+ def _log(self, level, msg, args, exc_info=None, extra=None):
+ logged.append(msg)
+
+ man = logging.Manager(None)
+ self.assertRaises(TypeError, man.setLoggerClass, int)
+ man.setLoggerClass(MyLogger)
+ logger = man.getLogger('test')
+ logger.warning('should appear in logged')
+ logging.warning('should not appear in logged')
+
+ self.assertEqual(logged, ['should appear in logged'])
+
+
+class ChildLoggerTest(BaseTest):
+ def test_child_loggers(self):
+ r = logging.getLogger()
+ l1 = logging.getLogger('abc')
+ l2 = logging.getLogger('def.ghi')
+ c1 = r.getChild('xyz')
+ c2 = r.getChild('uvw.xyz')
+ self.assertTrue(c1 is logging.getLogger('xyz'))
+ self.assertTrue(c2 is logging.getLogger('uvw.xyz'))
+ c1 = l1.getChild('def')
+ c2 = c1.getChild('ghi')
+ c3 = l1.getChild('def.ghi')
+ self.assertTrue(c1 is logging.getLogger('abc.def'))
+ self.assertTrue(c2 is logging.getLogger('abc.def.ghi'))
+ self.assertTrue(c2 is c3)
+
+
+class HandlerTest(BaseTest):
+
+ @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_race(self):
+ # Issue #14632 refers.
+ def remove_loop(fname, tries):
+ for _ in range(tries):
+ try:
+ os.unlink(fname)
+ except OSError:
+ pass
+ time.sleep(0.004 * random.randint(0, 4))
+
+ del_count = 500
+ log_count = 500
+
+ for delay in (False, True):
+ fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
+ os.close(fd)
+ remover = threading.Thread(target=remove_loop, args=(fn, del_count))
+ remover.daemon = True
+ remover.start()
+ h = logging.handlers.WatchedFileHandler(fn, delay=delay)
+ f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
+ h.setFormatter(f)
+ try:
+ for _ in range(log_count):
+ time.sleep(0.005)
+ r = logging.makeLogRecord({'msg': 'testing' })
+ h.handle(r)
+ finally:
+ remover.join()
+ try:
+ h.close()
+ except ValueError:
+ pass
+ if os.path.exists(fn):
+ os.unlink(fn)
+
+
+# Set the locale to the platform-dependent default. I have no idea
+# why the test does this, but in any case we save the current locale
+# first and restore it at the end.
+@run_with_locale('LC_ALL', '')
+def test_main():
+ run_unittest(BuiltinLevelsTest, BasicFilterTest,
+ CustomLevelsAndFiltersTest, MemoryHandlerTest,
+ ConfigFileTest, SocketHandlerTest, MemoryTest,
+ EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
+ ChildLoggerTest, HandlerTest)
+
+if __name__ == "__main__":
+ test_main()