Files
Vlastislav Svatek 673e67106e
Some checks failed
ci / deploy (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (javascript-typescript) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
first commit
2026-06-05 10:39:05 +02:00

238 lines
9.9 KiB
Python

import logging
from urllib.parse import quote_plus
from dcim.models import Cable, Device, Interface
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.db import transaction
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.views import View
from netbox_librenms_plugin.utils import get_librenms_sync_device
from netbox_librenms_plugin.views.mixins import (
CacheMixin,
LibreNMSAPIMixin,
LibreNMSPermissionMixin,
NetBoxObjectPermissionMixin,
)
logger = logging.getLogger(__name__)
class SyncCablesView(LibreNMSPermissionMixin, NetBoxObjectPermissionMixin, LibreNMSAPIMixin, CacheMixin, View):
"""Create NetBox cables using cached LibreNMS link data."""
required_object_permissions = {
"POST": [
("add", Cable),
("change", Cable),
],
}
def get_selected_interfaces(self, request, initial_device):
"""
Return selected interface entries from POST data.
Each ``select`` value is a ``local_port_id`` (stable LibreNMS identifier)
so that matching against cached link data is user-preference agnostic.
"""
selected_interfaces = []
selected_data = [x for x in request.POST.getlist("select") if x]
if not selected_data:
return None
for port_id in selected_data:
override = request.POST.get(f"device_selection_{port_id}")
device_id = override or initial_device.id
selected_interfaces.append({"device_id": device_id, "local_port_id": port_id})
return selected_interfaces
def get_cached_links_data(self, request, obj):
"""Return cached LibreNMS link data for the given object."""
server_key = getattr(self, "_post_server_key", None) or self.librenms_api.server_key
cache_obj = get_librenms_sync_device(obj, server_key=server_key) or obj
cached_data = cache.get(self.get_cache_key(cache_obj, "links", server_key))
if not cached_data:
return None
return cached_data.get("links", [])
def create_cable(self, local_interface, remote_interface, request):
"""
Create a cable between local and remote interfaces.
Returns:
True on success, False on failure.
"""
try:
Cable.objects.create(
a_terminations=[local_interface],
b_terminations=[remote_interface],
status="connected",
)
return True
except Exception as exc: # pragma: no cover - protects UX
messages.error(request, f"Failed to create cable: {str(exc)}")
return False
def check_existing_cable(self, local_interface, remote_interface):
"""Return True if a cable already exists for either interface."""
from dcim.models import Interface as DCIMInterface
interface_ct = ContentType.objects.get_for_model(DCIMInterface)
return Cable.objects.filter(
Q(
terminations__termination_type=interface_ct,
terminations__termination_id=local_interface.pk,
)
| Q(
terminations__termination_type=interface_ct,
terminations__termination_id=remote_interface.pk,
)
).exists()
def validate_prerequisites(self, cached_links, selected_interfaces):
"""Validate that cached data and selections are present before sync."""
if not cached_links:
messages.error(
self.request,
"Cache has expired. Please refresh the cable data before syncing.",
)
return False
if selected_interfaces is None:
messages.error(self.request, "No interfaces selected for synchronization.")
return False
return True
def process_single_interface(self, interface, cached_links):
"""Process cable creation for a single interface from cached link data."""
port_id = str(interface.get("local_port_id", ""))
try:
link_data = next(link for link in cached_links if str(link.get("local_port_id", "")) == port_id)
# Apply posted device_id (VC member selection) without mutating the cached list.
link_data = {**link_data, "device_id": interface.get("device_id", link_data.get("device_id"))}
return self.handle_cable_creation(link_data, interface)
except StopIteration:
return {"status": "invalid", "interface": port_id}
def verify_cable_creation_requirements(self, link_data):
"""Return True if all required NetBox IDs are present in link data."""
required_fields = [
"netbox_local_interface_id",
"netbox_remote_interface_id",
]
return all(link_data.get(field) for field in required_fields)
def handle_cable_creation(self, link_data, interface):
"""Create a cable from link data and return the operation result."""
display_name = link_data.get("local_port") or interface.get("local_port_id", "")
if not self.verify_cable_creation_requirements(link_data):
if not link_data.get("netbox_remote_device_id") or not link_data.get("netbox_remote_interface_id"):
return {"status": "missing_remote", "interface": display_name}
return {"status": "invalid", "interface": display_name}
try:
local_interface = Interface.objects.get(pk=link_data["netbox_local_interface_id"])
# Honour user's VC member selection: if the selected device_id differs from
# the cached interface's device, look up the same port name on that device.
selected_device_id = interface.get("device_id")
if selected_device_id and str(local_interface.device_id) != str(selected_device_id):
port_name = link_data.get("local_port") or local_interface.name
try:
local_interface = Interface.objects.get(device_id=selected_device_id, name=port_name)
except Interface.DoesNotExist:
logger.debug(
"Port %s not found on device %s; falling back to cached interface",
port_name,
selected_device_id,
)
remote_interface = Interface.objects.get(pk=link_data["netbox_remote_interface_id"])
if self.check_existing_cable(local_interface, remote_interface):
return {"status": "duplicate", "interface": display_name}
if self.create_cable(local_interface, remote_interface, self.request):
return {"status": "valid", "interface": display_name}
return {"status": "invalid", "interface": display_name} # pragma: no cover
except Interface.DoesNotExist:
return {"status": "missing_remote", "interface": display_name}
def process_interface_sync(self, selected_interfaces, cached_links):
"""
Process cable sync for all selected interfaces and return results.
Each interface is processed in its own atomic block so individual
failures roll back only that cable without affecting others.
"""
results = {"valid": [], "invalid": [], "duplicate": [], "missing_remote": []}
for interface in selected_interfaces:
try:
with transaction.atomic():
result = self.process_single_interface(interface, cached_links)
results[result["status"]].append(result.get("interface", ""))
except Exception:
logger.exception("Failed to sync cable for port_id %s", interface.get("local_port_id", ""))
results["invalid"].append(interface.get("local_port_id", ""))
return results
def post(self, request, pk):
"""Sync selected cable connections from LibreNMS into NetBox."""
# Check both plugin write and NetBox object permissions
if error := self.require_all_permissions("POST"):
return error
initial_device = get_object_or_404(Device, pk=pk)
server_key = request.POST.get("server_key") or self.librenms_api.server_key
self._post_server_key = server_key
self._initial_device = initial_device
redirect_url = (
f"{reverse('plugins:netbox_librenms_plugin:device_librenms_sync', args=[initial_device.pk])}?tab=cables"
+ (f"&server_key={quote_plus(server_key)}" if server_key else "")
)
selected_interfaces = self.get_selected_interfaces(request, initial_device)
cached_links = self.get_cached_links_data(request, initial_device)
if not self.validate_prerequisites(cached_links, selected_interfaces):
return redirect(redirect_url)
results = self.process_interface_sync(selected_interfaces, cached_links)
self.display_sync_results(request, results)
return redirect(redirect_url)
def display_sync_results(self, request, results):
"""Display flash messages summarizing the cable sync results."""
if results["missing_remote"]:
messages.error(
request,
f"Remote device or interface not found in NetBox for: {', '.join(results['missing_remote'])}",
)
if results["invalid"]:
messages.error(
request,
f"No LibreNMS link data found for interfaces: {', '.join(results['invalid'])}",
)
if results["duplicate"]:
messages.warning(
request,
f"Cable already exists for interfaces: {', '.join(results['duplicate'])}",
)
if results["valid"]:
messages.success(
request,
f"Successfully created cable for interfaces: {', '.join(results['valid'])}",
)