You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

612 lines
22 KiB

import os
import re
from typing import AnyStr, Iterable, Optional, Dict
from datetime import timedelta
from easysnmp import EasySNMPTimeoutError
from pexpect import TIMEOUT
from transliterate import translit
from django.utils.translation import gettext_lazy as _, gettext
from django.conf import settings
from djing.lib import RuTimedelta, safe_int
from devapp.expect_scripts import register_f601_onu, register_f660_onu, ExpectValidationError, OnuZteRegisterError
from .base_intr import (
DevBase, SNMPBaseWorker, BasePort, DeviceImplementationError,
ListOrError, DeviceConfigurationError
)
def _norm_name(name: str, replreg=None):
if replreg is None:
return re.sub(pattern='\W{1,255}', repl='', string=name, flags=re.IGNORECASE)
return replreg.sub('', name)
def plain_ip_device_mon_template(device) -> Optional[AnyStr]:
if not device:
raise ValueError
parent_host_name = _norm_name("%d%s" % (
device.parent_dev.pk, translit(device.parent_dev.comment, language_code='ru', reversed=True)
)) if device.parent_dev else None
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
mac_addr = device.mac_addr
r = (
"define host{",
"\tuse generic-switch",
"\thost_name %s" % host_name,
"\taddress %s" % device.ip_address,
"\tparents %s" % parent_host_name if parent_host_name is not None else '',
"\t_mac_addr %s" % mac_addr if mac_addr is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
def ex_expect(filename, params=()):
base_dir = getattr(settings, 'BASE_DIR')
if base_dir is not None:
exec_file = os.path.join(base_dir, 'devapp', 'expect_scripts', filename)
if os.path.isfile(exec_file) and os.access(path=exec_file, mode=os.X_OK):
params = ' '.join(str(p) for p in params)
if params:
return os.system('%s %s' % (exec_file, params))
else:
return os.system(exec_file)
class DLinkPort(BasePort):
def __init__(self, snmp_worker, *args, **kwargs):
BasePort.__init__(self, writable=True, *args, **kwargs)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
def disable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 2
)
def enable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 1
)
class DLinkDevice(DevBase, SNMPBaseWorker):
has_attachable_to_subscriber = True
tech_code = 'dlink_sw'
description = _('DLink switch')
is_use_device_port = True
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2)
def reboot(self, save_before_reboot=False):
dat = self.db_instance.extra_data
if dat is None:
return
login = dat.get('login')
passw = dat.get('password')
if login and passw:
return ex_expect('dlink_DGS1100_reboot.exp', (
self.db_instance.ip_address,
login, passw,
1 if save_before_reboot else 0
))
def get_ports(self) -> ListOrError:
interfaces_count = safe_int(self.get_item('.1.3.6.1.2.1.2.1.0'))
nams = tuple(self.get_list('.1.3.6.1.4.1.171.10.134.2.1.1.100.2.1.3'))
stats = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.7'))
macs = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.6'))
speeds = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.5'))
try:
for n in range(interfaces_count):
status = True if int(stats[n]) == 1 else False
yield DLinkPort(
num=n + 1,
name=nams[n] if len(nams) > 0 else '',
status=status,
mac=macs[n] if len(macs) > 0 else _('does not fetch the mac'),
speed=int(speeds[n]) if len(speeds) > 0 else 0,
snmp_worker=self)
except IndexError:
return DeviceImplementationError('Dlink port index error')
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.1.0')
def uptime(self) -> timedelta:
uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.8.0'))
tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_template_name(self):
return 'generic_switch.html'
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# Dlink has no require snmp info
pass
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
return plain_ip_device_mon_template(device)
def register_device(self, extra_data: Dict):
pass
class ONUdev(BasePort):
def __init__(self, signal, snmp_worker, *args, **kwargs):
super(ONUdev, self).__init__(*args, **kwargs)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
self.signal = signal
def disable(self):
pass
def enable(self):
pass
def __str__(self):
return "%d: '%s' %s" % (self.num, self.nm, self.mac())
class OLTDevice(DevBase, SNMPBaseWorker):
has_attachable_to_subscriber = False
description = 'PON OLT'
is_use_device_port = False
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2)
def reboot(self, save_before_reboot=False):
pass
def get_ports(self) -> ListOrError:
nms = self.get_list('.1.3.6.1.4.1.3320.101.10.1.1.79')
res = []
try:
for nm in nms:
n = int(nm)
status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % n)
signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % n)
onu = ONUdev(
num=n,
name=self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % n),
status=True if status == '3' else False,
mac=self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % n),
speed=0,
signal=int(signal or 0),
snmp_worker=self)
res.append(onu)
except EasySNMPTimeoutError as e:
return EasySNMPTimeoutError(
"%s (%s)" % (gettext('wait for a reply from the SNMP Timeout'), e)
), res
return res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def uptime(self):
up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.9.1.4.1'))
tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_template_name(self):
return 'olt.html'
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# Olt has no require snmp info
pass
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
return plain_ip_device_mon_template(device)
def register_device(self, extra_data: Dict):
pass
class OnuDevice(DevBase, SNMPBaseWorker):
has_attachable_to_subscriber = True
description = 'PON ONU BDCOM'
tech_code = 'bdcom_onu'
is_use_device_port = False
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
dev_ip_addr = None
if dev_instance.ip_address:
dev_ip_addr = dev_instance.ip_address
else:
parent_device = dev_instance.parent_dev
if parent_device is not None and parent_device.ip_address:
dev_ip_addr = parent_device.ip_address
if dev_ip_addr is None:
raise DeviceImplementationError(gettext(
'Ip address or parent device with ip address required for ONU device'
))
SNMPBaseWorker.__init__(self, dev_ip_addr, dev_instance.man_passw, 2)
def reboot(self, save_before_reboot=False):
pass
def get_ports(self) -> ListOrError:
return ()
def get_device_name(self):
pass
def uptime(self):
pass
def get_template_name(self):
return "onu.html"
def get_details(self):
if self.db_instance is None:
return
num = safe_int(self.db_instance.snmp_extra)
if num == 0:
return
try:
status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % num)
signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % num)
distance = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.27.%d' % num)
mac = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % num)
if mac is not None:
mac = ':'.join('%x' % ord(i) for i in mac)
# uptime = self.get_item('.1.3.6.1.2.1.2.2.1.9.%d' % num)
signal = safe_int(signal)
if status is not None and status.isdigit():
return {
'status': status,
'signal': signal / 10 if signal != 0 else 0,
'name': self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % num),
'mac': mac,
'distance': int(distance) / 10 if distance.isdigit() else 0
}
except EasySNMPTimeoutError as e:
return {'err': "%s: %s" % (_('ONU not connected'), e)}
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# DBCOM Onu have en integer snmp port
try:
int(v)
except ValueError:
raise ExpectValidationError(_('Onu snmp field must be en integer'))
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
if device.ip_address:
address = device.ip_address
elif device.parent_dev:
address = device.parent_dev.ip_address
else:
address = None
r = (
"define host{",
"\tuse device-onu",
"\thost_name %s" % host_name,
"\taddress %s" % address if address else None,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
def register_device(self, extra_data: Dict):
pass
class EltexPort(BasePort):
def __init__(self, snmp_worker, *args, **kwargs):
BasePort.__init__(self, *args, **kwargs)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
def disable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num),
2
)
def enable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num),
1
)
class EltexSwitch(DLinkDevice):
description = _('Eltex switch')
is_use_device_port = False
has_attachable_to_subscriber = True
tech_code = 'eltex_sw'
def get_ports(self) -> ListOrError:
for i, n in enumerate(range(49, 77), 1):
speed = self.get_item('.1.3.6.1.2.1.2.2.1.5.%d' % n)
yield EltexPort(self,
num=i,
name=self.get_item('.1.3.6.1.2.1.31.1.1.1.18.%d' % n),
status=self.get_item('.1.3.6.1.2.1.2.2.1.8.%d' % n),
mac=self.get_item('.1.3.6.1.2.1.2.2.1.6.%d' % n),
speed=int(speed or 0)
)
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def uptime(self):
uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0'))
tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta())
return tm
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
return plain_ip_device_mon_template(device)
def reboot(self, save_before_reboot=False):
pass
def conv_signal(lvl: int) -> float:
if lvl == 65535: return 0.0
r = 0
if 0 < lvl < 30000:
r = lvl * 0.002 - 30
elif 60000 < lvl < 65534:
r = (lvl - 65534) * 0.002 - 30
return round(r, 2)
class Olt_ZTE_C320(OLTDevice):
description = 'OLT ZTE C320'
def get_fibers(self):
fibers = ({
'fb_id': fiber_id,
'fb_name': fiber_name,
'fb_onu_num': safe_int(self.get_item('.1.3.6.1.4.1.3902.1012.3.13.1.1.13.%d' % int(fiber_id)))
} for fiber_name, fiber_id in self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.1.1.1'))
return fibers
def get_ports_on_fiber(self, fiber_num: int) -> Iterable:
onu_types = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%d' % fiber_num)
onu_ports = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.2.%d' % fiber_num)
onu_signals = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%d' % fiber_num)
# Real sn in last 3 octets
onu_sns = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%d' % fiber_num)
onu_prefixs = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.11.2.1.1.%d' % fiber_num)
onu_list = ({
'onu_type': onu_type_num[0],
'onu_port': onu_port,
'onu_signal': conv_signal(safe_int(onu_signal)),
'onu_sn': onu_prefix + ''.join('%.2X' % ord(i) for i in onu_sn[-4:]), # Real sn in last 4 octets,
'snmp_extra': "%d.%d" % (fiber_num, safe_int(onu_type_num[1])),
} for onu_type_num, onu_port, onu_signal, onu_sn, onu_prefix in zip(
onu_types, onu_ports, onu_signals, onu_sns, onu_prefixs
))
return onu_list
def get_units_unregistered(self, fiber_num: int) -> Iterable:
sn_num_list = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.3.1.2.%d' % fiber_num)
firmware_ver = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.11.%d' % fiber_num)
loid_passws = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.9.%d' % fiber_num)
loids = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.8.%d' % fiber_num)
return ({
'mac': ':'.join('%x' % ord(i) for i in sn[-6:]),
'firmware_ver': frm_ver,
'loid_passw': loid_passw,
'loid': loid,
'sn': sn
} for frm_ver, loid_passw, loid, (sn, num) in zip(
firmware_ver, loid_passws, loids, sn_num_list
))
def uptime(self):
up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0'))
tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_long_description(self):
return self.get_item('.1.3.6.1.2.1.1.1.0')
def get_hostname(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def get_template_name(self):
return 'olt_ztec320.html'
def _reg_dev_zte(device, extra_data: Dict, reg_func):
if not extra_data:
raise DeviceConfigurationError(_('You have not info in extra_data '
'field, please fill it in JSON'))
ip = None
if device.ip_address:
ip = device.ip_address
elif device.parent_dev:
ip = device.parent_dev.ip_address
if ip:
mac = str(device.mac_addr) if device.mac_addr else None
# Format serial number from mac address
# because saved mac address was make from serial number
sn = "ZTEG%s" % ''.join('%.2X' % int(x, base=16) for x in mac.split(':')[-4:])
telnet = extra_data.get('telnet')
try:
onu_snmp = reg_func(
onu_mac=mac,
serial=sn,
zte_ip_addr=str(ip),
telnet_login=telnet.get('login'),
telnet_passw=telnet.get('password'),
telnet_prompt=telnet.get('prompt'),
onu_vlan=extra_data.get('default_vid')
)
if onu_snmp is not None:
device.snmp_extra = onu_snmp
device.save(update_fields=('snmp_extra',))
else:
raise DeviceConfigurationError('unregistered onu not found, sn=%s' % sn)
except TIMEOUT as e:
raise OnuZteRegisterError(e)
else:
raise DeviceConfigurationError('not have ip')
class ZteOnuDevice(OnuDevice):
description = 'Zte ONU F660'
tech_code = 'zte_onu'
def get_details(self) -> Optional[Dict]:
if self.db_instance is None:
return
snmp_extra = self.db_instance.snmp_extra
if not snmp_extra:
return
try:
fiber_num, onu_num = snmp_extra.split('.')
fiber_num, onu_num = int(fiber_num), int(onu_num)
fiber_addr = '%d.%d' % (fiber_num, onu_num)
status = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.1.%s.1' % fiber_addr)
signal = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%s.1' % fiber_addr)
distance = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.18.%s.1' % fiber_addr)
ip_addr = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.16.1.1.10.%s' % fiber_addr)
vlans = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.15.100.1.1.7.%s.1.1' % fiber_addr)
int_name = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.3.%s' % fiber_addr)
onu_type = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%s' % fiber_addr)
sn = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%s' % fiber_addr)
if sn is not None:
sn = 'ZTEG%s' % ''.join('%.2X' % ord(x) for x in sn[-4:])
return {
'status': status,
'signal': conv_signal(safe_int(signal)),
'distance': safe_int(distance) / 10,
'ip_addr': ip_addr,
'vlans': vlans,
'serial': sn,
'int_name': int_name,
'onu_type': onu_type
}
except IndexError:
pass
def get_template_name(self):
return 'onu_for_zte.html'
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# for example 268501760.5
try:
fiber_num, onu_port = v.split('.')
int(fiber_num), int(onu_port)
except ValueError:
raise ExpectValidationError(_('Zte onu snmp field must be two dot separated integers'))
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
if device.ip_address:
address = device.ip_address
elif device.parent_dev:
address = device.parent_dev.ip_address
else:
address = None
r = (
"define host{",
"\tuse dev-onu-zte-f660",
"\thost_name %s" % host_name,
"\taddress %s" % address if address else None,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
def register_device(self, extra_data: Dict):
_reg_dev_zte(self.db_instance, extra_data, register_f660_onu)
def get_fiber_str(self):
dev = self.db_instance
if not dev:
return
dat = dev.snmp_extra
if dat and '.' in dat:
snmp_fiber_num, onu_port_num = dat.split('.')
snmp_fiber_num = int(snmp_fiber_num)
bin_snmp_fiber_num = bin(snmp_fiber_num)[2:]
rack_num = int(bin_snmp_fiber_num[5:13], 2)
fiber_num = int(bin_snmp_fiber_num[13:21], 2)
return 'gpon-onu_1/%d/%d:%s' % (
rack_num, fiber_num, onu_port_num
)
class ZteF601(ZteOnuDevice):
description = 'Zte ONU F601'
def register_device(self, extra_data: Dict):
_reg_dev_zte(self.db_instance, extra_data, register_f601_onu)
class HuaweiSwitch(EltexSwitch):
description = _('Huawei switch')
is_use_device_port = True
has_attachable_to_subscriber = True
tech_code = 'huawei_s2300'
def get_ports(self):
interfaces_ids = self.get_list('.1.3.6.1.2.1.17.1.4.1.2')
if interfaces_ids is None:
raise DeviceImplementationError('Switch returned null')
for i, n in enumerate(interfaces_ids):
n = int(n)
speed = self.get_item('.1.3.6.1.2.1.2.2.1.5.%d' % n)
oper_status = safe_int(self.get_item('.1.3.6.1.2.1.2.2.1.7.%d' % n))
oper_status = True if oper_status == 1 else False
link_status = safe_int(self.get_item('.1.3.6.1.2.1.2.2.1.8.%d' % n))
link_status = True if link_status == 1 else False
ep = EltexPort(
self,
num=i+1,
snmp_num=n,
name=self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % n), # name
status=oper_status, # status
mac='', # self.get_item('.1.3.6.1.2.1.2.2.1.6.%d' % n), # mac
speed=0 if not link_status else safe_int(speed) # speed
)
ep.writable = True
yield ep
def reboot(self, save_before_reboot=False):
pass