#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: vault.py
#
# Copyright 2023 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for vault.
.. _Google Python Style Guide:
https://google.github.io/styleguide/pyguide.html
"""
from collections import defaultdict
from dataclasses import dataclass
import json
import logging
import re
import time
from hashlib import sha256, pbkdf2_hmac
from pathlib import Path
from Crypto.Cipher import PKCS1_OAEP
from binascii import hexlify
from .datamodels import Folder, FolderMetadata, NeverUrl, EquivalentDomain, SharedFolder, UrlRule
from .dataschemas import SecretSchema, SharedFolderSchema, AttachmentSchema
from .encryption import Blob, EncryptManager, Stream
from .secrets import Password, SECRET_NOTE_CLASS_MAPPING, Attachment, Custom, FolderEntry
LOGGER_BASENAME = 'vault'
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''08-02-2023'''
__copyright__ = '''Copyright 2023, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos", "Yorick Hoorneman"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
[docs]class Vault:
"""Models the encrypted vault and implements decryption of all items and connection everything appropriately."""
def __init__(self, lastpass_instance, password):
self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}')
self._lastpass = lastpass_instance
self.username = lastpass_instance.username.encode('utf-8')
self.password = password.encode('utf-8')
self.key_iteration_count = lastpass_instance.iteration_count
self._key = None
self._hash = None
self._blob = None
self.unable_to_decrypt = []
@property
def key(self):
"""The encryption key of the vault."""
if self._key is None:
if self.key_iteration_count == 1:
self._key = sha256(f'{self.username}{self.password}').digest()
else:
self._key = pbkdf2_hmac('sha256', self.password, self.username, self.key_iteration_count, 32)
return self._key
@property
def hash(self):
"""The hash of the vault."""
if self._hash is None:
if self.key_iteration_count == 1:
self._hash = bytearray(sha256(hexlify(self.key) + self.password).hexdigest(), 'ascii')
else:
self._hash = hexlify(pbkdf2_hmac('sha256', self.key, self.password, 1, 32))
return self._hash
@property
def blob(self):
if self._blob is None:
params = {'mobile': 1,
'b64': 1,
'hash': 0.0,
'hasplugin': '3.0.23',
'requestsrc': 'android'}
url = f'{self._lastpass.host}/getaccts.php'
response = self._lastpass.session.get(url, params=params)
if not response.ok:
response.raise_for_status()
self._blob = response.content
return self._blob
@staticmethod
def _get_attachments_by_parent_id(id_, attachments):
return [attachment for attachment in attachments if attachment.get('parent_id') == id_]
@staticmethod
def _get_chunks_by_id(blob, chunk_id):
return [chunk for chunk in blob.chunks if chunk.id == chunk_id.encode('utf-8')]
@staticmethod
def _get_chunk_by_id(blob, chunk_id):
return next((chunk for chunk in blob.chunks if chunk.id == chunk_id.encode('utf-8')), None)
[docs] def decrypt_blob(self, data):
blob = Blob(data)
encrypted_username = Vault._get_chunk_by_id(blob, 'ENCU').payload.decode('utf-8')
attachments_data = self._get_attachments(blob)
never_urls = self._get_never_urls(blob)
equivalent_domains = self._get_eqdns(blob)
url_rules = self._get_url_rules(blob)
secrets, attachments, \
folder_entries, shared_folders = self._get_secrets_folders_and_attachments(blob, attachments_data)
encryption_key = self.key
return DecryptedVault(self._lastpass,
encrypted_username,
attachments,
never_urls,
equivalent_domains,
url_rules, secrets,
encryption_key,
folder_entries,
shared_folders)
@staticmethod
def _get_attribute_payload_data(stream, attributes):
return {attribute: stream.get_payload_by_size(stream.read_byte_size(4)) for attribute in attributes}
@staticmethod
def _parse_secret_type(payload, encryption_key):
"""Parses an account chunk, decrypts and creates an Account object.
All secure notes are ACCTs but not all of them store account information.
"""
stream = Stream(payload)
secret = SecretSchema()
data = Vault._get_attribute_payload_data(stream, secret.attributes)
data.update(Vault._transform_data_attributes(data,
secret.plain_encrypted,
EncryptManager.decrypt_aes256_auto,
arguments={'encryption_key': encryption_key}))
data.update(Vault._transform_data_attributes(data,
secret.base_64_encrypted,
EncryptManager.decrypt_aes256_auto,
arguments={'encryption_key': encryption_key,
'base64': True}))
data.update(Vault._transform_data_attributes(data,
secret.hex_decoded,
EncryptManager.decode_hex))
data.update(Vault._transform_data_attributes(data,
secret.decoded_attributes,
lambda x: x.decode('utf-8')))
data.update(Vault._transform_data_attributes(data,
secret.boolean_values,
lambda x: bool(int(x))))
data['encryption_key'] = encryption_key
if data.get('is_secure_note'):
return Vault._parse_secure_note(data)
if all([not any([data.get('username'),
data.get('password'),
data.get('name'),
data.get('notes')]),
data.get('url') == 'http://group']):
return FolderEntry, data
return Password, data
def _parse_secret(self, # pylint: disable=too-many-arguments
chunk,
key,
shared_folder,
attachments_data,
attachments,
secrets,
folder_entries):
class_type, data = self._parse_secret_type(chunk.payload, key)
secret = class_type(self._lastpass, data, shared_folder)
if class_type is FolderEntry:
folder_entries.append(secret)
else:
if secret.has_attachment:
for attachment_data in self._get_attachments_by_parent_id(secret.id, attachments_data):
attachment_data['decryption_key'] = secret.attachment_encryption_key
attachment = Attachment(self._lastpass, attachment_data)
secret.add_attachment(attachment)
attachments.append(attachment)
secrets.append(secret)
return secrets, attachments, folder_entries
def _get_secrets_folders_and_attachments(self, blob, attachments_data):
secrets = []
attachments = []
folder_entries = []
shared_folders = []
key = self.key
rsa_private_key = None
shared_folder = None
for chunk in blob.chunks:
if chunk.id == b'ACCT':
try:
secrets, attachments, folder_entries = self._parse_secret(chunk,
key,
shared_folder,
attachments_data,
attachments,
secrets,
folder_entries)
# We want to skip any possible error so the process completes and we gather the errors so they can be
# troubleshoot
except Exception: # noqa
self._logger.exception('Unable to decrypt chunk, adding to the error list.')
self.unable_to_decrypt.append((chunk, key))
continue
elif chunk.id == b'PRIK':
rsa_private_key = EncryptManager.decrypt_rsa_key(chunk.payload, self.key)
elif chunk.id == b'SHAR':
# After SHAR chunk all the following accounts are encrypted with a new key.
# SHAR chunks hold shared folders so shared folders are passed into all accounts under them.
data = self._parse_shared_folder(chunk.payload, self.key, rsa_private_key)
shared_folder_data = self._lastpass._get_shared_folder_data_by_id(data.get('id')) # pylint: disable=protected-access
if shared_folder_data:
shared_folder_data['shared_name'] = data.get('name')
shared_folder = SharedFolder(*shared_folder_data.values())
shared_folders.append(shared_folder)
key = data.get('key')
return secrets, attachments, folder_entries, shared_folders
@staticmethod
def _parse_url_rules(payload):
stream = Stream(payload)
attributes = ['url', 'exact_host', 'exact_port', 'case_insensitive']
data = Vault._get_attribute_payload_data(stream, attributes)
data['url'] = EncryptManager.decode_hex(data['url'])
data.update(Vault._transform_data_attributes(data,
attributes,
lambda x: x.decode('utf-8')))
bools = attributes[1:]
data.update(Vault._transform_data_attributes(data,
bools,
lambda x: bool(int(x))))
return UrlRule(**data)
@staticmethod
def _get_url_rules(blob):
urul_chunks = Vault._get_chunks_by_id(blob, 'URUL')
return [Vault._parse_url_rules(chunk.payload) for chunk in urul_chunks]
@staticmethod
def _parse_eqdns(payload):
stream = Stream(payload)
attributes = ['id', 'url']
data = Vault._get_attribute_payload_data(stream, attributes)
data.update(Vault._transform_data_attributes(data,
attributes,
lambda x: x.decode('utf-8')))
return EquivalentDomain(int(data.get('id')),
EncryptManager.decode_hex(data.get('url')).decode('utf-8'))
@staticmethod
def _get_eqdns(blob):
eqdn_chunks = Vault._get_chunks_by_id(blob, 'EQDN')
return [Vault._parse_eqdns(chunk.payload) for chunk in eqdn_chunks]
@staticmethod
def _parse_never_urls(payload):
stream = Stream(payload)
attributes = ['id', 'url']
data = Vault._get_attribute_payload_data(stream, attributes)
data.update(Vault._transform_data_attributes(data,
attributes,
lambda x: x.decode('utf-8')))
return NeverUrl(int(data.get('id')),
EncryptManager.decode_hex(data.get('url')).decode('utf-8'))
@staticmethod
def _get_never_urls(blob):
never_urls_chunks = Vault._get_chunks_by_id(blob, 'NEVR')
return [Vault._parse_never_urls(chunk.payload) for chunk in never_urls_chunks]
@staticmethod
def _parse_attachment(payload):
stream = Stream(payload)
attachment = AttachmentSchema()
data = Vault._get_attribute_payload_data(stream, attachment.attributes)
data.update(Vault._transform_data_attributes(data,
attachment.decoded_attributes,
lambda x: x.decode('utf-8')))
return data
@staticmethod
def _get_attachments(blob):
attachment_chunks = Vault._get_chunks_by_id(blob, 'ATTA')
return [Vault._parse_attachment(chunk.payload) for chunk in attachment_chunks]
@staticmethod
def _transform_data_attributes(data, attributes, transformation, arguments=None):
id_ = data.get('id')
arguments = arguments if arguments else {}
transformed_data = {}
for attribute in attributes:
value = data.get(attribute)
try:
transformed_data[attribute] = transformation(value, **arguments)
except Exception: # noqa
LOGGER.error(f'Attribute :{attribute} with value: {value} for secret :{id_} cannot be transformed.')
return transformed_data
@staticmethod
def _get_class_and_key_mapping(data):
note_type = data.get('note_type') or 'Generic'
class_type = SECRET_NOTE_CLASS_MAPPING.get(note_type, SECRET_NOTE_CLASS_MAPPING.get('Custom'))
key_mapping = class_type.attribute_mapping
if data.get('note_type').startswith('Custom'):
# this needs work as the attributes are not part of the class.
data = json.loads(data.get('custom_note_definition_json'))
attributes = [entry.get('text') for entry in data.get('fields')]
key_mapping = {attribute: Vault._sanitize_to_attribute(attribute) for attribute in attributes}
return class_type, key_mapping
@staticmethod
def _sanitize_to_attribute(value):
delimiters = [';', ',', '_', '-', '*', ' '] # noqa
regex_pattern = '|'.join(map(re.escape, delimiters))
value = re.split(regex_pattern, value)
return '_'.join([part.lower() for part in value])
@staticmethod
def _parse_secure_note(data):
secret_name = data.get('name')
class_type, key_mapping = Vault._get_class_and_key_mapping(data)
note_data = {'original_notes': data.get('notes', '')}
try:
valid_lines = [line for line in data.get('notes').split('\n')
if not any([not line, ':' not in line])]
for line in valid_lines:
# Split only once so that strings like "Hostname:host.example.com:80" get interpreted correctly
key, value = line.split(':', 1)
entry = key_mapping.get(key)
if entry:
note_data[entry] = value
data.update(note_data)
if class_type == Custom:
data['custom_attribute_mapping'] = key_mapping
except TypeError:
LOGGER.error(f'Could not identify valid lines in the note of secret {secret_name} maybe it is corrupt?')
return class_type, data
@staticmethod
def _parse_shared_folder(payload, encryption_key, rsa_key):
stream = Stream(payload)
folder = SharedFolderSchema()
data = Vault._get_attribute_payload_data(stream, folder.attributes)
data.update(Vault._transform_data_attributes(data,
folder.hex_decoded,
EncryptManager.decode_hex))
key = data.get('key')
# Shared folder encryption key might come already in pre-decrypted form,
# where it's only AES encrypted with the regular encryption key.
# When the key is blank, then there's an RSA encrypted key, which has to
# be decrypted first before use.
if not key:
hex_key = PKCS1_OAEP.new(rsa_key).decrypt(data.get('encrypted_key'))
else:
hex_key = EncryptManager.decrypt_aes256_auto(key, encryption_key)
key = EncryptManager.decode_hex(hex_key)
data['key'] = key
data['name'] = EncryptManager.decrypt_aes256_auto(data.get('encrypted_name'), key, base64=True).decode('utf-8')
data.update(Vault._transform_data_attributes(data,
folder.decoded_attributes,
lambda x: x.decode('utf-8')))
return data
[docs] def refresh(self):
"""Refreshes the vault by cleaning up the encrypted blob and the decrypted secrets and forcing the retrieval."""
self._logger.info('Cleaning up local blob.')
self._blob = None
self._logger.info('Retrieving remote blob and decrypting secrets.')
try:
_ = self.blob
except Exception: # noqa
self._logger.exception('Problem retrieving remote blob.')
return False
return True
[docs] def save(self, path='.', name='vault.blob', timestamp=True):
"""Can save the downloaded blob.
Args:
path: The path to save the blob to, defaults to local directory.
name: The name to save the blob as, defaults to "vault.blob".
Returns:
None.
"""
name = f'{f"{int(time.time() * 10)}-" if timestamp else ""}{name}'
with open(Path(path, name), 'wb', encoding='utf8') as ofile:
ofile.write(self.blob)
[docs]@dataclass
class DecryptedVault:
def __init__(self, # pylint: disable=too-many-arguments
lastpass_instance,
encrypted_username,
attachments,
never_urls,
equivalent_domains,
url_rules,
secrets,
encryption_key,
folder_entries,
shared_folders):
self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}')
self._lastpass = lastpass_instance
self.encrypted_username = encrypted_username
self.attachments = attachments
self.never_urls = never_urls
self.equivalent_domains = equivalent_domains
self.url_rules = url_rules
self.secrets = secrets
self.encryption_key = encryption_key
self.folder_entries = folder_entries
self._folders = None
self._shared_folders = shared_folders
def _get_shared_folder_by_id(self, folder_id):
return next((folder for folder in self._shared_folders if folder.id == folder_id), None)
[docs] def delete_secret_by_id(self, id_):
self.secrets = [secret for secret in self.secrets if secret.id != id_]
self.clear_folders()
[docs] def create_secret(self, secret_type, data):
shared_folder = self._get_shared_folder_by_id(data.get('shared_folder_id'))
secret = secret_type(self._lastpass, data, shared_folder)
self.secrets.append(secret)
self.clear_folders()
return True
[docs] def clear_folders(self):
self._folders = None
@property
def folders(self):
"""All the folders of the vault.
Returns:
A list of all the folders of the vault.
"""
if self._folders is None:
root_folder_data, folders_data = self._parse_folder_groups(self.secrets)
root_folder = Folder('\\',
('\\',),
id=None,
encryption_key=self.encryption_key,
is_personal=True)
root_folder.secrets.extend(root_folder_data.get('\\'))
all_folders = [root_folder]
all_folders.extend(self._get_folder_objects(folders_data, root_folder))
self._folders = all_folders
return self._folders
@staticmethod
def _parse_folder_groups(secrets):
"""Parses all folder structures by iterating over all secrets.
There are three levels of folders. One is the root one that could hold parentless secrets, the second is the
personal folders that only exist for the user and the third is the shared folders that are shared.
Args:
secrets: All the secrets to iterate over and deduct their directory structure.
Returns:
tuple: Data for the root folder, the personal folders and the shared folders.
"""
root_folder_data = {'\\': []}
folders_data = defaultdict(list)
for secret in secrets:
if not any([secret.group_id, secret.shared_folder]):
root_folder_data['\\'].append(secret)
continue
if secret.group_id:
split_path = tuple(secret.group.split('\\'))
is_personal = True
group_id = secret.group_id
if secret.shared_folder:
split_path = (tuple([secret.shared_folder.shared_name] + secret.group.split('\\'))
if secret.group else tuple([secret.shared_folder.shared_name]))
is_personal = False
group_id = secret.shared_folder.id
folder_metadata = FolderMetadata(split_path,
group_id,
secret.encryption_key,
is_personal=is_personal)
folders_data[folder_metadata].append(secret)
return root_folder_data, folders_data
@staticmethod
def _get_parent_folder(folder, folders):
"""Tries to identify the parent folder of a provided folder and return that from a list of folders.
Args:
folder: The folder to look the parent for.
folders: A list of all the folders.
Returns:
The parent folder of the mentioned folder if a match is found, else None.
"""
return next((parent_folder for parent_folder in folders
if tuple(folder.path[:-1]) == parent_folder.path), None) # noqa
@staticmethod
def _get_folder_objects(secrets_by_path, root_folder=None):
folder_objects = []
for folder_metadata, secrets in sorted(secrets_by_path.items()):
folder = Folder(folder_metadata.path[-1],
folder_metadata.path,
folder_metadata.id,
folder_metadata.encryption_key,
is_personal=folder_metadata.is_personal)
folder.add_secrets(secrets)
if len(folder.path) > 1:
folder_parent = DecryptedVault._get_parent_folder(folder, folder_objects)
if not folder_parent:
folder_parent = Folder(folder.path[-2],
tuple(folder.path[:-1]),
id=None,
encryption_key=folder.encryption_key,
is_personal=folder_metadata.is_personal)
folder_grandparent = DecryptedVault._get_parent_folder(folder_parent, folder_objects)
if folder_grandparent:
folder_grandparent.add_folder(folder_parent)
folder_parent.parent = folder_grandparent
folder_objects.append(folder_parent)
folder.parent = folder_parent
folder_parent.add_folder(folder)
else:
if root_folder:
folder.parent = root_folder
root_folder.add_folder(folder)
folder_objects.append(folder)
return folder_objects