Files
netbox-librenms-plugin/netbox_librenms_plugin/librenms_api.py
Vlastislav Svatek 673e67106e
Some checks failed
ci / deploy (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (javascript-typescript) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
first commit
2026-06-05 10:39:05 +02:00

1099 lines
41 KiB
Python

import logging
import urllib.parse
import requests
from django.core.cache import cache
from netbox.plugins import get_plugin_config
# HTTP request timeout constants (in seconds)
DEFAULT_API_TIMEOUT = 10
EXTENDED_API_TIMEOUT = 20 # For endpoints that may take longer (e.g., device listing)
logger = logging.getLogger(__name__)
class LibreNMSAPI:
"""
Client to interact with the LibreNMS API and retrieve interface data for devices.
"""
def __init__(self, server_key=None):
"""
Initialize LibreNMS API client with support for multiple servers.
Args:
server_key: Key for specific server configuration. If None, uses selected server or default.
"""
# If no server_key is provided, try to get the selected server from settings
if not server_key:
try:
from netbox_librenms_plugin.models import LibreNMSSettings
settings = LibreNMSSettings.objects.first()
if settings:
server_key = settings.selected_server
except (ImportError, AttributeError):
pass
# Default to 'default' if still no server_key
server_key = server_key or "default"
# Get server configuration
servers_config = get_plugin_config("netbox_librenms_plugin", "servers")
# If the requested server_key doesn't exist but there are configured servers,
# only fall back to the first available server when using the auto-default key.
# If a specific (non-default) server_key was requested but not found, raise
# immediately to avoid silently using the wrong LibreNMS instance.
if servers_config and isinstance(servers_config, dict) and server_key not in servers_config:
if server_key != "default":
available = list(servers_config.keys())
raise KeyError(
f"Server '{server_key}' not found in LibreNMS plugin configuration. Available servers: {available}"
)
first_key = next(iter(servers_config), None)
if first_key:
logger.info(
"Server '%s' not found in config, falling back to '%s'",
server_key,
first_key,
)
server_key = first_key
self.server_key = server_key
if servers_config and isinstance(servers_config, dict) and server_key in servers_config:
# Multi-server configuration
config = servers_config[server_key]
self.librenms_url = config["librenms_url"]
self.api_token = config["api_token"]
self.cache_timeout = config.get("cache_timeout", 300)
self.verify_ssl = config.get("verify_ssl", True)
else:
# Fallback to legacy single-server configuration
self.librenms_url = get_plugin_config("netbox_librenms_plugin", "librenms_url")
self.api_token = get_plugin_config("netbox_librenms_plugin", "api_token")
self.cache_timeout = get_plugin_config("netbox_librenms_plugin", "cache_timeout", 300)
self.verify_ssl = get_plugin_config("netbox_librenms_plugin", "verify_ssl", True)
if not self.librenms_url or not self.api_token:
raise ValueError(f"LibreNMS URL or API token is not configured for server '{server_key}'.")
self.headers = {"X-Auth-Token": self.api_token}
def test_connection(self):
"""
Test connection to LibreNMS server by calling the /system endpoint.
Returns:
dict: System information if successful, error dict if failed
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/system",
headers=self.headers,
verify=self.verify_ssl,
timeout=DEFAULT_API_TIMEOUT,
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "ok" and data.get("system"):
return data["system"][0] if data["system"] else None
# Handle different HTTP status codes with user-friendly messages
if response.status_code == 401:
return {
"error": True,
"message": "Authentication failed - check API token",
}
elif response.status_code == 403:
return {
"error": True,
"message": "Access forbidden - check API token permissions",
}
elif response.status_code == 404:
return {
"error": True,
"message": "API endpoint not found - check LibreNMS URL",
}
elif response.status_code >= 500:
return {
"error": True,
"message": "LibreNMS server error - check server status",
}
else:
return {
"error": True,
"message": f"HTTP {response.status_code} - unexpected server response",
}
except requests.exceptions.SSLError:
return {
"error": True,
"message": "SSL certificate verification failed - try setting verify_ssl to false",
}
except requests.exceptions.ConnectionError:
return {
"error": True,
"message": "Connection failed - check server URL and network connectivity",
}
except requests.exceptions.Timeout:
return {
"error": True,
"message": "Connection timeout - server may be slow or unreachable",
}
except Exception as e:
return {"error": True, "message": f"Unexpected error: {str(e)}"}
@classmethod
def get_available_servers(cls):
"""
Get list of available server configurations.
Returns:
dict: Dictionary of server keys and their display names
"""
servers_config = get_plugin_config("netbox_librenms_plugin", "servers")
if servers_config and isinstance(servers_config, dict):
# Multi-server configuration
result = {}
for key, config in servers_config.items():
display_name = config.get("display_name", key)
result[key] = display_name
return result
else:
# Legacy single-server configuration
legacy_url = get_plugin_config("netbox_librenms_plugin", "librenms_url")
if legacy_url:
return {"default": f"Default Server ({legacy_url})"}
return {"default": "Default Server"}
def get_librenms_id(self, obj):
"""
Args:
obj: NetBox device or VM object
Returns:
int: LibreNMS device ID if found, None otherwise
Notes:
Lookup order:
1. Custom field 'librenms_id' on object
2. Cached librenms_id value
3. API lookup using:
a. primary_ip
b. primary IP's DNS name
c. hostname if FQDN
If found via API, stores ID in custom field if available,
otherwise caches the value.
"""
from netbox_librenms_plugin.utils import get_librenms_device_id
librenms_id = get_librenms_device_id(obj, self.server_key, auto_save=False)
if librenms_id is not None:
return librenms_id
# Check cache
cache_key = self._get_cache_key(obj)
librenms_id = cache.get(cache_key)
if librenms_id is not None:
return librenms_id
# Determine dynamically from API
ip_address = obj.primary_ip.address.ip if obj.primary_ip else None
dns_name = obj.primary_ip.dns_name if obj.primary_ip else None
hostname = obj.name if obj.name else None
# Try IP address
if ip_address:
librenms_id = self._normalize_librenms_id(self.get_device_id_by_ip(ip_address))
if librenms_id is not None:
self._store_librenms_id(obj, librenms_id)
return librenms_id
# Try primary IP's DNS name
if dns_name:
librenms_id = self._normalize_librenms_id(self.get_device_id_by_hostname(dns_name))
if librenms_id is not None:
self._store_librenms_id(obj, librenms_id)
return librenms_id
# Try hostname if FQDN
if hostname:
librenms_id = self._normalize_librenms_id(self.get_device_id_by_hostname(hostname))
if librenms_id is not None:
self._store_librenms_id(obj, librenms_id)
return librenms_id
return None
@staticmethod
def _normalize_librenms_id(value):
"""Coerce a raw LibreNMS ID value to int or None.
Booleans are rejected because bool is a subclass of int in Python,
so int(True) silently becomes 1 — a valid-looking device ID.
"""
if value is None or isinstance(value, bool):
return None
try:
return int(value)
except (ValueError, TypeError):
return None
def _get_cache_key(self, obj):
"""
Generate a unique cache key for an object.
Args:
obj: NetBox device or VM object
Returns:
str: Cache key
"""
object_type = obj._meta.model_name
server_key = getattr(self, "server_key", "default")
return f"librenms_device_id_{object_type}_{obj.pk}_{server_key}"
def _store_librenms_id(self, obj, librenms_id):
"""
Store in custom field if available
Args:
obj: NetBox device or VM object
librenms_id: LibreNMS device ID
Returns:
None
"""
if "librenms_id" in obj.cf:
from netbox_librenms_plugin.utils import set_librenms_device_id
set_librenms_device_id(obj, librenms_id, self.server_key)
obj.save(update_fields=["custom_field_data"])
else:
# Use cache as fallback
cache_key = self._get_cache_key(obj)
cache.set(cache_key, librenms_id, timeout=self.cache_timeout)
def get_device_id_by_ip(self, ip_address):
"""
Retrieve the device ID using the device's IP address.
Args:
ip_address: Device IP address
Retruns:
int: LibreNMS device ID if found, None otherwise
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/devices/{ip_address}",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
device_data = response.json()["devices"][0]
return device_data["device_id"]
except (requests.exceptions.RequestException, ValueError, IndexError, KeyError, TypeError):
return None
def get_device_id_by_hostname(self, hostname):
"""
Retrieve the device ID using the device's hostname.
Args:
hostname: Device hostname
Returns:
int: LibreNMS device ID if found, None otherwise
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/devices/{hostname}",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
device_data = response.json()["devices"][0]
return device_data["device_id"]
except (requests.exceptions.RequestException, ValueError, IndexError, KeyError, TypeError):
return None
def get_device_info(self, device_id):
"""
Fetch device information from LibreNMS using its primary IP.
Args:
device_id: LibreNMS device ID
Returns:
tuple: (success: bool, data: dict)
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/devices/{device_id}",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
if response.status_code == 200:
device_data = response.json()["devices"][0]
if not isinstance(device_data, dict):
return False, None
return True, device_data
return False, None
except (requests.exceptions.RequestException, ValueError, IndexError, KeyError, TypeError):
return False, None
def get_ports(self, device_id, with_vlans=True):
"""
Fetch ports data from LibreNMS for a device using its primary IP.
Includes VLAN assignment data (ifVlan, ifTrunk) for interface VLAN sync.
When with_vlans=True, includes detailed VLAN associations (tagged/untagged)
for all ports in a single API call (requires LibreNMS 24.2.0+).
Args:
device_id: LibreNMS device ID
with_vlans: Include detailed VLAN data for all ports (default: True)
Returns:
tuple: (success: bool, data: dict)
"""
try:
params = {
"columns": "port_id,ifName,ifType,ifSpeed,ifAdminStatus,ifDescr,ifAlias,ifPhysAddress,ifMtu,ifVlan,ifTrunk"
}
if with_vlans:
params["with"] = "vlans"
response = requests.get(
f"{self.librenms_url}/api/v0/devices/{device_id}/ports",
headers=self.headers,
params=params,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
data = response.json()
return True, data
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return False, "Device not found in LibreNMS"
return False, f"HTTP error: {str(e)}"
except requests.exceptions.RequestException as e:
return False, f"Error connecting to LibreNMS: {str(e)}"
def add_device(self, data):
"""
Add a device to LibreNMS.
Args:
Dictionary containing device data including:
- hostname: Device hostname or IP
- snmp_version: SNMP version (v1, v2c, or v3)
- force_add: Skip checks for duplicate device and SNMP reachability (optional, default False)
- port: SNMP port (optional, defaults to config value)
- transport: SNMP transport protocol (optional: udp, tcp, udp6, tcp6)
- port_association_mode: Port identification method (optional: ifIndex, ifName, ifDescr, ifAlias)
- poller_group: Poller group ID (optional, defaults to 0)
- community: SNMP community string (for v1 or v2c)
- authlevel, authname, authpass, authalgo, cryptopass, cryptoalgo: SNMP v3 parameters
Returns:
tuple: (success: bool, message: str)
"""
payload = {
"hostname": data["hostname"],
"snmpver": data["snmp_version"],
"force_add": data.get("force_add", False),
}
# Add optional common fields if provided
if data.get("port"):
payload["port"] = data["port"]
if data.get("transport"):
payload["transport"] = data["transport"]
if data.get("port_association_mode"):
payload["port_association_mode"] = data["port_association_mode"]
if data.get("poller_group") is not None:
payload["poller_group"] = data["poller_group"]
if data["snmp_version"] in ("v1", "v2c"):
payload["community"] = data["community"]
elif data["snmp_version"] == "v3":
payload.update(
{
"authlevel": data["authlevel"],
"authname": data["authname"],
"authpass": data["authpass"],
"authalgo": data["authalgo"],
"cryptopass": data["cryptopass"],
"cryptoalgo": data["cryptoalgo"],
}
)
try:
response = requests.post(
f"{self.librenms_url}/api/v0/devices",
headers=self.headers,
json=payload,
timeout=EXTENDED_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if result.get("status") == "ok":
return True, "Device added successfully."
else:
return False, result.get("message", "Unknown error.")
except requests.exceptions.RequestException as e:
return False, str(e)
def update_device_field(self, device_id, field_data):
"""
Update a specific field for a device in LibreNMS.
Args:
device_id: LibreNMS device ID
field_data: Dictionary containing field name and value
e.g {
"field": ["location", "override_sysLocation"],
"data": [device.site.name, "1"]
Returns:
tuple (success: bool, message: str)
"""
try:
response = requests.patch(
f"{self.librenms_url}/api/v0/devices/{device_id}",
headers=self.headers,
json=field_data,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if result.get("status") == "ok":
return True, "Device fields updated successfully"
else:
return False, result.get("message", "Unknown error occurred")
except requests.exceptions.RequestException as e:
error_message = str(e)
if hasattr(e.response, "json"):
error_details = e.response.json()
error_message = error_details.get("message", error_message)
return False, error_message
def get_locations(self):
"""
Fetch locations data from LibreNMS.
Args:
None
Returns:
tuple: (success: bool, data: dict)
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/resources/locations",
headers=self.headers,
timeout=EXTENDED_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if "locations" in result:
return True, result["locations"]
else:
return False, "No locations found or unexpected response format"
except requests.exceptions.RequestException as e:
error_message = str(e)
return False, error_message
def add_location(self, location_data):
"""
Add a location to LibreNMS.
Args:
location_data: Dictionary containing location data
e.g location_data = {
"location": site.name,
"lat": str(site.latitude),
"lng": str(site.longitude)
}
Return:
tuple: (success: bool, message: str)
"""
try:
response = requests.post(
f"{self.librenms_url}/api/v0/locations",
headers=self.headers,
json=location_data,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if result.get("status") == "ok":
location_id = result["message"].split("#")[-1]
return True, {"id": location_id, "message": result["message"]}
else:
return False, result.get("message") or "Unexpected response format"
except requests.exceptions.RequestException as e:
error_message = str(e)
if hasattr(e.response, "json"):
error_details = e.response.json()
error_message = error_details.get("message", error_message)
return False, error_message
def update_location(self, location_name, location_data):
"""
Update a location in LibreNMS.
Args:
location_name: LibreNMS Location name
location_data: Dictionary containing location data
e.g location_data = {
"lat": str(site.latitude),
"lng": str(site.longitude)
}
Returns:
tuple: (success: bool, message: str)
"""
try:
encoded_location_name = urllib.parse.quote(location_name)
response = requests.patch(
f"{self.librenms_url}/api/v0/locations/{encoded_location_name}",
headers=self.headers,
json=location_data,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if result.get("status") == "ok":
return True, result["message"]
else:
return False, result.get("message") or "Unexpected response format"
except requests.exceptions.RequestException as e:
error_message = str(e)
if hasattr(e.response, "json"):
error_details = e.response.json()
error_message = error_details.get("message", error_message)
return False, error_message
def get_device_links(self, device_id):
"""
Get links for a specific device from LibreNMS.
Args:
hostname: LibreNMS Device ID
Returns:
tuple: (success: bool, data: dict)
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/devices/{device_id}/links",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
return True, response.json()
except requests.exceptions.RequestException as e:
return False, str(e)
def get_device_ips(self, device_id):
"""
Fetch IP address data for a specific device from LibreNMS.
Args:
device_id: LibreNMS Device ID
Returns:
tuple: (success: bool, data: dict)
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/devices/{device_id}/ip",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
data = response.json()
addresses = data.get("addresses") if isinstance(data, dict) else None
if not isinstance(addresses, list):
message = data.get("message") if isinstance(data, dict) else None
return False, message or "Unexpected response format: 'addresses' must be a list"
return True, addresses
except (requests.exceptions.RequestException, ValueError) as e:
return False, str(e)
def get_port_by_id(self, port_id):
"""
Fetch specific port data from LibreNMS using port ID.
Args:
port_id: LibreNMS Port ID
Returns:
tuple: (success: bool, data: dict)
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/ports/{port_id}",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
return True, response.json()
except requests.exceptions.RequestException as e:
return False, str(e)
def get_device_inventory(self, device_id):
"""
Fetch complete inventory for a device from LibreNMS.
Useful for getting component details like chassis serial numbers for Virtual Chassis.
Route: /api/v0/inventory/{device_id}/all
Args:
device_id: LibreNMS device ID
Returns:
tuple: (success: bool, data: list)
Example inventory item:
{
"entPhysicalDescr": "Chassis Component",
"entPhysicalClass": "chassis",
"entPhysicalSerialNum": "ABC123456",
"entPhysicalModelName": "EX4300-48P",
...
}
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/inventory/{device_id}/all",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
inventory_data = response.json()
inventory = inventory_data.get("inventory") if isinstance(inventory_data, dict) else None
if not isinstance(inventory, list) or any(not isinstance(item, dict) for item in inventory):
msg = inventory_data.get("message", "") if isinstance(inventory_data, dict) else ""
logger.warning(f"Unexpected inventory response for device {device_id}: {inventory_data}")
return False, msg or "Unexpected response format: invalid 'inventory' payload"
return True, inventory
except (requests.exceptions.RequestException, ValueError) as e:
return False, str(e)
def get_poller_groups(self):
"""
Fetch all poller groups from LibreNMS.
Route: /api/v0/poller_group
Returns:
tuple: (success: bool, data: list)
Example poller group:
{
"id": 1,
"group_name": "test",
"descr": "test group"
}
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/poller_group",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if isinstance(result, dict) and result.get("status") == "ok":
poller_groups = result.get("get_poller_group")
if not isinstance(poller_groups, list):
return False, result.get("message") or "Unexpected response format: missing 'get_poller_group' list"
if not all(isinstance(item, dict) for item in poller_groups):
return False, "Unexpected response format: invalid item shape in 'get_poller_group'"
return True, poller_groups
if isinstance(result, dict):
return False, result.get("message") or "Unexpected response format"
return False, "Unexpected response format: non-object JSON"
except (requests.exceptions.RequestException, ValueError) as e:
return False, str(e)
def get_inventory_filtered(self, device_id, ent_physical_class=None, ent_physical_contained_in=None):
"""
Fetch filtered inventory from LibreNMS with optional filtering.
Uses query parameters if supported, falls back to client-side filtering.
Route: /api/v0/inventory/{device_id}
Args:
device_id: LibreNMS device ID
ent_physical_class: Filter by entPhysicalClass (e.g., 'chassis', 'stack')
ent_physical_contained_in: Filter by entPhysicalContainedIn (0=root, 1=first level, etc.)
Returns:
tuple: (success: bool, inventory: list)
Example:
>>> api.get_inventory_filtered(22, ent_physical_class='chassis', ent_physical_contained_in=1)
(True, [{'entPhysicalClass': 'chassis', ...}, ...])
"""
logger.debug(
f"get_inventory_filtered: device={device_id}, "
f"class={ent_physical_class}, contained_in={ent_physical_contained_in}"
)
try:
# Build query parameters for API filtering
params = {}
if ent_physical_class is not None:
params["entPhysicalClass"] = ent_physical_class
if ent_physical_contained_in is not None:
params["entPhysicalContainedIn"] = str(ent_physical_contained_in)
# Try the filtered endpoint first (non-/all)
response = requests.get(
f"{self.librenms_url}/api/v0/inventory/{device_id}",
headers=self.headers,
params=params,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
data = response.json()
if isinstance(data, dict) and data.get("status") == "ok":
inventory = data.get("inventory")
if not isinstance(inventory, list) or any(not isinstance(item, dict) for item in inventory):
msg = data.get("message")
return False, msg or "Unexpected response format: invalid 'inventory' payload"
logger.debug(f"API returned {len(inventory)} items")
# If we got results or didn't specify filters, return
if inventory or not params:
return True, inventory
# If filtered endpoint returned empty but we have filters,
# try /all endpoint and filter client-side
if params:
logger.debug("Filtered inventory API returned no results, falling back to client-side filtering")
success, all_inventory = self.get_device_inventory(device_id)
if not success:
return False, all_inventory
# Apply client-side filters
filtered = all_inventory
if ent_physical_class:
filtered = [item for item in filtered if item.get("entPhysicalClass") == ent_physical_class]
if ent_physical_contained_in is not None:
filtered = [
item
for item in filtered
if str(item.get("entPhysicalContainedIn")) == str(ent_physical_contained_in)
]
return True, filtered
# LibreNMS API v0 always returns JSON objects, so data is always
# a dict here; the isinstance guard is purely defensive.
if isinstance(data, dict):
return False, data.get("message") or "Unexpected response format"
return False, "Unexpected response format"
except (requests.exceptions.RequestException, ValueError) as e:
logger.warning(f"Failed to fetch filtered inventory: {e}")
return False, str(e)
def list_devices(self, filters=None):
"""
List all devices from LibreNMS with optional filtering.
Route: /api/v0/devices
Args:
filters (dict, optional): Filter parameters:
- type: Device type filter (e.g., 'network', 'server', 'storage')
- location_id: Location ID filter (numeric ID from LibreNMS)
- hostname: Hostname filter (partial match)
- os: Operating system filter
- version: OS version filter
- hardware: Hardware model filter
- features: Features filter
- device_id: Specific device ID
- query: Search query (searches across multiple fields)
Returns:
tuple: (success: bool, data: list)
Example device:
{
"device_id": 1,
"hostname": "router01.example.com",
"sysName": "router01",
"ip": "192.168.1.1",
"hardware": "Cisco C9300-48P",
"version": "IOS 16.9.4",
"location": "Datacenter 1",
"status": 1,
"status_reason": "",
"ignore": 0,
"disabled": 0,
"uptime": 3153600,
"os": "ios",
"type": "network",
"serial": "ABC123456789",
"icon": "cisco.svg",
...
}
"""
try:
params = {}
if filters:
# Build query parameters from filters
for key, value in filters.items():
if value is not None and value != "":
params[key] = value
response = requests.get(
f"{self.librenms_url}/api/v0/devices",
headers=self.headers,
params=params,
timeout=EXTENDED_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if isinstance(result, dict) and result.get("status") == "ok":
devices = result.get("devices")
if not isinstance(devices, list):
msg = result.get("message")
return False, msg or "Unexpected response format: missing 'devices' list"
if not all(isinstance(item, dict) for item in devices):
return False, "Unexpected response format: invalid item shape in 'devices'"
return True, devices
# LibreNMS API v0 always returns JSON objects, so result is always
# a dict here; the isinstance guard is purely defensive.
if isinstance(result, dict):
return False, result.get("message") or "Unexpected response format"
return False, "Unexpected response format"
except (requests.exceptions.RequestException, ValueError) as e:
return False, str(e)
# =========================================================================
# VLAN Methods
# =========================================================================
def get_device_vlans(self, device_id: int) -> tuple[bool, list | str]:
"""
Fetch all VLANs configured on a device using the resources endpoint.
This method uses /api/v0/resources/vlans which includes the vlan_id
primary key, unlike /api/v0/devices/{device_id}/vlans which omits it.
Route: /api/v0/resources/vlans
Args:
device_id: LibreNMS device ID
Returns:
tuple: (success: bool, data: list of VLAN dicts or error string)
Example VLAN:
{
"vlan_id": 123,
"device_id": 1,
"vlan_vlan": 50,
"vlan_domain": 1,
"vlan_name": "ORG_DATA",
"vlan_type": "ethernet",
"vlan_state": 1
}
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/resources/vlans",
headers=self.headers,
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if isinstance(result, dict) and result.get("status") == "ok":
all_vlans = result.get("vlans")
if not isinstance(all_vlans, list):
msg = result.get("message")
return False, msg or "Unexpected response format: missing 'vlans' list"
# Filter VLANs by device_id since resources endpoint returns all VLANs
device_vlans = [
v for v in all_vlans if isinstance(v, dict) and str(v.get("device_id")) == str(device_id)
]
return True, device_vlans
if isinstance(result, dict):
return False, result.get("message") or "Unexpected response format"
return False, "Unexpected response format"
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return False, "VLANs resource not found"
return False, f"HTTP error: {str(e)}"
except (requests.exceptions.RequestException, ValueError) as e:
return False, f"Error connecting to LibreNMS: {str(e)}"
def get_port_vlan_details(self, port_id: int) -> tuple[bool, dict | str]:
"""
Fetch detailed VLAN associations for a single port.
Required for trunk ports to get the tagged VLANs list.
Route: /api/v0/ports/{port_id}?with=vlans
Args:
port_id: LibreNMS port ID
Returns:
tuple: (success: bool, data: port dict with vlans array or error string)
Example port:
{
"port_id": 227011,
"ifName": "Te1/1/1",
"ifVlan": "90",
"ifTrunk": "dot1Q",
"vlans": [
{"vlan": 90, "untagged": 1, "state": "unknown"},
{"vlan": 50, "untagged": 0, "state": "forwarding"}
]
}
"""
try:
response = requests.get(
f"{self.librenms_url}/api/v0/ports/{port_id}",
headers=self.headers,
params={"with": "vlans"},
timeout=DEFAULT_API_TIMEOUT,
verify=self.verify_ssl,
)
response.raise_for_status()
if response.status_code == 200:
result = response.json()
if not isinstance(result, dict):
return False, "Unexpected response format"
port_data = result.get("port")
if not isinstance(port_data, list):
return False, result.get("message", "Unexpected response format: missing 'port' list")
if not port_data:
return False, "Port not found"
if not isinstance(port_data[0], dict):
return False, "Unexpected response format: invalid 'port' entry"
return True, port_data[0]
return False, f"HTTP {response.status_code}"
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return False, "Port not found in LibreNMS"
return False, f"HTTP error: {str(e)}"
except (requests.exceptions.RequestException, ValueError) as e:
return False, f"Error connecting to LibreNMS: {str(e)}"
def parse_port_vlan_data(self, port_data: dict, interface_name_field: str = "ifName") -> dict:
"""
Transform LibreNMS port VLAN data into normalized structure.
Args:
port_data: Raw port dict from LibreNMS API
interface_name_field: Field to use for interface name ('ifName' or 'ifDescr')
Returns:
dict: Normalized structure with:
- port_id: int
- interface_name: str (value from interface_name_field)
- ifName: str (always included for reference)
- ifDescr: str (always included for reference)
- mode: 'access' | 'tagged' | None
- untagged_vlan: int | None
- tagged_vlans: list[int]
"""
port_id = port_data.get("port_id")
if_name = port_data.get("ifName", "")
if_descr = port_data.get("ifDescr", "")
interface_name = port_data.get(interface_name_field, "") or if_name
if_vlan = port_data.get("ifVlan", "")
if_trunk = port_data.get("ifTrunk")
# Determine 802.1Q mode
if not if_vlan:
mode = None
elif if_trunk == "dot1Q":
mode = "tagged"
else:
mode = "access"
# Parse VLAN assignments from vlans array if present
vlans_data = port_data.get("vlans", [])
untagged_vlan = None
tagged_vlans = []
if isinstance(vlans_data, list) and vlans_data:
# Parse from detailed vlans array
for vlan_entry in vlans_data:
if not isinstance(vlan_entry, dict):
continue
vlan_id = vlan_entry.get("vlan")
if vlan_id is None:
continue
try:
vlan_id = int(vlan_id)
except (ValueError, TypeError):
continue
if vlan_entry.get("untagged") == 1:
untagged_vlan = vlan_id
else:
tagged_vlans.append(vlan_id)
elif if_vlan:
# Fallback to ifVlan field for basic port info
try:
untagged_vlan = int(if_vlan)
except (ValueError, TypeError):
pass
return {
"port_id": port_id,
"interface_name": interface_name,
"ifName": if_name,
"ifDescr": if_descr,
"mode": mode,
"untagged_vlan": untagged_vlan,
"tagged_vlans": sorted(tagged_vlans),
}