238 lines
9.9 KiB
Python
238 lines
9.9 KiB
Python
import logging
|
|
from urllib.parse import quote_plus
|
|
|
|
from dcim.models import Cable, Device, Interface
|
|
from django.contrib import messages
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.cache import cache
|
|
from django.db import transaction
|
|
from django.db.models import Q
|
|
from django.shortcuts import get_object_or_404, redirect
|
|
from django.urls import reverse
|
|
from django.views import View
|
|
|
|
from netbox_librenms_plugin.utils import get_librenms_sync_device
|
|
from netbox_librenms_plugin.views.mixins import (
|
|
CacheMixin,
|
|
LibreNMSAPIMixin,
|
|
LibreNMSPermissionMixin,
|
|
NetBoxObjectPermissionMixin,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SyncCablesView(LibreNMSPermissionMixin, NetBoxObjectPermissionMixin, LibreNMSAPIMixin, CacheMixin, View):
|
|
"""Create NetBox cables using cached LibreNMS link data."""
|
|
|
|
required_object_permissions = {
|
|
"POST": [
|
|
("add", Cable),
|
|
("change", Cable),
|
|
],
|
|
}
|
|
|
|
def get_selected_interfaces(self, request, initial_device):
|
|
"""
|
|
Return selected interface entries from POST data.
|
|
|
|
Each ``select`` value is a ``local_port_id`` (stable LibreNMS identifier)
|
|
so that matching against cached link data is user-preference agnostic.
|
|
"""
|
|
selected_interfaces = []
|
|
selected_data = [x for x in request.POST.getlist("select") if x]
|
|
|
|
if not selected_data:
|
|
return None
|
|
|
|
for port_id in selected_data:
|
|
override = request.POST.get(f"device_selection_{port_id}")
|
|
device_id = override or initial_device.id
|
|
selected_interfaces.append({"device_id": device_id, "local_port_id": port_id})
|
|
|
|
return selected_interfaces
|
|
|
|
def get_cached_links_data(self, request, obj):
|
|
"""Return cached LibreNMS link data for the given object."""
|
|
server_key = getattr(self, "_post_server_key", None) or self.librenms_api.server_key
|
|
cache_obj = get_librenms_sync_device(obj, server_key=server_key) or obj
|
|
cached_data = cache.get(self.get_cache_key(cache_obj, "links", server_key))
|
|
if not cached_data:
|
|
return None
|
|
return cached_data.get("links", [])
|
|
|
|
def create_cable(self, local_interface, remote_interface, request):
|
|
"""
|
|
Create a cable between local and remote interfaces.
|
|
|
|
Returns:
|
|
True on success, False on failure.
|
|
"""
|
|
try:
|
|
Cable.objects.create(
|
|
a_terminations=[local_interface],
|
|
b_terminations=[remote_interface],
|
|
status="connected",
|
|
)
|
|
return True
|
|
except Exception as exc: # pragma: no cover - protects UX
|
|
messages.error(request, f"Failed to create cable: {str(exc)}")
|
|
return False
|
|
|
|
def check_existing_cable(self, local_interface, remote_interface):
|
|
"""Return True if a cable already exists for either interface."""
|
|
from dcim.models import Interface as DCIMInterface
|
|
|
|
interface_ct = ContentType.objects.get_for_model(DCIMInterface)
|
|
return Cable.objects.filter(
|
|
Q(
|
|
terminations__termination_type=interface_ct,
|
|
terminations__termination_id=local_interface.pk,
|
|
)
|
|
| Q(
|
|
terminations__termination_type=interface_ct,
|
|
terminations__termination_id=remote_interface.pk,
|
|
)
|
|
).exists()
|
|
|
|
def validate_prerequisites(self, cached_links, selected_interfaces):
|
|
"""Validate that cached data and selections are present before sync."""
|
|
if not cached_links:
|
|
messages.error(
|
|
self.request,
|
|
"Cache has expired. Please refresh the cable data before syncing.",
|
|
)
|
|
return False
|
|
|
|
if selected_interfaces is None:
|
|
messages.error(self.request, "No interfaces selected for synchronization.")
|
|
return False
|
|
|
|
return True
|
|
|
|
def process_single_interface(self, interface, cached_links):
|
|
"""Process cable creation for a single interface from cached link data."""
|
|
port_id = str(interface.get("local_port_id", ""))
|
|
try:
|
|
link_data = next(link for link in cached_links if str(link.get("local_port_id", "")) == port_id)
|
|
# Apply posted device_id (VC member selection) without mutating the cached list.
|
|
link_data = {**link_data, "device_id": interface.get("device_id", link_data.get("device_id"))}
|
|
return self.handle_cable_creation(link_data, interface)
|
|
except StopIteration:
|
|
return {"status": "invalid", "interface": port_id}
|
|
|
|
def verify_cable_creation_requirements(self, link_data):
|
|
"""Return True if all required NetBox IDs are present in link data."""
|
|
required_fields = [
|
|
"netbox_local_interface_id",
|
|
"netbox_remote_interface_id",
|
|
]
|
|
|
|
return all(link_data.get(field) for field in required_fields)
|
|
|
|
def handle_cable_creation(self, link_data, interface):
|
|
"""Create a cable from link data and return the operation result."""
|
|
display_name = link_data.get("local_port") or interface.get("local_port_id", "")
|
|
if not self.verify_cable_creation_requirements(link_data):
|
|
if not link_data.get("netbox_remote_device_id") or not link_data.get("netbox_remote_interface_id"):
|
|
return {"status": "missing_remote", "interface": display_name}
|
|
return {"status": "invalid", "interface": display_name}
|
|
|
|
try:
|
|
local_interface = Interface.objects.get(pk=link_data["netbox_local_interface_id"])
|
|
|
|
# Honour user's VC member selection: if the selected device_id differs from
|
|
# the cached interface's device, look up the same port name on that device.
|
|
selected_device_id = interface.get("device_id")
|
|
if selected_device_id and str(local_interface.device_id) != str(selected_device_id):
|
|
port_name = link_data.get("local_port") or local_interface.name
|
|
try:
|
|
local_interface = Interface.objects.get(device_id=selected_device_id, name=port_name)
|
|
except Interface.DoesNotExist:
|
|
logger.debug(
|
|
"Port %s not found on device %s; falling back to cached interface",
|
|
port_name,
|
|
selected_device_id,
|
|
)
|
|
|
|
remote_interface = Interface.objects.get(pk=link_data["netbox_remote_interface_id"])
|
|
|
|
if self.check_existing_cable(local_interface, remote_interface):
|
|
return {"status": "duplicate", "interface": display_name}
|
|
|
|
if self.create_cable(local_interface, remote_interface, self.request):
|
|
return {"status": "valid", "interface": display_name}
|
|
return {"status": "invalid", "interface": display_name} # pragma: no cover
|
|
|
|
except Interface.DoesNotExist:
|
|
return {"status": "missing_remote", "interface": display_name}
|
|
|
|
def process_interface_sync(self, selected_interfaces, cached_links):
|
|
"""
|
|
Process cable sync for all selected interfaces and return results.
|
|
|
|
Each interface is processed in its own atomic block so individual
|
|
failures roll back only that cable without affecting others.
|
|
"""
|
|
results = {"valid": [], "invalid": [], "duplicate": [], "missing_remote": []}
|
|
|
|
for interface in selected_interfaces:
|
|
try:
|
|
with transaction.atomic():
|
|
result = self.process_single_interface(interface, cached_links)
|
|
results[result["status"]].append(result.get("interface", ""))
|
|
except Exception:
|
|
logger.exception("Failed to sync cable for port_id %s", interface.get("local_port_id", ""))
|
|
results["invalid"].append(interface.get("local_port_id", ""))
|
|
|
|
return results
|
|
|
|
def post(self, request, pk):
|
|
"""Sync selected cable connections from LibreNMS into NetBox."""
|
|
# Check both plugin write and NetBox object permissions
|
|
if error := self.require_all_permissions("POST"):
|
|
return error
|
|
|
|
initial_device = get_object_or_404(Device, pk=pk)
|
|
server_key = request.POST.get("server_key") or self.librenms_api.server_key
|
|
self._post_server_key = server_key
|
|
self._initial_device = initial_device
|
|
redirect_url = (
|
|
f"{reverse('plugins:netbox_librenms_plugin:device_librenms_sync', args=[initial_device.pk])}?tab=cables"
|
|
+ (f"&server_key={quote_plus(server_key)}" if server_key else "")
|
|
)
|
|
|
|
selected_interfaces = self.get_selected_interfaces(request, initial_device)
|
|
cached_links = self.get_cached_links_data(request, initial_device)
|
|
|
|
if not self.validate_prerequisites(cached_links, selected_interfaces):
|
|
return redirect(redirect_url)
|
|
|
|
results = self.process_interface_sync(selected_interfaces, cached_links)
|
|
self.display_sync_results(request, results)
|
|
|
|
return redirect(redirect_url)
|
|
|
|
def display_sync_results(self, request, results):
|
|
"""Display flash messages summarizing the cable sync results."""
|
|
if results["missing_remote"]:
|
|
messages.error(
|
|
request,
|
|
f"Remote device or interface not found in NetBox for: {', '.join(results['missing_remote'])}",
|
|
)
|
|
if results["invalid"]:
|
|
messages.error(
|
|
request,
|
|
f"No LibreNMS link data found for interfaces: {', '.join(results['invalid'])}",
|
|
)
|
|
if results["duplicate"]:
|
|
messages.warning(
|
|
request,
|
|
f"Cable already exists for interfaces: {', '.join(results['duplicate'])}",
|
|
)
|
|
if results["valid"]:
|
|
messages.success(
|
|
request,
|
|
f"Successfully created cable for interfaces: {', '.join(results['valid'])}",
|
|
)
|