# Copyright (C) 2017 Jan Jancar
#
# This file is a part of the Mailman PGP plugin.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""RFC1847 and RFC3156 compliant message wrapped."""
import copy
from email import message_from_string
from email.encoders import encode_7or8bit
from email.iterators import walk
from email.mime.application import MIMEApplication
from email.utils import collapse_rfc2231_value
from mailman.email.message import Message
from pgpy import PGPMessage, PGPSignature
from pgpy.constants import HashAlgorithm, SymmetricKeyAlgorithm
from public import public
from mailman_pgp.pgp.base import BaseWrapper
from mailman_pgp.utils.email import copy_headers
from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob
@public
[docs]class MIMEWrapper(BaseWrapper):
"""PGP/MIME (RFC1847 + RFC3156) compliant wrapper."""
_signature_subtype = 'pgp-signature'
_encryption_subtype = 'pgp-encrypted'
_keys_subtype = 'pgp-keys'
_signed_type = 'application/' + _signature_subtype
_encrypted_type = 'application/' + _encryption_subtype
_keys_type = 'application/' + _keys_subtype
_signed_multipart = 'multipart/signed'
_encrypted_multipart = 'multipart/encrypted'
_signature_preamble = \
'This is an OpenPGP/MIME signed message (RFC 4880 and 3156)'
_encryption_preamble = \
'This is an OpenPGP/MIME encrypted message (RFC 4880 and 3156)'
[docs] def get_payload(self):
yield self.msg.as_string()
def _is_mime(self):
is_multipart = self.msg.is_multipart()
payloads = len(self.msg.get_payload()) if self.msg.get_payload() else 0
return is_multipart and payloads == 2
[docs] def is_signed(self):
"""
Whether the whole message is MIME signed as per RFC3156 section 5.
:return: If the message is MIME signed.
:rtype: bool
"""
if not self._is_mime():
return False
second_type = self.msg.get_payload(1).get_content_type()
protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol',
''))
content_subtype = self.msg.get_content_subtype()
return (second_type == MIMEWrapper._signed_type and
content_subtype == 'signed' and
protocol_param == MIMEWrapper._signed_type)
[docs] def has_signature(self):
return self.is_signed()
[docs] def get_signed(self):
"""
:return:
"""
yield self.msg.get_payload(0).as_string()
[docs] def get_signature(self):
"""
:return:
:rtype: typing.Generator[pgpy.PGPSignature]
"""
try:
sig = PGPSignature.from_blob(
self.msg.get_payload(1).get_payload())
except:
return
yield sig
[docs] def strip_signature(self):
"""
:return:
:rtype: MIMEWrapper
"""
inner = self.msg.get_payload(0)
copy_headers(inner, self.msg, True)
self.msg.set_payload(inner.get_payload())
return self
[docs] def is_encrypted(self):
"""
Whether the whole message is MIME encrypted as per RFC3156 section 4.
:return: If the message is MIME encrypted.
:rtype: bool
"""
if not self._is_mime():
return False
first_part = self.msg.get_payload(0).as_string()
first_type = self.msg.get_payload(0).get_content_type()
second_type = self.msg.get_payload(1).get_content_type()
content_subtype = self.msg.get_content_subtype()
protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol',
''))
return ('Version: 1' in first_part and
first_type == MIMEWrapper._encrypted_type and
second_type == 'application/octet-stream' and
content_subtype == 'encrypted' and
protocol_param == MIMEWrapper._encrypted_type)
[docs] def has_encryption(self):
return self.is_encrypted()
[docs] def get_encrypted(self):
"""
:return:
:rtype: typing.Generator[pgpy.PGPMessage]
"""
try:
msg = PGPMessage.from_blob(self.msg.get_payload(1).get_payload())
except:
return
yield msg
[docs] def is_keys(self):
"""
Whether the message has only keys as per RFC3156 section 7.
:return: If the message is keys.
:rtype: bool
"""
for part in walk(self.msg):
if (not part.is_multipart() # noqa
and part.get_content_type() != MIMEWrapper._keys_type):
return False
return True
[docs] def has_keys(self):
"""
Whether the message contains keys as per RFC3156 section 7.
:return: If the message contains keys.
:rtype: bool
"""
for part in walk(self.msg):
if (not part.is_multipart() # noqa
and part.get_content_type() == MIMEWrapper._keys_type):
return True
return False
[docs] def keys(self):
"""
Get the collection of keys in this message.
:return: A collection of keys.
"""
for part in walk(self.msg):
if (not part.is_multipart() # noqa
and part.get_content_type() == MIMEWrapper._keys_type):
try:
key = key_from_blob(part.get_payload())
except:
continue
yield key
def _attach_key_part(self, obj, name, description):
key_part = MIMEApplication(_data=str(obj),
_subtype=MIMEWrapper._keys_subtype,
_encoder=encode_7or8bit,
name=name)
key_part.add_header('Content-Description', description)
key_part.add_header('Content-Disposition', 'attachment',
filename=name)
self.msg.attach(key_part)
[docs] def attach_keys(self, *keys):
"""
Attach a key to this message, as per RFC3156 section 7.
:param keys: Keys to attach.
:type keys: pgpy.PGPKey
:return:
:rtype: MIMEWrapper
"""
if len(keys) == 0:
return self
if self.msg.get_content_type() != 'multipart/mixed':
# wrap in multipart/mixed
payload = copy.deepcopy(self.msg)
self.msg.set_payload([])
self.msg.set_type('multipart/mixed')
self.msg['MIME-Version'] = '1.0'
self.msg.attach(payload)
for key in keys:
filename = '0x' + key.fingerprint.keyid + '.asc'
self._attach_key_part(key, filename, 'OpenPGP key')
return self
def _is_revoc(self, part):
if part.get_content_type() != MIMEWrapper._keys_type:
return False
try:
revoc_from_blob(part.get_payload())
except ValueError:
return False
return True
[docs] def is_revocs(self):
for part in walk(self.msg):
if (not part.is_multipart() and not self._is_revoc(part)):
return False
return True
[docs] def has_revocs(self):
for part in walk(self.msg):
if (not part.is_multipart() and self._is_revoc(part)):
return True
return False
[docs] def revocs(self):
for part in walk(self.msg):
if (not part.is_multipart() # noqa
and part.get_content_type() == MIMEWrapper._keys_type):
try:
revoc = revoc_from_blob(part.get_payload())
except:
continue
yield revoc
[docs] def attach_revocs(self, *key_revocations):
"""
Attach a key revocation signature to the message, as a key subpart.
:param key_revocations: A key revocation signature to attach.
:type key_revocations: pgpy.PGPSignature
:return:
:rtype: MIMEWrapper
"""
if len(key_revocations) == 0:
return self
if self.msg.get_content_type() != 'multipart/mixed':
# wrap in multipart/mixed
payload = copy.deepcopy(self.msg)
self.msg.set_payload([])
self.msg.set_type('multipart/mixed')
self.msg['MIME-Version'] = '1.0'
self.msg.attach(payload)
for key_revocation in key_revocations:
filename = '0x' + key_revocation.signer + '.asc'
self._attach_key_part(key_revocation, filename,
'OpenPGP key revocation')
return self
[docs] def verify(self, key):
"""
Verify the signature of this message with key.
:param key: The key to verify with.
:type key: pgpy.PGPKey
:return: The verified signature.
:rtype: Generator[pgpy.types.SignatureVerification]
"""
clear_text = next(iter(self.get_signed()))
signature = next(iter(self.get_signature()))
try:
verification = key.verify(clear_text, signature)
except:
return
yield verification
def _micalg(self, hash_algo):
algs = {
HashAlgorithm.MD5: 'md5',
HashAlgorithm.SHA1: 'sha1',
HashAlgorithm.RIPEMD160: 'ripemd160',
HashAlgorithm.SHA256: 'sha256',
HashAlgorithm.SHA384: 'sha384',
HashAlgorithm.SHA512: 'sha512',
HashAlgorithm.SHA224: 'sha224'
}
return 'pgp-' + algs[hash_algo]
def _wrap_signed(self, msg, signature):
"""
As per RFC1847 and RFC3156.
:param msg:
:param signature:
"""
self.msg.set_payload([])
self.msg.attach(msg)
self.msg.set_type(MIMEWrapper._signed_multipart)
self.msg.set_param('micalg', self._micalg(signature.hash_algorithm))
self.msg.set_param('protocol', MIMEWrapper._signed_type)
self.msg.preamble = MIMEWrapper._signature_preamble
second_part = MIMEApplication(_data=str(signature),
_subtype=MIMEWrapper._signature_subtype,
_encoder=encode_7or8bit,
name='signature.asc')
second_part.add_header('Content-Description',
'OpenPGP digital signature')
second_part.add_header('Content-Disposition', 'attachment',
filename='signature.asc')
self.msg.attach(second_part)
[docs] def sign(self, key, **kwargs):
"""
Sign a message with key.
:param key: The key to sign with.
:type key: pgpy.PGPKey
:rtype: MIMEWrapper
"""
payload = next(iter(self.get_payload()))
signature = key.sign(payload, **kwargs)
original_msg = copy.deepcopy(self.msg)
self._wrap_signed(original_msg, signature)
return self
[docs] def decrypt(self, key):
"""
Decrypt this message with key.
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
:return: The decrypted message.
:rtype: mailman.email.message.Message
"""
pmsg = next(iter(self.get_encrypted()))
decrypted = key.decrypt(pmsg)
dmsg = decrypted.message
if isinstance(dmsg, bytearray):
dmsg = dmsg.decode(decrypted.charset or 'utf-8')
out = message_from_string(dmsg, _class=Message)
if decrypted.is_signed:
# rewrap, self.msg should be multipart/signed,
# headers from out should overwrite those from self.msg
# self.msg payload should be [out, sig]
signature = next(iter(decrypted.signatures))
self._wrap_signed(out, signature)
else:
# self.msg payload should be out.get_payload
# headers from out should overwrite those from self.msg
self.msg.set_payload(out.get_payload())
copy_headers(out, self.msg, True)
return self
def _encrypt(self, pmsg, *keys, cipher, **kwargs):
emsg = copy.copy(pmsg)
if len(keys) == 1:
emsg = keys[0].encrypt(emsg, cipher=cipher, **kwargs)
else:
session_key = cipher.gen_key()
for key in keys:
emsg = key.encrypt(emsg, cipher=cipher,
sessionkey=session_key,
**kwargs)
del session_key
return emsg
def _wrap_encrypted(self, payload):
self.msg.set_payload([])
self.msg.set_type(MIMEWrapper._encrypted_multipart)
self.msg.set_param('protocol', MIMEWrapper._encrypted_type)
self.msg.preamble = MIMEWrapper._encryption_preamble
first_part = MIMEApplication(_data='Version: 1',
_subtype=MIMEWrapper._encryption_subtype,
_encoder=encode_7or8bit)
first_part.add_header('Content-Description',
'PGP/MIME version identification')
self.msg.attach(first_part)
second_part = MIMEApplication(_data=str(payload),
_subtype='octet-stream',
_encoder=encode_7or8bit,
name='encrypted.asc')
second_part.add_header('Content-Description',
'OpenPGP encrypted message')
second_part.add_header('Content-Disposition', 'inline',
filename='encrypted.asc')
self.msg.attach(second_part)
[docs] def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256,
**kwargs):
"""
Encrypt the message with key/s, using cipher.
:param keys: The key/s to encrypt with.
:type keys: pgpy.PGPKey
:param cipher: The symmetric cipher to use.
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
:return:
:rtype: MIMEWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
if self.is_signed():
# self.msg payload should be [ version_1, encrypted]
# headers should remain the same, except Content-Type
# signature should be combined into the PGP blob
pmsg = PGPMessage.new(next(iter(self.get_signed())))
pmsg |= next(iter(self.get_signature()))
else:
# self.msg payload should be [ version_1, encrypted]
# headers should remain the same, except Content-Type
pmsg = PGPMessage.new(next(iter(self.get_payload())))
pmsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
self._wrap_encrypted(pmsg)
return self
[docs] def sign_encrypt(self, key, *keys, hash=None,
cipher=SymmetricKeyAlgorithm.AES256,
**kwargs):
"""
Sign and encrypt the message, in one go.
This is as per RFC 3156 section 6.2 - Combined method.
:param key: The key to sign with.
:type key: pgpy.PGPKey
:param keys: The key/s to encrypt with.
:type keys: pgpy.PGPKey
:param hash:
:type hash: pgpy.constants.HashAlgorithm
:param cipher:
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
:return:
:rtype: MIMEWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
payload = next(iter(self.get_payload()))
pmsg = PGPMessage.new(payload)
pmsg |= key.sign(pmsg, hash=hash)
pmsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
self._wrap_encrypted(pmsg)
return self