Files
Vlastislav Svatek 673e67106e
Some checks failed
ci / deploy (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (javascript-typescript) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
first commit
2026-06-05 10:39:05 +02:00

399 lines
17 KiB
Python

from urllib.parse import quote_plus
from dcim.models import Device, Interface, MACAddress
from django.contrib import messages
from django.core.cache import cache
from django.db import transaction
from django.http import Http404, JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.views import View
from virtualization.models import VirtualMachine, VMInterface
from netbox_librenms_plugin.models import InterfaceTypeMapping
from netbox_librenms_plugin.utils import (
convert_speed_to_kbps,
get_interface_name_field,
get_librenms_device_id,
get_librenms_sync_device,
set_librenms_device_id,
)
from netbox_librenms_plugin.views.mixins import (
CacheMixin,
LibreNMSAPIMixin,
LibreNMSPermissionMixin,
NetBoxObjectPermissionMixin,
VlanAssignmentMixin,
)
class SyncInterfacesView(
LibreNMSPermissionMixin, NetBoxObjectPermissionMixin, LibreNMSAPIMixin, VlanAssignmentMixin, CacheMixin, View
):
"""Sync selected interfaces from LibreNMS into NetBox."""
def get_required_permissions_for_object_type(self, object_type):
"""Return the required permissions based on object type."""
if object_type == "device":
return [("add", Interface), ("change", Interface)]
elif object_type == "virtualmachine":
return [("add", VMInterface), ("change", VMInterface)]
else:
raise Http404(f"Invalid object type: {object_type}")
def post(self, request, object_type, object_id):
"""Sync selected interfaces from LibreNMS into NetBox."""
# Set permissions dynamically based on object type
self.required_object_permissions = {
"POST": self.get_required_permissions_for_object_type(object_type),
}
# Check both plugin write and NetBox object permissions
if error := self.require_all_permissions("POST"):
return error
url_name = (
"dcim:device_librenms_sync"
if object_type == "device"
else "plugins:netbox_librenms_plugin:vm_librenms_sync"
)
obj = self.get_object(object_type, object_id)
self.object = obj # Store for use in sync methods
# Read server_key from POST so we use the exact server the user was viewing
server_key = request.POST.get("server_key") or self.librenms_api.server_key
self._post_server_key = server_key
interface_name_field = get_interface_name_field(request)
self.interface_name_field = interface_name_field
selected_interfaces = self.get_selected_interfaces(request, interface_name_field)
exclude_columns = request.POST.getlist("exclude_columns")
redirect_url = (
reverse(url_name, kwargs={"pk": object_id})
+ f"?tab=interfaces&interface_name_field={interface_name_field}"
+ (f"&server_key={quote_plus(server_key)}" if server_key else "")
)
if selected_interfaces is None:
return redirect(redirect_url)
ports_data = self.get_cached_ports_data(request, obj, server_key)
if ports_data is None:
return redirect(redirect_url)
# Prepare VLAN lookup maps if VLAN sync is enabled
vlan_groups = self.get_vlan_groups_for_device(obj)
lookup_maps = self._build_vlan_lookup_maps(vlan_groups)
self._lookup_maps = lookup_maps
self.sync_selected_interfaces(obj, selected_interfaces, ports_data, exclude_columns, interface_name_field)
messages.success(request, "Selected interfaces synced successfully.")
return redirect(redirect_url)
def get_object(self, object_type, object_id):
"""Return the Device or VirtualMachine for the given type and ID."""
if object_type == "device":
return get_object_or_404(Device, pk=object_id)
if object_type == "virtualmachine":
return get_object_or_404(VirtualMachine, pk=object_id)
raise Http404("Invalid object type.")
def get_selected_interfaces(self, request, interface_name_field):
"""Return the list of selected interface names from POST data."""
selected_interfaces = request.POST.getlist("select")
if not selected_interfaces:
messages.error(request, "No interfaces selected for synchronization.")
return None
return selected_interfaces
def get_cached_ports_data(self, request, obj, server_key=None):
"""Return cached LibreNMS port data for the given object."""
if server_key is None:
server_key = self.librenms_api.server_key
# On VC member pages the GET tab writes ports under the resolved sync device's
# cache key. Resolve the same device here so the POST path reads the same entry.
cache_obj = obj
if isinstance(obj, Device) and not get_librenms_device_id(obj, server_key, auto_save=False):
sync_device = get_librenms_sync_device(obj, server_key=server_key)
if sync_device is not None:
cache_obj = sync_device
cached_data = cache.get(self.get_cache_key(cache_obj, "ports", server_key))
if not cached_data:
messages.warning(
request,
"No cached data found. Please refresh the data before syncing.",
)
return None
return cached_data.get("ports", [])
def sync_selected_interfaces(
self,
obj,
selected_interfaces,
ports_data,
exclude_columns,
interface_name_field,
):
"""Create or update NetBox interfaces from LibreNMS port data."""
with transaction.atomic():
for port in ports_data:
port_name = port.get(interface_name_field)
if port_name in selected_interfaces:
self.sync_interface(obj, port, exclude_columns, interface_name_field)
def sync_interface(self, obj, librenms_interface, exclude_columns, interface_name_field):
"""Create or update a single NetBox interface from LibreNMS data."""
interface_name = librenms_interface.get(interface_name_field)
if isinstance(obj, Device):
device_selection_key = f"device_selection_{interface_name}"
selected_device_id = self.request.POST.get(device_selection_key)
if selected_device_id:
try:
target_device = Device.objects.get(id=selected_device_id)
# Validate the target is the current device or a VC member
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
valid_ids = set(obj.virtual_chassis.members.values_list("id", flat=True))
if target_device.id not in valid_ids:
target_device = obj
elif target_device.id != obj.id:
target_device = obj
except (Device.DoesNotExist, ValueError, TypeError):
target_device = obj
else:
target_device = obj
interface, _ = Interface.objects.get_or_create(device=target_device, name=interface_name)
elif isinstance(obj, VirtualMachine):
interface, _ = VMInterface.objects.get_or_create(virtual_machine=obj, name=interface_name)
else:
raise ValueError("Invalid object type.")
netbox_type = None
if isinstance(obj, Device):
netbox_type = self.get_netbox_interface_type(librenms_interface)
self.update_interface_attributes(
interface,
librenms_interface,
netbox_type,
exclude_columns,
interface_name_field,
)
# Sync VLANs if not excluded
if "vlans" not in exclude_columns:
self._sync_interface_vlans(interface, librenms_interface, interface_name)
def get_netbox_interface_type(self, librenms_interface):
"""Return the NetBox interface type mapped from LibreNMS type and speed."""
speed = convert_speed_to_kbps(librenms_interface.get("ifSpeed"))
mappings = InterfaceTypeMapping.objects.filter(librenms_type=librenms_interface.get("ifType"))
if speed is not None:
speed_mapping = mappings.filter(librenms_speed__lte=speed).order_by("-librenms_speed").first()
mapping = speed_mapping or mappings.filter(librenms_speed__isnull=True).first()
else:
mapping = mappings.filter(librenms_speed__isnull=True).first()
return mapping.netbox_type if mapping else "other"
def handle_mac_address(self, interface, ifPhysAddress):
"""Assign or create the MAC address for the given interface."""
if ifPhysAddress:
existing_mac = interface.mac_addresses.filter(mac_address=ifPhysAddress).first()
if existing_mac:
mac_obj = existing_mac
else:
mac_obj = MACAddress.objects.create(mac_address=ifPhysAddress)
interface.mac_addresses.add(mac_obj)
if hasattr(interface, "primary_mac_address"):
interface.primary_mac_address = mac_obj
def update_interface_attributes(
self,
interface,
librenms_interface,
netbox_type,
exclude_columns,
interface_name_field,
):
"""Update interface fields from LibreNMS data, respecting excluded columns."""
is_device_interface = isinstance(interface, Interface)
LIBRENMS_TO_NETBOX_MAPPING = {
interface_name_field: "name",
"ifType": "type",
"ifSpeed": "speed",
"ifAlias": "description",
"ifMtu": "mtu",
}
for librenms_key, netbox_key in LIBRENMS_TO_NETBOX_MAPPING.items():
if netbox_key in exclude_columns:
continue
if librenms_key == "ifSpeed":
speed = convert_speed_to_kbps(librenms_interface.get(librenms_key))
setattr(interface, netbox_key, speed)
elif librenms_key == "ifType":
if is_device_interface and hasattr(interface, netbox_key):
setattr(interface, netbox_key, netbox_type)
elif librenms_key == "ifAlias":
interface_name = librenms_interface.get(interface_name_field)
if librenms_interface.get("ifAlias") != interface_name:
setattr(interface, netbox_key, librenms_interface.get(librenms_key))
else:
setattr(interface, netbox_key, librenms_interface.get(librenms_key))
port_id = librenms_interface.get("port_id")
if port_id is not None:
server_key = getattr(self, "_post_server_key", None) or self.librenms_api.server_key
set_librenms_device_id(interface, port_id, server_key)
if "enabled" not in exclude_columns:
admin_status = librenms_interface.get("ifAdminStatus")
interface.enabled = (
True
if admin_status is None
else (admin_status.lower() == "up" if isinstance(admin_status, str) else bool(admin_status))
)
if "mac_address" not in exclude_columns:
ifPhysAddress = librenms_interface.get("ifPhysAddress")
self.handle_mac_address(interface, ifPhysAddress)
interface.save()
def _sync_interface_vlans(self, interface, librenms_port, interface_name):
"""
Sync VLAN assignments from LibreNMS to NetBox interface.
Sets mode, untagged_vlan, and tagged_vlans based on LibreNMS data.
Args:
interface: NetBox Interface or VMInterface object
librenms_port: Port data dict from LibreNMS with VLAN info
interface_name: Original interface name for form field lookup
"""
# Get per-VLAN group selections from form (safely handle special chars in name)
safe_name = interface_name.replace("/", "_").replace(":", "_")
# Build VLAN data from port
vlan_data = {
"untagged_vlan": librenms_port.get("untagged_vlan"),
"tagged_vlans": librenms_port.get("tagged_vlans", []),
}
# Build per-VLAN group map from POST data
vlan_group_map = {}
all_vids = []
if vlan_data["untagged_vlan"]:
all_vids.append(str(vlan_data["untagged_vlan"]))
for vid in vlan_data.get("tagged_vlans", []):
all_vids.append(str(vid))
for vid in all_vids:
group_id = self.request.POST.get(f"vlan_group_{safe_name}_{vid}", "")
if group_id:
vlan_group_map[vid] = group_id
# Use mixin method to update interface VLAN assignments
self._update_interface_vlan_assignment(interface, vlan_data, vlan_group_map, self._lookup_maps)
class DeleteNetBoxInterfacesView(LibreNMSPermissionMixin, NetBoxObjectPermissionMixin, CacheMixin, View):
"""Delete interfaces that exist only in NetBox."""
def get_required_permissions_for_object_type(self, object_type):
"""Return the required permissions based on object type."""
if object_type == "device":
return [("delete", Interface)]
elif object_type == "virtualmachine":
return [("delete", VMInterface)]
else:
raise Http404(f"Invalid object type: {object_type}")
def post(self, request, object_type, object_id):
"""Delete selected NetBox-only interfaces not present in LibreNMS."""
# Set permissions dynamically based on object type
self.required_object_permissions = {
"POST": self.get_required_permissions_for_object_type(object_type),
}
# Check both plugin write and NetBox object permissions
if error := self.require_all_permissions_json("POST"):
return error
if object_type == "device":
obj = get_object_or_404(Device, pk=object_id)
elif object_type == "virtualmachine":
obj = get_object_or_404(VirtualMachine, pk=object_id)
else:
return JsonResponse({"error": "Invalid object type"}, status=400)
interface_ids = request.POST.getlist("interface_ids")
if not interface_ids:
return JsonResponse({"error": "No interfaces selected for deletion"}, status=400)
deleted_count = 0
errors = []
interface_name = None
try:
with transaction.atomic():
for interface_id in interface_ids:
interface_name = None
try:
if object_type == "device":
interface = Interface.objects.get(id=interface_id)
interface_name = interface.name
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
valid_device_ids = [member.id for member in obj.virtual_chassis.members.all()]
if interface.device_id not in valid_device_ids:
errors.append(
"Interface {} does not belong to this device or its virtual chassis".format(
interface.name
)
)
continue
elif interface.device_id != obj.id:
errors.append(f"Interface {interface.name} does not belong to this device")
continue
else:
interface = VMInterface.objects.get(id=interface_id)
interface_name = interface.name
if interface.virtual_machine_id != obj.id:
errors.append(f"Interface {interface.name} does not belong to this virtual machine")
continue
interface.delete()
deleted_count += 1
except (Interface.DoesNotExist, VMInterface.DoesNotExist):
errors.append(f"Interface with ID {interface_id} not found")
continue
except Exception as exc: # pragma: no cover - defensive
errors.append(f"Error deleting interface {interface_name or interface_id}: {str(exc)}")
continue
except Exception as exc: # pragma: no cover
return JsonResponse({"error": f"Transaction failed: {str(exc)}"}, status=500)
response_data = {
"status": "success",
"deleted_count": deleted_count,
"message": f"Successfully deleted {deleted_count} interface(s)",
}
if errors:
response_data["errors"] = errors
response_data["message"] += f" with {len(errors)} error(s)"
return JsonResponse(response_data)