# Copyright (C) 2017 Jan Jancar
#
# This file is a part of the Mailman PGP plugin.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
import unittest
from contextlib import suppress
from unittest.mock import patch
from mailman.app.lifecycle import create_list
from mailman.interfaces.pending import IPendings
from mailman.interfaces.usermanager import IUserManager
from mailman.interfaces.workflows import IWorkflow
from mailman.testing.helpers import get_queue_messages
from mailman.workflows.common import SubscriptionBase
from zope.component import getUtility
from zope.interface import implementer
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
from mailman_pgp.workflows.base import (PGPMixin)
from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
from mailman_pgp.workflows.key_set import KEY_REQUEST, SetPubkeyMixin
[docs]class PubkeyMixinTestSetup():
[docs] def setUp(self):
with mm_transaction():
self.mlist = create_list('test@example.com',
style_name='pgp-default')
self.list_key = load_key('ecc_p256.priv.asc')
with transaction():
self.pgp_list = PGPMailingList.for_list(self.mlist)
self.pgp_list.key = self.list_key
self.um = getUtility(IUserManager)
self.sender_key = load_key('rsa_1024.priv.asc')
self.sender = self.um.create_address('anne@example.org')
@implementer(IWorkflow)
[docs]class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin,
ConfirmPubkeyMixin):
name = 'test-workflow'
description = ''
initial_state = 'prepare'
save_attributes = (
'pubkey_key',
'pubkey_confirmed',
'address_key',
'subscriber_key',
'user_key',
'token_owner_key'
)
def __init__(self, mlist, subscriber=None, *, pubkey=None,
pubkey_pre_confirmed=False):
SubscriptionBase.__init__(self, mlist, subscriber)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
PGPMixin.__init__(self, mlist)
def _step_prepare(self):
self.push('do_subscription')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
self.push('create_address')
self.push('sanity_checks')
[docs]class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
[docs] def test_create_address(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
workflow.run_thru('create_address')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
[docs] def test_address_existing(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
t.add(pgp_address)
workflow.run_thru('create_address')
still = PGPAddress.for_address(self.sender)
self.assertIsNotNone(still)
[docs]class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
[docs] def test_key_request_sent(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
list(workflow)
items = get_queue_messages('virgin', expected_count=1)
message = items[0].msg
token = workflow.token
self.assertEqual(message['Subject'], 'key set {}'.format(token))
self.assertEqual(message.get_payload(), KEY_REQUEST)
[docs] def test_receive_key(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
list(workflow)
with transaction():
pgp_address = PGPAddress.for_address(self.sender)
pgp_address.key = self.sender_key.pubkey
receive_workflow = PGPTestWorkflow(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key')
[docs] def test_set_pubkey(self):
workflow = PGPTestWorkflow(self.mlist, self.sender,
pubkey=self.sender_key.pubkey)
workflow.run_thru('pubkey_checks')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
self.assertIsNotNone(pgp_address.key)
self.assertEqual(pgp_address.key_fingerprint,
self.sender_key.fingerprint)
[docs] def test_pubkey_set(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
pgp_address.key = self.sender_key.pubkey
t.add(pgp_address)
workflow.run_thru('pubkey_checks')
self.assertEqual(pgp_address.key_fingerprint,
self.sender_key.fingerprint)
[docs]class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
[docs] def test_key_request_pubkey_set(self):
workflow = PGPTestWorkflow(self.mlist, self.sender,
pubkey=self.sender_key.pubkey,
pubkey_pre_confirmed=True)
workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
[docs] def test_send_key_confirm_request(self):
workflow = PGPTestWorkflow(self.mlist, self.sender,
pubkey=self.sender_key.pubkey,
pubkey_pre_confirmed=False)
list(workflow)
items = get_queue_messages('virgin', expected_count=1)
message = items[0].msg
token = workflow.token
self.assertEqual(message['Subject'], 'key confirm {}'.format(token))
wrapped = PGPWrapper(message)
self.assertTrue(wrapped.is_encrypted())
[docs] def test_receive_confirmation(self):
workflow = PGPTestWorkflow(self.mlist, self.sender,
pubkey=self.sender_key.pubkey,
pubkey_pre_confirmed=False)
list(workflow)
receive_workflow = PGPTestWorkflow(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key_confirmation')
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
[docs]class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
[docs] def test_pended_data_key_request(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
with suppress(StopIteration):
workflow.run_thru('send_key_request')
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
[docs] def test_pended_data_key_confirmation(self):
workflow = PGPTestWorkflow(self.mlist, self.sender,
pubkey=self.sender_key.pubkey)
with suppress(StopIteration):
workflow.run_thru('send_key_confirm_request')
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
[docs] def test_exisitng_pgp_address(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
pgp_address.key = self.sender_key.pubkey
pgp_address.key_confirmed = True
t.add(pgp_address)
workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
[docs] def test_exisitng_pgp_address_not_confirmed(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
pgp_address.key = self.sender_key.pubkey
t.add(pgp_address)
workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_send_key_confirm_request') as step:
next(workflow)
step.assert_called_once_with()
[docs] def test_exisitng_pgp_address_no_key(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
t.add(pgp_address)
workflow.run_thru('pubkey_checks')
with patch.object(workflow, '_step_send_key_request') as step:
next(workflow)
step.assert_called_once_with()