Source code for mailman_pgp.runners.incoming
# Copyright (C) 2017 Jan Jancar
#
# This file is a part of the Mailman PGP plugin.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""The encryption-aware incoming runner."""
import logging
from mailman.config import config as mailman_config
from mailman.core.runner import Runner
from mailman.email.message import Message
from mailman.interfaces.action import Action
from mailman.model.mailinglist import MailingList
from pgpy.errors import PGPError
from public import public
from mailman_pgp.config import config
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.utils.moderation import record_action
log = logging.getLogger('mailman.plugin.pgp')
def _pass_default(msg, msgdata, listid):
inq = config.get_value('queues', 'in')
mailman_config.switchboards[inq].enqueue(msg, msgdata,
listid=listid)
@public
[docs]class PGPIncomingRunner(Runner):
def _dispose(self, mlist: MailingList, msg: Message, msgdata: dict):
"""See `IRunner`."""
# Is the message for an encrypted mailing list? If not, pass to default
# incoming runner. If yes, go on.
pgp_list = PGPMailingList.for_list(mlist)
if not pgp_list:
_pass_default(msg, msgdata, mlist.list_id)
return False
wrapped = PGPWrapper(msg)
# Is the message encrypted?
if wrapped.is_encrypted():
# Decrypt it and pass it on.
list_key = pgp_list.key
if list_key is None:
# keep the message and hope the key becomes available.
return True
try:
msg = wrapped.decrypt(list_key).msg
except PGPError:
reason = 'Message could not be decrypted.'
log.info('[pgp] {}{}: {}'.format(
Action.reject.name, msg.get('message-id', 'n/a'),
reason))
record_action(msg, msgdata, Action.reject, msg.sender,
reason)
msgdata['pgp_moderate'] = True
else:
# Take the `nonencrypted_msg_action`
# just set some data for our `encryption` rule which will
# jump to the moderation chain if `pgp_moderate` is True
action = pgp_list.nonencrypted_msg_action
if action != Action.defer:
reason = 'Message was not encrypted.'
log.info('[pgp] {}{}: {}'.format(
action.name, msg.get('message-id', 'n/a'), reason))
record_action(msg, msgdata, action, msg.sender, reason)
msgdata['pgp_moderate'] = True
_pass_default(msg, msgdata, mlist.list_id)
return False