diff options
Diffstat (limited to 'lib/python2.7/test/test_logging.py')
-rw-r--r-- | lib/python2.7/test/test_logging.py | 1995 |
1 files changed, 0 insertions, 1995 deletions
diff --git a/lib/python2.7/test/test_logging.py b/lib/python2.7/test/test_logging.py deleted file mode 100644 index 31bc48e..0000000 --- a/lib/python2.7/test/test_logging.py +++ /dev/null @@ -1,1995 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2001-2013 by Vinay Sajip. All Rights Reserved. -# -# Permission to use, copy, modify, and distribute this software and its -# documentation for any purpose and without fee is hereby granted, -# provided that the above copyright notice appear in all copies and that -# both that copyright notice and this permission notice appear in -# supporting documentation, and that the name of Vinay Sajip -# not be used in advertising or publicity pertaining to distribution -# of the software without specific, written prior permission. -# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING -# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL -# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR -# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER -# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT -# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -"""Test harness for the logging module. Run all tests. - -Copyright (C) 2001-2013 Vinay Sajip. All Rights Reserved. -""" - -import logging -import logging.handlers -import logging.config - -import codecs -import cPickle -import cStringIO -import gc -import json -import os -import random -import re -import select -import socket -from SocketServer import ThreadingTCPServer, StreamRequestHandler -import struct -import sys -import tempfile -from test.test_support import captured_stdout, run_with_locale, run_unittest -import textwrap -import time -import unittest -import warnings -import weakref -try: - import threading -except ImportError: - threading = None - -class BaseTest(unittest.TestCase): - - """Base class for logging tests.""" - - log_format = "%(name)s -> %(levelname)s: %(message)s" - expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" - message_num = 0 - - def setUp(self): - """Setup the default logging stream to an internal StringIO instance, - so that we can examine log output as we want.""" - logger_dict = logging.getLogger().manager.loggerDict - logging._acquireLock() - try: - self.saved_handlers = logging._handlers.copy() - self.saved_handler_list = logging._handlerList[:] - self.saved_loggers = logger_dict.copy() - self.saved_level_names = logging._levelNames.copy() - finally: - logging._releaseLock() - - # Set two unused loggers: one non-ASCII and one Unicode. - # This is to test correct operation when sorting existing - # loggers in the configuration code. See issue 8201. - logging.getLogger("\xab\xd7\xbb") - logging.getLogger(u"\u013f\u00d6\u0047") - - self.root_logger = logging.getLogger("") - self.original_logging_level = self.root_logger.getEffectiveLevel() - - self.stream = cStringIO.StringIO() - self.root_logger.setLevel(logging.DEBUG) - self.root_hdlr = logging.StreamHandler(self.stream) - self.root_formatter = logging.Formatter(self.log_format) - self.root_hdlr.setFormatter(self.root_formatter) - self.root_logger.addHandler(self.root_hdlr) - - def tearDown(self): - """Remove our logging stream, and restore the original logging - level.""" - self.stream.close() - self.root_logger.removeHandler(self.root_hdlr) - while self.root_logger.handlers: - h = self.root_logger.handlers[0] - self.root_logger.removeHandler(h) - h.close() - self.root_logger.setLevel(self.original_logging_level) - logging._acquireLock() - try: - logging._levelNames.clear() - logging._levelNames.update(self.saved_level_names) - logging._handlers.clear() - logging._handlers.update(self.saved_handlers) - logging._handlerList[:] = self.saved_handler_list - loggerDict = logging.getLogger().manager.loggerDict - loggerDict.clear() - loggerDict.update(self.saved_loggers) - finally: - logging._releaseLock() - - def assert_log_lines(self, expected_values, stream=None): - """Match the collected log lines against the regular expression - self.expected_log_pat, and compare the extracted group values to - the expected_values list of tuples.""" - stream = stream or self.stream - pat = re.compile(self.expected_log_pat) - try: - stream.reset() - actual_lines = stream.readlines() - except AttributeError: - # StringIO.StringIO lacks a reset() method. - actual_lines = stream.getvalue().splitlines() - self.assertEqual(len(actual_lines), len(expected_values)) - for actual, expected in zip(actual_lines, expected_values): - match = pat.search(actual) - if not match: - self.fail("Log line does not match expected pattern:\n" + - actual) - self.assertEqual(tuple(match.groups()), expected) - s = stream.read() - if s: - self.fail("Remaining output at end of log stream:\n" + s) - - def next_message(self): - """Generate a message consisting solely of an auto-incrementing - integer.""" - self.message_num += 1 - return "%d" % self.message_num - - -class BuiltinLevelsTest(BaseTest): - """Test builtin levels and their inheritance.""" - - def test_flat(self): - #Logging levels in a flat logger namespace. - m = self.next_message - - ERR = logging.getLogger("ERR") - ERR.setLevel(logging.ERROR) - INF = logging.getLogger("INF") - INF.setLevel(logging.INFO) - DEB = logging.getLogger("DEB") - DEB.setLevel(logging.DEBUG) - - # These should log. - ERR.log(logging.CRITICAL, m()) - ERR.error(m()) - - INF.log(logging.CRITICAL, m()) - INF.error(m()) - INF.warn(m()) - INF.info(m()) - - DEB.log(logging.CRITICAL, m()) - DEB.error(m()) - DEB.warn (m()) - DEB.info (m()) - DEB.debug(m()) - - # These should not log. - ERR.warn(m()) - ERR.info(m()) - ERR.debug(m()) - - INF.debug(m()) - - self.assert_log_lines([ - ('ERR', 'CRITICAL', '1'), - ('ERR', 'ERROR', '2'), - ('INF', 'CRITICAL', '3'), - ('INF', 'ERROR', '4'), - ('INF', 'WARNING', '5'), - ('INF', 'INFO', '6'), - ('DEB', 'CRITICAL', '7'), - ('DEB', 'ERROR', '8'), - ('DEB', 'WARNING', '9'), - ('DEB', 'INFO', '10'), - ('DEB', 'DEBUG', '11'), - ]) - - def test_nested_explicit(self): - # Logging levels in a nested namespace, all explicitly set. - m = self.next_message - - INF = logging.getLogger("INF") - INF.setLevel(logging.INFO) - INF_ERR = logging.getLogger("INF.ERR") - INF_ERR.setLevel(logging.ERROR) - - # These should log. - INF_ERR.log(logging.CRITICAL, m()) - INF_ERR.error(m()) - - # These should not log. - INF_ERR.warn(m()) - INF_ERR.info(m()) - INF_ERR.debug(m()) - - self.assert_log_lines([ - ('INF.ERR', 'CRITICAL', '1'), - ('INF.ERR', 'ERROR', '2'), - ]) - - def test_nested_inherited(self): - #Logging levels in a nested namespace, inherited from parent loggers. - m = self.next_message - - INF = logging.getLogger("INF") - INF.setLevel(logging.INFO) - INF_ERR = logging.getLogger("INF.ERR") - INF_ERR.setLevel(logging.ERROR) - INF_UNDEF = logging.getLogger("INF.UNDEF") - INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") - UNDEF = logging.getLogger("UNDEF") - - # These should log. - INF_UNDEF.log(logging.CRITICAL, m()) - INF_UNDEF.error(m()) - INF_UNDEF.warn(m()) - INF_UNDEF.info(m()) - INF_ERR_UNDEF.log(logging.CRITICAL, m()) - INF_ERR_UNDEF.error(m()) - - # These should not log. - INF_UNDEF.debug(m()) - INF_ERR_UNDEF.warn(m()) - INF_ERR_UNDEF.info(m()) - INF_ERR_UNDEF.debug(m()) - - self.assert_log_lines([ - ('INF.UNDEF', 'CRITICAL', '1'), - ('INF.UNDEF', 'ERROR', '2'), - ('INF.UNDEF', 'WARNING', '3'), - ('INF.UNDEF', 'INFO', '4'), - ('INF.ERR.UNDEF', 'CRITICAL', '5'), - ('INF.ERR.UNDEF', 'ERROR', '6'), - ]) - - def test_nested_with_virtual_parent(self): - # Logging levels when some parent does not exist yet. - m = self.next_message - - INF = logging.getLogger("INF") - GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") - CHILD = logging.getLogger("INF.BADPARENT") - INF.setLevel(logging.INFO) - - # These should log. - GRANDCHILD.log(logging.FATAL, m()) - GRANDCHILD.info(m()) - CHILD.log(logging.FATAL, m()) - CHILD.info(m()) - - # These should not log. - GRANDCHILD.debug(m()) - CHILD.debug(m()) - - self.assert_log_lines([ - ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), - ('INF.BADPARENT.UNDEF', 'INFO', '2'), - ('INF.BADPARENT', 'CRITICAL', '3'), - ('INF.BADPARENT', 'INFO', '4'), - ]) - - def test_invalid_name(self): - self.assertRaises(TypeError, logging.getLogger, any) - -class BasicFilterTest(BaseTest): - - """Test the bundled Filter class.""" - - def test_filter(self): - # Only messages satisfying the specified criteria pass through the - # filter. - filter_ = logging.Filter("spam.eggs") - handler = self.root_logger.handlers[0] - try: - handler.addFilter(filter_) - spam = logging.getLogger("spam") - spam_eggs = logging.getLogger("spam.eggs") - spam_eggs_fish = logging.getLogger("spam.eggs.fish") - spam_bakedbeans = logging.getLogger("spam.bakedbeans") - - spam.info(self.next_message()) - spam_eggs.info(self.next_message()) # Good. - spam_eggs_fish.info(self.next_message()) # Good. - spam_bakedbeans.info(self.next_message()) - - self.assert_log_lines([ - ('spam.eggs', 'INFO', '2'), - ('spam.eggs.fish', 'INFO', '3'), - ]) - finally: - handler.removeFilter(filter_) - - -# -# First, we define our levels. There can be as many as you want - the only -# limitations are that they should be integers, the lowest should be > 0 and -# larger values mean less information being logged. If you need specific -# level values which do not fit into these limitations, you can use a -# mapping dictionary to convert between your application levels and the -# logging system. -# -SILENT = 120 -TACITURN = 119 -TERSE = 118 -EFFUSIVE = 117 -SOCIABLE = 116 -VERBOSE = 115 -TALKATIVE = 114 -GARRULOUS = 113 -CHATTERBOX = 112 -BORING = 111 - -LEVEL_RANGE = range(BORING, SILENT + 1) - -# -# Next, we define names for our levels. You don't need to do this - in which -# case the system will use "Level n" to denote the text for the level. -# -my_logging_levels = { - SILENT : 'Silent', - TACITURN : 'Taciturn', - TERSE : 'Terse', - EFFUSIVE : 'Effusive', - SOCIABLE : 'Sociable', - VERBOSE : 'Verbose', - TALKATIVE : 'Talkative', - GARRULOUS : 'Garrulous', - CHATTERBOX : 'Chatterbox', - BORING : 'Boring', -} - -class GarrulousFilter(logging.Filter): - - """A filter which blocks garrulous messages.""" - - def filter(self, record): - return record.levelno != GARRULOUS - -class VerySpecificFilter(logging.Filter): - - """A filter which blocks sociable and taciturn messages.""" - - def filter(self, record): - return record.levelno not in [SOCIABLE, TACITURN] - - -class CustomLevelsAndFiltersTest(BaseTest): - - """Test various filtering possibilities with custom logging levels.""" - - # Skip the logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" - - def setUp(self): - BaseTest.setUp(self) - for k, v in my_logging_levels.items(): - logging.addLevelName(k, v) - - def log_at_all_levels(self, logger): - for lvl in LEVEL_RANGE: - logger.log(lvl, self.next_message()) - - def test_logger_filter(self): - # Filter at logger level. - self.root_logger.setLevel(VERBOSE) - # Levels >= 'Verbose' are good. - self.log_at_all_levels(self.root_logger) - self.assert_log_lines([ - ('Verbose', '5'), - ('Sociable', '6'), - ('Effusive', '7'), - ('Terse', '8'), - ('Taciturn', '9'), - ('Silent', '10'), - ]) - - def test_handler_filter(self): - # Filter at handler level. - self.root_logger.handlers[0].setLevel(SOCIABLE) - try: - # Levels >= 'Sociable' are good. - self.log_at_all_levels(self.root_logger) - self.assert_log_lines([ - ('Sociable', '6'), - ('Effusive', '7'), - ('Terse', '8'), - ('Taciturn', '9'), - ('Silent', '10'), - ]) - finally: - self.root_logger.handlers[0].setLevel(logging.NOTSET) - - def test_specific_filters(self): - # Set a specific filter object on the handler, and then add another - # filter object on the logger itself. - handler = self.root_logger.handlers[0] - specific_filter = None - garr = GarrulousFilter() - handler.addFilter(garr) - try: - self.log_at_all_levels(self.root_logger) - first_lines = [ - # Notice how 'Garrulous' is missing - ('Boring', '1'), - ('Chatterbox', '2'), - ('Talkative', '4'), - ('Verbose', '5'), - ('Sociable', '6'), - ('Effusive', '7'), - ('Terse', '8'), - ('Taciturn', '9'), - ('Silent', '10'), - ] - self.assert_log_lines(first_lines) - - specific_filter = VerySpecificFilter() - self.root_logger.addFilter(specific_filter) - self.log_at_all_levels(self.root_logger) - self.assert_log_lines(first_lines + [ - # Not only 'Garrulous' is still missing, but also 'Sociable' - # and 'Taciturn' - ('Boring', '11'), - ('Chatterbox', '12'), - ('Talkative', '14'), - ('Verbose', '15'), - ('Effusive', '17'), - ('Terse', '18'), - ('Silent', '20'), - ]) - finally: - if specific_filter: - self.root_logger.removeFilter(specific_filter) - handler.removeFilter(garr) - - -class MemoryHandlerTest(BaseTest): - - """Tests for the MemoryHandler.""" - - # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" - - def setUp(self): - BaseTest.setUp(self) - self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, - self.root_hdlr) - self.mem_logger = logging.getLogger('mem') - self.mem_logger.propagate = 0 - self.mem_logger.addHandler(self.mem_hdlr) - - def tearDown(self): - self.mem_hdlr.close() - BaseTest.tearDown(self) - - def test_flush(self): - # The memory handler flushes to its target handler based on specific - # criteria (message count and message level). - self.mem_logger.debug(self.next_message()) - self.assert_log_lines([]) - self.mem_logger.info(self.next_message()) - self.assert_log_lines([]) - # This will flush because the level is >= logging.WARNING - self.mem_logger.warn(self.next_message()) - lines = [ - ('DEBUG', '1'), - ('INFO', '2'), - ('WARNING', '3'), - ] - self.assert_log_lines(lines) - for n in (4, 14): - for i in range(9): - self.mem_logger.debug(self.next_message()) - self.assert_log_lines(lines) - # This will flush because it's the 10th message since the last - # flush. - self.mem_logger.debug(self.next_message()) - lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] - self.assert_log_lines(lines) - - self.mem_logger.debug(self.next_message()) - self.assert_log_lines(lines) - - -class ExceptionFormatter(logging.Formatter): - """A special exception formatter.""" - def formatException(self, ei): - return "Got a [%s]" % ei[0].__name__ - - -class ConfigFileTest(BaseTest): - - """Reading logging config from a .ini-style config file.""" - - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" - - # config0 is a standard configuration. - config0 = """ - [loggers] - keys=root - - [handlers] - keys=hand1 - - [formatters] - keys=form1 - - [logger_root] - level=WARNING - handlers=hand1 - - [handler_hand1] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stdout,) - - [formatter_form1] - format=%(levelname)s ++ %(message)s - datefmt= - """ - - # config1 adds a little to the standard configuration. - config1 = """ - [loggers] - keys=root,parser - - [handlers] - keys=hand1 - - [formatters] - keys=form1 - - [logger_root] - level=WARNING - handlers= - - [logger_parser] - level=DEBUG - handlers=hand1 - propagate=1 - qualname=compiler.parser - - [handler_hand1] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stdout,) - - [formatter_form1] - format=%(levelname)s ++ %(message)s - datefmt= - """ - - # config1a moves the handler to the root. - config1a = """ - [loggers] - keys=root,parser - - [handlers] - keys=hand1 - - [formatters] - keys=form1 - - [logger_root] - level=WARNING - handlers=hand1 - - [logger_parser] - level=DEBUG - handlers= - propagate=1 - qualname=compiler.parser - - [handler_hand1] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stdout,) - - [formatter_form1] - format=%(levelname)s ++ %(message)s - datefmt= - """ - - # config2 has a subtle configuration error that should be reported - config2 = config1.replace("sys.stdout", "sys.stbout") - - # config3 has a less subtle configuration error - config3 = config1.replace("formatter=form1", "formatter=misspelled_name") - - # config4 specifies a custom formatter class to be loaded - config4 = """ - [loggers] - keys=root - - [handlers] - keys=hand1 - - [formatters] - keys=form1 - - [logger_root] - level=NOTSET - handlers=hand1 - - [handler_hand1] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stdout,) - - [formatter_form1] - class=""" + __name__ + """.ExceptionFormatter - format=%(levelname)s:%(name)s:%(message)s - datefmt= - """ - - # config5 specifies a custom handler class to be loaded - config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') - - # config6 uses ', ' delimiters in the handlers and formatters sections - config6 = """ - [loggers] - keys=root,parser - - [handlers] - keys=hand1, hand2 - - [formatters] - keys=form1, form2 - - [logger_root] - level=WARNING - handlers= - - [logger_parser] - level=DEBUG - handlers=hand1 - propagate=1 - qualname=compiler.parser - - [handler_hand1] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stdout,) - - [handler_hand2] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stderr,) - - [formatter_form1] - format=%(levelname)s ++ %(message)s - datefmt= - - [formatter_form2] - format=%(message)s - datefmt= - """ - - # config7 adds a compiler logger. - config7 = """ - [loggers] - keys=root,parser,compiler - - [handlers] - keys=hand1 - - [formatters] - keys=form1 - - [logger_root] - level=WARNING - handlers=hand1 - - [logger_compiler] - level=DEBUG - handlers= - propagate=1 - qualname=compiler - - [logger_parser] - level=DEBUG - handlers= - propagate=1 - qualname=compiler.parser - - [handler_hand1] - class=StreamHandler - level=NOTSET - formatter=form1 - args=(sys.stdout,) - - [formatter_form1] - format=%(levelname)s ++ %(message)s - datefmt= - """ - - disable_test = """ - [loggers] - keys=root - - [handlers] - keys=screen - - [formatters] - keys= - - [logger_root] - level=DEBUG - handlers=screen - - [handler_screen] - level=DEBUG - class=StreamHandler - args=(sys.stdout,) - formatter= - """ - - def apply_config(self, conf, **kwargs): - file = cStringIO.StringIO(textwrap.dedent(conf)) - logging.config.fileConfig(file, **kwargs) - - def test_config0_ok(self): - # A simple config file which overrides the default settings. - with captured_stdout() as output: - self.apply_config(self.config0) - logger = logging.getLogger() - # Won't output anything - logger.info(self.next_message()) - # Outputs a message - logger.error(self.next_message()) - self.assert_log_lines([ - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_config1_ok(self, config=config1): - # A config file defining a sub-parser as well. - with captured_stdout() as output: - self.apply_config(config) - logger = logging.getLogger("compiler.parser") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '1'), - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_config2_failure(self): - # A simple config file which overrides the default settings. - self.assertRaises(StandardError, self.apply_config, self.config2) - - def test_config3_failure(self): - # A simple config file which overrides the default settings. - self.assertRaises(StandardError, self.apply_config, self.config3) - - def test_config4_ok(self): - # A config file specifying a custom formatter class. - with captured_stdout() as output: - self.apply_config(self.config4) - logger = logging.getLogger() - try: - raise RuntimeError() - except RuntimeError: - logging.exception("just testing") - sys.stdout.seek(0) - self.assertEqual(output.getvalue(), - "ERROR:root:just testing\nGot a [RuntimeError]\n") - # Original logger output is empty - self.assert_log_lines([]) - - def test_config5_ok(self): - self.test_config1_ok(config=self.config5) - - def test_config6_ok(self): - self.test_config1_ok(config=self.config6) - - def test_config7_ok(self): - with captured_stdout() as output: - self.apply_config(self.config1a) - logger = logging.getLogger("compiler.parser") - # See issue #11424. compiler-hyphenated sorts - # between compiler and compiler.xyz and this - # was preventing compiler.xyz from being included - # in the child loggers of compiler because of an - # overzealous loop termination condition. - hyphenated = logging.getLogger('compiler-hyphenated') - # All will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - hyphenated.critical(self.next_message()) - self.assert_log_lines([ - ('INFO', '1'), - ('ERROR', '2'), - ('CRITICAL', '3'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - with captured_stdout() as output: - self.apply_config(self.config7) - logger = logging.getLogger("compiler.parser") - self.assertFalse(logger.disabled) - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - logger = logging.getLogger("compiler.lexer") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - # Will not appear - hyphenated.critical(self.next_message()) - self.assert_log_lines([ - ('INFO', '4'), - ('ERROR', '5'), - ('INFO', '6'), - ('ERROR', '7'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_logger_disabling(self): - self.apply_config(self.disable_test) - logger = logging.getLogger('foo') - self.assertFalse(logger.disabled) - self.apply_config(self.disable_test) - self.assertTrue(logger.disabled) - self.apply_config(self.disable_test, disable_existing_loggers=False) - self.assertFalse(logger.disabled) - -class LogRecordStreamHandler(StreamRequestHandler): - - """Handler for a streaming logging request. It saves the log message in the - TCP server's 'log_output' attribute.""" - - TCP_LOG_END = "!!!END!!!" - - def handle(self): - """Handle multiple requests - each expected to be of 4-byte length, - followed by the LogRecord in pickle format. Logs the record - according to whatever policy is configured locally.""" - while True: - chunk = self.connection.recv(4) - if len(chunk) < 4: - break - slen = struct.unpack(">L", chunk)[0] - chunk = self.connection.recv(slen) - while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unpickle(chunk) - record = logging.makeLogRecord(obj) - self.handle_log_record(record) - - def unpickle(self, data): - return cPickle.loads(data) - - def handle_log_record(self, record): - # If the end-of-messages sentinel is seen, tell the server to - # terminate. - if self.TCP_LOG_END in record.msg: - self.server.abort = 1 - return - self.server.log_output += record.msg + "\n" - - -class LogRecordSocketReceiver(ThreadingTCPServer): - - """A simple-minded TCP socket-based logging receiver suitable for test - purposes.""" - - allow_reuse_address = 1 - log_output = "" - - def __init__(self, host='localhost', - port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler=LogRecordStreamHandler): - ThreadingTCPServer.__init__(self, (host, port), handler) - self.abort = False - self.timeout = 0.1 - self.finished = threading.Event() - - def serve_until_stopped(self): - while not self.abort: - rd, wr, ex = select.select([self.socket.fileno()], [], [], - self.timeout) - if rd: - self.handle_request() - # Notify the main thread that we're about to exit - self.finished.set() - # close the listen socket - self.server_close() - - -@unittest.skipUnless(threading, 'Threading required for this test.') -class SocketHandlerTest(BaseTest): - - """Test for SocketHandler objects.""" - - def setUp(self): - """Set up a TCP server to receive log messages, and a SocketHandler - pointing to that server's address and port.""" - BaseTest.setUp(self) - self.tcpserver = LogRecordSocketReceiver(port=0) - self.port = self.tcpserver.socket.getsockname()[1] - self.threads = [ - threading.Thread(target=self.tcpserver.serve_until_stopped)] - for thread in self.threads: - thread.start() - - self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) - self.sock_hdlr.setFormatter(self.root_formatter) - self.root_logger.removeHandler(self.root_logger.handlers[0]) - self.root_logger.addHandler(self.sock_hdlr) - - def tearDown(self): - """Shutdown the TCP server.""" - try: - self.tcpserver.abort = True - del self.tcpserver - self.root_logger.removeHandler(self.sock_hdlr) - self.sock_hdlr.close() - for thread in self.threads: - thread.join(2.0) - finally: - BaseTest.tearDown(self) - - def get_output(self): - """Get the log output as received by the TCP server.""" - # Signal the TCP receiver and wait for it to terminate. - self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) - self.tcpserver.finished.wait(2.0) - return self.tcpserver.log_output - - def test_output(self): - # The log message sent to the SocketHandler is properly received. - logger = logging.getLogger("tcp") - logger.error("spam") - logger.debug("eggs") - self.assertEqual(self.get_output(), "spam\neggs\n") - - -class MemoryTest(BaseTest): - - """Test memory persistence of logger objects.""" - - def setUp(self): - """Create a dict to remember potentially destroyed objects.""" - BaseTest.setUp(self) - self._survivors = {} - - def _watch_for_survival(self, *args): - """Watch the given objects for survival, by creating weakrefs to - them.""" - for obj in args: - key = id(obj), repr(obj) - self._survivors[key] = weakref.ref(obj) - - def _assertTruesurvival(self): - """Assert that all objects watched for survival have survived.""" - # Trigger cycle breaking. - gc.collect() - dead = [] - for (id_, repr_), ref in self._survivors.items(): - if ref() is None: - dead.append(repr_) - if dead: - self.fail("%d objects should have survived " - "but have been destroyed: %s" % (len(dead), ", ".join(dead))) - - def test_persistent_loggers(self): - # Logger objects are persistent and retain their configuration, even - # if visible references are destroyed. - self.root_logger.setLevel(logging.INFO) - foo = logging.getLogger("foo") - self._watch_for_survival(foo) - foo.setLevel(logging.DEBUG) - self.root_logger.debug(self.next_message()) - foo.debug(self.next_message()) - self.assert_log_lines([ - ('foo', 'DEBUG', '2'), - ]) - del foo - # foo has survived. - self._assertTruesurvival() - # foo has retained its settings. - bar = logging.getLogger("foo") - bar.debug(self.next_message()) - self.assert_log_lines([ - ('foo', 'DEBUG', '2'), - ('foo', 'DEBUG', '3'), - ]) - - -class EncodingTest(BaseTest): - def test_encoding_plain_file(self): - # In Python 2.x, a plain file object is treated as having no encoding. - log = logging.getLogger("test") - fn = tempfile.mktemp(".log") - # the non-ascii data we write to the log. - data = "foo\x80" - try: - handler = logging.FileHandler(fn) - log.addHandler(handler) - try: - # write non-ascii data to the log. - log.warning(data) - finally: - log.removeHandler(handler) - handler.close() - # check we wrote exactly those bytes, ignoring trailing \n etc - f = open(fn) - try: - self.assertEqual(f.read().rstrip(), data) - finally: - f.close() - finally: - if os.path.isfile(fn): - os.remove(fn) - - def test_encoding_cyrillic_unicode(self): - log = logging.getLogger("test") - #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) - message = u'\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' - #Ensure it's written in a Cyrillic encoding - writer_class = codecs.getwriter('cp1251') - writer_class.encoding = 'cp1251' - stream = cStringIO.StringIO() - writer = writer_class(stream, 'strict') - handler = logging.StreamHandler(writer) - log.addHandler(handler) - try: - log.warning(message) - finally: - log.removeHandler(handler) - handler.close() - # check we wrote exactly those bytes, ignoring trailing \n etc - s = stream.getvalue() - #Compare against what the data should be when encoded in CP-1251 - self.assertEqual(s, '\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') - - -class WarningsTest(BaseTest): - - def test_warnings(self): - with warnings.catch_warnings(): - logging.captureWarnings(True) - try: - warnings.filterwarnings("always", category=UserWarning) - file = cStringIO.StringIO() - h = logging.StreamHandler(file) - logger = logging.getLogger("py.warnings") - logger.addHandler(h) - warnings.warn("I'm warning you...") - logger.removeHandler(h) - s = file.getvalue() - h.close() - self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) - - #See if an explicit file uses the original implementation - file = cStringIO.StringIO() - warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, - file, "Dummy line") - s = file.getvalue() - file.close() - self.assertEqual(s, - "dummy.py:42: UserWarning: Explicit\n Dummy line\n") - finally: - logging.captureWarnings(False) - - -def formatFunc(format, datefmt=None): - return logging.Formatter(format, datefmt) - -def handlerFunc(): - return logging.StreamHandler() - -class CustomHandler(logging.StreamHandler): - pass - -class ConfigDictTest(BaseTest): - - """Reading logging config from a dictionary.""" - - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" - - # config0 is a standard configuration. - config0 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'root' : { - 'level' : 'WARNING', - 'handlers' : ['hand1'], - }, - } - - # config1 adds a little to the standard configuration. - config1 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - # config2 has a subtle configuration error that should be reported - config2 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdbout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - #As config1 but with a misspelt level on a handler - config2a = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NTOSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - - #As config1 but with a misspelt level on a logger - config2b = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WRANING', - }, - } - - # config3 has a less subtle configuration error - config3 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'misspelled_name', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - # config4 specifies a custom formatter class to be loaded - config4 = { - 'version': 1, - 'formatters': { - 'form1' : { - '()' : __name__ + '.ExceptionFormatter', - 'format' : '%(levelname)s:%(name)s:%(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'root' : { - 'level' : 'NOTSET', - 'handlers' : ['hand1'], - }, - } - - # As config4 but using an actual callable rather than a string - config4a = { - 'version': 1, - 'formatters': { - 'form1' : { - '()' : ExceptionFormatter, - 'format' : '%(levelname)s:%(name)s:%(message)s', - }, - 'form2' : { - '()' : __name__ + '.formatFunc', - 'format' : '%(levelname)s:%(name)s:%(message)s', - }, - 'form3' : { - '()' : formatFunc, - 'format' : '%(levelname)s:%(name)s:%(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - 'hand2' : { - '()' : handlerFunc, - }, - }, - 'root' : { - 'level' : 'NOTSET', - 'handlers' : ['hand1'], - }, - } - - # config5 specifies a custom handler class to be loaded - config5 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : __name__ + '.CustomHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - # config6 specifies a custom handler class to be loaded - # but has bad arguments - config6 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : __name__ + '.CustomHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - '9' : 'invalid parameter name', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - #config 7 does not define compiler.parser but defines compiler.lexer - #so compiler.parser should be disabled after applying it - config7 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.lexer' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - config8 = { - 'version': 1, - 'disable_existing_loggers' : False, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - 'compiler.lexer' : { - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - config9 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'WARNING', - 'stream' : 'ext://sys.stdout', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'WARNING', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'NOTSET', - }, - } - - config9a = { - 'version': 1, - 'incremental' : True, - 'handlers' : { - 'hand1' : { - 'level' : 'WARNING', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'INFO', - }, - }, - } - - config9b = { - 'version': 1, - 'incremental' : True, - 'handlers' : { - 'hand1' : { - 'level' : 'INFO', - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'INFO', - }, - }, - } - - #As config1 but with a filter added - config10 = { - 'version': 1, - 'formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'filters' : { - 'filt1' : { - 'name' : 'compiler.parser', - }, - }, - 'handlers' : { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - 'filters' : ['filt1'], - }, - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'filters' : ['filt1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - 'handlers' : ['hand1'], - }, - } - - #As config1 but using cfg:// references - config11 = { - 'version': 1, - 'true_formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handler_configs': { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'formatters' : 'cfg://true_formatters', - 'handlers' : { - 'hand1' : 'cfg://handler_configs[hand1]', - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - #As config11 but missing the version key - config12 = { - 'true_formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handler_configs': { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'formatters' : 'cfg://true_formatters', - 'handlers' : { - 'hand1' : 'cfg://handler_configs[hand1]', - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - #As config11 but using an unsupported version - config13 = { - 'version': 2, - 'true_formatters': { - 'form1' : { - 'format' : '%(levelname)s ++ %(message)s', - }, - }, - 'handler_configs': { - 'hand1' : { - 'class' : 'logging.StreamHandler', - 'formatter' : 'form1', - 'level' : 'NOTSET', - 'stream' : 'ext://sys.stdout', - }, - }, - 'formatters' : 'cfg://true_formatters', - 'handlers' : { - 'hand1' : 'cfg://handler_configs[hand1]', - }, - 'loggers' : { - 'compiler.parser' : { - 'level' : 'DEBUG', - 'handlers' : ['hand1'], - }, - }, - 'root' : { - 'level' : 'WARNING', - }, - } - - out_of_order = { - "version": 1, - "formatters": { - "mySimpleFormatter": { - "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s" - } - }, - "handlers": { - "fileGlobal": { - "class": "logging.StreamHandler", - "level": "DEBUG", - "formatter": "mySimpleFormatter" - }, - "bufferGlobal": { - "class": "logging.handlers.MemoryHandler", - "capacity": 5, - "formatter": "mySimpleFormatter", - "target": "fileGlobal", - "level": "DEBUG" - } - }, - "loggers": { - "mymodule": { - "level": "DEBUG", - "handlers": ["bufferGlobal"], - "propagate": "true" - } - } - } - - def apply_config(self, conf): - logging.config.dictConfig(conf) - - def test_config0_ok(self): - # A simple config which overrides the default settings. - with captured_stdout() as output: - self.apply_config(self.config0) - logger = logging.getLogger() - # Won't output anything - logger.info(self.next_message()) - # Outputs a message - logger.error(self.next_message()) - self.assert_log_lines([ - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_config1_ok(self, config=config1): - # A config defining a sub-parser as well. - with captured_stdout() as output: - self.apply_config(config) - logger = logging.getLogger("compiler.parser") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '1'), - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_config2_failure(self): - # A simple config which overrides the default settings. - self.assertRaises(StandardError, self.apply_config, self.config2) - - def test_config2a_failure(self): - # A simple config which overrides the default settings. - self.assertRaises(StandardError, self.apply_config, self.config2a) - - def test_config2b_failure(self): - # A simple config which overrides the default settings. - self.assertRaises(StandardError, self.apply_config, self.config2b) - - def test_config3_failure(self): - # A simple config which overrides the default settings. - self.assertRaises(StandardError, self.apply_config, self.config3) - - def test_config4_ok(self): - # A config specifying a custom formatter class. - with captured_stdout() as output: - self.apply_config(self.config4) - #logger = logging.getLogger() - try: - raise RuntimeError() - except RuntimeError: - logging.exception("just testing") - sys.stdout.seek(0) - self.assertEqual(output.getvalue(), - "ERROR:root:just testing\nGot a [RuntimeError]\n") - # Original logger output is empty - self.assert_log_lines([]) - - def test_config4a_ok(self): - # A config specifying a custom formatter class. - with captured_stdout() as output: - self.apply_config(self.config4a) - #logger = logging.getLogger() - try: - raise RuntimeError() - except RuntimeError: - logging.exception("just testing") - sys.stdout.seek(0) - self.assertEqual(output.getvalue(), - "ERROR:root:just testing\nGot a [RuntimeError]\n") - # Original logger output is empty - self.assert_log_lines([]) - - def test_config5_ok(self): - self.test_config1_ok(config=self.config5) - - def test_config6_failure(self): - self.assertRaises(StandardError, self.apply_config, self.config6) - - def test_config7_ok(self): - with captured_stdout() as output: - self.apply_config(self.config1) - logger = logging.getLogger("compiler.parser") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '1'), - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - with captured_stdout() as output: - self.apply_config(self.config7) - logger = logging.getLogger("compiler.parser") - self.assertTrue(logger.disabled) - logger = logging.getLogger("compiler.lexer") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '3'), - ('ERROR', '4'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - #Same as test_config_7_ok but don't disable old loggers. - def test_config_8_ok(self): - with captured_stdout() as output: - self.apply_config(self.config1) - logger = logging.getLogger("compiler.parser") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '1'), - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - with captured_stdout() as output: - self.apply_config(self.config8) - logger = logging.getLogger("compiler.parser") - self.assertFalse(logger.disabled) - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - logger = logging.getLogger("compiler.lexer") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '3'), - ('ERROR', '4'), - ('INFO', '5'), - ('ERROR', '6'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_config_9_ok(self): - with captured_stdout() as output: - self.apply_config(self.config9) - logger = logging.getLogger("compiler.parser") - #Nothing will be output since both handler and logger are set to WARNING - logger.info(self.next_message()) - self.assert_log_lines([], stream=output) - self.apply_config(self.config9a) - #Nothing will be output since both handler is still set to WARNING - logger.info(self.next_message()) - self.assert_log_lines([], stream=output) - self.apply_config(self.config9b) - #Message should now be output - logger.info(self.next_message()) - self.assert_log_lines([ - ('INFO', '3'), - ], stream=output) - - def test_config_10_ok(self): - with captured_stdout() as output: - self.apply_config(self.config10) - logger = logging.getLogger("compiler.parser") - logger.warning(self.next_message()) - logger = logging.getLogger('compiler') - #Not output, because filtered - logger.warning(self.next_message()) - logger = logging.getLogger('compiler.lexer') - #Not output, because filtered - logger.warning(self.next_message()) - logger = logging.getLogger("compiler.parser.codegen") - #Output, as not filtered - logger.error(self.next_message()) - self.assert_log_lines([ - ('WARNING', '1'), - ('ERROR', '4'), - ], stream=output) - - def test_config11_ok(self): - self.test_config1_ok(self.config11) - - def test_config12_failure(self): - self.assertRaises(StandardError, self.apply_config, self.config12) - - def test_config13_failure(self): - self.assertRaises(StandardError, self.apply_config, self.config13) - - @unittest.skipUnless(threading, 'listen() needs threading to work') - def setup_via_listener(self, text): - # Ask for a randomly assigned port (by using port 0) - t = logging.config.listen(0) - t.start() - t.ready.wait() - # Now get the port allocated - port = t.port - t.ready.clear() - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(2.0) - sock.connect(('localhost', port)) - - slen = struct.pack('>L', len(text)) - s = slen + text - sentsofar = 0 - left = len(s) - while left > 0: - sent = sock.send(s[sentsofar:]) - sentsofar += sent - left -= sent - sock.close() - finally: - t.ready.wait(2.0) - logging.config.stopListening() - t.join(2.0) - - def test_listen_config_10_ok(self): - with captured_stdout() as output: - self.setup_via_listener(json.dumps(self.config10)) - logger = logging.getLogger("compiler.parser") - logger.warning(self.next_message()) - logger = logging.getLogger('compiler') - #Not output, because filtered - logger.warning(self.next_message()) - logger = logging.getLogger('compiler.lexer') - #Not output, because filtered - logger.warning(self.next_message()) - logger = logging.getLogger("compiler.parser.codegen") - #Output, as not filtered - logger.error(self.next_message()) - self.assert_log_lines([ - ('WARNING', '1'), - ('ERROR', '4'), - ], stream=output) - - def test_listen_config_1_ok(self): - with captured_stdout() as output: - self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) - logger = logging.getLogger("compiler.parser") - # Both will output a message - logger.info(self.next_message()) - logger.error(self.next_message()) - self.assert_log_lines([ - ('INFO', '1'), - ('ERROR', '2'), - ], stream=output) - # Original logger output is empty. - self.assert_log_lines([]) - - def test_out_of_order(self): - self.apply_config(self.out_of_order) - handler = logging.getLogger('mymodule').handlers[0] - self.assertIsInstance(handler.target, logging.Handler) - -class ManagerTest(BaseTest): - def test_manager_loggerclass(self): - logged = [] - - class MyLogger(logging.Logger): - def _log(self, level, msg, args, exc_info=None, extra=None): - logged.append(msg) - - man = logging.Manager(None) - self.assertRaises(TypeError, man.setLoggerClass, int) - man.setLoggerClass(MyLogger) - logger = man.getLogger('test') - logger.warning('should appear in logged') - logging.warning('should not appear in logged') - - self.assertEqual(logged, ['should appear in logged']) - - -class ChildLoggerTest(BaseTest): - def test_child_loggers(self): - r = logging.getLogger() - l1 = logging.getLogger('abc') - l2 = logging.getLogger('def.ghi') - c1 = r.getChild('xyz') - c2 = r.getChild('uvw.xyz') - self.assertTrue(c1 is logging.getLogger('xyz')) - self.assertTrue(c2 is logging.getLogger('uvw.xyz')) - c1 = l1.getChild('def') - c2 = c1.getChild('ghi') - c3 = l1.getChild('def.ghi') - self.assertTrue(c1 is logging.getLogger('abc.def')) - self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) - self.assertTrue(c2 is c3) - - -class HandlerTest(BaseTest): - - @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') - @unittest.skipUnless(threading, 'Threading required for this test.') - def test_race(self): - # Issue #14632 refers. - def remove_loop(fname, tries): - for _ in range(tries): - try: - os.unlink(fname) - except OSError: - pass - time.sleep(0.004 * random.randint(0, 4)) - - del_count = 500 - log_count = 500 - - for delay in (False, True): - fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') - os.close(fd) - remover = threading.Thread(target=remove_loop, args=(fn, del_count)) - remover.daemon = True - remover.start() - h = logging.handlers.WatchedFileHandler(fn, delay=delay) - f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') - h.setFormatter(f) - try: - for _ in range(log_count): - time.sleep(0.005) - r = logging.makeLogRecord({'msg': 'testing' }) - h.handle(r) - finally: - remover.join() - try: - h.close() - except ValueError: - pass - if os.path.exists(fn): - os.unlink(fn) - - -# Set the locale to the platform-dependent default. I have no idea -# why the test does this, but in any case we save the current locale -# first and restore it at the end. -@run_with_locale('LC_ALL', '') -def test_main(): - run_unittest(BuiltinLevelsTest, BasicFilterTest, - CustomLevelsAndFiltersTest, MemoryHandlerTest, - ConfigFileTest, SocketHandlerTest, MemoryTest, - EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, - ChildLoggerTest, HandlerTest) - -if __name__ == "__main__": - test_main() |