Keeper SDK for Python 3
Project description
Keeper SDK for Python
Overview
The Keeper SDK for Python provides developers with a comprehensive toolkit for integrating Keeper Security's password management and secrets management capabilities into Python applications. This repository contains two primary packages:
- Keeper SDK (
keepersdk): A Python library for programmatic access to Keeper Vault, enabling developers to build custom integrations, automate password management workflows, and manage enterprise console operations.
Table of Contents
Prerequisites
Before installing the Keeper SDK, ensure your system meets the following requirements:
- Python Version: Python 3.10 or higher
- Operating System: Windows, macOS, or Linux
- Package Manager: pip (Python package installer)
- Virtual Environment (recommended):
venvorvirtualenv
To verify your Python version:
python3 --version
Keeper SDK
About Keeper SDK
The Keeper SDK is a Python library that provides programmatic access to Keeper Security's platform. It enables developers to:
- Authenticate users and manage sessions
- Access and manipulate vault records (passwords, files, custom fields)
- Manage folders and shared folders
- Administer enterprise console operations (users, teams, roles, nodes)
- Integrate Keeper's zero-knowledge security architecture into applications
- Automate password rotation and secrets management workflows
SDK Installation
From PyPI (Recommended)
Install the latest stable release from the Python Package Index:
pip install keepersdk
From Source
To install from source for development or testing purposes:
# Clone the repository
git clone https://github.com/Keeper-Security/keeper-sdk-python
cd keeper-sdk-python/keepersdk-package
# Install the SDK
pip install -e .
SDK Environment Setup
For optimal development practices, it's recommended to use a virtual environment:
Step 1: Create a Virtual Environment
# On macOS/Linux
python3 -m venv venv
# On Windows
python -m venv venv
Step 2: Activate the Virtual Environment
# On macOS/Linux
source venv/bin/activate
# On Windows
venv\Scripts\activate
Step 3: Install Keeper SDK dependencies
pip install -e keepersdk-package
Step 4: Install keepersdk into the venv
python keepersdk-package/setup.py install
Your environment is now ready for SDK development.
SDK Configuration
The Keeper SDK uses a configuration storage system to manage authentication settings and endpoints. You can use:
- JsonConfigurationStorage: Stores configuration in JSON format (default)
- InMemoryConfigurationStorage: Temporary in-memory storage for testing
- Custom implementations: Implement your own configuration storage
Requirement for client
You can create a config.json file to load default credentials safely. This ensures that the client code doesn't contain any hardcoded credentials. Create the .json file in .keeper folder of current user (you might also need to create a .keeper folder if it is not present).
Path - Users//keeper/config.json
Alternatively you can run the sample login script shown in the next section to enter server, username and password during execution.
A sample showing the structure of the config.json needed is shown below:
{
"users": [
{
"user": "username@yourcompany.com",
"password":"yourpassword",
"server": "keepersecurity.com",
"last_device": {
"device_token": ""
}
}
],
"servers": [
{
"server": "keepersecurity.com",
"server_key_id": 10
}
],
"devices": [
{
"device_token": "",
"private_key": "",
"server_info": [
{
"server": "keepersecurity.com",
"clone_code": ""
}
]
}
],
"last_login": "username@yourcompany.com",
"last_server": "keepersecurity.com"
}
SDK Persistent Login Flow
The persistent login flow allows you to authenticate once and remain logged in for a specified timeout period without requiring re-authentication. This is particularly useful for automated scripts and long-running applications.
Key Features:
- One-time setup: Configure persistent login on a new device with a single execution
- Automatic session management: No need to re-authenticate during the timeout period
- Configurable timeout: Default is 30 days, but can be customized
- Device registration: Registers the device's data key for secure authentication
When to Use:
- Automated scripts and background services
- Long-running applications that need continuous access
- Development and testing environments
- Applications where user interaction is not always possible
Important Notes:
- Persistent login must be enabled on first-time device setup
- The device data key registration is a one-time operation per device
- Enterprise policies may restrict persistent login usage
- Always follow your organization's security policies when using persistent login
Example: Setting Up Persistent Login
This example demonstrates how to enable persistent login on a new device. Run this script once to configure persistent login for subsequent sessions:
import getpass
import json
import logging
from typing import Dict, Optional
import fido2
import webbrowser
from keepersdk import errors, utils
from keepersdk.authentication import (
configuration,
endpoint,
keeper_auth,
login_auth,
)
from keepersdk.authentication.yubikey import (
IKeeperUserInteraction,
yubikey_authenticate,
)
from keepersdk.constants import KEEPER_PUBLIC_HOSTS
try:
import pyperclip
except ImportError:
pyperclip = None
logger = utils.get_logger()
logger.setLevel(logging.INFO)
if not logger.handlers:
_handler = logging.StreamHandler()
_handler.setLevel(logging.INFO)
_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
)
logger.addHandler(_handler)
class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction):
def output_text(self, text: str) -> None:
print(text)
def prompt_up(self) -> None:
print(
"\nTouch the flashing Security key to authenticate or "
"press Ctrl-C to resume with the primary two factor authentication..."
)
def request_pin(self, permissions, rd_id):
return getpass.getpass("Enter Security Key PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
# Two-factor duration codes (used by LoginFlow)
_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = {
login_auth.TwoFactorDuration.EveryLogin: "login",
login_auth.TwoFactorDuration.Every12Hours: "12_hours",
login_auth.TwoFactorDuration.EveryDay: "24_hours",
login_auth.TwoFactorDuration.Every30Days: "30_days",
login_auth.TwoFactorDuration.Forever: "forever",
}
class LoginFlow:
"""
Handles the full login process: server selection, username, password,
device approval, 2FA, SSO data key, and SSO token.
"""
def __init__(self) -> None:
self._config = configuration.JsonConfigurationStorage()
self._logged_in_with_persistent = True
self._endpoint: Optional[endpoint.KeeperEndpoint] = None
@property
def endpoint(self) -> Optional[endpoint.KeeperEndpoint]:
return self._endpoint
@property
def logged_in_with_persistent(self) -> bool:
"""True if login succeeded by resuming an existing persistent session (no step loop)."""
return self._logged_in_with_persistent
def run(self) -> Optional[keeper_auth.KeeperAuth]:
"""
Run the login flow.
Returns:
Authenticated Keeper context, or None if login fails.
"""
server = self._ensure_server()
keeper_endpoint = endpoint.KeeperEndpoint(self._config, server)
self._endpoint = keeper_endpoint
login_auth_context = login_auth.LoginAuth(keeper_endpoint)
username = self._config.get().last_login or input("Enter username: ")
login_auth_context.resume_session = True
login_auth_context.login(username)
while not login_auth_context.login_step.is_final():
step = login_auth_context.login_step
if isinstance(step, login_auth.LoginStepDeviceApproval):
self._handle_device_approval(step)
elif isinstance(step, login_auth.LoginStepPassword):
self._handle_password(step)
elif isinstance(step, login_auth.LoginStepTwoFactor):
self._handle_two_factor(step)
elif isinstance(step, login_auth.LoginStepSsoDataKey):
self._handle_sso_data_key(step)
elif isinstance(step, login_auth.LoginStepSsoToken):
self._handle_sso_token(step)
elif isinstance(step, login_auth.LoginStepError):
print(f"Login error: ({step.code}) {step.message}")
return None
else:
raise NotImplementedError(
f"Unsupported login step type: {type(step).__name__}"
)
self._logged_in_with_persistent = False
if self._logged_in_with_persistent:
print("Successfully logged in with persistent login")
if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected):
return login_auth_context.login_step.take_keeper_auth()
return None
def _ensure_server(self) -> str:
if not self._config.get().last_server:
print("Available server options:")
for region, host in KEEPER_PUBLIC_HOSTS.items():
print(f" {region}: {host}")
server = (
input("Enter server (default: keepersecurity.com): ").strip()
or "keepersecurity.com"
)
self._config.get().last_server = server
else:
server = self._config.get().last_server
return server
def _handle_device_approval(
self, step: login_auth.LoginStepDeviceApproval
) -> None:
step.send_push(login_auth.DeviceApprovalChannel.KeeperPush)
print(
"Device approval request sent. Login to existing vault/console or "
"ask admin to approve this device and then press return/enter to resume"
)
input()
step.resume()
def _handle_password(self, step: login_auth.LoginStepPassword) -> None:
password = getpass.getpass("Enter password: ")
step.verify_password(password)
def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None:
channels = [
x
for x in step.get_channels()
if x.channel_type != login_auth.TwoFactorChannel.Other
]
menu = []
for i, channel in enumerate(channels):
desc = self._two_factor_channel_desc(channel.channel_type)
menu.append(
(
str(i + 1),
f"{desc} {channel.channel_name} {channel.phone}",
)
)
menu.append(("q", "Quit authentication attempt and return to Commander prompt."))
lines = ["", "This account requires 2FA Authentication"]
lines.extend(f" {a}. {t}" for a, t in menu)
print("\n".join(lines))
while True:
selection = input("Selection: ")
if selection is None:
return
if selection in ("q", "Q"):
raise KeyboardInterrupt()
try:
assert selection.isnumeric()
idx = 1 if not selection else int(selection)
assert 1 <= idx <= len(channels)
channel = channels[idx - 1]
desc = self._two_factor_channel_desc(channel.channel_type)
print(f"Selected {idx}. {desc}")
except AssertionError:
print(
"Invalid entry, additional factors of authentication shown "
"may be configured if not currently enabled."
)
continue
if channel.channel_type in (
login_auth.TwoFactorChannel.TextMessage,
login_auth.TwoFactorChannel.KeeperDNA,
login_auth.TwoFactorChannel.DuoSecurity,
):
action = next(
(
x
for x in step.get_channel_push_actions(channel.channel_uid)
if x
in (
login_auth.TwoFactorPushAction.TextMessage,
login_auth.TwoFactorPushAction.KeeperDna,
)
),
None,
)
if action:
step.send_push(channel.channel_uid, action)
if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey:
try:
challenge = json.loads(channel.challenge)
signature = yubikey_authenticate(challenge, FidoCliInteraction())
if signature:
print("Verified Security Key.")
step.send_code(channel.channel_uid, signature)
return
except Exception as e:
logger.error(e)
continue
# 2FA code path
step.duration = min(step.duration, channel.max_expiration)
available_dura = sorted(
x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration
)
available_codes = [
_TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura
]
while True:
mfa_desc = self._two_factor_duration_desc(step.duration)
prompt_exp = (
f"\n2FA Code Duration: {mfa_desc}.\n"
f"To change duration: 2fa_duration={'|'.join(available_codes)}"
)
print(prompt_exp)
selection = input("\nEnter 2FA Code or Duration: ")
if not selection:
return
if selection in available_codes:
step.duration = self._two_factor_code_to_duration(selection)
elif selection.startswith("2fa_duration="):
code = selection[len("2fa_duration=") :]
if code in available_codes:
step.duration = self._two_factor_code_to_duration(code)
else:
print(f"Invalid 2FA duration: {code}")
else:
try:
step.send_code(channel.channel_uid, selection)
print("Successfully verified 2FA Code.")
return
except errors.KeeperApiError as kae:
print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}")
def _handle_sso_data_key(
self, step: login_auth.LoginStepSsoDataKey
) -> None:
menu = [
("1", "Keeper Push. Send a push notification to your device."),
("2", "Admin Approval. Request your admin to approve this device."),
("r", "Resume SSO authentication after device is approved."),
("q", "Quit SSO authentication attempt and return to Commander prompt."),
]
lines = ["Approve this device by selecting a method below:"]
lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu)
print("\n".join(lines))
while True:
answer = input("Selection: ")
if answer is None:
return
if answer == "q":
raise KeyboardInterrupt()
if answer == "r":
step.resume()
break
if answer in ("1", "2"):
step.request_data_key(
login_auth.DataKeyShareChannel.KeeperPush
if answer == "1"
else login_auth.DataKeyShareChannel.AdminApproval
)
else:
print(f'Action "{answer}" is not supported.')
def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None:
menu = [
("a", "SSO User with a Master Password."),
]
if pyperclip:
menu.append(("c", "Copy SSO Login URL to clipboard."))
else:
menu.append(("u", "Show SSO Login URL."))
try:
wb = webbrowser.get()
menu.append(("o", "Navigate to SSO Login URL with the default web browser."))
except Exception:
wb = None
if pyperclip:
menu.append(("p", "Paste SSO Token from clipboard."))
menu.append(("t", "Enter SSO Token manually."))
menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt."))
lines = [
"",
"SSO Login URL:",
step.sso_login_url,
"Navigate to SSO Login URL with your browser and complete authentication.",
"Copy a returned SSO Token into clipboard."
+ (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."),
'NOTE: To copy SSO Token please click "Copy authentication token" '
'button on "SSO Connect" page.',
"",
]
lines.extend(f" {a:>3}. {t}" for a, t in menu)
print("\n".join(lines))
while True:
token = input("Selection: ")
if token == "q":
raise KeyboardInterrupt()
if token == "a":
step.login_with_password()
return
if token == "c":
token = None
if pyperclip:
try:
pyperclip.copy(step.sso_login_url)
print("SSO Login URL is copied to clipboard.")
except Exception:
print("Failed to copy SSO Login URL to clipboard.")
else:
print("Clipboard not available (install pyperclip).")
elif token == "u":
token = None
if not pyperclip:
print("\nSSO Login URL:", step.sso_login_url, "\n")
else:
print("Unsupported menu option (use 'c' to copy URL).")
elif token == "o":
token = None
if wb:
try:
wb.open_new_tab(step.sso_login_url)
except Exception:
print("Failed to open web browser.")
elif token == "p":
if pyperclip:
try:
token = pyperclip.paste()
except Exception:
token = ""
print("Failed to paste from clipboard")
else:
token = None
print("Clipboard not available (use 't' to enter token manually).")
elif token == "t":
token = getpass.getpass("Enter SSO Token: ").strip()
else:
if len(token) < 10:
print(f"Unsupported menu option: {token}")
continue
if token:
try:
step.set_sso_token(token)
break
except errors.KeeperApiError as kae:
print(f"SSO Login error: ({kae.result_code}) {kae.message}")
@staticmethod
def _two_factor_channel_desc(
channel_type: login_auth.TwoFactorChannel,
) -> str:
return {
login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)",
login_auth.TwoFactorChannel.TextMessage: "Send SMS Code",
login_auth.TwoFactorChannel.DuoSecurity: "DUO",
login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID",
login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)",
login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)",
login_auth.TwoFactorChannel.Backup: "Backup Code",
}.get(channel_type, "Not Supported")
@staticmethod
def _two_factor_duration_desc(
duration: login_auth.TwoFactorDuration,
) -> str:
return {
login_auth.TwoFactorDuration.EveryLogin: "Require Every Login",
login_auth.TwoFactorDuration.Forever: "Save on this Device Forever",
login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours",
login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours",
login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days",
}.get(duration, "Require Every Login")
@staticmethod
def _two_factor_code_to_duration(
text: str,
) -> login_auth.TwoFactorDuration:
for dura, code in _TWO_FACTOR_DURATION_CODES.items():
if code == text:
return dura
return login_auth.TwoFactorDuration.EveryLogin
def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None:
"""
Enable persistent login and register data key for device.
Sets persistent_login to on and logout_timer to 30 days.
"""
keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1')
keeper_auth.register_data_key_for_device(keeper_auth_context)
mins_per_day = 60 * 24
timeout_in_minutes = mins_per_day * 30 # 30 days
keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes))
print("Persistent login turned on successfully and device registered")
def login():
"""
Handle the login process including server selection, authentication,
and multi-factor authentication steps (device approval, password, 2FA
with channel selection and Security Key, SSO data key, SSO token).
Returns:
tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails.
"""
flow = LoginFlow()
keeper_auth_context = flow.run()
if keeper_auth_context and not flow.logged_in_with_persistent:
enable_persistent_login(keeper_auth_context)
keeper_endpoint = flow.endpoint if keeper_auth_context else None
return keeper_auth_context, keeper_endpoint
def display_login_info(keeper_auth_context: keeper_auth.KeeperAuth, keeper_endpoint: endpoint.KeeperEndpoint):
"""
Display login success information.
Args:
keeper_auth_context: The authenticated Keeper context.
keeper_endpoint: The Keeper endpoint with server information.
"""
print("\n" + "=" * 50)
print("LOGIN SUCCESSFUL")
print("=" * 50)
print(f"Username: {keeper_auth_context.auth_context.username}")
print(f"Server: {keeper_endpoint.server}")
print(f"Enterprise Admin: {keeper_auth_context.auth_context.is_enterprise_admin}")
if keeper_auth_context.auth_context.enterprise_id:
print(f"Enterprise ID: {keeper_auth_context.auth_context.enterprise_id}")
print("=" * 50)
keeper_auth_context.close()
def main():
"""
Main entry point for the login script.
Performs login and displays login information.
"""
keeper_auth_context, keeper_endpoint = login()
if keeper_auth_context:
display_login_info(keeper_auth_context, keeper_endpoint)
else:
print("Login failed.")
if __name__ == "__main__":
main()
SDK Usage Example
Below is a complete example demonstrating authentication, vault synchronization, and record retrieval:
import getpass
import json
import logging
import sqlite3
from typing import Dict, Optional
import fido2
import webbrowser
from keepersdk import errors, utils
from keepersdk.authentication import (
configuration,
endpoint,
keeper_auth,
login_auth,
)
from keepersdk.authentication.yubikey import (
IKeeperUserInteraction,
yubikey_authenticate,
)
from keepersdk.constants import KEEPER_PUBLIC_HOSTS
from keepersdk.vault import sqlite_storage, vault_online, vault_record
try:
import pyperclip
except ImportError:
pyperclip = None
logger = utils.get_logger()
logger.setLevel(logging.INFO)
if not logger.handlers:
_handler = logging.StreamHandler()
_handler.setLevel(logging.INFO)
_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
)
logger.addHandler(_handler)
class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction):
def output_text(self, text: str) -> None:
print(text)
def prompt_up(self) -> None:
print(
"\nTouch the flashing Security key to authenticate or "
"press Ctrl-C to resume with the primary two factor authentication..."
)
def request_pin(self, permissions, rd_id):
return getpass.getpass("Enter Security Key PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
# Two-factor duration codes (used by LoginFlow)
_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = {
login_auth.TwoFactorDuration.EveryLogin: "login",
login_auth.TwoFactorDuration.Every12Hours: "12_hours",
login_auth.TwoFactorDuration.EveryDay: "24_hours",
login_auth.TwoFactorDuration.Every30Days: "30_days",
login_auth.TwoFactorDuration.Forever: "forever",
}
class LoginFlow:
"""
Handles the full login process: server selection, username, password,
device approval, 2FA, SSO data key, and SSO token.
"""
def __init__(self) -> None:
self._config = configuration.JsonConfigurationStorage()
self._logged_in_with_persistent = True
self._endpoint: Optional[endpoint.KeeperEndpoint] = None
@property
def endpoint(self) -> Optional[endpoint.KeeperEndpoint]:
return self._endpoint
@property
def logged_in_with_persistent(self) -> bool:
"""True if login succeeded by resuming an existing persistent session (no step loop)."""
return self._logged_in_with_persistent
def run(self) -> Optional[keeper_auth.KeeperAuth]:
"""
Run the login flow.
Returns:
Authenticated Keeper context, or None if login fails.
"""
server = self._ensure_server()
keeper_endpoint = endpoint.KeeperEndpoint(self._config, server)
self._endpoint = keeper_endpoint
login_auth_context = login_auth.LoginAuth(keeper_endpoint)
username = self._config.get().last_login or input("Enter username: ")
login_auth_context.resume_session = True
login_auth_context.login(username)
while not login_auth_context.login_step.is_final():
step = login_auth_context.login_step
if isinstance(step, login_auth.LoginStepDeviceApproval):
self._handle_device_approval(step)
elif isinstance(step, login_auth.LoginStepPassword):
self._handle_password(step)
elif isinstance(step, login_auth.LoginStepTwoFactor):
self._handle_two_factor(step)
elif isinstance(step, login_auth.LoginStepSsoDataKey):
self._handle_sso_data_key(step)
elif isinstance(step, login_auth.LoginStepSsoToken):
self._handle_sso_token(step)
elif isinstance(step, login_auth.LoginStepError):
print(f"Login error: ({step.code}) {step.message}")
return None
else:
raise NotImplementedError(
f"Unsupported login step type: {type(step).__name__}"
)
self._logged_in_with_persistent = False
if self._logged_in_with_persistent:
print("Successfully logged in with persistent login")
if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected):
return login_auth_context.login_step.take_keeper_auth()
return None
def _ensure_server(self) -> str:
if not self._config.get().last_server:
print("Available server options:")
for region, host in KEEPER_PUBLIC_HOSTS.items():
print(f" {region}: {host}")
server = (
input("Enter server (default: keepersecurity.com): ").strip()
or "keepersecurity.com"
)
self._config.get().last_server = server
else:
server = self._config.get().last_server
return server
def _handle_device_approval(
self, step: login_auth.LoginStepDeviceApproval
) -> None:
step.send_push(login_auth.DeviceApprovalChannel.KeeperPush)
print(
"Device approval request sent. Login to existing vault/console or "
"ask admin to approve this device and then press return/enter to resume"
)
input()
step.resume()
def _handle_password(self, step: login_auth.LoginStepPassword) -> None:
password = getpass.getpass("Enter password: ")
step.verify_password(password)
def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None:
channels = [
x
for x in step.get_channels()
if x.channel_type != login_auth.TwoFactorChannel.Other
]
menu = []
for i, channel in enumerate(channels):
desc = self._two_factor_channel_desc(channel.channel_type)
menu.append(
(
str(i + 1),
f"{desc} {channel.channel_name} {channel.phone}",
)
)
menu.append(("q", "Quit authentication attempt and return to Commander prompt."))
lines = ["", "This account requires 2FA Authentication"]
lines.extend(f" {a}. {t}" for a, t in menu)
print("\n".join(lines))
while True:
selection = input("Selection: ")
if selection is None:
return
if selection in ("q", "Q"):
raise KeyboardInterrupt()
try:
assert selection.isnumeric()
idx = 1 if not selection else int(selection)
assert 1 <= idx <= len(channels)
channel = channels[idx - 1]
desc = self._two_factor_channel_desc(channel.channel_type)
print(f"Selected {idx}. {desc}")
except AssertionError:
print(
"Invalid entry, additional factors of authentication shown "
"may be configured if not currently enabled."
)
continue
if channel.channel_type in (
login_auth.TwoFactorChannel.TextMessage,
login_auth.TwoFactorChannel.KeeperDNA,
login_auth.TwoFactorChannel.DuoSecurity,
):
action = next(
(
x
for x in step.get_channel_push_actions(channel.channel_uid)
if x
in (
login_auth.TwoFactorPushAction.TextMessage,
login_auth.TwoFactorPushAction.KeeperDna,
)
),
None,
)
if action:
step.send_push(channel.channel_uid, action)
if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey:
try:
challenge = json.loads(channel.challenge)
signature = yubikey_authenticate(challenge, FidoCliInteraction())
if signature:
print("Verified Security Key.")
step.send_code(channel.channel_uid, signature)
return
except Exception as e:
logger.error(e)
continue
# 2FA code path
step.duration = min(step.duration, channel.max_expiration)
available_dura = sorted(
x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration
)
available_codes = [
_TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura
]
while True:
mfa_desc = self._two_factor_duration_desc(step.duration)
prompt_exp = (
f"\n2FA Code Duration: {mfa_desc}.\n"
f"To change duration: 2fa_duration={'|'.join(available_codes)}"
)
print(prompt_exp)
selection = input("\nEnter 2FA Code or Duration: ")
if not selection:
return
if selection in available_codes:
step.duration = self._two_factor_code_to_duration(selection)
elif selection.startswith("2fa_duration="):
code = selection[len("2fa_duration=") :]
if code in available_codes:
step.duration = self._two_factor_code_to_duration(code)
else:
print(f"Invalid 2FA duration: {code}")
else:
try:
step.send_code(channel.channel_uid, selection)
print("Successfully verified 2FA Code.")
return
except errors.KeeperApiError as kae:
print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}")
def _handle_sso_data_key(
self, step: login_auth.LoginStepSsoDataKey
) -> None:
menu = [
("1", "Keeper Push. Send a push notification to your device."),
("2", "Admin Approval. Request your admin to approve this device."),
("r", "Resume SSO authentication after device is approved."),
("q", "Quit SSO authentication attempt and return to Commander prompt."),
]
lines = ["Approve this device by selecting a method below:"]
lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu)
print("\n".join(lines))
while True:
answer = input("Selection: ")
if answer is None:
return
if answer == "q":
raise KeyboardInterrupt()
if answer == "r":
step.resume()
break
if answer in ("1", "2"):
step.request_data_key(
login_auth.DataKeyShareChannel.KeeperPush
if answer == "1"
else login_auth.DataKeyShareChannel.AdminApproval
)
else:
print(f'Action "{answer}" is not supported.')
def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None:
menu = [
("a", "SSO User with a Master Password."),
]
if pyperclip:
menu.append(("c", "Copy SSO Login URL to clipboard."))
else:
menu.append(("u", "Show SSO Login URL."))
try:
wb = webbrowser.get()
menu.append(("o", "Navigate to SSO Login URL with the default web browser."))
except Exception:
wb = None
if pyperclip:
menu.append(("p", "Paste SSO Token from clipboard."))
menu.append(("t", "Enter SSO Token manually."))
menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt."))
lines = [
"",
"SSO Login URL:",
step.sso_login_url,
"Navigate to SSO Login URL with your browser and complete authentication.",
"Copy a returned SSO Token into clipboard."
+ (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."),
'NOTE: To copy SSO Token please click "Copy authentication token" '
'button on "SSO Connect" page.',
"",
]
lines.extend(f" {a:>3}. {t}" for a, t in menu)
print("\n".join(lines))
while True:
token = input("Selection: ")
if token == "q":
raise KeyboardInterrupt()
if token == "a":
step.login_with_password()
return
if token == "c":
token = None
if pyperclip:
try:
pyperclip.copy(step.sso_login_url)
print("SSO Login URL is copied to clipboard.")
except Exception:
print("Failed to copy SSO Login URL to clipboard.")
else:
print("Clipboard not available (install pyperclip).")
elif token == "u":
token = None
if not pyperclip:
print("\nSSO Login URL:", step.sso_login_url, "\n")
else:
print("Unsupported menu option (use 'c' to copy URL).")
elif token == "o":
token = None
if wb:
try:
wb.open_new_tab(step.sso_login_url)
except Exception:
print("Failed to open web browser.")
elif token == "p":
if pyperclip:
try:
token = pyperclip.paste()
except Exception:
token = ""
print("Failed to paste from clipboard")
else:
token = None
print("Clipboard not available (use 't' to enter token manually).")
elif token == "t":
token = getpass.getpass("Enter SSO Token: ").strip()
else:
if len(token) < 10:
print(f"Unsupported menu option: {token}")
continue
if token:
try:
step.set_sso_token(token)
break
except errors.KeeperApiError as kae:
print(f"SSO Login error: ({kae.result_code}) {kae.message}")
@staticmethod
def _two_factor_channel_desc(
channel_type: login_auth.TwoFactorChannel,
) -> str:
return {
login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)",
login_auth.TwoFactorChannel.TextMessage: "Send SMS Code",
login_auth.TwoFactorChannel.DuoSecurity: "DUO",
login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID",
login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)",
login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)",
login_auth.TwoFactorChannel.Backup: "Backup Code",
}.get(channel_type, "Not Supported")
@staticmethod
def _two_factor_duration_desc(
duration: login_auth.TwoFactorDuration,
) -> str:
return {
login_auth.TwoFactorDuration.EveryLogin: "Require Every Login",
login_auth.TwoFactorDuration.Forever: "Save on this Device Forever",
login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours",
login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours",
login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days",
}.get(duration, "Require Every Login")
@staticmethod
def _two_factor_code_to_duration(
text: str,
) -> login_auth.TwoFactorDuration:
for dura, code in _TWO_FACTOR_DURATION_CODES.items():
if code == text:
return dura
return login_auth.TwoFactorDuration.EveryLogin
def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None:
"""
Enable persistent login and register data key for device.
Sets persistent_login to on and logout_timer to 30 days.
"""
keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1')
keeper_auth.register_data_key_for_device(keeper_auth_context)
mins_per_day = 60 * 24
timeout_in_minutes = mins_per_day * 30 # 30 days
keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes))
print("Persistent login turned on successfully and device registered")
def login():
"""
Handle the login process including server selection, authentication,
and multi-factor authentication steps (device approval, password, 2FA
with channel selection and Security Key, SSO data key, SSO token).
Returns:
tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails.
"""
flow = LoginFlow()
keeper_auth_context = flow.run()
if keeper_auth_context and not flow.logged_in_with_persistent:
enable_persistent_login(keeper_auth_context)
keeper_endpoint = flow.endpoint if keeper_auth_context else None
return keeper_auth_context, keeper_endpoint
def list_records(keeper_auth_context: keeper_auth.KeeperAuth) -> None:
"""
List all records in the vault.
Args:
keeper_auth_context: The authenticated Keeper context.
"""
conn = sqlite3.Connection("file::memory:", uri=True)
vault_storage = sqlite_storage.SqliteVaultStorage(
lambda: conn,
vault_owner=bytes(keeper_auth_context.auth_context.username, "utf-8"),
)
vault = vault_online.VaultOnline(keeper_auth_context, vault_storage)
vault.sync_down()
print("Vault Records:")
print("-" * 50)
for record in vault.vault_data.records():
print(f"Title: {record.title}")
print(f"Record UID: {record.record_uid}")
print(f"Record Type: {record.record_type}")
if record.version == 2:
legacy_record = vault.vault_data.load_record(record.record_uid)
if isinstance(legacy_record, vault_record.PasswordRecord):
print(f"Username: {legacy_record.login}")
print(f"URL: {legacy_record.link}")
print("-" * 50)
vault.close()
keeper_auth_context.close()
def main() -> None:
"""Run login and list all vault records."""
keeper_auth_context, _ = login()
if keeper_auth_context:
list_records(keeper_auth_context)
else:
print("Login failed. Unable to list records.")
if __name__ == "__main__":
main()
Important Security Notes:
- Never hardcode credentials in production code
- Always implement proper two-factor authentication
- Use device approval flows for enhanced security
- Consider using environment variables or secure vaults for credential management
Contributing
We welcome contributions from the community! Please feel free to submit pull requests, report issues, or suggest enhancements through our GitHub repository.
License
This project is licensed under the MIT License. See the LICENSE file for details.
Support
For support, documentation, and additional resources:
- Documentation: Keeper Security Developer Portal
- Support: Keeper Security Support
- Community: Keeper Security GitHub
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file keepersdk-1.2.2-py3-none-any.whl.
File metadata
- Download URL: keepersdk-1.2.2-py3-none-any.whl
- Upload date:
- Size: 671.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
135a8c06ac50baf346abf542129ed2ef42cc5a13e35399c07f9ec3860c53ea90
|
|
| MD5 |
728aa2b9dfbf683ac1002dbd510f80a4
|
|
| BLAKE2b-256 |
26d3296ba747f0bd6686cdebf62e87ddef6f7a6ad22d3b6bb9d7f99f7a7f728c
|