Source code for mailman_pgp.rules.signature

# Copyright (C) 2017 Jan Jancar
#
# This file is a part of the Mailman PGP plugin.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.

"""Signature checking rule for the pgp-posting-chain."""
from operator import attrgetter

from mailman.core.i18n import _
from mailman.interfaces.action import Action
from mailman.interfaces.chain import AcceptEvent
from mailman.interfaces.rules import IRule
from mailman.interfaces.usermanager import IUserManager
from public import public
from zope.component import getUtility
from zope.event import classhandler
from zope.interface import implementer

from mailman_pgp.config import config
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.model.sighash import PGPSigHash
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.utils.email import get_email
from mailman_pgp.utils.moderation import record_action
from mailman_pgp.utils.pgp import expired, hashes, revoked, verifies


@public
@implementer(IRule)
[docs]class Signature: """The signature checking rule.""" name = 'pgp-signature' description = _( 'A rule which enforces PGP enabled list signature configuration.') record = False
[docs] def check(self, mlist, msg, msgdata): """See `IRule`.""" # Find the `PGPMailingList` this is for. pgp_list = PGPMailingList.for_list(mlist) if pgp_list is None: return False email = get_email(msg) # Wrap the message to work with it. wrapped = PGPWrapper(msg) # Take unsigned_msg_action if unsigned. if not wrapped.is_signed(): action = pgp_list.unsigned_msg_action if action != Action.defer: record_action(msg, msgdata, action, email, 'The message is unsigned.') return True # Take `inline_pgp_action` if inline signed. if wrapped.inline.is_signed(): action = pgp_list.inline_pgp_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Inline PGP is not allowed.') return True # Lookup the address by sender, and its corresponding `PGPAddress`. user_manager = getUtility(IUserManager) address = user_manager.get_address(email) pgp_address = PGPAddress.for_address(address) if pgp_address is None: # Just let it continue. return False # See if we have a key. key = pgp_address.key if key is None: record_action(msg, msgdata, Action.reject, email, 'No key set for address {}.'.format(email)) return True if not pgp_address.key_confirmed: record_action(msg, msgdata, Action.reject, email, 'Key not confirmed.') return True verifications = list(wrapped.verify(key)) # verifications is a list of SignatureVerification, only contains # sigs that appear to be by the pgp_address.key if expired(verifications): action = pgp_list.expired_sig_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Signature is expired.') return True if revoked(verifications): action = pgp_list.revoked_sig_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Signature is made by a revoked key.') return True # Take the `invalid_sig_action` if the verification failed. if not verifies(verifications): action = pgp_list.invalid_sig_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Signature did not verify.') return True sig_hashes = set(hashes(verifications)) duplicates = set(PGPSigHash.hashes(sig_hashes)) if duplicates: fingerprints = map(attrgetter('fingerprint'), duplicates) if key.fingerprint in fingerprints: action = pgp_list.duplicate_sig_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Signature duplicate.') return True msgdata['pgp_sig_hashes'] = sig_hashes return False
@classhandler.handler(AcceptEvent) def on_message_posting(event): """ Add sig hashes to sighash table. :param event: :type event: AcceptEvent """ if not config.get_value('misc', 'collect_sig_hashes'): return pgp_list = PGPMailingList.for_list(event.mlist) if pgp_list is None: return address = getUtility(IUserManager).get_address(get_email(event.msg)) pgp_address = PGPAddress.for_address(address) if pgp_address is None or pgp_address.key_fingerprint is None: return for sig_hash in event.msgdata['pgp_sig_hashes']: with transaction() as t: pgp_hash = PGPSigHash(hash=sig_hash, fingerprint=pgp_address.key_fingerprint) t.add(pgp_hash)