Merge branch 'trunk' into patch-3
This commit is contained in:
commit
50cf8b3261
|
@ -575,8 +575,9 @@ class POP3(basic.LineOnlyReceiver, policies.TimeoutMixin):
|
|||
return self.processCommand(*line.split(b' '))
|
||||
except (ValueError, AttributeError, POP3Error, TypeError) as e:
|
||||
log.err()
|
||||
self.failResponse('bad protocol or server: {}: {}'.format(
|
||||
e.__class__.__name__, e))
|
||||
self.failResponse(b': '.join([b'bad protocol or server',
|
||||
e.__class__.__name__.encode('utf-8'),
|
||||
b''.join(e.args)]))
|
||||
|
||||
|
||||
def processCommand(self, command, *args):
|
||||
|
@ -862,7 +863,7 @@ class POP3(basic.LineOnlyReceiver, policies.TimeoutMixin):
|
|||
self.successResponse(b'USER accepted, send PASS')
|
||||
|
||||
|
||||
def do_PASS(self, password):
|
||||
def do_PASS(self, password, *words):
|
||||
"""
|
||||
Handle a PASS command.
|
||||
|
||||
|
@ -874,15 +875,19 @@ class POP3(basic.LineOnlyReceiver, policies.TimeoutMixin):
|
|||
|
||||
@type password: L{bytes}
|
||||
@param password: A password.
|
||||
|
||||
@type words: L{tuple} of L{bytes}
|
||||
@param words: Other parts of the password split by spaces.
|
||||
"""
|
||||
if self._userIs is None:
|
||||
self.failResponse(b"USER required before PASS")
|
||||
return
|
||||
user = self._userIs
|
||||
self._userIs = None
|
||||
password = b' '.join((password,) + words)
|
||||
d = defer.maybeDeferred(self.authenticateUserPASS, user, password)
|
||||
d.addCallbacks(self._cbMailbox, self._ebMailbox, callbackArgs=(user,)
|
||||
).addErrback(self._ebUnexpected)
|
||||
d.addCallbacks(self._cbMailbox, self._ebMailbox,
|
||||
callbackArgs=(user,)).addErrback(self._ebUnexpected)
|
||||
|
||||
|
||||
def _longOperation(self, d):
|
||||
|
|
|
@ -431,6 +431,18 @@ class DummyPOP3(pop3.POP3):
|
|||
|
||||
|
||||
|
||||
class DummyPOP3Auth(DummyPOP3):
|
||||
"""
|
||||
Class to test successful authentication in twisted.mail.pop3.POP3.
|
||||
"""
|
||||
def __init__(self, user, password):
|
||||
self.portal = cred.portal.Portal(TestRealm())
|
||||
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
||||
ch.addUser(user, password)
|
||||
self.portal.registerChecker(ch)
|
||||
|
||||
|
||||
|
||||
class DummyMailbox(pop3.Mailbox):
|
||||
"""
|
||||
An in-memory L{pop3.IMailbox} implementation.
|
||||
|
@ -499,7 +511,7 @@ class AnotherPOP3Tests(unittest.TestCase):
|
|||
"""
|
||||
Additional L{pop3.POP3} tests.
|
||||
"""
|
||||
def runTest(self, lines, expectedOutput):
|
||||
def runTest(self, lines, expectedOutput, protocolInstance=None):
|
||||
"""
|
||||
Assert that when C{lines} are delivered to L{pop3.POP3} it responds
|
||||
with C{expectedOutput}.
|
||||
|
@ -510,10 +522,13 @@ class AnotherPOP3Tests(unittest.TestCase):
|
|||
@param expectedOutput: A sequence of L{bytes} representing the
|
||||
expected response from the server.
|
||||
|
||||
@param protocolInstance: Instance of L{twisted.mail.pop3.POP3} or
|
||||
L{None}. If L{None}, a new DummyPOP3 will be used.
|
||||
|
||||
@return: A L{Deferred} that fires when the lines have been delivered
|
||||
and the output checked.
|
||||
"""
|
||||
dummy = DummyPOP3()
|
||||
dummy = protocolInstance if protocolInstance else DummyPOP3()
|
||||
client = LineSendingProtocol(lines)
|
||||
d = loopback.loopbackAsync(dummy, client)
|
||||
return d.addCallback(self._cbRunTest, client, dummy, expectedOutput)
|
||||
|
@ -579,6 +594,24 @@ class AnotherPOP3Tests(unittest.TestCase):
|
|||
b'+OK '])
|
||||
|
||||
|
||||
def test_badUTF8CharactersInCommand(self):
|
||||
"""
|
||||
Sending a command with invalid UTF-8 characters
|
||||
will raise a L{pop3.POP3Error}.
|
||||
"""
|
||||
error = b'not authenticated yet: cannot do \x81PASS'
|
||||
d = self.runTest(
|
||||
[b'\x81PASS',
|
||||
b'QUIT'],
|
||||
[b'+OK <moshez>',
|
||||
b"-ERR bad protocol or server: POP3Error: " +
|
||||
error,
|
||||
b'+OK '])
|
||||
errors = self.flushLoggedErrors(pop3.POP3Error)
|
||||
self.assertEqual(len(errors), 1)
|
||||
return d
|
||||
|
||||
|
||||
def test_authListing(self):
|
||||
"""
|
||||
L{pop3.POP3} responds to an I{AUTH} command with a list of supported
|
||||
|
@ -604,70 +637,195 @@ class AnotherPOP3Tests(unittest.TestCase):
|
|||
self.assertEqual(client.response[5], b".")
|
||||
|
||||
|
||||
def test_illegalPASS(self):
|
||||
def run_PASS(self, real_user, real_password,
|
||||
tried_user=None, tried_password=None,
|
||||
after_auth_input=[], after_auth_output=[]):
|
||||
"""
|
||||
L{pop3.POP3} handles a I{PASS} command before a I{USER} command with an
|
||||
error indicating the out-of-sequence operation.
|
||||
Test a login with PASS.
|
||||
|
||||
If L{real_user} matches L{tried_user} and L{real_password} matches
|
||||
L{tried_password}, a successful login will be expected.
|
||||
Otherwise an unsuccessful login will be expected.
|
||||
|
||||
@type real_user: L{bytes}
|
||||
@param real_user: The user to test.
|
||||
|
||||
@type real_password: L{bytes}
|
||||
@param real_password: The password of the test user.
|
||||
|
||||
@type tried_user: L{bytes} or L{None}
|
||||
@param tried_user: The user to call USER with.
|
||||
If None, real_user will be used.
|
||||
|
||||
@type tried_password: L{bytes} or L{None}
|
||||
@param tried_password: The password to call PASS with.
|
||||
If None, real_password will be used.
|
||||
|
||||
@type after_auth_input: L{list} of l{bytes}
|
||||
@param after_auth_input: Extra protocol input after authentication.
|
||||
|
||||
@type after_auth_output: L{list} of l{bytes}
|
||||
@param after_auth_output: Extra protocol output after authentication.
|
||||
"""
|
||||
dummy = DummyPOP3()
|
||||
client = LineSendingProtocol([
|
||||
b"PASS fooz",
|
||||
b"QUIT"
|
||||
])
|
||||
d = loopback.loopbackAsync(dummy, client)
|
||||
return d.addCallback(self._cbTestIllegalPASS, client, dummy)
|
||||
if not tried_user:
|
||||
tried_user = real_user
|
||||
if not tried_password:
|
||||
tried_password = real_password
|
||||
response = [b'+OK <moshez>',
|
||||
b'+OK USER accepted, send PASS',
|
||||
b'-ERR Authentication failed']
|
||||
if real_user == tried_user and real_password == tried_password:
|
||||
response = [b'+OK <moshez>',
|
||||
b'+OK USER accepted, send PASS',
|
||||
b'+OK Authentication succeeded']
|
||||
fullInput = [b' '.join([b'USER', tried_user]),
|
||||
b' '.join([b'PASS', tried_password])]
|
||||
|
||||
fullInput += after_auth_input + [b'QUIT']
|
||||
response += after_auth_output + [b'+OK ']
|
||||
|
||||
return self.runTest(
|
||||
fullInput,
|
||||
response,
|
||||
protocolInstance=DummyPOP3Auth(real_user, real_password))
|
||||
|
||||
|
||||
def _cbTestIllegalPASS(self, ignored, client, dummy):
|
||||
expectedOutput = (
|
||||
b'+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n')
|
||||
self.assertEqual(expectedOutput,
|
||||
b'\r\n'.join(client.response) + b'\r\n')
|
||||
dummy.connectionLost(failure.Failure(
|
||||
Exception("Test harness disconnect")))
|
||||
|
||||
|
||||
def test_emptyPASS(self):
|
||||
def run_PASS_before_USER(self, password):
|
||||
"""
|
||||
L{pop3.POP3} handles a I{PASS} command with a password equal to C{""}
|
||||
as it would any other value.
|
||||
Test protocol violation produced by calling PASS before USER.
|
||||
@type password: L{bytes}
|
||||
@param password: A password to test.
|
||||
"""
|
||||
dummy = DummyPOP3()
|
||||
client = LineSendingProtocol([
|
||||
b"PASS ",
|
||||
b"QUIT"
|
||||
])
|
||||
d = loopback.loopbackAsync(dummy, client)
|
||||
return d.addCallback(self._cbTestEmptyPASS, client, dummy)
|
||||
|
||||
|
||||
def _cbTestEmptyPASS(self, ignored, client, dummy):
|
||||
expectedOutput = (
|
||||
b'+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n')
|
||||
self.assertEqual(expectedOutput,
|
||||
b'\r\n'.join(client.response) + b'\r\n')
|
||||
dummy.connectionLost(failure.Failure(
|
||||
Exception("Test harness disconnect")))
|
||||
|
||||
|
||||
def test_badUTF8CharactersInCommand(self):
|
||||
"""
|
||||
Sending a command with invalid UTF-8 characters
|
||||
will raise a L{pop3.POP3Error}.
|
||||
"""
|
||||
error = str(b'not authenticated yet: cannot do \x81PASS')
|
||||
if not isinstance(error, bytes):
|
||||
error = error.encode("utf-8")
|
||||
d = self.runTest(
|
||||
[b'\x81PASS',
|
||||
return self.runTest(
|
||||
[b' '.join([b'PASS', password]),
|
||||
b'QUIT'],
|
||||
[b'+OK <moshez>',
|
||||
b"-ERR bad protocol or server: POP3Error: " +
|
||||
error,
|
||||
b'-ERR USER required before PASS',
|
||||
b'+OK '])
|
||||
errors = self.flushLoggedErrors(pop3.POP3Error)
|
||||
self.assertEqual(len(errors), 1)
|
||||
return d
|
||||
|
||||
|
||||
def test_illegal_PASS_before_USER(self):
|
||||
"""
|
||||
Test PASS before USER with a wrong password.
|
||||
"""
|
||||
return self.run_PASS_before_USER(b'fooz')
|
||||
|
||||
|
||||
def test_empty_PASS_before_USER(self):
|
||||
"""
|
||||
Test PASS before USER with an empty password.
|
||||
"""
|
||||
return self.run_PASS_before_USER(b'')
|
||||
|
||||
|
||||
def test_one_space_PASS_before_USER(self):
|
||||
"""
|
||||
Test PASS before USER with an password that is a space.
|
||||
"""
|
||||
return self.run_PASS_before_USER(b' ')
|
||||
|
||||
|
||||
def test_space_PASS_before_USER(self):
|
||||
"""
|
||||
Test PASS before USER with a password containing a space.
|
||||
"""
|
||||
return self.run_PASS_before_USER(b'fooz barz')
|
||||
|
||||
|
||||
def test_multiple_spaces_PASS_before_USER(self):
|
||||
"""
|
||||
Test PASS before USER with a password containing multiple spaces.
|
||||
"""
|
||||
return self.run_PASS_before_USER(b'fooz barz asdf')
|
||||
|
||||
|
||||
def test_other_whitespace_PASS_before_USER(self):
|
||||
"""
|
||||
Test PASS before USER with a password containing tabs and spaces.
|
||||
"""
|
||||
return self.run_PASS_before_USER(b'fooz barz\tcrazy@! \t ')
|
||||
|
||||
|
||||
def test_good_PASS(self):
|
||||
"""
|
||||
Test PASS with a good password.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz')
|
||||
|
||||
|
||||
def test_space_PASS(self):
|
||||
"""
|
||||
Test PASS with a password containing a space.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz barz')
|
||||
|
||||
|
||||
def test_multiple_spaces_PASS(self):
|
||||
"""
|
||||
Test PASS with a password containing a space.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz barz asdf')
|
||||
|
||||
|
||||
def test_other_whitespace_PASS(self):
|
||||
"""
|
||||
Test PASS with a password containing tabs and spaces.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz barz\tcrazy@! \t ')
|
||||
|
||||
|
||||
def test_pass_wrong_user(self):
|
||||
"""
|
||||
Test PASS with a wrong user.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz',
|
||||
tried_user=b'wronguser')
|
||||
|
||||
|
||||
def test_wrong_PASS(self):
|
||||
"""
|
||||
Test PASS with a wrong password.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz',
|
||||
tried_password=b'barz')
|
||||
|
||||
|
||||
def test_wrong_space_PASS(self):
|
||||
"""
|
||||
Test PASS with a password containing a space.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz barz',
|
||||
tried_password=b'foozbarz ')
|
||||
|
||||
|
||||
def test_wrong_multiple_spaces_PASS(self):
|
||||
"""
|
||||
Test PASS with a password containing a space.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz barz asdf',
|
||||
tried_password=b'foozbarz ')
|
||||
|
||||
|
||||
def test_wrong_other_whitespace_PASS(self):
|
||||
"""
|
||||
Test PASS with a password containing tabs and spaces.
|
||||
"""
|
||||
return self.run_PASS(b'testuser', b'fooz barz\tcrazy@! \t ')
|
||||
|
||||
|
||||
def test_wrong_command(self):
|
||||
"""
|
||||
After logging in, test a dummy command that is not defined.
|
||||
"""
|
||||
extra_input = [b'DUMMY COMMAND']
|
||||
extra_output = [b' '.join([b'-ERR bad protocol or server: POP3Error:',
|
||||
b'Unknown protocol command: DUMMY'])]
|
||||
|
||||
return self.run_PASS(b'testuser', b'testpassword',
|
||||
after_auth_input=extra_input,
|
||||
after_auth_output=extra_output,
|
||||
).addCallback(self.flushLoggedErrors,
|
||||
pop3.POP3Error)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
The POP3 server implemented by twisted.mail.pop3 now accepts passwords that contain spaces.
|
|
@ -23,8 +23,8 @@ from twisted.internet.error import ConnectionRefusedError
|
|||
from twisted.python.failure import Failure
|
||||
from twisted.internet import error
|
||||
from twisted.internet import interfaces
|
||||
from twisted.internet.testing import MemoryReactorClock
|
||||
|
||||
from .proto_helpers import MemoryReactorClock
|
||||
|
||||
|
||||
class TLSNegotiation:
|
||||
|
|
|
@ -551,17 +551,17 @@ class TestLoader(object):
|
|||
warnings.warn("trial only supports doctesting modules")
|
||||
return
|
||||
extraArgs = {}
|
||||
if sys.version_info > (2, 4):
|
||||
# Work around Python issue2604: DocTestCase.tearDown clobbers globs
|
||||
def saveGlobals(test):
|
||||
"""
|
||||
Save C{test.globs} and replace it with a copy so that if
|
||||
necessary, the original will be available for the next test
|
||||
run.
|
||||
"""
|
||||
test._savedGlobals = getattr(test, '_savedGlobals', test.globs)
|
||||
test.globs = test._savedGlobals.copy()
|
||||
extraArgs['setUp'] = saveGlobals
|
||||
|
||||
# Work around Python issue2604: DocTestCase.tearDown clobbers globs
|
||||
def saveGlobals(test):
|
||||
"""
|
||||
Save C{test.globs} and replace it with a copy so that if
|
||||
necessary, the original will be available for the next test
|
||||
run.
|
||||
"""
|
||||
test._savedGlobals = getattr(test, '_savedGlobals', test.globs)
|
||||
test.globs = test._savedGlobals.copy()
|
||||
extraArgs['setUp'] = saveGlobals
|
||||
return doctest.DocTestSuite(module, **extraArgs)
|
||||
|
||||
def loadAnything(self, thing, recurse=False, parent=None, qualName=None):
|
||||
|
|
Loading…
Reference in New Issue