Merge branch 'trunk' into patch-2

This commit is contained in:
Glyph 2019-08-04 17:30:49 -07:00 committed by GitHub
commit 5885bf7c22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 238 additions and 74 deletions

View File

@ -575,8 +575,9 @@ class POP3(basic.LineOnlyReceiver, policies.TimeoutMixin):
return self.processCommand(*line.split(b' '))
except (ValueError, AttributeError, POP3Error, TypeError) as e:
log.err()
self.failResponse('bad protocol or server: {}: {}'.format(
e.__class__.__name__, e))
self.failResponse(b': '.join([b'bad protocol or server',
e.__class__.__name__.encode('utf-8'),
b''.join(e.args)]))
def processCommand(self, command, *args):
@ -862,7 +863,7 @@ class POP3(basic.LineOnlyReceiver, policies.TimeoutMixin):
self.successResponse(b'USER accepted, send PASS')
def do_PASS(self, password):
def do_PASS(self, password, *words):
"""
Handle a PASS command.
@ -874,15 +875,19 @@ class POP3(basic.LineOnlyReceiver, policies.TimeoutMixin):
@type password: L{bytes}
@param password: A password.
@type words: L{tuple} of L{bytes}
@param words: Other parts of the password split by spaces.
"""
if self._userIs is None:
self.failResponse(b"USER required before PASS")
return
user = self._userIs
self._userIs = None
password = b' '.join((password,) + words)
d = defer.maybeDeferred(self.authenticateUserPASS, user, password)
d.addCallbacks(self._cbMailbox, self._ebMailbox, callbackArgs=(user,)
).addErrback(self._ebUnexpected)
d.addCallbacks(self._cbMailbox, self._ebMailbox,
callbackArgs=(user,)).addErrback(self._ebUnexpected)
def _longOperation(self, d):

View File

@ -431,6 +431,18 @@ class DummyPOP3(pop3.POP3):
class DummyPOP3Auth(DummyPOP3):
"""
Class to test successful authentication in twisted.mail.pop3.POP3.
"""
def __init__(self, user, password):
self.portal = cred.portal.Portal(TestRealm())
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
ch.addUser(user, password)
self.portal.registerChecker(ch)
class DummyMailbox(pop3.Mailbox):
"""
An in-memory L{pop3.IMailbox} implementation.
@ -499,7 +511,7 @@ class AnotherPOP3Tests(unittest.TestCase):
"""
Additional L{pop3.POP3} tests.
"""
def runTest(self, lines, expectedOutput):
def runTest(self, lines, expectedOutput, protocolInstance=None):
"""
Assert that when C{lines} are delivered to L{pop3.POP3} it responds
with C{expectedOutput}.
@ -510,10 +522,13 @@ class AnotherPOP3Tests(unittest.TestCase):
@param expectedOutput: A sequence of L{bytes} representing the
expected response from the server.
@param protocolInstance: Instance of L{twisted.mail.pop3.POP3} or
L{None}. If L{None}, a new DummyPOP3 will be used.
@return: A L{Deferred} that fires when the lines have been delivered
and the output checked.
"""
dummy = DummyPOP3()
dummy = protocolInstance if protocolInstance else DummyPOP3()
client = LineSendingProtocol(lines)
d = loopback.loopbackAsync(dummy, client)
return d.addCallback(self._cbRunTest, client, dummy, expectedOutput)
@ -579,6 +594,24 @@ class AnotherPOP3Tests(unittest.TestCase):
b'+OK '])
def test_badUTF8CharactersInCommand(self):
"""
Sending a command with invalid UTF-8 characters
will raise a L{pop3.POP3Error}.
"""
error = b'not authenticated yet: cannot do \x81PASS'
d = self.runTest(
[b'\x81PASS',
b'QUIT'],
[b'+OK <moshez>',
b"-ERR bad protocol or server: POP3Error: " +
error,
b'+OK '])
errors = self.flushLoggedErrors(pop3.POP3Error)
self.assertEqual(len(errors), 1)
return d
def test_authListing(self):
"""
L{pop3.POP3} responds to an I{AUTH} command with a list of supported
@ -604,70 +637,195 @@ class AnotherPOP3Tests(unittest.TestCase):
self.assertEqual(client.response[5], b".")
def test_illegalPASS(self):
def run_PASS(self, real_user, real_password,
tried_user=None, tried_password=None,
after_auth_input=[], after_auth_output=[]):
"""
L{pop3.POP3} handles a I{PASS} command before a I{USER} command with an
error indicating the out-of-sequence operation.
Test a login with PASS.
If L{real_user} matches L{tried_user} and L{real_password} matches
L{tried_password}, a successful login will be expected.
Otherwise an unsuccessful login will be expected.
@type real_user: L{bytes}
@param real_user: The user to test.
@type real_password: L{bytes}
@param real_password: The password of the test user.
@type tried_user: L{bytes} or L{None}
@param tried_user: The user to call USER with.
If None, real_user will be used.
@type tried_password: L{bytes} or L{None}
@param tried_password: The password to call PASS with.
If None, real_password will be used.
@type after_auth_input: L{list} of l{bytes}
@param after_auth_input: Extra protocol input after authentication.
@type after_auth_output: L{list} of l{bytes}
@param after_auth_output: Extra protocol output after authentication.
"""
dummy = DummyPOP3()
client = LineSendingProtocol([
b"PASS fooz",
b"QUIT"
])
d = loopback.loopbackAsync(dummy, client)
return d.addCallback(self._cbTestIllegalPASS, client, dummy)
if not tried_user:
tried_user = real_user
if not tried_password:
tried_password = real_password
response = [b'+OK <moshez>',
b'+OK USER accepted, send PASS',
b'-ERR Authentication failed']
if real_user == tried_user and real_password == tried_password:
response = [b'+OK <moshez>',
b'+OK USER accepted, send PASS',
b'+OK Authentication succeeded']
fullInput = [b' '.join([b'USER', tried_user]),
b' '.join([b'PASS', tried_password])]
fullInput += after_auth_input + [b'QUIT']
response += after_auth_output + [b'+OK ']
return self.runTest(
fullInput,
response,
protocolInstance=DummyPOP3Auth(real_user, real_password))
def _cbTestIllegalPASS(self, ignored, client, dummy):
expectedOutput = (
b'+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n')
self.assertEqual(expectedOutput,
b'\r\n'.join(client.response) + b'\r\n')
dummy.connectionLost(failure.Failure(
Exception("Test harness disconnect")))
def test_emptyPASS(self):
def run_PASS_before_USER(self, password):
"""
L{pop3.POP3} handles a I{PASS} command with a password equal to C{""}
as it would any other value.
Test protocol violation produced by calling PASS before USER.
@type password: L{bytes}
@param password: A password to test.
"""
dummy = DummyPOP3()
client = LineSendingProtocol([
b"PASS ",
b"QUIT"
])
d = loopback.loopbackAsync(dummy, client)
return d.addCallback(self._cbTestEmptyPASS, client, dummy)
def _cbTestEmptyPASS(self, ignored, client, dummy):
expectedOutput = (
b'+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n')
self.assertEqual(expectedOutput,
b'\r\n'.join(client.response) + b'\r\n')
dummy.connectionLost(failure.Failure(
Exception("Test harness disconnect")))
def test_badUTF8CharactersInCommand(self):
"""
Sending a command with invalid UTF-8 characters
will raise a L{pop3.POP3Error}.
"""
error = str(b'not authenticated yet: cannot do \x81PASS')
if not isinstance(error, bytes):
error = error.encode("utf-8")
d = self.runTest(
[b'\x81PASS',
return self.runTest(
[b' '.join([b'PASS', password]),
b'QUIT'],
[b'+OK <moshez>',
b"-ERR bad protocol or server: POP3Error: " +
error,
b'-ERR USER required before PASS',
b'+OK '])
errors = self.flushLoggedErrors(pop3.POP3Error)
self.assertEqual(len(errors), 1)
return d
def test_illegal_PASS_before_USER(self):
"""
Test PASS before USER with a wrong password.
"""
return self.run_PASS_before_USER(b'fooz')
def test_empty_PASS_before_USER(self):
"""
Test PASS before USER with an empty password.
"""
return self.run_PASS_before_USER(b'')
def test_one_space_PASS_before_USER(self):
"""
Test PASS before USER with an password that is a space.
"""
return self.run_PASS_before_USER(b' ')
def test_space_PASS_before_USER(self):
"""
Test PASS before USER with a password containing a space.
"""
return self.run_PASS_before_USER(b'fooz barz')
def test_multiple_spaces_PASS_before_USER(self):
"""
Test PASS before USER with a password containing multiple spaces.
"""
return self.run_PASS_before_USER(b'fooz barz asdf')
def test_other_whitespace_PASS_before_USER(self):
"""
Test PASS before USER with a password containing tabs and spaces.
"""
return self.run_PASS_before_USER(b'fooz barz\tcrazy@! \t ')
def test_good_PASS(self):
"""
Test PASS with a good password.
"""
return self.run_PASS(b'testuser', b'fooz')
def test_space_PASS(self):
"""
Test PASS with a password containing a space.
"""
return self.run_PASS(b'testuser', b'fooz barz')
def test_multiple_spaces_PASS(self):
"""
Test PASS with a password containing a space.
"""
return self.run_PASS(b'testuser', b'fooz barz asdf')
def test_other_whitespace_PASS(self):
"""
Test PASS with a password containing tabs and spaces.
"""
return self.run_PASS(b'testuser', b'fooz barz\tcrazy@! \t ')
def test_pass_wrong_user(self):
"""
Test PASS with a wrong user.
"""
return self.run_PASS(b'testuser', b'fooz',
tried_user=b'wronguser')
def test_wrong_PASS(self):
"""
Test PASS with a wrong password.
"""
return self.run_PASS(b'testuser', b'fooz',
tried_password=b'barz')
def test_wrong_space_PASS(self):
"""
Test PASS with a password containing a space.
"""
return self.run_PASS(b'testuser', b'fooz barz',
tried_password=b'foozbarz ')
def test_wrong_multiple_spaces_PASS(self):
"""
Test PASS with a password containing a space.
"""
return self.run_PASS(b'testuser', b'fooz barz asdf',
tried_password=b'foozbarz ')
def test_wrong_other_whitespace_PASS(self):
"""
Test PASS with a password containing tabs and spaces.
"""
return self.run_PASS(b'testuser', b'fooz barz\tcrazy@! \t ')
def test_wrong_command(self):
"""
After logging in, test a dummy command that is not defined.
"""
extra_input = [b'DUMMY COMMAND']
extra_output = [b' '.join([b'-ERR bad protocol or server: POP3Error:',
b'Unknown protocol command: DUMMY'])]
return self.run_PASS(b'testuser', b'testpassword',
after_auth_input=extra_input,
after_auth_output=extra_output,
).addCallback(self.flushLoggedErrors,
pop3.POP3Error)

View File

@ -0,0 +1 @@
The POP3 server implemented by twisted.mail.pop3 now accepts passwords that contain spaces.

View File

View File

View File

@ -23,8 +23,8 @@ from twisted.internet.error import ConnectionRefusedError
from twisted.python.failure import Failure
from twisted.internet import error
from twisted.internet import interfaces
from twisted.internet.testing import MemoryReactorClock
from .proto_helpers import MemoryReactorClock
class TLSNegotiation:

View File

@ -551,17 +551,17 @@ class TestLoader(object):
warnings.warn("trial only supports doctesting modules")
return
extraArgs = {}
if sys.version_info > (2, 4):
# Work around Python issue2604: DocTestCase.tearDown clobbers globs
def saveGlobals(test):
"""
Save C{test.globs} and replace it with a copy so that if
necessary, the original will be available for the next test
run.
"""
test._savedGlobals = getattr(test, '_savedGlobals', test.globs)
test.globs = test._savedGlobals.copy()
extraArgs['setUp'] = saveGlobals
# Work around Python issue2604: DocTestCase.tearDown clobbers globs
def saveGlobals(test):
"""
Save C{test.globs} and replace it with a copy so that if
necessary, the original will be available for the next test
run.
"""
test._savedGlobals = getattr(test, '_savedGlobals', test.globs)
test.globs = test._savedGlobals.copy()
extraArgs['setUp'] = saveGlobals
return doctest.DocTestSuite(module, **extraArgs)
def loadAnything(self, thing, recurse=False, parent=None, qualName=None):