You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

464 lines
16 KiB

import re
from typing import AnyStr, Iterable, Optional, Dict
from datetime import timedelta
from easysnmp import EasySNMPTimeoutError
from transliterate import translit
from django.utils.translation import gettext_lazy as _, gettext
from djing.lib import RuTimedelta, safe_int
from djing.lib.tln.tln import ValidationError as TlnValidationError
from .base_intr import DevBase, SNMPBaseWorker, BasePort, DeviceImplementationError, ListOrError
def _norm_name(name: str, replreg=None):
if replreg is None:
return re.sub(pattern='\W{1,255}', repl='', string=name, flags=re.IGNORECASE)
return replreg.sub('', name)
def plain_ip_device_mon_template(device) -> Optional[AnyStr]:
if not device:
raise ValueError
parent_host_name = _norm_name("%d%s" % (
device.parent_dev.pk, translit(device.parent_dev.comment, language_code='ru', reversed=True)
)) if device.parent_dev else None
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
mac_addr = device.mac_addr
r = (
"define host{",
"\tuse generic-switch",
"\thost_name %s" % host_name,
"\taddress %s" % device.ip_address,
"\tparents %s" % parent_host_name if parent_host_name is not None else '',
"\t_mac_addr %s" % mac_addr if mac_addr is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
class DLinkPort(BasePort):
def __init__(self, num, name, status, mac, speed, snmp_worker):
BasePort.__init__(self, num, name, status, mac, speed)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
def disable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 2
)
def enable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 1
)
class DLinkDevice(DevBase, SNMPBaseWorker):
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2)
@staticmethod
def description():
return _('DLink switch')
def reboot(self):
return self.get_item('.1.3.6.1.4.1.2021.8.1.101.1')
def get_ports(self) -> ListOrError:
interfaces_count = safe_int(self.get_item('.1.3.6.1.2.1.2.1.0'))
nams = list(self.get_list('.1.3.6.1.4.1.171.10.134.2.1.1.100.2.1.3'))
stats = list(self.get_list('.1.3.6.1.2.1.2.2.1.7'))
macs = list(self.get_list('.1.3.6.1.2.1.2.2.1.6'))
speeds = list(self.get_list('.1.3.6.1.2.1.2.2.1.5'))
res = []
try:
for n in range(interfaces_count):
status = True if int(stats[n]) == 1 else False
res.append(DLinkPort(
n + 1,
nams[n] if len(nams) > 0 else '',
status,
macs[n] if len(macs) > 0 else _('does not fetch the mac'),
int(speeds[n]) if len(speeds) > 0 else 0,
self))
return res
except IndexError:
return DeviceImplementationError('Dlink port index error'), res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.1.0')
def uptime(self) -> timedelta:
uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.8.0'))
tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_template_name(self):
return 'ports.html'
@staticmethod
def has_attachable_to_subscriber():
return True
@staticmethod
def is_use_device_port():
return True
def validate_extra_snmp_info(self, v: str) -> None:
# Dlink has no require snmp info
pass
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
return plain_ip_device_mon_template(device, *args, **kwargs)
class ONUdev(BasePort):
def __init__(self, num, name, status, mac, speed, signal, snmp_worker):
super(ONUdev, self).__init__(num, name, status, mac, speed)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
self.signal = signal
def disable(self):
pass
def enable(self):
pass
def __str__(self):
return "%d: '%s' %s" % (self.num, self.nm, self.mac())
class OLTDevice(DevBase, SNMPBaseWorker):
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2)
@staticmethod
def description():
return gettext('PON OLT')
def reboot(self):
pass
def get_ports(self) -> ListOrError:
nms = self.get_list('.1.3.6.1.4.1.3320.101.10.1.1.79')
res = []
try:
for nm in nms:
n = int(nm)
status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % n)
signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % n)
onu = ONUdev(
num=n,
name=self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % n),
status=True if status == '3' else False,
mac=self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % n),
speed=0,
signal=int(signal) / 10 if signal != 'NOSUCHINSTANCE' else 0,
snmp_worker=self)
res.append(onu)
except EasySNMPTimeoutError as e:
return EasySNMPTimeoutError(
"%s (%s)" % (gettext('wait for a reply from the SNMP Timeout'), e)
), res
return res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def uptime(self):
up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.9.1.4.1'))
tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_template_name(self):
return 'olt.html'
@staticmethod
def has_attachable_to_subscriber():
return False
@staticmethod
def is_use_device_port():
return False
def validate_extra_snmp_info(self, v: str) -> None:
# Olt has no require snmp info
pass
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
return plain_ip_device_mon_template(device)
class OnuDevice(DevBase, SNMPBaseWorker):
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
dev_ip_addr = None
if dev_instance.ip_address:
dev_ip_addr = dev_instance.ip_address
else:
parent_device = dev_instance.parent_dev
if parent_device is not None and parent_device.ip_address:
dev_ip_addr = parent_device.ip_address
if dev_ip_addr is None:
raise DeviceImplementationError(gettext(
'Ip address or parent device with ip address required for ONU device'
))
SNMPBaseWorker.__init__(self, dev_ip_addr, dev_instance.man_passw, 2)
@staticmethod
def description() -> AnyStr:
return gettext('PON ONU')
def reboot(self):
pass
def get_ports(self) -> ListOrError:
return []
def get_device_name(self):
pass
def uptime(self):
pass
def get_template_name(self):
return "onu.html"
@staticmethod
def has_attachable_to_subscriber():
return True
@staticmethod
def is_use_device_port():
return False
def get_details(self):
if self.db_instance is None:
return
num = safe_int(self.db_instance.snmp_extra)
if num == 0:
return
try:
status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % num)
signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % num)
distance = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.27.%d' % num)
mac = ':'.join('%x' % ord(i) for i in self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % num))
# uptime = self.get_item('.1.3.6.1.2.1.2.2.1.9.%d' % num)
return {
'status': status,
'signal': int(signal) / 10 if signal != 'NOSUCHINSTANCE' else 0,
'name': self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % num),
'mac': mac,
'distance': int(distance) / 10 if distance != 'NOSUCHINSTANCE' else 0
}
except EasySNMPTimeoutError as e:
return {'err': "%s: %s" % (_('ONU not connected'), e)}
def validate_extra_snmp_info(self, v: str) -> None:
# DBCOM Onu have en integer snmp port
try:
int(v)
except ValueError:
raise TlnValidationError(_('Onu snmp field must be en integer'))
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
r = (
"define host{",
"\tuse device-onu",
"\thost_name %s" % host_name,
# "\taddress %s" % device.ip_address,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
class EltexPort(BasePort):
def __init__(self, snmp_worker, *args, **kwargs):
BasePort.__init__(self, *args, **kwargs)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
def disable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num),
2
)
def enable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num),
1
)
class EltexSwitch(DLinkDevice):
@staticmethod
def description():
return _('Eltex switch')
def get_ports(self) -> ListOrError:
res = []
for i, n in enumerate(range(49, 77), 1):
speed = self.get_item('.1.3.6.1.2.1.2.2.1.5.%d' % n)
res.append(EltexPort(self,
i,
self.get_item('.1.3.6.1.2.1.31.1.1.1.18.%d' % n),
self.get_item('.1.3.6.1.2.1.2.2.1.8.%d' % n),
self.get_item('.1.3.6.1.2.1.2.2.1.6.%d' % n),
int(speed),
))
return res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def uptime(self):
uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0'))
tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta())
return tm
@staticmethod
def has_attachable_to_subscriber():
return True
@staticmethod
def is_use_device_port():
return False
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
return plain_ip_device_mon_template(device)
def conv_signal(lvl: int) -> float:
if lvl == 65535: return 0.0
r = 0
if 0 < lvl < 30000:
r = lvl * 0.002 - 30
elif 60000 < lvl < 65534:
r = (lvl - 65534) * 0.002 - 30
return round(r, 2)
class Olt_ZTE_C320(OLTDevice):
@staticmethod
def description():
return gettext('OLT ZTE C320')
def get_fibers(self):
fibers = ({
'fb_id': fiber_id,
'fb_name': fiber_name,
'fb_onu_num': safe_int(self.get_item('.1.3.6.1.4.1.3902.1012.3.13.1.1.13.%d' % int(fiber_id)))
} for fiber_name, fiber_id in self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.1.1.1'))
return fibers
def get_ports_on_fiber(self, fiber_num: int) -> Iterable:
onu_types = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%d' % fiber_num)
onu_ports = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.2.%d' % fiber_num)
onu_signals = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%d' % fiber_num)
# Real sn in last 3 octets
onu_sns = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%d' % fiber_num)
onu_prefixs = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.11.2.1.1.%d' % fiber_num)
onu_list = ({
'onu_type': onu_type_num[0],
'onu_port': onu_port,
'onu_signal': conv_signal(safe_int(onu_signal)),
'onu_sn': onu_prefix + ''.join('%.2X' % ord(i) for i in onu_sn[-4:]), # Real sn in last 4 octets,
'snmp_extra': "%d.%d" % (fiber_num, safe_int(onu_type_num[1])),
} for onu_type_num, onu_port, onu_signal, onu_sn, onu_prefix in zip(
onu_types, onu_ports, onu_signals, onu_sns, onu_prefixs
))
return onu_list
def uptime(self):
up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0'))
tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_long_description(self):
return self.get_item('.1.3.6.1.2.1.1.1.0')
def get_hostname(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def get_template_name(self):
return 'olt_ztec320.html'
class ZteOnuDevice(OnuDevice):
@staticmethod
def description():
return _('ZTE PON ONU')
def get_details(self) -> Optional[Dict]:
if self.db_instance is None:
return
snmp_extra = self.db_instance.snmp_extra
if not snmp_extra:
return
try:
fiber_num, onu_num = snmp_extra.split('.')
fiber_num, onu_num = int(fiber_num), int(onu_num)
status = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.1.%d.%d.1' % (fiber_num, onu_num))
signal = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%d.%d.1' % (fiber_num, onu_num))
distance = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.18.%d.%d.1' % (fiber_num, onu_num))
name = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.11.2.1.1.%d.%d' % (fiber_num, onu_num))
return {
'status': status,
'signal': conv_signal(safe_int(signal)),
'name': name,
'distance': int(distance) / 10 if distance != 'NOSUCHINSTANCE' else 0
}
except ValueError:
pass
def get_template_name(self):
return 'onu_for_zte.html'
def validate_extra_snmp_info(self, v: str) -> None:
# for example 268501760.5
try:
fiber_num, onu_port = v.split('.')
int(fiber_num), int(onu_port)
except ValueError:
raise TlnValidationError(_('Zte onu snmp field must be two dot separated integers'))
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
r = (
"define host{",
"\tuse dev-onu-zte-f660",
"\thost_name %s" % host_name,
# "\taddress %s" % device.ip_address,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)