diff --git a/agent/core.py b/agent/core.py new file mode 100644 index 0000000..748042d --- /dev/null +++ b/agent/core.py @@ -0,0 +1,79 @@ +# -*- coding: utf8 -*- +from abc import ABCMeta, abstractmethod +from structs import AbonStruct, TariffStruct + + +# Всплывает если из NAS вернулся не удачный результат +class NasFailedResult(BaseException): + pass + + +# Проверяет входной тип на принадлежность классу. +# Можно передать объект или коллекцию объектов +# В общем желание организовать строгую типизацию :) +def check_input_type(class_or_type): + def real_check(fn): + def wrapped(self, user): + try: + for usr in user: + assert isinstance(usr, class_or_type) + except TypeError: + assert isinstance(user, class_or_type) + return fn(self, user) + return wrapped + return real_check + + +# Общается с NAS'ом +class BaseTransmitter(object): + __metaclass__ = ABCMeta + + @abstractmethod + @check_input_type(AbonStruct) + def add_user_range(self, user_list): + """добавляем список абонентов в NAS""" + + @abstractmethod + @check_input_type(AbonStruct) + def remove_user_range(self, user_list): + """удаляем список абонентов""" + + @abstractmethod + @check_input_type(AbonStruct) + def add_user(self, user): + """добавляем абонента""" + + @abstractmethod + @check_input_type(AbonStruct) + def remove_user(self, user): + """удаляем абонента""" + + @abstractmethod + @check_input_type(AbonStruct) + def update_user(self, user): + """чтоб обновить абонента надо изменить всё кроме его uid, по uid абонент будет найден""" + + @abstractmethod + @check_input_type(TariffStruct) + def add_tariff_range(self, tariff_list): + """добавляем список тарифов в NAS""" + + @abstractmethod + @check_input_type(TariffStruct) + def remove_tariff_range(self, tariff_list): + """удаляем список тарифов по уникальным идентификаторам""" + + @abstractmethod + @check_input_type(TariffStruct) + def add_tariff(self, tariff): + """добавляем тариф""" + + @abstractmethod + @check_input_type(TariffStruct) + def update_tariff(self, tariff): + """чтоб обновить тариф надо изменить всё кроме его tid, по tid тариф будет найден""" + + @abstractmethod + @check_input_type(TariffStruct) + def remove_tariff(self, tid): + """удаляем тариф""" diff --git a/agent/mod_mikrotik.py b/agent/mod_mikrotik.py new file mode 100644 index 0000000..5d98a30 --- /dev/null +++ b/agent/mod_mikrotik.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +import socket +import binascii +from hashlib import md5 +from core import BaseTransmitter, NasFailedResult +from structs import TariffStruct, IpStruct + + +class ApiRos: + "Routeros api" + + def __init__(self, sk): + self.sk = sk + self.currenttag = 0 + + def login(self, username, pwd): + for repl, attrs in self.talk(["/login"]): + chal = binascii.unhexlify(attrs['=ret']) + md = md5() + md.update('\x00') + md.update(pwd) + md.update(chal) + self.talk(["/login", "=name=" + username, + "=response=00" + binascii.hexlify(md.digest())]) + + def talk(self, words): + if self.writeSentence(words) == 0: return + r = [] + while 1: + i = self.readSentence() + if len(i) == 0: continue + reply = i[0] + attrs = {} + for w in i[1:]: + j = w.find('=', 1) + if j == -1: + attrs[w] = '' + else: + attrs[w[:j]] = w[j + 1:] + r.append((reply, attrs)) + if reply == '!done': return r + + def writeSentence(self, words): + ret = 0 + for w in words: + self.writeWord(w) + ret += 1 + self.writeWord('') + return ret + + def readSentence(self): + r = [] + while 1: + w = self.readWord() + if w == '': return r + r.append(w) + + def writeWord(self, w): + print "<<< " + w + self.writeLen(len(w)) + self.writeStr(w) + + def readWord(self): + ret = self.readStr(self.readLen()) + print ">>> " + ret + return ret + + def writeLen(self, l): + if l < 0x80: + self.writeStr(chr(l)) + elif l < 0x4000: + l |= 0x8000 + self.writeStr(chr((l >> 8) & 0xFF)) + self.writeStr(chr(l & 0xFF)) + elif l < 0x200000: + l |= 0xC00000 + self.writeStr(chr((l >> 16) & 0xFF)) + self.writeStr(chr((l >> 8) & 0xFF)) + self.writeStr(chr(l & 0xFF)) + elif l < 0x10000000: + l |= 0xE0000000 + self.writeStr(chr((l >> 24) & 0xFF)) + self.writeStr(chr((l >> 16) & 0xFF)) + self.writeStr(chr((l >> 8) & 0xFF)) + self.writeStr(chr(l & 0xFF)) + else: + self.writeStr(chr(0xF0)) + self.writeStr(chr((l >> 24) & 0xFF)) + self.writeStr(chr((l >> 16) & 0xFF)) + self.writeStr(chr((l >> 8) & 0xFF)) + self.writeStr(chr(l & 0xFF)) + + def readLen(self): + c = ord(self.readStr(1)) + if (c & 0x80) == 0x00: + pass + elif (c & 0xC0) == 0x80: + c &= ~0xC0 + c <<= 8 + c += ord(self.readStr(1)) + elif (c & 0xE0) == 0xC0: + c &= ~0xE0 + c <<= 8 + c += ord(self.readStr(1)) + c <<= 8 + c += ord(self.readStr(1)) + elif (c & 0xF0) == 0xE0: + c &= ~0xF0 + c <<= 8 + c += ord(self.readStr(1)) + c <<= 8 + c += ord(self.readStr(1)) + c <<= 8 + c += ord(self.readStr(1)) + elif (c & 0xF8) == 0xF0: + c = ord(self.readStr(1)) + c <<= 8 + c += ord(self.readStr(1)) + c <<= 8 + c += ord(self.readStr(1)) + c <<= 8 + c += ord(self.readStr(1)) + return c + + def writeStr(self, text): + n = 0 + while n < len(text): + r = self.sk.send(text[n:]) + if r == 0: raise RuntimeError, "connection closed by remote end" + n += r + + def readStr(self, length): + ret = '' + while len(ret) < length: + s = self.sk.recv(length - len(ret)) + if s == '': raise RuntimeError, "connection closed by remote end" + ret += s + return ret + + +class MikrotikTransmitter(BaseTransmitter): + def __init__(self, login, password, ip, port=8728): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((ip, port)) + self.ar = ApiRos(s) + self.ar.login(login, password) + + def _exec_cmd(self, cmd): + assert isinstance(cmd, list) + result = self.ar.talk(cmd) + for rt in result: + if rt[0] == '!trap': + raise NasFailedResult, rt[1]['=message'] + return result + + # ищем правило по имени, и возвращаем всю инфу о найденном правиле + def _find_queue(self, name): + ret = self._exec_cmd(['/queue/simple/print', '?name=%s' % name]) + return ret[0][1] + + def add_user_range(self, user_list): + return map(self.add_user, user_list) + + def remove_user_range(self, user_list): + names = ['uid%d' % usr.uid for usr in user_list] + return self._exec_cmd(['/queue/simple/remove', '=.id=%s' % ','.join(names)]) + + # добавляем правило шейпинга для указанного ip и со скоростью max-limit=Upload/Download + # Мы уверены что user это инстанс класса agent.structs.AbonStruct + def add_user(self, user): + assert isinstance(user.tariff, TariffStruct) + assert isinstance(user.ip, IpStruct) + return self._exec_cmd(['/queue/simple/add', + '=name=uid%d' % user.uid, + '=target=%s/32' % user.ip.get_str(), + '=max-limit=%fM/%fM' % (user.tariff.speedOut, user.tariff.speedIn) + ]) + + # удаляем правило шейпера по имени правила + def remove_user(self, user): + uid = user if type(user) is int else user.uid + self._exec_cmd(['/queue/simple/remove', '=.id=uid%d' % uid]) + + # обновляем основную инфу абонента + def update_user(self, user): + assert isinstance(user.tariff, TariffStruct) + assert isinstance(user.ip, IpStruct) + self._exec_cmd(['/queue/simple/set', '=.id=uid%d' % user.uid, + '=max-limit=%fM/%fM' % (user.tariff.speedOut, user.tariff.speedIn), + '=target=%s/32' % user.ip.get_str() + ]) + + # Тарифы хранить нам не надо, так что методы тарифов ниже не реализуем + def add_tariff_range(self, tariff_list): + pass + + # todo: реальзовать + def remove_tariff_range(self, tariff_list): + pass + + # todo: реальзовать + def add_tariff(self, tariff): + pass + + # todo: реальзовать + def update_tariff(self, tariff): + pass + + # todo: реальзовать + def remove_tariff(self, tid): + pass diff --git a/agent/structs.py b/agent/structs.py new file mode 100644 index 0000000..3049ae0 --- /dev/null +++ b/agent/structs.py @@ -0,0 +1,78 @@ +# -*- coding: utf8 -*- +from abc import ABCMeta, abstractmethod +from struct import pack, unpack +from utils import int2ip, ip2int + + +class BaseStruct(object): + __metaclass__ = ABCMeta + + @abstractmethod + def serialize(self): + """привращаем инфу в бинарную строку""" + + @abstractmethod + def deserialize(self, data, *args): + """создаём объект из бинарной строки""" + + +class IpStruct(object): + + def __init__(self, ip): + if type(ip) is int: + self.__ip = ip + else: + self.__ip = ip2int(ip) + + def get_str(self): + return int2ip(self.__ip) + + def get_int(self): + return self.__ip + + +# Как обслуживается абонент +class TariffStruct(BaseStruct): + + def __init__(self, tariff_id=0, speedIn=None, speedOut=None): + self.tid = tariff_id + self.speedIn = speedIn + self.speedOut = speedOut + + def serialize(self): + dt = pack("!Iff", int(self.tid), float(self.speedIn), float(self.speedOut)) + return dt + + def deserialize(self, data, *args): + dt = unpack("!Iff", data) + self.tid = int(dt[0]) + self.speedIn = float(dt[1]) + self.speedOut = float(dt[2]) + return self + + +# Абонент из базы +class AbonStruct(BaseStruct): + + def __init__(self, uid=None, ip=None, tariff=None): + self.uid = long(uid) + self.ip = IpStruct(ip) + assert isinstance(tariff, TariffStruct) + self.tariff = tariff + + def serialize(self): + assert isinstance(self.tariff, TariffStruct) + assert isinstance(self.ip, IpStruct) + dt = pack("!LII", self.uid, self.ip.get_int(), self.tariff.tid) + return dt + + def deserialize(self, data, all_tarifs=None): + dt = unpack("!LII", data) + self.uid = dt[0] + self.ip = IpStruct(dt[1]) + tarifs = filter(lambda trf: trf.tid == dt[2], all_tarifs) + if len(tarifs) < 1: + raise IndexError + assert isinstance(tarifs[0], TariffStruct) + self.tariff = tarifs[0] + return self diff --git a/agent/utils.py b/agent/utils.py new file mode 100644 index 0000000..d36a69a --- /dev/null +++ b/agent/utils.py @@ -0,0 +1,17 @@ +# -*- coding: utf8 -*- +import socket +import struct + + +def ip2int(addr): + try: + return struct.unpack("!I", socket.inet_aton(addr))[0] + except: + return 0 + + +def int2ip(addr): + try: + return socket.inet_ntoa(struct.pack("!I", addr)) + except: + return ''