14 changed files with 478 additions and 356 deletions
-
2devapp/base_intr.py
-
96devapp/dev_types.py
-
4devapp/forms.py
-
16devapp/locale/ru/LC_MESSAGES/django.po
-
1devapp/models.py
-
6devapp/onu_config/__init__.py
-
86devapp/onu_config/base.py
-
165devapp/onu_config/f601.py
-
141devapp/onu_config/f660.py
-
31devapp/views.py
-
4djing/lib/tln/__init__.py
-
274djing/lib/tln/tln.py
-
3docs/extra_func.md
-
5requirements.txt
@ -0,0 +1,6 @@ |
|||||
|
from .f601 import register_onu as register_f601_onu |
||||
|
from .f660 import register_onu as register_f660_onu |
||||
|
from .base import ( |
||||
|
ZteOltConsoleError, OnuZteRegisterError, |
||||
|
ZTEFiberIsFull, ZteOltLoginFailed, ExpectValidationError |
||||
|
) |
||||
@ -0,0 +1,86 @@ |
|||||
|
import re |
||||
|
import sys |
||||
|
from pexpect import spawn |
||||
|
|
||||
|
|
||||
|
class ZteOltConsoleError(Exception): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class OnuZteRegisterError(ZteOltConsoleError): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class ZTEFiberIsFull(ZteOltConsoleError): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class ZteOltLoginFailed(ZteOltConsoleError): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class ExpectValidationError(ValueError): |
||||
|
pass |
||||
|
|
||||
|
|
||||
|
class MySpawn(spawn): |
||||
|
def __init__(self, *args, **kwargs): |
||||
|
super(MySpawn, self).__init__(encoding='utf-8', *args, **kwargs) |
||||
|
self.logfile = sys.stdout |
||||
|
|
||||
|
def do_cmd(self, c, prompt): |
||||
|
self.sendline(c) |
||||
|
return self.expect_exact(prompt) |
||||
|
|
||||
|
def get_lines(self): |
||||
|
return self.buffer.split('\r\n') |
||||
|
|
||||
|
def get_lines_before(self): |
||||
|
return self.before.split('\r\n') |
||||
|
|
||||
|
|
||||
|
def parse_onu_name(onu_name: str, name_regexp=re.compile('[/:_]')): |
||||
|
gpon_onu, stack_num, rack_num, fiber_num, onu_num = name_regexp.split(onu_name) |
||||
|
return { |
||||
|
'stack_num': stack_num, |
||||
|
'rack_num': rack_num, |
||||
|
'fiber_num': fiber_num, |
||||
|
'onu_num': onu_num |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def get_unregistered_onu(lines, serial): |
||||
|
for line in lines: |
||||
|
if line.startswith('gpon-onu_'): |
||||
|
spls = re.split(r'\s+', line) |
||||
|
if len(spls) > 2: |
||||
|
if serial == spls[1]: |
||||
|
onu_index, sn, state = spls[:3] |
||||
|
return parse_onu_name(onu_index) |
||||
|
|
||||
|
|
||||
|
def get_free_registered_onu_number(lines): |
||||
|
onu_type_regexp = re.compile(r'^\s{1,5}onu \d{1,3} type [-\w\d]{4,64} sn \w{4,64}$') |
||||
|
onu_olt_num = None |
||||
|
i = 0 |
||||
|
for l in lines: |
||||
|
if onu_type_regexp.match(l): |
||||
|
# match line |
||||
|
i += 1 |
||||
|
onu, num, onu_type, onu_type, sn, onu_sn = l.split() |
||||
|
onu_olt_num = int(num) |
||||
|
if onu_olt_num > i: |
||||
|
return i |
||||
|
return onu_olt_num + 1 |
||||
|
|
||||
|
|
||||
|
def sn_to_mac(sn: str): |
||||
|
t = sn[4:].lower() |
||||
|
r = tuple(t[i:i + 2] for i in range(0, len(t), 2)) |
||||
|
return '45:47:%s' % ':'.join(r) |
||||
|
|
||||
|
|
||||
|
def onu_conv(rack_num: int, fiber_num: int, port_num: int): |
||||
|
r = "10000{0:08b}{1:08b}00000000".format(rack_num, fiber_num) |
||||
|
snmp_fiber_num = int(r, base=2) |
||||
|
return "%d.%d" % (snmp_fiber_num, port_num) |
||||
@ -0,0 +1,165 @@ |
|||||
|
import re |
||||
|
from typing import Optional |
||||
|
from djing.lib import process_lock |
||||
|
from . import base |
||||
|
|
||||
|
|
||||
|
def get_onu_template(vlan_id: int, mac_addr: str): |
||||
|
template = ( |
||||
|
'sn-bind enable sn', |
||||
|
'tcont 1 profile HSI_100', |
||||
|
'gemport 1 unicast tcont 1 dir both', |
||||
|
'switchport mode hybrid vport 1', |
||||
|
'switchport vlan %d tag vport 1' % vlan_id, |
||||
|
'port-location format flexible-syntax vport 1', |
||||
|
'port-location sub-option remote-id enable vport 1', |
||||
|
'port-location sub-option remote-id name %s vport 1' % mac_addr, |
||||
|
'dhcp-option82 enable vport 1', |
||||
|
'dhcp-option82 trust true replace vport 1', |
||||
|
'ip dhcp snooping enable vport 1' |
||||
|
) |
||||
|
return template |
||||
|
|
||||
|
|
||||
|
def get_pon_mng_template(vlan_id: int): |
||||
|
template = ( |
||||
|
'service HSI type internet gemport 1 vlan %d' % vlan_id, |
||||
|
'loop-detect ethuni eth_0/1 enable', |
||||
|
'vlan port eth_0/1 mode tag vlan %d' % vlan_id, |
||||
|
'dhcp-ip ethuni eth_0/1 from-internet' |
||||
|
) |
||||
|
return template |
||||
|
|
||||
|
|
||||
|
def appy_config(onu_mac: str, sn: str, hostname: str, login: str, password: str, prompt: str, vlan: int): |
||||
|
onu_type = 'ZTE-F601' |
||||
|
|
||||
|
# Входим |
||||
|
ch = base.MySpawn('telnet %s' % hostname) |
||||
|
ch.timeout = 15 |
||||
|
ch.expect_exact('Username:') |
||||
|
ch.do_cmd(login, 'Password:') |
||||
|
|
||||
|
choice = ch.do_cmd(password, ['bad password.', '%s#' % prompt]) |
||||
|
if choice == 0: |
||||
|
raise base.ZteOltLoginFailed |
||||
|
|
||||
|
ch.do_cmd('terminal length 0', '%s#' % prompt) |
||||
|
choice = ch.do_cmd('show gpon onu uncfg', ['No related information to show', '%s#' % prompt]) |
||||
|
|
||||
|
if choice == 0: |
||||
|
ch.close() |
||||
|
raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) |
||||
|
elif choice == 1: |
||||
|
# Получим незареганные onu |
||||
|
unregistered_onu = base.get_unregistered_onu( |
||||
|
lines=ch.get_lines_before(), |
||||
|
serial=sn |
||||
|
) |
||||
|
if unregistered_onu is None: |
||||
|
ch.close() |
||||
|
raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) |
||||
|
|
||||
|
stack_num = int(unregistered_onu.get('stack_num')) |
||||
|
rack_num = int(unregistered_onu.get('rack_num')) |
||||
|
fiber_num = int(unregistered_onu.get('fiber_num')) |
||||
|
|
||||
|
# Получим последнюю зарегистрированную onu |
||||
|
ch.do_cmd('show run int gpon-olt_%(stack)s/%(rack)s/%(fiber)s' % { |
||||
|
'stack': stack_num, |
||||
|
'rack': rack_num, |
||||
|
'fiber': fiber_num |
||||
|
}, '%s#' % prompt) |
||||
|
free_onu_number = base.get_free_registered_onu_number( |
||||
|
ch.get_lines_before() |
||||
|
) |
||||
|
|
||||
|
if free_onu_number > 126: |
||||
|
ch.close() |
||||
|
raise base.ZTEFiberIsFull('olt fiber %d is full' % fiber_num) |
||||
|
|
||||
|
# enter to config |
||||
|
ch.do_cmd('conf t', '%s(config)#' % prompt) |
||||
|
|
||||
|
int_addr = '%d/%d/%d' % ( |
||||
|
stack_num, |
||||
|
rack_num, |
||||
|
fiber_num |
||||
|
) |
||||
|
|
||||
|
# go to olt interface |
||||
|
ch.do_cmd('interface gpon-olt_%s' % int_addr, '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# register onu on olt interface |
||||
|
ch.do_cmd('onu %d type %s sn %s' % ( |
||||
|
free_onu_number, |
||||
|
onu_type, |
||||
|
sn |
||||
|
), '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# Exit from int olt |
||||
|
ch.do_cmd('exit', '%s(config)#' % prompt) |
||||
|
|
||||
|
# Enter to int onu |
||||
|
ch.do_cmd('int gpon-onu_%(int_addr)s:%(onu_num)d' % { |
||||
|
'int_addr': int_addr, |
||||
|
'onu_num': free_onu_number |
||||
|
}, '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# Apply int onu config |
||||
|
template = get_onu_template(vlan, onu_mac) |
||||
|
for line in template: |
||||
|
ch.do_cmd(line, '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# Exit |
||||
|
ch.do_cmd('exit', '%s(config)#' % prompt) |
||||
|
|
||||
|
# Enter to pon-onu-mng |
||||
|
ch.do_cmd('pon-onu-mng gpon-onu_%(int_addr)s:%(onu_num)d' % { |
||||
|
'int_addr': int_addr, |
||||
|
'onu_num': free_onu_number |
||||
|
}, '%s(gpon-onu-mng)#' % prompt) |
||||
|
|
||||
|
# Apply config to pon-onu-mng |
||||
|
for line in get_pon_mng_template(vlan): |
||||
|
ch.do_cmd(line, '%s(gpon-onu-mng)#' % prompt) |
||||
|
|
||||
|
# Exit |
||||
|
ch.do_cmd('exit', '%s(config)#' % prompt) |
||||
|
|
||||
|
ch.close() |
||||
|
return base.onu_conv( |
||||
|
rack_num=rack_num, |
||||
|
fiber_num=fiber_num, |
||||
|
port_num=free_onu_number |
||||
|
) |
||||
|
else: |
||||
|
ch.close() |
||||
|
raise base.ZteOltConsoleError("I don't know what choice:", choice) |
||||
|
|
||||
|
|
||||
|
# Main Entry point |
||||
|
@process_lock |
||||
|
def register_onu(onu_mac: Optional[str], serial: str, zte_ip_addr: str, telnet_login: str, |
||||
|
telnet_passw: str, telnet_prompt: str, onu_vlan: int): |
||||
|
|
||||
|
if not re.match(r'^ZTEG[0-9A-F]{8}$', serial): |
||||
|
raise base.ExpectValidationError('Serial not valid, match: ^ZTEG[0-9A-F]{8}$') |
||||
|
|
||||
|
if not isinstance(onu_vlan, int): |
||||
|
onu_vlan = int(onu_vlan) |
||||
|
|
||||
|
if onu_mac is None: |
||||
|
onu_mac = base.sn_to_mac(serial) |
||||
|
|
||||
|
IP4_ADDR_REGEX = ( |
||||
|
r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
||||
|
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
||||
|
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
||||
|
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' |
||||
|
) |
||||
|
if not re.match(IP4_ADDR_REGEX, zte_ip_addr): |
||||
|
raise base.ExpectValidationError('ip address for zte not valid') |
||||
|
|
||||
|
return appy_config(onu_mac, serial, zte_ip_addr, telnet_login, |
||||
|
telnet_passw, telnet_prompt, onu_vlan) |
||||
@ -0,0 +1,141 @@ |
|||||
|
import re |
||||
|
from typing import Optional |
||||
|
|
||||
|
from djing.lib import process_lock |
||||
|
from . import base |
||||
|
|
||||
|
|
||||
|
def get_onu_template(vlan_id: int, mac_addr: str): |
||||
|
template = ( |
||||
|
'switchport mode hybrid vport 1', |
||||
|
'switchport vlan %d tag vport 1' % vlan_id, |
||||
|
'port-location format flexible-syntax vport 1', |
||||
|
'port-location sub-option remote-id enable vport 1', |
||||
|
'port-location sub-option remote-id name %s vport 1' % mac_addr, |
||||
|
'dhcp-option82 enable vport 1', |
||||
|
'dhcp-option82 trust true replace vport 1', |
||||
|
'ip dhcp snooping enable vport 1' |
||||
|
) |
||||
|
return template |
||||
|
|
||||
|
|
||||
|
def appy_config(onu_mac: str, sn: str, hostname: str, login: str, password: str, prompt: str, vlan: int): |
||||
|
onu_type = 'ZTE-F660' |
||||
|
|
||||
|
# Входим |
||||
|
ch = base.MySpawn('telnet %s' % hostname) |
||||
|
ch.timeout = 15 |
||||
|
ch.expect_exact('Username:') |
||||
|
ch.do_cmd(login, 'Password:') |
||||
|
|
||||
|
choice = ch.do_cmd(password, ['bad password.', '%s#' % prompt]) |
||||
|
if choice == 0: |
||||
|
raise base.ZteOltLoginFailed |
||||
|
|
||||
|
ch.do_cmd('terminal length 0', '%s#' % prompt) |
||||
|
choice = ch.do_cmd('show gpon onu uncfg', ['No related information to show', '%s#' % prompt]) |
||||
|
if choice == 0: |
||||
|
ch.close() |
||||
|
raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) |
||||
|
elif choice == 1: |
||||
|
# Получим незареганные onu |
||||
|
unregistered_onu = base.get_unregistered_onu( |
||||
|
lines=ch.get_lines_before(), |
||||
|
serial=sn |
||||
|
) |
||||
|
if unregistered_onu is None: |
||||
|
ch.close() |
||||
|
raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) |
||||
|
stack_num = int(unregistered_onu.get('stack_num')) |
||||
|
rack_num = int(unregistered_onu.get('rack_num')) |
||||
|
fiber_num = int(unregistered_onu.get('fiber_num')) |
||||
|
|
||||
|
# Получим последнюю зарегистрированную onu |
||||
|
ch.do_cmd('show run int gpon-olt_%(stack)s/%(rack)s/%(fiber)s' % { |
||||
|
'stack': stack_num, |
||||
|
'rack': rack_num, |
||||
|
'fiber': fiber_num |
||||
|
}, '%s#' % prompt) |
||||
|
free_onu_number = base.get_free_registered_onu_number( |
||||
|
ch.get_lines_before() |
||||
|
) |
||||
|
if free_onu_number > 126: |
||||
|
ch.close() |
||||
|
raise base.ZTEFiberIsFull('olt fiber %d is full' % fiber_num) |
||||
|
|
||||
|
# enter to config |
||||
|
ch.do_cmd('conf t', '%s(config)#' % prompt) |
||||
|
int_addr = '%d/%d/%d' % ( |
||||
|
stack_num, |
||||
|
rack_num, |
||||
|
fiber_num |
||||
|
) |
||||
|
|
||||
|
# go to olt interface |
||||
|
ch.do_cmd('interface gpon-olt_%s' % int_addr, '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# register onu on olt interface |
||||
|
ch.do_cmd('onu %d type %s sn %s' % ( |
||||
|
free_onu_number, |
||||
|
onu_type, |
||||
|
sn |
||||
|
), '%s(config-if)#' % prompt) |
||||
|
# register onu profile on olt interface |
||||
|
ch.do_cmd( |
||||
|
'onu %d profile line ZTE-F660-LINE remote ZTE-F660-ROUTER' % free_onu_number, |
||||
|
'%s(config-if)#' % prompt |
||||
|
) |
||||
|
|
||||
|
# Exit from int olt |
||||
|
ch.do_cmd('exit', '%s(config)#' % prompt) |
||||
|
|
||||
|
# Enter to int onu |
||||
|
ch.do_cmd('int gpon-onu_%(int_addr)s:%(onu_num)d' % { |
||||
|
'int_addr': int_addr, |
||||
|
'onu_num': free_onu_number |
||||
|
}, '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# Apply int onu config |
||||
|
template = get_onu_template(vlan, onu_mac) |
||||
|
for line in template: |
||||
|
ch.do_cmd(line, '%s(config-if)#' % prompt) |
||||
|
|
||||
|
# Exit |
||||
|
ch.do_cmd('exit', '%s(config)#' % prompt) |
||||
|
ch.do_cmd('exit', '%s#' % prompt) |
||||
|
ch.close() |
||||
|
return base.onu_conv( |
||||
|
rack_num=rack_num, |
||||
|
fiber_num=fiber_num, |
||||
|
port_num=free_onu_number |
||||
|
) |
||||
|
else: |
||||
|
ch.close() |
||||
|
raise base.ZteOltConsoleError("I don't know what choice:", choice) |
||||
|
|
||||
|
|
||||
|
# Main Entry point |
||||
|
@process_lock |
||||
|
def register_onu(onu_mac: Optional[str], serial: str, zte_ip_addr: str, telnet_login: str, |
||||
|
telnet_passw: str, telnet_prompt: str, onu_vlan: int): |
||||
|
|
||||
|
if not re.match(r'^ZTEG[0-9A-F]{8}$', serial): |
||||
|
raise base.ExpectValidationError('Serial not valid, match: ^ZTEG[0-9A-F]{8}$') |
||||
|
|
||||
|
if not isinstance(onu_vlan, int): |
||||
|
onu_vlan = int(onu_vlan) |
||||
|
|
||||
|
if onu_mac is None: |
||||
|
onu_mac = base.sn_to_mac(serial) |
||||
|
|
||||
|
IP4_ADDR_REGEX = ( |
||||
|
r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
||||
|
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
||||
|
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
||||
|
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' |
||||
|
) |
||||
|
if not re.match(IP4_ADDR_REGEX, zte_ip_addr): |
||||
|
raise base.ExpectValidationError('ip address for zte not valid') |
||||
|
|
||||
|
return appy_config(onu_mac, serial, zte_ip_addr, telnet_login, |
||||
|
telnet_passw, telnet_prompt, onu_vlan) |
||||
@ -1,4 +0,0 @@ |
|||||
from .tln import * |
|
||||
|
|
||||
__all__ = ('TelnetApi', 'ValidationError', 'ZTEFiberIsFull', 'ZteOltLoginFailed', |
|
||||
'OnuZteRegisterError', 'ZteOltConsoleError', 'register_onu_ZTE_F660') |
|
||||
@ -1,274 +0,0 @@ |
|||||
#!/usr/bin/env python3 |
|
||||
import re |
|
||||
import struct |
|
||||
from telnetlib import Telnet |
|
||||
from time import sleep |
|
||||
from typing import Generator, Dict, Optional, Tuple |
|
||||
|
|
||||
from djing.lib import process_lock |
|
||||
|
|
||||
|
|
||||
class ZteOltConsoleError(Exception): |
|
||||
pass |
|
||||
|
|
||||
|
|
||||
class OnuZteRegisterError(ZteOltConsoleError): |
|
||||
pass |
|
||||
|
|
||||
|
|
||||
class ZTEFiberIsFull(ZteOltConsoleError): |
|
||||
pass |
|
||||
|
|
||||
|
|
||||
class ZteOltLoginFailed(ZteOltConsoleError): |
|
||||
pass |
|
||||
|
|
||||
|
|
||||
class ValidationError(ValueError): |
|
||||
pass |
|
||||
|
|
||||
|
|
||||
MAC_ADDR_REGEX = b'^([0-9A-Fa-f]{1,2}[:-]){5}([0-9A-Fa-f]{1,2})$' |
|
||||
IP_ADDR_REGEX = ( |
|
||||
'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
|
||||
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
|
||||
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' |
|
||||
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' |
|
||||
) |
|
||||
ONU_SN_REGEX = b'^ZTEG[A-F\d]{8}$' |
|
||||
|
|
||||
|
|
||||
class TelnetApi(Telnet): |
|
||||
|
|
||||
def __init__(self, prompt_string: bytes, *args, **kwargs): |
|
||||
timeout = kwargs.get('timeout') |
|
||||
if timeout: |
|
||||
self._timeout = timeout |
|
||||
self._prompt_string = prompt_string or b'ZTE#' |
|
||||
self.config_level = [] |
|
||||
super().__init__(*args, **kwargs) |
|
||||
|
|
||||
def write(self, buffer: bytes) -> None: |
|
||||
buffer = buffer + b'\n' |
|
||||
print('>>', buffer) |
|
||||
super().write(buffer) |
|
||||
|
|
||||
def resize_screen(self, width: int, height: int): |
|
||||
naws_cmd = struct.pack('>BBBHHBB', |
|
||||
255, 250, 31, # IAC SB NAWS |
|
||||
width, height, |
|
||||
255, 240 # IAC SE |
|
||||
) |
|
||||
sock = self.get_socket() |
|
||||
sock.send(naws_cmd) |
|
||||
|
|
||||
def read_lines(self) -> Generator: |
|
||||
while True: |
|
||||
line = self.read_until(b'\r\n', timeout=self._timeout) |
|
||||
line = line.replace(b'\r\n', b'') |
|
||||
if self._prompt_string == line: |
|
||||
break |
|
||||
if line == b'': |
|
||||
continue |
|
||||
yield line |
|
||||
|
|
||||
def command_to(self, cmd: bytes) -> Generator: |
|
||||
self.write(cmd) |
|
||||
return self.read_lines() |
|
||||
|
|
||||
def set_prompt_string(self, prompt_string: bytes) -> None: |
|
||||
self.config_level.append(prompt_string) |
|
||||
self._prompt_string = prompt_string |
|
||||
|
|
||||
def level_exit(self) -> Optional[Tuple]: |
|
||||
if len(self.config_level) < 2: |
|
||||
print('We are in root') |
|
||||
return |
|
||||
self.config_level.pop() |
|
||||
self.set_prompt_string(self.config_level[-1]) |
|
||||
return tuple(self.command_to(b'exit')) |
|
||||
|
|
||||
def __del__(self): |
|
||||
if self.sock: |
|
||||
self.write(b'exit') |
|
||||
super().__del__() |
|
||||
|
|
||||
|
|
||||
def parse_onu_name(onu_name: bytes, name_regexp=re.compile(b'[/:_]')) -> Dict[str, bytes]: |
|
||||
gpon_onu, stack_num, rack_num, fiber_num, onu_num = name_regexp.split(onu_name) |
|
||||
return { |
|
||||
'stack_num': stack_num, |
|
||||
'rack_num': rack_num, |
|
||||
'fiber_num': fiber_num, |
|
||||
'onu_num': onu_num |
|
||||
} |
|
||||
|
|
||||
|
|
||||
class OltZTERegister(TelnetApi): |
|
||||
|
|
||||
def __init__(self, screen_size: Tuple[int, int], prompt_title: bytes, *args, **kwargs): |
|
||||
super().__init__(prompt_string=prompt_title, *args, **kwargs) |
|
||||
self.prompt_title = prompt_title |
|
||||
self.set_prompt_string(b'%s#' % prompt_title) |
|
||||
self.resize_screen(*screen_size) |
|
||||
|
|
||||
def enter(self, username: bytes, passw: bytes) -> None: |
|
||||
self.read_until(b'Username:') |
|
||||
self.write(username) |
|
||||
self.read_until(b'Password:') |
|
||||
self.write(passw) |
|
||||
for l in self.read_lines(): |
|
||||
if b'bad password' in l: |
|
||||
raise ZteOltLoginFailed |
|
||||
|
|
||||
def get_unregistered_onu(self, sn: bytes) -> Optional[Dict]: |
|
||||
lines = tuple(self.command_to(b'show gpon onu uncfg')) |
|
||||
if len(lines) > 3: |
|
||||
# devices available |
|
||||
# find onu by sn |
|
||||
line = tuple(ln for ln in lines if sn.lower() in ln.lower()) |
|
||||
if len(line) > 0: |
|
||||
line = line[0] |
|
||||
onu_name, onu_sn, onu_state = line.split() |
|
||||
onu_numbers = parse_onu_name(onu_name) |
|
||||
onu_numbers.update({ |
|
||||
'onu_name': onu_name, |
|
||||
'onu_sn': onu_sn, |
|
||||
'onu_state': onu_state |
|
||||
}) |
|
||||
return onu_numbers |
|
||||
|
|
||||
def get_last_registered_onu_number(self, stack_num: int, rack_num: int, fiber_num: int) -> int: |
|
||||
registered_lines = self.command_to(b'show run int gpon-olt_%d/%d/%d' % ( |
|
||||
stack_num, |
|
||||
rack_num, |
|
||||
fiber_num |
|
||||
)) |
|
||||
onu_type_regexp = re.compile(b'^\s{2}onu \d{1,3} type [-\w\d]{4,64} sn \w{4,64}$') |
|
||||
last_onu = 0 |
|
||||
for rl in registered_lines: |
|
||||
if rl == b' --More--': |
|
||||
self.write(b' ') |
|
||||
if onu_type_regexp.match(rl): |
|
||||
_onu, num, _type, onu_type, _sn, onu_sn = rl.split() |
|
||||
last_onu = int(num) |
|
||||
return last_onu |
|
||||
|
|
||||
def enter_to_config_mode(self) -> bool: |
|
||||
prompt = b'%s(config)#' % self.prompt_title |
|
||||
self.set_prompt_string(prompt) |
|
||||
res = tuple(self.command_to(b'config terminal')) |
|
||||
if res[1].startswith(b'Enter configuration commands'): |
|
||||
# ok, we in the config mode |
|
||||
return True |
|
||||
return False |
|
||||
|
|
||||
def go_to_olt_interface(self, stack_num: int, rack_num: int, fiber_num: int) -> Tuple: |
|
||||
self.set_prompt_string(b'%s(config-if)#' % self.prompt_title) |
|
||||
return tuple(self.command_to(b'interface gpon-olt_%d/%d/%d' % ( |
|
||||
stack_num, |
|
||||
rack_num, |
|
||||
fiber_num |
|
||||
))) |
|
||||
|
|
||||
def go_to_onu_interface(self, stack_num: int, rack_num: int, fiber_num: int, onu_port_num: int) -> Tuple: |
|
||||
self.set_prompt_string(b'%s(config-if)#' % self.prompt_title) |
|
||||
return tuple(self.command_to(b'interface gpon-onu_%d/%d/%d:%d' % ( |
|
||||
stack_num, |
|
||||
rack_num, |
|
||||
fiber_num, |
|
||||
onu_port_num |
|
||||
))) |
|
||||
|
|
||||
def apply_conf_to_onu(self, mac_addr: bytes, vlan_id: int) -> None: |
|
||||
tmpl = ( |
|
||||
b'switchport vlan %d tag vport 1' % vlan_id, |
|
||||
b'port-location format flexible-syntax vport 1', |
|
||||
b'port-location sub-option remote-id enable vport 1', |
|
||||
b'port-location sub-option remote-id name %s vport 1' % mac_addr, |
|
||||
b'dhcp-option82 enable vport 1', |
|
||||
b'dhcp-option82 trust true replace vport 1', |
|
||||
b'ip dhcp snooping enable vport 1' |
|
||||
) |
|
||||
for conf_line in tmpl: |
|
||||
self.write(conf_line) |
|
||||
|
|
||||
def register_onu_on_olt_fiber(self, onu_type: bytes, new_onu_num: int, onu_sn: bytes, line_profile: bytes, |
|
||||
remote_profile: bytes) -> Tuple: |
|
||||
# ok, we in interface |
|
||||
tpl = b'onu %d type %s sn %s' % (new_onu_num, onu_type, onu_sn) |
|
||||
r = tuple(self.command_to(tpl)) |
|
||||
return tuple(self.command_to(b'onu %d profile line %s remote %s' % ( |
|
||||
new_onu_num, |
|
||||
line_profile, |
|
||||
remote_profile |
|
||||
))) + r |
|
||||
|
|
||||
|
|
||||
@process_lock |
|
||||
def register_onu_ZTE_F660(olt_ip: str, onu_sn: bytes, login_passwd: Tuple[bytes, bytes], onu_mac: bytes, prompt_title: bytes, vlan_id: int) -> Tuple: |
|
||||
onu_type = b'ZTE-F660' |
|
||||
line_profile = b'ZTE-F660-LINE' |
|
||||
remote_profile = b'ZTE-F660-ROUTER' |
|
||||
if not re.match(MAC_ADDR_REGEX, onu_mac): |
|
||||
raise ValidationError |
|
||||
if not re.match(IP_ADDR_REGEX, olt_ip): |
|
||||
raise ValidationError |
|
||||
if not re.match(ONU_SN_REGEX, onu_sn): |
|
||||
raise ValidationError |
|
||||
|
|
||||
tn = OltZTERegister(host=olt_ip, timeout=2, screen_size=(120, 128), prompt_title=prompt_title) |
|
||||
tn.enter(*login_passwd) |
|
||||
|
|
||||
unregistered_onu = tn.get_unregistered_onu(onu_sn) |
|
||||
if unregistered_onu is None: |
|
||||
raise OnuZteRegisterError('unregistered onu not found, sn=%s' % onu_sn.decode('utf-8')) |
|
||||
|
|
||||
stack_num = int(unregistered_onu['stack_num']) |
|
||||
rack_num = int(unregistered_onu['rack_num']) |
|
||||
fiber_num = int(unregistered_onu['fiber_num']) |
|
||||
|
|
||||
last_onu_number = tn.get_last_registered_onu_number( |
|
||||
stack_num, rack_num, fiber_num |
|
||||
) |
|
||||
|
|
||||
if last_onu_number > 126: |
|
||||
raise ZTEFiberIsFull('olt fiber %d is full' % fiber_num) |
|
||||
|
|
||||
# enter to config |
|
||||
if not tn.enter_to_config_mode(): |
|
||||
raise ZteOltConsoleError('Failed to enter to config mode') |
|
||||
|
|
||||
# go to olt interface |
|
||||
if not tn.go_to_olt_interface(stack_num, rack_num, fiber_num): |
|
||||
raise ZteOltConsoleError('Failed to enter in olt fiber port') |
|
||||
|
|
||||
# new onu port number |
|
||||
new_onu_port_num = last_onu_number + 1 |
|
||||
|
|
||||
# register onu on olt interface |
|
||||
r = tn.register_onu_on_olt_fiber(onu_type, new_onu_port_num, onu_sn, line_profile, remote_profile) |
|
||||
print(r) |
|
||||
|
|
||||
# exit from olt interface |
|
||||
tn.level_exit() |
|
||||
|
|
||||
r = tn.go_to_onu_interface(stack_num, rack_num, fiber_num, new_onu_port_num) |
|
||||
print(r) |
|
||||
|
|
||||
tn.apply_conf_to_onu(onu_mac, vlan_id) |
|
||||
sleep(1) |
|
||||
return stack_num, rack_num, fiber_num, new_onu_port_num |
|
||||
|
|
||||
|
|
||||
if __name__ == '__main__': |
|
||||
ip = '192.168.0.100' |
|
||||
try: |
|
||||
register_onu_ZTE_F660( |
|
||||
olt_ip=ip, onu_sn=b'ZTEG^#*$&@&', login_passwd=(b'login', b'password'), |
|
||||
onu_mac=b'MAC' |
|
||||
) |
|
||||
except ZteOltConsoleError as e: |
|
||||
print(e) |
|
||||
except ConnectionRefusedError: |
|
||||
print('ERROR: connection refused', ip) |
|
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue