# Copyright (C) 2017 Jan Jancar
#
# This file is a part of the Mailman PGP plugin.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Strict inline PGP message wrapper."""
import copy
from email.iterators import walk
from email.mime.text import MIMEText
from pgpy import PGPMessage
from pgpy.constants import SymmetricKeyAlgorithm
from public import public
from mailman_pgp.pgp.base import BaseWrapper
from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob
@public
[docs]class InlineWrapper(BaseWrapper):
"""Inline PGP wrapper."""
[docs] def get_payload(self):
for part in walk(self.msg):
if not part.is_multipart():
yield part.get_payload()
def _walk(self, walk_fn, *args, **kwargs):
for part in walk(self.msg):
if not part.is_multipart():
yield walk_fn(part, *args, **kwargs)
def _is_signed(self, part):
try:
msg = PGPMessage.from_blob(part.get_payload())
return msg.is_signed
except:
pass
return False
[docs] def is_signed(self):
"""
Whether the message is inline signed.
:return: If the message is inline signed.
:rtype: bool
"""
return all(self._walk(self._is_signed))
[docs] def has_signature(self):
"""
Whether some parts of the message are inline signed.
:return: If some parts of the message are inline signed.
:rtype: bool
"""
return any(self._walk(self._is_signed))
[docs] def get_signed(self):
"""
:return:
"""
for part in walk(self.msg):
if not part.is_multipart() and self._is_signed(part):
try:
msg = PGPMessage.from_blob(part.get_payload()).message
except:
continue
yield msg
[docs] def get_signature(self):
"""
:return:
:rtype: typing.Generator[pgpy.PGPMessage]
"""
for part in walk(self.msg):
if not part.is_multipart():
try:
msg = PGPMessage.from_blob(part.get_payload())
except:
continue
yield msg
[docs] def strip_signature(self):
for part in walk(self.msg):
if not part.is_multipart() and self._is_signed(part):
try:
msg = PGPMessage.from_blob(part.get_payload())
except:
continue
part.set_payload(msg.message)
return self
def _is_encrypted(self, part):
try:
msg = PGPMessage.from_blob(part.get_payload())
return msg.is_encrypted
except:
pass
return False
[docs] def is_encrypted(self):
"""
Whether the message is inline encrypted.
:return: If the message is inline encrypted.
:rtype: bool
"""
return all(self._walk(self._is_encrypted))
[docs] def has_encryption(self):
"""
Whether some parts of the message are inline encrypted.
:return: If some parts of the message are inline encrypted.
:rtype: bool
"""
return any(self._walk(self._is_encrypted))
[docs] def get_encrypted(self):
"""
:return:
:rtype: typing.Generator[pgpy.PGPMessage]
"""
for part in walk(self.msg):
if not part.is_multipart():
try:
msg = PGPMessage.from_blob(part.get_payload())
except:
continue
yield msg
def _has_keys(self, part):
try:
key_from_blob(part.get_payload())
return True
except:
pass
return False
[docs] def is_keys(self):
"""
Whether the message is all keys (all parts).
:return: If the message is keys.
:rtype: bool
"""
return all(self._walk(self._has_keys))
[docs] def has_keys(self):
"""
Whether the message contains public or private keys.
:return: If the message contains keys.
:rtype: bool
"""
return any(self._walk(self._has_keys))
[docs] def keys(self):
"""
Get the collection of keys in this message.
:return: A collection of keys.
:rtype: Generator[pgpy.PGPKey]
"""
for part in walk(self.msg):
if not part.is_multipart():
try:
key = key_from_blob(part.get_payload())
except:
continue
yield key
def _is_revoc(self, part):
try:
revoc_from_blob(part.get_payload())
except ValueError:
return False
return True
[docs] def is_revocs(self):
for part in walk(self.msg):
if (not part.is_multipart() and not self._is_revoc(part)):
return False
return True
[docs] def has_revocs(self):
for part in walk(self.msg):
if (not part.is_multipart() and self._is_revoc(part)):
return True
return False
[docs] def revocs(self):
for part in walk(self.msg):
if not part.is_multipart():
try:
revoc = revoc_from_blob(part.get_payload())
except:
continue
yield revoc
[docs] def attach_revocs(self, *key_revocations):
"""
Attach a key revocation signature to the message.
:param key_revocations: A key revocation signature to attach.
:type key_revocations: pgpy.PGPSignature
:return:
:rtype: InlineWrapper
"""
if self.msg.get_content_type() != 'multipart/mixed':
# wrap in multipart/mixed
payload = copy.deepcopy(self.msg)
self.msg.set_payload([])
self.msg.set_type('multipart/mixed')
self.msg['MIME-Version'] = '1.0'
self.msg.attach(payload)
for key_revocation in key_revocations:
revoc_part = MIMEText(str(key_revocation))
self.msg.attach(revoc_part)
return self
[docs] def verify(self, key):
"""
Verify the signatures of this message with key.
:param key: The key to verify with.
:type key: pgpy.PGPKey
:return: The verified signatures.
:rtype: Generator[pgpy.types.SignatureVerification]
"""
yield from map(key.verify, self.get_signature())
def _sign(self, pmsg, key, **kwargs):
smsg = copy.copy(pmsg)
smsg |= key.sign(smsg, **kwargs)
return smsg
[docs] def sign(self, key, **kwargs):
"""
Sign a message with key.
:param key: The key to sign with.
:type key: pgpy.PGPKey
:return:
:rtype: InlineWrapper
"""
for part in walk(self.msg):
if not part.is_multipart():
if self._is_signed(part):
pmsg = PGPMessage.from_blob(part.get_payload())
else:
payload = str(part.get_payload())
pmsg = PGPMessage.new(payload, cleartext=True)
smsg = self._sign(pmsg, key, **kwargs)
part.set_payload(str(smsg))
return self
def _decrypt(self, part, key):
message = PGPMessage.from_blob(part.get_payload())
decrypted = key.decrypt(message)
if decrypted.is_signed:
part.set_payload(str(decrypted))
else:
dmsg = decrypted.message
if isinstance(dmsg, bytearray):
dmsg = dmsg.decode(decrypted.charset or 'utf-8')
part.set_payload(dmsg)
[docs] def decrypt(self, key):
"""
Decrypt this message with key.
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
:return:
:rtype: InlineWrapper
"""
for part in walk(self.msg):
if not part.is_multipart() and self._is_encrypted(part):
self._decrypt(part, key)
return self
def _encrypt(self, pmsg, *keys, cipher, **kwargs):
emsg = copy.copy(pmsg)
if len(keys) == 1:
emsg = keys[0].encrypt(emsg, cipher=cipher, **kwargs)
else:
session_key = cipher.gen_key()
for key in keys:
emsg = key.encrypt(emsg, cipher=cipher,
sessionkey=session_key,
**kwargs)
del session_key
return emsg
[docs] def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256,
**kwargs):
"""
Encrypt the message with key/s, using cipher.
:param keys: The key/s to encrypt with.
:type keys: pgpy.PGPKey
:param cipher: The symmetric cipher to use.
:type cipher: SymmetricKeyAlgorithm
:return:
:rtype: InlineWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
for part in walk(self.msg):
if not part.is_multipart():
payload = str(part.get_payload())
pmsg = PGPMessage.new(payload)
emsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
part.set_payload(str(emsg))
return self
[docs] def sign_encrypt(self, key, *keys, hash=None,
cipher=SymmetricKeyAlgorithm.AES256,
**kwargs):
"""
Sign and encrypt the message, in one go.
:param key: The key to sign with.
:type key: pgpy.PGPKey
:param keys: The key/s to encrypt with.
:type keys: pgpy.PGPKey
:param hash:
:type hash: pgpy.constants.HashAlgorithm
:param cipher:
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
:return:
:rtype: InlineWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
for part in walk(self.msg):
if not part.is_multipart():
if self._is_signed(part):
pmsg = PGPMessage.from_blob(part.get_payload())
else:
payload = str(part.get_payload())
pmsg = PGPMessage.new(payload)
smsg = self._sign(pmsg, key, hash=hash)
emsg = self._encrypt(smsg, *keys, cipher=cipher, **kwargs)
part.set_payload(str(emsg))
return self