From 4c8879210c5b844df31464c4dd1d769571452cd3 Mon Sep 17 00:00:00 2001 From: gaima8 <7595658+gaima8@users.noreply.github.com> Date: Thu, 23 Nov 2023 22:15:08 +0000 Subject: [PATCH 1/2] feat: Enable using only the SSH agent if asked. --- lib/jnpr/junos/device.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/lib/jnpr/junos/device.py b/lib/jnpr/junos/device.py index a4d09a5de..421a6eeb1 100644 --- a/lib/jnpr/junos/device.py +++ b/lib/jnpr/junos/device.py @@ -1219,6 +1219,12 @@ def __init__(self, *vargs, **kvargs): *OPTIONAL* To disable public key authentication. default is ``None``. + :param bool allow_agent: + *OPTIONAL* If ``True`` then the SSH config file is not parsed by PyEZ + and passed down to ncclient. If ``False`` then the SSH config file will + be parsed by PyEZ. If option is not provided will fallback to default + behavior. This option is passed down to the ncclient as is, if it is + present in the kwargs. """ # ---------------------------------------- @@ -1237,6 +1243,7 @@ def __init__(self, *vargs, **kvargs): self._huge_tree = kvargs.get("huge_tree", False) self._conn_open_timeout = kvargs.get("conn_open_timeout", 30) self._look_for_keys = kvargs.get("look_for_keys", None) + self._allow_agent = kvargs.get('allow_agent', False) if self._fact_style != "new": warnings.warn( "fact-style %s will be removed in a future " @@ -1273,9 +1280,12 @@ def __init__(self, *vargs, **kvargs): self._auth_user = ( kvargs.get("user") or self._conf_auth_user or self._auth_user ) - self._ssh_private_key_file = ( - kvargs.get("ssh_private_key_file") or self._conf_ssh_private_key_file - ) + if self._allow_agent: + self._ssh_private_key_file = kvargs.get('ssh_private_key_file') + else: + self._ssh_private_key_file = ( + kvargs.get("ssh_private_key_file") or self._conf_ssh_private_key_file + ) self._auth_password = kvargs.get("password") or kvargs.get("passwd") # ----------------------------- @@ -1354,14 +1364,16 @@ def open(self, *vargs, **kvargs): try: ts_start = datetime.datetime.now() - # we want to enable the ssh-agent if-and-only-if we are + # enable the ssh-agent if asked, or if we are # not given a password or an ssh key file. # in this condition it means we want to query the agent # for available ssh keys - - allow_agent = bool( - (self._auth_password is None) and (self._ssh_private_key_file is None) - ) + if self._allow_agent is False: + allow_agent = bool( + (self._auth_password is None) and (self._ssh_private_key_file is None) + ) + else: + allow_agent = self._allow_agent # option to disable ncclient transport ssh authentication # using public keys look_for_keys=False From 389a285c6d5fc65c7b8252abe886b20b36ca117a Mon Sep 17 00:00:00 2001 From: gaima8 <7595658+gaima8@users.noreply.github.com> Date: Wed, 29 Nov 2023 22:26:38 +0000 Subject: [PATCH 2/2] add tests? --- lib/jnpr/junos/device.py | 1 + tests/unit/test_device.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/lib/jnpr/junos/device.py b/lib/jnpr/junos/device.py index 421a6eeb1..5caef714e 100644 --- a/lib/jnpr/junos/device.py +++ b/lib/jnpr/junos/device.py @@ -1225,6 +1225,7 @@ def __init__(self, *vargs, **kvargs): be parsed by PyEZ. If option is not provided will fallback to default behavior. This option is passed down to the ncclient as is, if it is present in the kwargs. + default is ``False``. """ # ---------------------------------------- diff --git a/tests/unit/test_device.py b/tests/unit/test_device.py index 2a3f9a0a2..ddda04717 100644 --- a/tests/unit/test_device.py +++ b/tests/unit/test_device.py @@ -494,6 +494,36 @@ def test_device_open_with_look_for_keys_True(self, mock_connect, mock_execute): self.dev2.open() self.assertEqual(self.dev2.connected, True) + @patch("ncclient.manager.connect") + @patch("jnpr.junos.Device.execute") + def test_device_open_with_allow_agent_False(self, mock_connect, mock_execute): + with patch("jnpr.junos.utils.fs.FS.cat") as mock_cat: + mock_cat.return_value = """ + + domain jls.net + + """ + mock_connect.side_effect = self._mock_manager + mock_execute.side_effect = self._mock_manager + self.dev2 = Device(host="2.2.2.2", user="test", password="password123", allow_agent=False) + self.dev2.open() + self.assertEqual(self.dev2.connected, True) + + @patch("ncclient.manager.connect") + @patch("jnpr.junos.Device.execute") + def test_device_open_with_allow_agent_True(self, mock_connect, mock_execute): + with patch("jnpr.junos.utils.fs.FS.cat") as mock_cat: + mock_cat.return_value = """ + + domain jls.net + + """ + mock_connect.side_effect = self._mock_manager + mock_execute.side_effect = self._mock_manager + self.dev2 = Device(host="2.2.2.2", user="test", password="password123", allow_agent=True) + self.dev2.open() + self.assertEqual(self.dev2.connected, True) + @patch("ncclient.manager.connect") @patch("jnpr.junos.Device.execute") def test_device_outbound(self, mock_connect, mock_execute):