Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable using only the SSH agent if asked. #1284

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions lib/jnpr/junos/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,13 @@ def __init__(self, *vargs, **kvargs):
*OPTIONAL* To disable public key authentication.
default is ``None``.

:param bool allow_agent:
*OPTIONAL* If ``True`` then the SSH config file is not parsed by PyEZ
and passed down to ncclient. If ``False`` then the SSH config file will
be parsed by PyEZ. If option is not provided will fallback to default
behavior. This option is passed down to the ncclient as is, if it is
present in the kwargs.
default is ``False``.
"""

# ----------------------------------------
Expand All @@ -1237,6 +1244,7 @@ def __init__(self, *vargs, **kvargs):
self._huge_tree = kvargs.get("huge_tree", False)
self._conn_open_timeout = kvargs.get("conn_open_timeout", 30)
self._look_for_keys = kvargs.get("look_for_keys", None)
self._allow_agent = kvargs.get('allow_agent', False)
if self._fact_style != "new":
warnings.warn(
"fact-style %s will be removed in a future "
Expand Down Expand Up @@ -1273,9 +1281,12 @@ def __init__(self, *vargs, **kvargs):
self._auth_user = (
kvargs.get("user") or self._conf_auth_user or self._auth_user
)
self._ssh_private_key_file = (
kvargs.get("ssh_private_key_file") or self._conf_ssh_private_key_file
)
if self._allow_agent:
self._ssh_private_key_file = kvargs.get('ssh_private_key_file')
else:
self._ssh_private_key_file = (
kvargs.get("ssh_private_key_file") or self._conf_ssh_private_key_file
)
self._auth_password = kvargs.get("password") or kvargs.get("passwd")

# -----------------------------
Expand Down Expand Up @@ -1354,14 +1365,16 @@ def open(self, *vargs, **kvargs):
try:
ts_start = datetime.datetime.now()

# we want to enable the ssh-agent if-and-only-if we are
# enable the ssh-agent if asked, or if we are
# not given a password or an ssh key file.
# in this condition it means we want to query the agent
# for available ssh keys

allow_agent = bool(
(self._auth_password is None) and (self._ssh_private_key_file is None)
)
if self._allow_agent is False:
allow_agent = bool(
(self._auth_password is None) and (self._ssh_private_key_file is None)
)
else:
allow_agent = self._allow_agent

# option to disable ncclient transport ssh authentication
# using public keys look_for_keys=False
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,36 @@ def test_device_open_with_look_for_keys_True(self, mock_connect, mock_execute):
self.dev2.open()
self.assertEqual(self.dev2.connected, True)

@patch("ncclient.manager.connect")
@patch("jnpr.junos.Device.execute")
def test_device_open_with_allow_agent_False(self, mock_connect, mock_execute):
with patch("jnpr.junos.utils.fs.FS.cat") as mock_cat:
mock_cat.return_value = """

domain jls.net

"""
mock_connect.side_effect = self._mock_manager
mock_execute.side_effect = self._mock_manager
self.dev2 = Device(host="2.2.2.2", user="test", password="password123", allow_agent=False)
self.dev2.open()
self.assertEqual(self.dev2.connected, True)

@patch("ncclient.manager.connect")
@patch("jnpr.junos.Device.execute")
def test_device_open_with_allow_agent_True(self, mock_connect, mock_execute):
with patch("jnpr.junos.utils.fs.FS.cat") as mock_cat:
mock_cat.return_value = """

domain jls.net

"""
mock_connect.side_effect = self._mock_manager
mock_execute.side_effect = self._mock_manager
self.dev2 = Device(host="2.2.2.2", user="test", password="password123", allow_agent=True)
self.dev2.open()
self.assertEqual(self.dev2.connected, True)

@patch("ncclient.manager.connect")
@patch("jnpr.junos.Device.execute")
def test_device_outbound(self, mock_connect, mock_execute):
Expand Down