Skip to content

Commit

Permalink
Dev: corosync: add subcommands 'crm corosync link ...' for managing m…
Browse files Browse the repository at this point in the history
…ulti-links in knet (jsc#PED-8083) (ClusterLabs#1471)

Knet transport in corosync provides flexible configurations of
multi-link connectivity. This pull request adds some subcommands to
manage these configurations, including:

1. `crm corosync link show` to list configured links
2. `crm corosync link update <linknumber> [<nodename>=<new_address> ...
] [options <option>=[value] ...]` to modify an existing link
3. `crm corosync link add <nodename>=<new_address> ... [options
<option>=[value] ...]` to add a new link.
4. `crm corosync link remove <linknumber>` to remove an existing link.
  • Loading branch information
nicholasyang2022 authored Sep 6, 2024
2 parents 6f44c70 + 30eb55e commit af15166
Show file tree
Hide file tree
Showing 15 changed files with 1,520 additions and 311 deletions.
7 changes: 3 additions & 4 deletions crmsh/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class Context(object):
Context object used to avoid having to pass these variables
to every bootstrap method.
"""
MAX_LINK_NUM = 8
DEFAULT_PROFILE_NAME = "default"
KNET_DEFAULT_PROFILE_NAME = "knet-default"
S390_PROFILE_NAME = "s390"
Expand Down Expand Up @@ -205,8 +204,8 @@ def _validate_network_options(self):
Validation.valid_admin_ip(self.admin_ip)
if self.type == "init" and self.transport != "knet" and len(self.nic_addr_list) > 1:
utils.fatal(f"Only one link is allowed for the '{self.transport}' transport type")
if len(self.nic_addr_list) > self.MAX_LINK_NUM:
utils.fatal(f"Maximum number of interfaces is {self.MAX_LINK_NUM}")
if len(self.nic_addr_list) > corosync.KNET_LINK_NUM_LIMIT:
utils.fatal(f"Maximum number of interfaces is {corosync.KNET_LINK_NUM_LIMIT}")
if self.transport == "udp":
cloud_type = utils.detect_cloud()
if cloud_type:
Expand Down Expand Up @@ -1330,7 +1329,7 @@ def get_address_list() -> typing.List[str]:
loop_count = len(_context.default_ip_list)
else:
# interative mode and without -i option specified
loop_count = min(Context.MAX_LINK_NUM, len(_context.interfaces_inst.nic_list))
loop_count = min(corosync.KNET_LINK_NUM_LIMIT, len(_context.interfaces_inst.nic_list))

ringXaddr_list = []
for i in range(loop_count):
Expand Down
316 changes: 312 additions & 4 deletions crmsh/corosync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
Functions that abstract creating and editing the corosync.conf
configuration file, and also the corosync-* utilities.
'''

import dataclasses
import ipaddress
import itertools
import os
import re
import typing
Expand Down Expand Up @@ -40,6 +42,7 @@
timestamp: on
}
"""
KNET_LINK_NUM_LIMIT = 8


def is_knet() -> bool:
Expand Down Expand Up @@ -367,9 +370,8 @@ class ConfParser(object):
"""
COROSYNC_KNOWN_SEC_NAMES_WITH_LIST = {("totem", "interface"), ("nodelist", "node")}

def __init__(self, config_file=None, config_data=None, sec_names_with_list=()):
def __init__(self, config_file=None, config_data=None):
self._config_file = config_file
self._sec_names_with_list = set(sec_names_with_list) if sec_names_with_list else self.COROSYNC_KNOWN_SEC_NAMES_WITH_LIST
if config_data is not None:
self._dom = corosync_config_format.DomParser(StringIO(config_data)).dom()
else:
Expand Down Expand Up @@ -436,7 +438,7 @@ def _raw_set(self, path, value, index):
else:
match node[key]:
case dict(_) as next_node:
if index > 0 and path_stack in self._sec_names_with_list:
if index > 0 and path_stack in self.COROSYNC_KNOWN_SEC_NAMES_WITH_LIST:
if index == 1:
new_node = dict()
node[key] = [next_node, new_node]
Expand Down Expand Up @@ -512,3 +514,309 @@ def remove_key(cls, path, index=0):
inst = cls()
inst.remove(path, index)
inst.save()

@classmethod
def transform_dom_with_list_schema(cls, dom):
# ensure every multi-value section is populated as a list if existing
query = corosync_config_format.DomQuery(dom)
for item in cls.COROSYNC_KNOWN_SEC_NAMES_WITH_LIST:
try:
parent = query.get(item[:-1])
node = parent[item[-1]]
if not isinstance(node, list):
parent[item[-1]] = [node]
except KeyError:
pass


@dataclasses.dataclass
class LinkNode:
nodeid: int
name: str
addr: str


@dataclasses.dataclass
class Link:
linknumber: int = -1
nodes: list[LinkNode] = dataclasses.field(default_factory=list)
mcastport: typing.Optional[int] = None
knet_link_priority: typing.Optional[int] = None
knet_ping_interval: typing.Optional[int] = None
knet_ping_timeout: typing.Optional[int] = None
knet_ping_precision: typing.Optional[int] = None
knet_pong_count: typing.Optional[int] = None
knet_transport: typing.Optional[str] = None
# UDP only
# bindnet_addr: typing.Optional[str] = None
# broadcast: typing.Optional[bool] = None
# mcastaddr: typing.Optional[str] = None
# ttl: typing.Optional[int] = None

def load_options(self, options: dict[str, str]):
for field in dataclasses.fields(self):
if field.name == 'nodes':
continue
self.__load_option(options, field)
return self

def __load_option(self, data: dict[str, str], field: dataclasses.Field):
try:
value = data[field.name]
except KeyError:
return
if value is None:
assert field.name not in {'linknumber', 'nodes'}
setattr(self, field.name, None)
return
if typing.get_origin(field.type) is typing.Union: # Optional[A] is Union[A, NoneType]
match typing.get_args(field.type):
case type_arg, NoneType:
tpe = type_arg
case _:
assert False
else:
tpe = field.type
if tpe is not str:
value = tpe(value)
setattr(self, field.name, value)


class LinkManager:
class LinkManageException(Exception):
pass

@dataclasses.dataclass
class MissingNodesException(LinkManageException):
nodeids: list[int]

@dataclasses.dataclass
class DuplicatedNodeAddressException(LinkManageException):
address: str
node1: int
node2: int

LINK_OPTIONS_UPDATABLE = {
field.name
for field in dataclasses.fields(Link)
if field.name not in {'linknumber', 'nodes'}
}

def __init__(self, config: dict):
self._config = config

@staticmethod
def load_config_file(path=None):
if not path:
path = conf()
try:
with open(path, 'r', encoding='utf-8') as f:
dom = corosync_config_format.DomParser(f).dom()
ConfParser.transform_dom_with_list_schema(dom)
return LinkManager(dom)
except (OSError, corosync_config_format.ParserException) as e:
raise ValueError(str(e)) from None

@staticmethod
def write_config_file(dom, path=None, file_mode=0o644):
if not path:
path = conf()
with utils.open_atomic(path, 'w', fsync=True, encoding='utf-8') as f:
corosync_config_format.DomSerializer(dom, f)
os.fchmod(f.fileno(), file_mode)

def totem_transport(self):
try:
return self._config['totem']['transport']
except KeyError:
return 'knet'

def links(self) -> list[typing.Optional[Link]]:
"""Returns a list of links, indexed by linknumber.
The length of returned list is always KNET_LINK_NUM_LIMIT.
If a link with certain linknumber does not exist, the corresponding list item is None."""
assert self.totem_transport() == 'knet'
try:
nodelist = self._config['nodelist']['node']
except KeyError:
return list()
assert isinstance(nodelist, list)
assert nodelist
assert all('nodeid' in node for node in nodelist)
assert all('name' in node for node in nodelist)
ids = [int(node['nodeid']) for node in nodelist]
names = [node['name'] for node in nodelist]
links: list[typing.Optional[Link]] = [None] * KNET_LINK_NUM_LIMIT
for i in range(KNET_LINK_NUM_LIMIT):
# enumerate ringX_addr for X = 0, 1, ...
# each ringX_addr is corresponding to a link
if f'ring{i}_addr' not in nodelist[0]:
continue
# If the link exists, load the ringX_addr of all nodes on this link
addrs = [node[f'ring{i}_addr'] for node in nodelist]
assert len(addrs) == len(ids) # both nodeid and ringX_address are required for every node
link_nodes = [LinkNode(*x) for x in zip(ids, names, addrs)]
link_nodes.sort(key=lambda node: node.nodeid)
link = Link()
link.linknumber = i
link.nodes = link_nodes
links[i] = link
try:
interfaces = self._config['totem']['interface']
except KeyError:
return links
assert isinstance(interfaces, list)
links_option_dict = {ln: x for ln, x in ((Link().load_options(x).linknumber, x) for x in interfaces)}
return [
link.load_options(links_option_dict[i]) if link is not None and i in links_option_dict else link
for i, link in enumerate(links)
]

def update_link(self, linknumber: int, options: dict[str, str|None]) -> dict:
"""update link options
Parameters:
* linknumber: the link to update
* options: specify the options to update. Not specified options will not be changed.
Specify None value will reset the option to its default value.
Returns: updated configuration dom. The internal state of LinkManager is also updated.
"""
links = self.links()
if links[linknumber] is None:
raise ValueError(f'Link {linknumber} does not exist.')
if 'nodes' in options:
raise ValueError('Unknown option "nodes".')
for option in options:
if option not in self.LINK_OPTIONS_UPDATABLE:
raise ValueError('Updating option "{}" is not supported. Updatable options: {}'.format(
option,
', '.join(self.LINK_OPTIONS_UPDATABLE),
))
links[linknumber].load_options(options)
assert 'totem' in self._config
try:
interfaces = self._config['totem']['interface']
assert isinstance(interfaces, list)
except KeyError:
interfaces = list()
linknumber_str = str(linknumber)
interface_index = next((i for i, x in enumerate(interfaces) if x.get('linknumber', -1) == linknumber_str), -1)
if interface_index == -1:
interface = {'linknumber': linknumber_str}
else:
interface = interfaces[interface_index]
for k, v in dataclasses.asdict(links[linknumber]).items():
if k not in self.LINK_OPTIONS_UPDATABLE:
continue
if v is None:
interface.pop(k, None)
else:
interface[k] = str(v)
if len(interface) == 1:
assert 'linknumber' in interface
if interface_index != -1:
del interfaces[interface_index]
# else do nothing
else:
if interface_index == -1:
interfaces.append(interface)
if not interfaces and 'interface' in self._config['totem']:
del self._config['totem']['interface']
else:
self._config['totem']['interface'] = interfaces
return self._config

def update_node_addr(self, linknumber: int, node_addresses: typing.Mapping[int, str]) -> dict:
"""Update the network addresses of the specified nodes on the specified link.
Parameters:
* linknumber: the link to update
* node_addresses: a mapping of nodeid->addr
Returns: updated configuration dom. The internal state of LinkManager is also updated.
"""
links = self.links()
if links[linknumber] is None:
raise ValueError(f'Link {linknumber} does not exist.')
return self.__upsert_node_addr_impl(self._config, links, linknumber, node_addresses)

@staticmethod
def __upsert_node_addr_impl(
config: dict, links: typing.Sequence[Link],
linknumber: int, node_addresses: typing.Mapping[int, str],
) -> dict:
"""Add a new link or updating the node addresses in an existing link.
Args:
config: [in/out] the configuration dom
links: [in] parsed link data
linknumber: [in] the linknunmber to add or update
node_addresses: [in] a mapping from nodeid to node address.
Returns:
a reference to in/out arg `config`
"""
existing_addr_node_map = {
utils.IP(node.addr).ip_address: node.nodeid
for link in links if link is not None
for node in link.nodes
if node.addr != ''
}
for nodeid, addr in node_addresses.items():
found = next((node for node in links[linknumber].nodes if node.nodeid == nodeid), None)
if found is None:
raise ValueError(f'Unknown nodeid {nodeid}.')
canonical_addr = utils.IP(addr).ip_address
if (
found.addr == '' # adding a new addr
or utils.IP(found.addr).ip_address != canonical_addr # updating a addr and the new value is not the same as the old value
):
# need to change uniqueness
existing = existing_addr_node_map.get(canonical_addr, None)
if existing is not None:
raise LinkManager.DuplicatedNodeAddressException(addr, nodeid, existing)
found.addr = addr
existing_addr_node_map[canonical_addr] = found.nodeid
nodes = config['nodelist']['node']
assert isinstance(nodes, list)
for node in nodes:
updated_addr = node_addresses.get(int(node['nodeid']), None)
if updated_addr is not None:
node[f'ring{linknumber}_addr'] = updated_addr
return config

def add_link(self, node_addresses: typing.Mapping[int, str], options: dict[str, str|None]) -> dict:
links = self.links()
next_linknumber = next((i for i, link in enumerate(links) if link is None), -1)
if next_linknumber == -1:
raise ValueError(f'Cannot add a new link. The maximum number of links supported is {KNET_LINK_NUM_LIMIT}.')
nodes = next(x for x in links if x is not None).nodes
unspecified_nodes = [node.nodeid for node in nodes if node.nodeid not in node_addresses]
if unspecified_nodes:
raise self.MissingNodesException(unspecified_nodes)
links[next_linknumber] = Link(next_linknumber, [dataclasses.replace(node, addr='') for node in nodes])
self.__upsert_node_addr_impl(self._config, links, next_linknumber, node_addresses)
return self.update_link(next_linknumber, options)

def remove_link(self, linknumber: int) -> dict:
"""Remove the specified link.
Parameters:
* linknumber: the link to update
Returns: updated configuration dom. The internal state of LinkManager is also updated.
"""
links = self.links()
if links[linknumber] is None:
raise ValueError(f'Link {linknumber} does not exist.')
if sum(1 if link is not None else 0 for link in links) <= 1:
raise ValueError(f'Cannot remove the last link.')
nodes = self._config['nodelist']['node']
assert isinstance(nodes, list)
for node in nodes:
del node[f'ring{linknumber}_addr']
assert 'totem' in self._config
if 'interface' not in self._config['totem']:
return self._config
interfaces = self._config['totem']['interface']
assert isinstance(interfaces, list)
interfaces = [interface for interface in interfaces if int(interface['linknumber']) != linknumber]
self._config['totem']['interface'] = interfaces
return self._config
Loading

0 comments on commit af15166

Please sign in to comment.