399 lines
17 KiB
Python
399 lines
17 KiB
Python
from urllib.parse import quote_plus
|
|
|
|
from dcim.models import Device, Interface, MACAddress
|
|
from django.contrib import messages
|
|
from django.core.cache import cache
|
|
from django.db import transaction
|
|
from django.http import Http404, JsonResponse
|
|
from django.shortcuts import get_object_or_404, redirect
|
|
from django.urls import reverse
|
|
from django.views import View
|
|
from virtualization.models import VirtualMachine, VMInterface
|
|
|
|
from netbox_librenms_plugin.models import InterfaceTypeMapping
|
|
from netbox_librenms_plugin.utils import (
|
|
convert_speed_to_kbps,
|
|
get_interface_name_field,
|
|
get_librenms_device_id,
|
|
get_librenms_sync_device,
|
|
set_librenms_device_id,
|
|
)
|
|
from netbox_librenms_plugin.views.mixins import (
|
|
CacheMixin,
|
|
LibreNMSAPIMixin,
|
|
LibreNMSPermissionMixin,
|
|
NetBoxObjectPermissionMixin,
|
|
VlanAssignmentMixin,
|
|
)
|
|
|
|
|
|
class SyncInterfacesView(
|
|
LibreNMSPermissionMixin, NetBoxObjectPermissionMixin, LibreNMSAPIMixin, VlanAssignmentMixin, CacheMixin, View
|
|
):
|
|
"""Sync selected interfaces from LibreNMS into NetBox."""
|
|
|
|
def get_required_permissions_for_object_type(self, object_type):
|
|
"""Return the required permissions based on object type."""
|
|
if object_type == "device":
|
|
return [("add", Interface), ("change", Interface)]
|
|
elif object_type == "virtualmachine":
|
|
return [("add", VMInterface), ("change", VMInterface)]
|
|
else:
|
|
raise Http404(f"Invalid object type: {object_type}")
|
|
|
|
def post(self, request, object_type, object_id):
|
|
"""Sync selected interfaces from LibreNMS into NetBox."""
|
|
# Set permissions dynamically based on object type
|
|
self.required_object_permissions = {
|
|
"POST": self.get_required_permissions_for_object_type(object_type),
|
|
}
|
|
|
|
# Check both plugin write and NetBox object permissions
|
|
if error := self.require_all_permissions("POST"):
|
|
return error
|
|
|
|
url_name = (
|
|
"dcim:device_librenms_sync"
|
|
if object_type == "device"
|
|
else "plugins:netbox_librenms_plugin:vm_librenms_sync"
|
|
)
|
|
obj = self.get_object(object_type, object_id)
|
|
self.object = obj # Store for use in sync methods
|
|
|
|
# Read server_key from POST so we use the exact server the user was viewing
|
|
server_key = request.POST.get("server_key") or self.librenms_api.server_key
|
|
self._post_server_key = server_key
|
|
|
|
interface_name_field = get_interface_name_field(request)
|
|
self.interface_name_field = interface_name_field
|
|
selected_interfaces = self.get_selected_interfaces(request, interface_name_field)
|
|
exclude_columns = request.POST.getlist("exclude_columns")
|
|
|
|
redirect_url = (
|
|
reverse(url_name, kwargs={"pk": object_id})
|
|
+ f"?tab=interfaces&interface_name_field={interface_name_field}"
|
|
+ (f"&server_key={quote_plus(server_key)}" if server_key else "")
|
|
)
|
|
|
|
if selected_interfaces is None:
|
|
return redirect(redirect_url)
|
|
|
|
ports_data = self.get_cached_ports_data(request, obj, server_key)
|
|
if ports_data is None:
|
|
return redirect(redirect_url)
|
|
|
|
# Prepare VLAN lookup maps if VLAN sync is enabled
|
|
vlan_groups = self.get_vlan_groups_for_device(obj)
|
|
lookup_maps = self._build_vlan_lookup_maps(vlan_groups)
|
|
self._lookup_maps = lookup_maps
|
|
|
|
self.sync_selected_interfaces(obj, selected_interfaces, ports_data, exclude_columns, interface_name_field)
|
|
|
|
messages.success(request, "Selected interfaces synced successfully.")
|
|
return redirect(redirect_url)
|
|
|
|
def get_object(self, object_type, object_id):
|
|
"""Return the Device or VirtualMachine for the given type and ID."""
|
|
if object_type == "device":
|
|
return get_object_or_404(Device, pk=object_id)
|
|
if object_type == "virtualmachine":
|
|
return get_object_or_404(VirtualMachine, pk=object_id)
|
|
raise Http404("Invalid object type.")
|
|
|
|
def get_selected_interfaces(self, request, interface_name_field):
|
|
"""Return the list of selected interface names from POST data."""
|
|
selected_interfaces = request.POST.getlist("select")
|
|
if not selected_interfaces:
|
|
messages.error(request, "No interfaces selected for synchronization.")
|
|
return None
|
|
return selected_interfaces
|
|
|
|
def get_cached_ports_data(self, request, obj, server_key=None):
|
|
"""Return cached LibreNMS port data for the given object."""
|
|
if server_key is None:
|
|
server_key = self.librenms_api.server_key
|
|
# On VC member pages the GET tab writes ports under the resolved sync device's
|
|
# cache key. Resolve the same device here so the POST path reads the same entry.
|
|
cache_obj = obj
|
|
if isinstance(obj, Device) and not get_librenms_device_id(obj, server_key, auto_save=False):
|
|
sync_device = get_librenms_sync_device(obj, server_key=server_key)
|
|
if sync_device is not None:
|
|
cache_obj = sync_device
|
|
cached_data = cache.get(self.get_cache_key(cache_obj, "ports", server_key))
|
|
if not cached_data:
|
|
messages.warning(
|
|
request,
|
|
"No cached data found. Please refresh the data before syncing.",
|
|
)
|
|
return None
|
|
return cached_data.get("ports", [])
|
|
|
|
def sync_selected_interfaces(
|
|
self,
|
|
obj,
|
|
selected_interfaces,
|
|
ports_data,
|
|
exclude_columns,
|
|
interface_name_field,
|
|
):
|
|
"""Create or update NetBox interfaces from LibreNMS port data."""
|
|
with transaction.atomic():
|
|
for port in ports_data:
|
|
port_name = port.get(interface_name_field)
|
|
|
|
if port_name in selected_interfaces:
|
|
self.sync_interface(obj, port, exclude_columns, interface_name_field)
|
|
|
|
def sync_interface(self, obj, librenms_interface, exclude_columns, interface_name_field):
|
|
"""Create or update a single NetBox interface from LibreNMS data."""
|
|
interface_name = librenms_interface.get(interface_name_field)
|
|
|
|
if isinstance(obj, Device):
|
|
device_selection_key = f"device_selection_{interface_name}"
|
|
selected_device_id = self.request.POST.get(device_selection_key)
|
|
|
|
if selected_device_id:
|
|
try:
|
|
target_device = Device.objects.get(id=selected_device_id)
|
|
# Validate the target is the current device or a VC member
|
|
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
|
|
valid_ids = set(obj.virtual_chassis.members.values_list("id", flat=True))
|
|
if target_device.id not in valid_ids:
|
|
target_device = obj
|
|
elif target_device.id != obj.id:
|
|
target_device = obj
|
|
except (Device.DoesNotExist, ValueError, TypeError):
|
|
target_device = obj
|
|
else:
|
|
target_device = obj
|
|
|
|
interface, _ = Interface.objects.get_or_create(device=target_device, name=interface_name)
|
|
elif isinstance(obj, VirtualMachine):
|
|
interface, _ = VMInterface.objects.get_or_create(virtual_machine=obj, name=interface_name)
|
|
else:
|
|
raise ValueError("Invalid object type.")
|
|
|
|
netbox_type = None
|
|
if isinstance(obj, Device):
|
|
netbox_type = self.get_netbox_interface_type(librenms_interface)
|
|
|
|
self.update_interface_attributes(
|
|
interface,
|
|
librenms_interface,
|
|
netbox_type,
|
|
exclude_columns,
|
|
interface_name_field,
|
|
)
|
|
|
|
# Sync VLANs if not excluded
|
|
if "vlans" not in exclude_columns:
|
|
self._sync_interface_vlans(interface, librenms_interface, interface_name)
|
|
|
|
def get_netbox_interface_type(self, librenms_interface):
|
|
"""Return the NetBox interface type mapped from LibreNMS type and speed."""
|
|
speed = convert_speed_to_kbps(librenms_interface.get("ifSpeed"))
|
|
mappings = InterfaceTypeMapping.objects.filter(librenms_type=librenms_interface.get("ifType"))
|
|
|
|
if speed is not None:
|
|
speed_mapping = mappings.filter(librenms_speed__lte=speed).order_by("-librenms_speed").first()
|
|
mapping = speed_mapping or mappings.filter(librenms_speed__isnull=True).first()
|
|
else:
|
|
mapping = mappings.filter(librenms_speed__isnull=True).first()
|
|
|
|
return mapping.netbox_type if mapping else "other"
|
|
|
|
def handle_mac_address(self, interface, ifPhysAddress):
|
|
"""Assign or create the MAC address for the given interface."""
|
|
if ifPhysAddress:
|
|
existing_mac = interface.mac_addresses.filter(mac_address=ifPhysAddress).first()
|
|
if existing_mac:
|
|
mac_obj = existing_mac
|
|
else:
|
|
mac_obj = MACAddress.objects.create(mac_address=ifPhysAddress)
|
|
|
|
interface.mac_addresses.add(mac_obj)
|
|
if hasattr(interface, "primary_mac_address"):
|
|
interface.primary_mac_address = mac_obj
|
|
|
|
def update_interface_attributes(
|
|
self,
|
|
interface,
|
|
librenms_interface,
|
|
netbox_type,
|
|
exclude_columns,
|
|
interface_name_field,
|
|
):
|
|
"""Update interface fields from LibreNMS data, respecting excluded columns."""
|
|
is_device_interface = isinstance(interface, Interface)
|
|
|
|
LIBRENMS_TO_NETBOX_MAPPING = {
|
|
interface_name_field: "name",
|
|
"ifType": "type",
|
|
"ifSpeed": "speed",
|
|
"ifAlias": "description",
|
|
"ifMtu": "mtu",
|
|
}
|
|
|
|
for librenms_key, netbox_key in LIBRENMS_TO_NETBOX_MAPPING.items():
|
|
if netbox_key in exclude_columns:
|
|
continue
|
|
|
|
if librenms_key == "ifSpeed":
|
|
speed = convert_speed_to_kbps(librenms_interface.get(librenms_key))
|
|
setattr(interface, netbox_key, speed)
|
|
elif librenms_key == "ifType":
|
|
if is_device_interface and hasattr(interface, netbox_key):
|
|
setattr(interface, netbox_key, netbox_type)
|
|
elif librenms_key == "ifAlias":
|
|
interface_name = librenms_interface.get(interface_name_field)
|
|
if librenms_interface.get("ifAlias") != interface_name:
|
|
setattr(interface, netbox_key, librenms_interface.get(librenms_key))
|
|
else:
|
|
setattr(interface, netbox_key, librenms_interface.get(librenms_key))
|
|
|
|
port_id = librenms_interface.get("port_id")
|
|
if port_id is not None:
|
|
server_key = getattr(self, "_post_server_key", None) or self.librenms_api.server_key
|
|
set_librenms_device_id(interface, port_id, server_key)
|
|
|
|
if "enabled" not in exclude_columns:
|
|
admin_status = librenms_interface.get("ifAdminStatus")
|
|
interface.enabled = (
|
|
True
|
|
if admin_status is None
|
|
else (admin_status.lower() == "up" if isinstance(admin_status, str) else bool(admin_status))
|
|
)
|
|
|
|
if "mac_address" not in exclude_columns:
|
|
ifPhysAddress = librenms_interface.get("ifPhysAddress")
|
|
self.handle_mac_address(interface, ifPhysAddress)
|
|
|
|
interface.save()
|
|
|
|
def _sync_interface_vlans(self, interface, librenms_port, interface_name):
|
|
"""
|
|
Sync VLAN assignments from LibreNMS to NetBox interface.
|
|
Sets mode, untagged_vlan, and tagged_vlans based on LibreNMS data.
|
|
|
|
Args:
|
|
interface: NetBox Interface or VMInterface object
|
|
librenms_port: Port data dict from LibreNMS with VLAN info
|
|
interface_name: Original interface name for form field lookup
|
|
"""
|
|
# Get per-VLAN group selections from form (safely handle special chars in name)
|
|
safe_name = interface_name.replace("/", "_").replace(":", "_")
|
|
|
|
# Build VLAN data from port
|
|
vlan_data = {
|
|
"untagged_vlan": librenms_port.get("untagged_vlan"),
|
|
"tagged_vlans": librenms_port.get("tagged_vlans", []),
|
|
}
|
|
|
|
# Build per-VLAN group map from POST data
|
|
vlan_group_map = {}
|
|
all_vids = []
|
|
if vlan_data["untagged_vlan"]:
|
|
all_vids.append(str(vlan_data["untagged_vlan"]))
|
|
for vid in vlan_data.get("tagged_vlans", []):
|
|
all_vids.append(str(vid))
|
|
|
|
for vid in all_vids:
|
|
group_id = self.request.POST.get(f"vlan_group_{safe_name}_{vid}", "")
|
|
if group_id:
|
|
vlan_group_map[vid] = group_id
|
|
|
|
# Use mixin method to update interface VLAN assignments
|
|
self._update_interface_vlan_assignment(interface, vlan_data, vlan_group_map, self._lookup_maps)
|
|
|
|
|
|
class DeleteNetBoxInterfacesView(LibreNMSPermissionMixin, NetBoxObjectPermissionMixin, CacheMixin, View):
|
|
"""Delete interfaces that exist only in NetBox."""
|
|
|
|
def get_required_permissions_for_object_type(self, object_type):
|
|
"""Return the required permissions based on object type."""
|
|
if object_type == "device":
|
|
return [("delete", Interface)]
|
|
elif object_type == "virtualmachine":
|
|
return [("delete", VMInterface)]
|
|
else:
|
|
raise Http404(f"Invalid object type: {object_type}")
|
|
|
|
def post(self, request, object_type, object_id):
|
|
"""Delete selected NetBox-only interfaces not present in LibreNMS."""
|
|
# Set permissions dynamically based on object type
|
|
self.required_object_permissions = {
|
|
"POST": self.get_required_permissions_for_object_type(object_type),
|
|
}
|
|
|
|
# Check both plugin write and NetBox object permissions
|
|
if error := self.require_all_permissions_json("POST"):
|
|
return error
|
|
|
|
if object_type == "device":
|
|
obj = get_object_or_404(Device, pk=object_id)
|
|
elif object_type == "virtualmachine":
|
|
obj = get_object_or_404(VirtualMachine, pk=object_id)
|
|
else:
|
|
return JsonResponse({"error": "Invalid object type"}, status=400)
|
|
|
|
interface_ids = request.POST.getlist("interface_ids")
|
|
|
|
if not interface_ids:
|
|
return JsonResponse({"error": "No interfaces selected for deletion"}, status=400)
|
|
|
|
deleted_count = 0
|
|
errors = []
|
|
interface_name = None
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
for interface_id in interface_ids:
|
|
interface_name = None
|
|
try:
|
|
if object_type == "device":
|
|
interface = Interface.objects.get(id=interface_id)
|
|
interface_name = interface.name
|
|
if hasattr(obj, "virtual_chassis") and obj.virtual_chassis:
|
|
valid_device_ids = [member.id for member in obj.virtual_chassis.members.all()]
|
|
if interface.device_id not in valid_device_ids:
|
|
errors.append(
|
|
"Interface {} does not belong to this device or its virtual chassis".format(
|
|
interface.name
|
|
)
|
|
)
|
|
continue
|
|
elif interface.device_id != obj.id:
|
|
errors.append(f"Interface {interface.name} does not belong to this device")
|
|
continue
|
|
else:
|
|
interface = VMInterface.objects.get(id=interface_id)
|
|
interface_name = interface.name
|
|
if interface.virtual_machine_id != obj.id:
|
|
errors.append(f"Interface {interface.name} does not belong to this virtual machine")
|
|
continue
|
|
|
|
interface.delete()
|
|
deleted_count += 1
|
|
|
|
except (Interface.DoesNotExist, VMInterface.DoesNotExist):
|
|
errors.append(f"Interface with ID {interface_id} not found")
|
|
continue
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
errors.append(f"Error deleting interface {interface_name or interface_id}: {str(exc)}")
|
|
continue
|
|
|
|
except Exception as exc: # pragma: no cover
|
|
return JsonResponse({"error": f"Transaction failed: {str(exc)}"}, status=500)
|
|
|
|
response_data = {
|
|
"status": "success",
|
|
"deleted_count": deleted_count,
|
|
"message": f"Successfully deleted {deleted_count} interface(s)",
|
|
}
|
|
|
|
if errors:
|
|
response_data["errors"] = errors
|
|
response_data["message"] += f" with {len(errors)} error(s)"
|
|
|
|
return JsonResponse(response_data)
|