Files
Vlastislav Svatek 673e67106e
Some checks failed
ci / deploy (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (javascript-typescript) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
first commit
2026-06-05 10:39:05 +02:00

548 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
from django.conf import settings as django_settings
from django.shortcuts import get_object_or_404, render
from netbox.views import generic
from netbox_librenms_plugin.forms import AddToLIbreSNMPV1V2, AddToLIbreSNMPV3
from netbox_librenms_plugin.import_utils import _determine_device_name
from netbox_librenms_plugin.import_utils.virtual_chassis import _generate_vc_member_name
from netbox_librenms_plugin.utils import (
get_interface_name_field,
get_librenms_device_id,
get_librenms_sync_device,
match_librenms_hardware_to_device_type,
resolve_naming_preferences,
)
from netbox_librenms_plugin.views.mixins import LibreNMSAPIMixin, LibreNMSPermissionMixin
class BaseLibreNMSSyncView(LibreNMSPermissionMixin, LibreNMSAPIMixin, generic.ObjectListView):
"""
Base view for LibreNMS sync information.
"""
queryset = None # Will be set in subclasses
model = None # Will be set in subclasses
tab = None # Will be set in subclasses
template_name = "netbox_librenms_plugin/librenms_sync_base.html"
def get(self, request, pk, context=None):
"""Handle GET request for the LibreNMS sync view."""
obj = get_object_or_404(self.model, pk=pk)
# For Virtual Chassis members, always delegate to get_librenms_sync_device() so
# self._librenms_lookup_device and self.librenms_id are consistent with the
# helper-based VC status computed in get_context_data(). A legacy bare-int mapping
# on the viewed member must not shadow an explicit per-server mapping on another
# member — get_librenms_sync_device() applies the full priority order.
librenms_lookup_device = obj
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
sync_device = get_librenms_sync_device(obj, server_key=self.librenms_api.server_key)
if sync_device:
librenms_lookup_device = sync_device
# Store for use in get_context_data (badge generation needs the same object)
self._librenms_lookup_device = librenms_lookup_device
# Get librenms_id using the determined lookup device
self.librenms_id = self.librenms_api.get_librenms_id(librenms_lookup_device)
context = self.get_context_data(request, obj)
return render(request, self.template_name, context)
def get_context_data(self, request, obj):
"""Get the context data for the LibreNMS sync view."""
# Get context from parent classes (including LibreNMSAPIMixin)
context = super().get_context_data()
# Add our specific context
context.update(
{
"object": obj,
"tab": self.tab,
"has_librenms_id": bool(self.librenms_id),
}
)
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
# Use helper function to determine the sync device
librenms_sync_device = get_librenms_sync_device(obj, server_key=self.librenms_api.server_key)
# Determine sync device status
sync_device_has_librenms_id = False
sync_device_has_primary_ip = False
if librenms_sync_device:
sync_device_has_librenms_id = (
get_librenms_device_id(librenms_sync_device, self.librenms_api.server_key, auto_save=False)
is not None
)
sync_device_has_primary_ip = bool(librenms_sync_device.primary_ip)
context.update(
{
"is_vc_member": True,
"sync_device_has_primary_ip": sync_device_has_primary_ip,
"librenms_sync_device": librenms_sync_device,
"sync_device_has_librenms_id": sync_device_has_librenms_id,
}
)
librenms_info = self.get_librenms_device_info(obj, request)
interface_context = self.get_interface_context(request, obj)
cable_context = self.get_cable_context(request, obj)
ip_context = self.get_ip_context(request, obj)
vlan_context = self.get_vlan_context(request, obj)
module_context = self.get_module_context(request, obj)
interface_name_field = get_interface_name_field(request)
# Get platform info for display and sync
platform_info = self._get_platform_info(librenms_info, obj)
# Get manufacturers for platform creation modal
from dcim.models import Manufacturer
manufacturers = Manufacturer.objects.all().order_by("name")
# Detect legacy bare-int librenms_id format for conversion badge
_lookup_device = getattr(self, "_librenms_lookup_device", obj)
_raw_cf = _lookup_device.cf.get("librenms_id") if _lookup_device else None
librenms_id_is_legacy = (isinstance(_raw_cf, int) and not isinstance(_raw_cf, bool)) or (
isinstance(_raw_cf, str) and _raw_cf.isdigit()
)
# Determine if serial match allows legacy ID conversion.
# VMs have no serial field in NetBox; skip the gate so the Convert ID button is enabled.
_librenms_serial = librenms_info["librenms_device_details"].get("librenms_device_serial", "-")
_netbox_serial = getattr(_lookup_device, "serial", "") or ""
_lookup_is_vm = _lookup_device._meta.model_name == "virtualmachine" if _lookup_device else False
librenms_id_serial_confirmed = _lookup_is_vm or bool(
_librenms_serial and _librenms_serial != "-" and _netbox_serial and _librenms_serial == _netbox_serial
)
context.update(
{
"interface_sync": interface_context,
"cable_sync": cable_context,
"ip_sync": ip_context,
"vlan_sync": vlan_context,
"module_sync": module_context,
"v1v2form": AddToLIbreSNMPV1V2(prefix="v1v2"),
"v3form": AddToLIbreSNMPV3(prefix="v3"),
"librenms_device_id": self.librenms_id,
"found_in_librenms": librenms_info.get("found_in_librenms"),
"librenms_device_details": librenms_info.get("librenms_device_details"),
"mismatched_device": librenms_info.get("mismatched_device"),
**librenms_info["librenms_device_details"],
"interface_name_field": interface_name_field,
"platform_info": platform_info,
"vc_inventory_serials": librenms_info["librenms_device_details"].get("vc_inventory_serials", []),
"manufacturers": manufacturers,
"all_server_mappings": self._build_all_server_mappings(_lookup_device, self.librenms_api.server_key),
"librenms_id_is_legacy": librenms_id_is_legacy,
"librenms_id_serial_confirmed": librenms_id_serial_confirmed,
# Lookup device may differ from object (e.g. VC master vs member).
# Used by the Remove server mapping form to post to the correct device.
"lookup_device_pk": _lookup_device.pk if _lookup_device else obj.pk,
"lookup_device_model_name": (
_lookup_device._meta.model_name if _lookup_device else obj._meta.model_name
),
"object_model_name": obj._meta.model_name,
}
)
return context
@staticmethod
def _build_all_server_mappings(obj, active_server_key):
"""
Build a list of all LibreNMS server mappings for the given device.
Each entry describes one server<->ID mapping stored in the ``librenms_id``
custom field:
* ``server_key`` the key as stored in the CF dict.
* ``display_name`` human-readable name from PLUGINS_CONFIG, or the key.
* ``librenms_url`` base URL of that server (``None`` when not configured).
* ``device_id`` the integer device ID on that server.
* ``device_url`` direct URL to the device page on that server (or ``None``).
* ``is_configured`` True when the server key exists in current plugin config.
* ``is_active`` True when this is the currently active server.
Returns ``None`` for legacy bare-int format (no per-server info to show)
and ``None`` when the CF is absent/invalid.
"""
cf_value = obj.custom_field_data.get("librenms_id")
if not isinstance(cf_value, dict) or not cf_value:
return None
plugins_cfg = getattr(django_settings, "PLUGINS_CONFIG", {}).get("netbox_librenms_plugin", {})
servers_config = plugins_cfg.get("servers") or {}
if not isinstance(servers_config, dict):
servers_config = {}
result = []
for sk, did in cf_value.items():
# Validate device ID — accept int or digit-string, skip bool/None/junk.
if isinstance(did, bool) or did is None:
continue
if isinstance(did, str):
if not did.isdigit():
continue
did = int(did)
elif not isinstance(did, int):
continue
srv_cfg = servers_config.get(sk)
# Legacy single-server config: "default" key with no matching servers entry —
# fall back to root-level librenms_url/display_name in plugins_cfg.
if srv_cfg is None and sk == "default":
legacy_url = plugins_cfg.get("librenms_url")
if legacy_url:
srv_cfg = {
"librenms_url": legacy_url,
"display_name": plugins_cfg.get("display_name") or f"Default Server ({legacy_url})",
}
is_configured = srv_cfg is not None
# Treat malformed (non-dict) server config entries as unconfigured
if srv_cfg is not None and not isinstance(srv_cfg, dict):
srv_cfg = None
is_configured = False
librenms_url = srv_cfg.get("librenms_url") if srv_cfg else None
display_name = (srv_cfg.get("display_name") or sk) if srv_cfg else sk
device_url = f"{librenms_url}/device/device={did}/" if librenms_url else None
result.append(
{
"server_key": sk,
"display_name": display_name,
"librenms_url": librenms_url,
"device_id": did,
"device_url": device_url,
"is_configured": is_configured,
"is_active": sk == active_server_key,
}
)
# Sort: active first, then configured, then orphaned
result.sort(key=lambda e: 0 if e["is_active"] else (1 if e["is_configured"] else 2))
return result or None
def get_librenms_device_info(self, obj, request=None):
"""Get the LibreNMS device information for the given object."""
found_in_librenms = False
mismatched_device = False
librenms_device_details = {
"librenms_device_url": None,
"librenms_device_hardware": "-",
"librenms_device_serial": "-",
"librenms_device_os": "-",
"librenms_device_version": "-",
"librenms_device_features": "-",
"librenms_device_location": "-",
"librenms_device_hardware_match": None,
"vc_inventory_serials": [],
}
if self.librenms_id:
success, device_info = self.librenms_api.get_device_info(self.librenms_id)
if success and device_info:
# Get NetBox device details
netbox_ip = str(obj.primary_ip.address.ip).lower() if obj.primary_ip else None
netbox_name = obj.name
# Get LibreNMS device details
librenms_sysname = device_info.get("sysName")
librenms_ip = device_info.get("ip")
# Extract new fields
hardware = device_info.get("hardware", "-")
serial = device_info.get("serial", "-")
os_name = device_info.get("os", "-")
version = device_info.get("version", "-")
features = device_info.get("features", "-")
# Try to match hardware to NetBox DeviceType
hardware_match = match_librenms_hardware_to_device_type(hardware)
# Compute resolved name using naming preferences
resolved_name = None
if request:
use_sysname, strip_domain = resolve_naming_preferences(request)
resolved_name = _determine_device_name(
device_info,
use_sysname=use_sysname,
strip_domain=strip_domain,
device_id=self.librenms_id,
)
# For VC members, generate the expected VC member name
if (
resolved_name
and hasattr(obj, "virtual_chassis")
and obj.virtual_chassis is not None
and obj.vc_position is not None
):
resolved_name = _generate_vc_member_name(
resolved_name,
obj.vc_position,
serial=getattr(obj, "serial", None),
)
# Update device details regardless of match
librenms_device_details.update(
{
"librenms_device_url": f"{self.librenms_api.librenms_url}/device/device={self.librenms_id}/",
"librenms_device_hardware": hardware,
"librenms_device_serial": serial,
"librenms_device_os": os_name,
"librenms_device_version": version,
"librenms_device_features": features,
"librenms_device_location": device_info.get("location", "-"),
"librenms_device_ip": librenms_ip,
"sysName": librenms_sysname,
"resolved_name": resolved_name or librenms_sysname,
"librenms_device_hostname": device_info.get("hostname", "-"),
"librenms_device_hardware_match": hardware_match,
}
)
# For Virtual Chassis, fetch inventory
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
vc_serials = self._get_vc_inventory_serials(obj)
librenms_device_details["vc_inventory_serials"] = vc_serials
# Device was retrieved successfully via librenms_id — trust the ID
found_in_librenms = True
# Normalise the NetBox name once for comparisons
netbox_name_norm = netbox_name.lower() if netbox_name else None
if netbox_name_norm:
# Strip VC member suffix like " (1)" before comparing
netbox_name_norm = re.sub(r"\s*\(\d+\)$", "", netbox_name_norm)
# Also strip the VC member naming pattern from settings
# (e.g. "-M2", " (2)", "-SW3") to recover the base device name
netbox_name_vc_stripped = None
if netbox_name_norm:
netbox_name_vc_stripped = self._strip_vc_pattern(netbox_name_norm)
# Collect all NetBox identity values to compare against
netbox_dns_name = (
obj.primary_ip.dns_name.lower() if obj.primary_ip and obj.primary_ip.dns_name else None
)
netbox_identities = {
v
for v in [
netbox_name_norm,
netbox_ip,
netbox_dns_name,
netbox_name_vc_stripped,
]
if v
}
# Collect all LibreNMS identity values, including
# domain-stripped short names (e.g. "sw01.example.net" → "sw01")
librenms_hostname = device_info.get("hostname")
librenms_values = []
for val in [librenms_sysname, librenms_hostname, librenms_ip]:
if val:
lower_val = val.lower()
librenms_values.append(lower_val)
# Add short name (strip domain) if it looks like an FQDN
short = lower_val.split(".")[0]
if short != lower_val:
librenms_values.append(short)
librenms_identities = set(librenms_values)
# A device is considered matched when ANY NetBox identity
# appears in the LibreNMS identities. This covers:
# - NetBox name == sysName or hostname
# - NetBox primary IP == LibreNMS hostname (added by IP)
# - NetBox DNS name == sysName or hostname (FQDN match)
if netbox_identities & librenms_identities:
mismatched_device = False
else:
mismatched_device = True
librenms_device_details["netbox_dns_name"] = netbox_dns_name or "-"
return {
"found_in_librenms": found_in_librenms,
"librenms_device_details": librenms_device_details,
"mismatched_device": mismatched_device,
}
def get_interface_context(self, request, obj):
"""
Get the context data for interface sync.
Subclasses should override this method.
"""
return None
def get_cable_context(self, request, obj):
"""
Get the context data for cable sync.
Subclasses should override this method if applicable.
"""
return None
def get_ip_context(self, request, obj):
"""
Get the context data for IP address sync.
Subclasses should override this method.
"""
return None
def get_vlan_context(self, request, obj):
"""
Get the context data for VLAN sync.
Subclasses should override this method.
"""
return None
def get_module_context(self, request, obj):
"""
Get the context data for module sync.
Subclasses should override this method if applicable (e.g. VMs return None).
"""
return None
@staticmethod
def _strip_vc_pattern(name):
"""
Strip the VC member naming suffix from a device name.
Uses the vc_member_name_pattern from LibreNMSSettings to build a
regex that removes the suffix. For example, with the default
pattern ``-M{position}`` and name ``switch01-m2``, this returns
``switch01``.
Returns the stripped name, or None if it equals the original
(i.e. no suffix was found).
"""
try:
from netbox_librenms_plugin.models import LibreNMSSettings
settings = LibreNMSSettings.objects.first()
pattern = (
settings.vc_member_name_pattern
if settings and isinstance(settings.vc_member_name_pattern, str)
else "-M{position}"
)
if not isinstance(pattern, str):
pattern = "-M{position}"
# Turn the pattern into a regex by replacing placeholders
# {position} → \d+ {serial} → .+
regex_suffix = re.escape(pattern)
regex_suffix = regex_suffix.replace(re.escape("{position}"), r"\d+")
regex_suffix = regex_suffix.replace(re.escape("{serial}"), r".+")
stripped = re.sub(regex_suffix + "$", "", name, flags=re.IGNORECASE)
return stripped if stripped != name else None
except Exception:
return None
def _get_vc_inventory_serials(self, obj):
"""
Fetch inventory serials for Virtual Chassis members.
Args:
obj: NetBox device object (VC member)
Returns:
list: [
{
'description': 'Chassis component description',
'serial': 'serial number',
'model': 'model name',
'assigned_member': Device object or None (if serial matches existing assignment)
}
]
"""
success, inventory = self.librenms_api.get_device_inventory(self.librenms_id)
if not success:
return []
# Filter for chassis components
chassis_components = [item for item in inventory if item.get("entPhysicalClass") == "chassis"]
# Get all VC members
vc_members = obj.virtual_chassis.members.all()
result = []
for component in chassis_components:
serial = component.get("entPhysicalSerialNum", "-")
if not serial or serial == "-":
continue
# Check if this serial is already assigned to a VC member
assigned_member = None
for member in vc_members:
if member.serial and member.serial.strip() == serial.strip():
assigned_member = member
break
result.append(
{
"description": component.get("entPhysicalDescr", "-"),
"serial": serial,
"model": component.get("entPhysicalModelName", "-"),
"assigned_member": assigned_member,
}
)
return result
def _get_platform_info(self, librenms_info, obj):
"""
Get platform information from LibreNMS.
Platform matching is based on OS name only (not version).
Version is displayed separately as informational data.
Args:
librenms_info: Dictionary with LibreNMS device info
obj: NetBox device object
Returns:
dict: {
'netbox_platform': Platform object or None,
'librenms_os': str (OS name),
'librenms_version': str (OS version),
'platform_exists': bool (whether OS platform exists in NetBox),
'platform_name': str (OS name for platform matching),
'matching_platform': Platform object or None
}
"""
from dcim.models import Platform
librenms_os = librenms_info["librenms_device_details"].get("librenms_device_os", "-")
librenms_version = librenms_info["librenms_device_details"].get("librenms_device_version", "-")
# Platform name is just the OS (not OS + version)
platform_name = librenms_os if librenms_os != "-" else None
# Check if platform exists (match by OS name only)
platform_exists = False
matching_platform = None
if platform_name:
try:
matching_platform = Platform.objects.get(name__iexact=platform_name)
platform_exists = True
except Platform.DoesNotExist:
pass
return {
"netbox_platform": obj.platform,
"librenms_os": librenms_os,
"librenms_version": librenms_version,
"platform_exists": platform_exists,
"platform_name": platform_name,
"matching_platform": matching_platform,
}