diff --git a/agent/core.py b/agent/core.py index 8561b56..f9caf4a 100644 --- a/agent/core.py +++ b/agent/core.py @@ -99,3 +99,27 @@ class BaseTransmitter(metaclass=ABCMeta): :param count: количество пингов :return: None если не пингуется, иначе кортеж, в котором (сколько вернулось, сколько было отправлено) """ + + @abstractmethod + def read_users(self): + """ + Читаем пользователей с NAS + :return: список AbonStruct + """ + + def _diff_users(self, users_from_db): + """ + :param users_from_db: QuerySet всех абонентов у которых может быть обслуживание + :return: на выходе получаем абонентов которых надо добавить в nas и которых надо удалить + """ + users_from_db = [ab.build_agent_struct() for ab in users_from_db if ab.is_access()] + users_from_db = set([ab for ab in users_from_db if ab is not None and ab.tariff is not None]) + users_from_nas = set(self.read_users()) + list_for_del = (users_from_db ^ users_from_nas) - users_from_db + list_for_add = users_from_db - users_from_nas + return list_for_add, list_for_del + + def sync_nas(self, users_from_db): + list_for_add, list_for_del = self._diff_users(users_from_db) + self.remove_user_range( list_for_del ) + self.add_user_range( list_for_add ) diff --git a/agent/mod_mikrotik.py b/agent/mod_mikrotik.py index 72f136e..05cf43f 100644 --- a/agent/mod_mikrotik.py +++ b/agent/mod_mikrotik.py @@ -5,7 +5,7 @@ from abc import ABCMeta from hashlib import md5 from .core import BaseTransmitter, NasFailedResult, NasNetworkError from mydefs import ping -from .structs import TariffStruct, AbonStruct, IpStruct, ShapeItem +from .structs import TariffStruct, AbonStruct, IpStruct from . import settings from djing.settings import DEBUG import re @@ -33,7 +33,7 @@ class ApiRos: md.update(bytes(pwd, 'utf-8')) md.update(chal) for r in self.talk_iter(["/login", "=name=" + username, - "=response=00" + binascii.hexlify(md.digest()).decode('utf-8')]): pass + "=response=00" + binascii.hexlify(md.digest()).decode('utf-8')]): pass def talk_iter(self, words): if self.writeSentence(words) == 0: return @@ -190,28 +190,26 @@ class TransmitterManager(BaseTransmitter, metaclass=ABCMeta): res = text_speed_digit elif text_append == 'k': res = text_speed_digit / 1000 - #elif text_append == 'G': + # elif text_append == 'G': # res = text_speed_digit * 0x400 else: - res = float(re.sub(r'[a-zA-Z]', '', text_speed)) / 1000**2 + res = float(re.sub(r'[a-zA-Z]', '', text_speed)) / 1000 ** 2 return res - try: - speeds = info['=max-limit'].split('/') - t = TariffStruct( - speedIn=parse_speed(speeds[1]), - speedOut=parse_speed(speeds[0]) - ) - a = AbonStruct( - uid=int(info['=name'][3:]), - #FIXME: тут в разных микротиках или =target-addresses или =target - ip=info['=target'][:-3], - tariff=t, - is_active=False if info['=disabled'] == 'false' else True - ) - return ShapeItem(abon=a, sid=info['=.id'].replace('*', '')) - except KeyError: - return + speeds = info['=max-limit'].split('/') + t = TariffStruct( + speedIn=parse_speed(speeds[1]), + speedOut=parse_speed(speeds[0]) + ) + a = AbonStruct( + uid=int(info['=name'][3:]), + # FIXME: тут в разных микротиках или =target-addresses или =target + ip=info['=target'][:-3], + tariff=t, + is_active=False if info['=disabled'] == 'false' else True + ) + a.queue_id = info['=.id'] + return a class QueueManager(TransmitterManager, metaclass=ABCMeta): @@ -223,15 +221,16 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta): def add(self, user): assert isinstance(user, AbonStruct) - assert isinstance(user.tariff, TariffStruct) + if user.tariff is None or not isinstance(user.tariff, TariffStruct): + return return self._exec_cmd(['/queue/simple/add', - '=name=uid%d' % user.uid, - #FIXME: тут в разных микротиках или =target-addresses или =target - '=target=%s' % user.ip.get_str(), - '=max-limit=%.3fM/%.3fM' % (user.tariff.speedOut, user.tariff.speedIn), - '=queue=MikroBILL_SFQ/MikroBILL_SFQ', - '=burst-time=1/1' - ]) + '=name=uid%d' % user.uid, + # FIXME: тут в разных микротиках или =target-addresses или =target + '=target=%s' % str(user.ip), + '=max-limit=%.3fM/%.3fM' % (user.tariff.speedOut, user.tariff.speedIn), + '=queue=MikroBILL_SFQ/MikroBILL_SFQ', + '=burst-time=1/1' + ]) def remove(self, user): assert isinstance(user, AbonStruct) @@ -240,11 +239,13 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta): return self._exec_cmd(['/queue/simple/remove', '=.id=*' + str(q.sid)]) def remove_range(self, q_ids): - names = ['%d' % usr for usr in q_ids] - return self._exec_cmd(['/queue/simple/remove'] + names) + if q_ids is not None and len(q_ids) > 0: + return self._exec_cmd(['/queue/simple/remove', '=numbers=' + ','.join(q_ids)]) def update(self, user): assert isinstance(user, AbonStruct) + if user.tariff is None or not isinstance(user.tariff, TariffStruct): + return queue = self.find('uid%d' % user.uid) if queue is None: # не нашли запись в шейпере об абоненте, добавим @@ -253,20 +254,22 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta): mk_id = queue.sid # обновляем шейпер абонента return self._exec_cmd(['/queue/simple/set', '=.id=*' + mk_id, - '=name=uid%d' % user.uid, - '=max-limit=%.3fM/%.3fM' % (user.tariff.speedOut, user.tariff.speedIn), - #FIXME: тут в разных микротиках или =target-addresses или =target - '=target=%s' % user.ip.get_str(), - '=queue=MikroBILL_SFQ/MikroBILL_SFQ', - '=burst-time=1/1' - ]) + '=name=uid%d' % user.uid, + '=max-limit=%.3fM/%.3fM' % (user.tariff.speedOut, user.tariff.speedIn), + # FIXME: тут в разных микротиках или =target-addresses или =target + '=target=%s' % user.ip.get_str(), + '=queue=MikroBILL_SFQ/MikroBILL_SFQ', + '=burst-time=1/1' + ]) # читаем шейпер, возващаем записи о шейпере def read_queue_iter(self): queues = self._exec_cmd_iter(['/queue/simple/print', '=detail']) for queue in queues: if queue[0] == '!done': return - yield self._build_shape_obj(queue[1]) + sobj = self._build_shape_obj(queue[1]) + if sobj is not None: + yield sobj # то же что и выше, только получаем только номера в микротике def read_mikroids_iter(self): @@ -294,6 +297,12 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta): return self._exec_cmd(['/queue/simple/enable', '=.id=*' + q.sid]) +class IpAddressListObj(IpStruct): + def __init__(self, ip, mk_id): + super(IpAddressListObj, self).__init__(ip) + self.mk_id = str(mk_id).replace('*', '') + + class IpAddressListManager(TransmitterManager, metaclass=ABCMeta): def add(self, list_name, ip, timeout=None): @@ -301,7 +310,7 @@ class IpAddressListManager(TransmitterManager, metaclass=ABCMeta): commands = [ '/ip/firewall/address-list/add', '=list=%s' % list_name, - '=address=%s' % ip.get_str() + '=address=%s' % str(ip) ] if type(timeout) is int: commands.append('=timeout=%d' % timeout) @@ -311,7 +320,7 @@ class IpAddressListManager(TransmitterManager, metaclass=ABCMeta): if timeout is not None: commands = [ '/ip/firewall/address-list/set', '=.id=' + str(mk_id), - '=timeout=%d' % timeout + '=timeout=%d' % timeout ] return self._exec_cmd(commands) @@ -321,12 +330,26 @@ class IpAddressListManager(TransmitterManager, metaclass=ABCMeta): '=.id=*' + str(mk_id).replace('*', '') ]) + def remove_range(self, items): + ids = [ip.mk_id for ip in items if isinstance(ip, IpAddressListObj)] + if len(ids) > 0: + return self._exec_cmd([ + '/ip/firewall/address-list/remove', + 'numbers=' + ','.join(ids) + ]) + def find(self, ip, list_name): assert isinstance(ip, IpStruct) return self._exec_cmd([ '/ip/firewall/address-list/print', 'where', '?list=%s' % list_name, - '?address=%s' % ip.get_str() + '?address=%s' % str(ip) + ]) + + def read_ips_iter(self, list_name): + return self._exec_cmd([ + '/ip/firewall/address-list/print', 'where', + '?list=%s' % list_name ]) def disable(self, user): @@ -352,21 +375,24 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager): def add_user_range(self, user_list): for usr in user_list: - self.add_user(usr) + if hasattr(usr, 'is_dhcp') and not usr.is_dhcp(): + self.add_user(usr) def remove_user_range(self, users): - queues = [QueueManager.find(self, 'uid%d' % user.uid) for user in users if isinstance(user, AbonStruct)] - queue_names = ["uid%d" % queue.sid for queue in queues] - QueueManager.remove_range(self, queue_names) + queue_ids = [usr.queue_id for usr in users if usr is not None] + QueueManager.remove_range(self, queue_ids) ips = [user.ip for user in users if isinstance(user, AbonStruct)] for ip in ips: ip_list_entity = IpAddressListManager.find(self, ip, LIST_USERS_ALLOWED) - if len(ip_list_entity) > 1: + if ip_list_entity is not None and len(ip_list_entity) > 1: IpAddressListManager.remove(self, ip_list_entity[0]['=.id']) def add_user(self, user, ip_timeout=None): - assert isinstance(user.tariff, TariffStruct) assert isinstance(user.ip, IpStruct) + if user.tariff is None or not isinstance(user.tariff, TariffStruct): + return + if not user.is_access(): + return QueueManager.add(self, user) IpAddressListManager.add(self, LIST_USERS_ALLOWED, user.ip, ip_timeout) # удаляем из списка заблокированных абонентов @@ -377,12 +403,11 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager): def remove_user(self, user): QueueManager.remove(self, user) firewall_ip_list_obj = IpAddressListManager.find(self, user.ip, LIST_USERS_ALLOWED) - if len(firewall_ip_list_obj) > 1: + if firewall_ip_list_obj is not None and len(firewall_ip_list_obj) > 1: IpAddressListManager.remove(self, firewall_ip_list_obj[0]['=.id']) # обновляем основную инфу абонента def update_user(self, user, ip_timeout=None): - assert isinstance(user.tariff, TariffStruct) assert isinstance(user.ip, IpStruct) # ищем ip абонента в списке ip @@ -396,6 +421,13 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager): IpAddressListManager.remove(self, find_res[0]['=.id']) return + # если нет услуги то её не должно быть и в nas + if user.tariff is None or not isinstance(user.tariff, TariffStruct): + queue = QueueManager.find(self, 'uid%d' % user.uid) + if queue is not None: + QueueManager.remove(self, user) + return + # если не найден (mikrotik возвращает пустой словарь в списке если ничего нет) if len(find_res) < 2: # добавим запись об абоненте @@ -423,7 +455,7 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager): interface = r[0]['=interface'] r = self._exec_cmd([ '/ping', '=address=%s' % host, '=arp-ping=yes', '=interval=100ms', '=count=%d' % count, - '=interface=%s' % interface + '=interface=%s' % interface ]) received, sent = int(r[-2:][0]['=received']), int(r[-2:][0]['=sent']) return received, sent @@ -454,3 +486,9 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager): def remove_tariff(self, tid): pass + + def read_users(self): + # shapes is ShapeItem + # allowed_ips = IpAddressListManager.read_ips_iter(self, LIST_USERS_ALLOWED) + queues = QueueManager.read_queue_iter(self) + return queues diff --git a/agent/structs.py b/agent/structs.py index eda9d10..849a296 100644 --- a/agent/structs.py +++ b/agent/structs.py @@ -34,9 +34,6 @@ class IpStruct(BaseStruct): self.__ip = int(dt[0]) return self - def get_str(self): - return int2ip(self.__ip) - def get_int(self): return self.__ip @@ -44,22 +41,32 @@ class IpStruct(BaseStruct): assert isinstance(other, IpStruct) return self.__ip == other.__ip + def __int__(self): + return self.__ip + def __str__(self): return int2ip(self.__ip) + def __hash__(self): + return hash(self.__ip) + # Как обслуживается абонент class TariffStruct(BaseStruct): def __init__(self, tariff_id=0, speedIn=None, speedOut=None): - self.tid = tariff_id - self.speedIn = speedIn if speedIn is not None else 0.001 - self.speedOut = speedOut if speedOut is not None else 0.001 + self.tid = int(tariff_id) + self.speedIn = float(speedIn if speedIn is not None else 0.001) + self.speedOut = float(speedOut if speedOut is not None else 0.001) def serialize(self): dt = pack("!Iff", int(self.tid), float(self.speedIn), float(self.speedOut)) return dt + # Да, если все значения нулевые + def is_empty(self): + return self.tid == 0 and self.speedIn == 0.001 and self.speedOut == 0.001 + def deserialize(self, data, *args): dt = unpack("!Iff", data) self.tid = int(dt[0]) @@ -68,7 +75,6 @@ class TariffStruct(BaseStruct): return self def __eq__(self, other): - assert isinstance(other, TariffStruct) # не сравниваем id, т.к. тарифы с одинаковыми скоростями для NAS одинаковы # Да и иногда не удобно доставать из nas id тарифы из базы return self.speedIn == other.speedIn and self.speedOut == other.speedOut @@ -76,6 +82,11 @@ class TariffStruct(BaseStruct): def __str__(self): return "Id=%d, speedIn=%.2f, speedOut=%.2f" % (self.tid, self.speedIn, self.speedOut) + # нужно чтоб хеши тарифов In10,Out20 и In20,Out10 были разными + # поэтому сначала float->str и потом хеш + def __hash__(self): + return hash(str(self.speedIn) + str(self.speedOut)) + # Абонент из базы class AbonStruct(BaseStruct): @@ -83,14 +94,15 @@ class AbonStruct(BaseStruct): def __init__(self, uid=None, ip=None, tariff=None, is_active=True): self.uid = int(uid) self.ip = IpStruct(ip) - assert isinstance(tariff, TariffStruct) self.tariff = tariff self.is_active = is_active def serialize(self): + if self.tariff is None: + return assert isinstance(self.tariff, TariffStruct) assert isinstance(self.ip, IpStruct) - dt = pack("!LII?", self.uid, self.ip.get_int(), self.tariff.tid, self.is_active) + dt = pack("!LII?", self.uid, int(self.ip), self.tariff.tid, self.is_active) return dt def deserialize(self, data, tariff=None): @@ -110,7 +122,10 @@ class AbonStruct(BaseStruct): return r def __str__(self): - return "uid=%d, ip=%s, tariff=%s" % (self.uid, self.ip, self.tariff) + return "uid=%d, ip=%s, tariff=%s" % (self.uid, self.ip, self.tariff or '') + + def __hash__(self): + return hash(int(self.ip) + hash(self.tariff)) if self.tariff is not None else 0 # Правило шейпинга в фаере, или ещё можно сказать услуга абонента на NAS diff --git a/cron.py b/cron.py index e42d2d8..3b9cc7b 100755 --- a/cron.py +++ b/cron.py @@ -10,40 +10,17 @@ from mydefs import LogicError def main(): - tm = None - - users = Abon.objects.all() - for user in users: - try: - # бдим за услугами абонента: просроченные отключить, заказанные подключить - user.activate_next_tariff(user) - - # если нет ip то и нет смысла лезть в NAS - if user.ip_address is None: - continue - - # а есть-ли у абонента доступ к услуге - if not user.is_access(): - continue - - # строим структуру агента - ab = user.build_agent_struct() - if ab is None: - # если не построилась структура агента, значит нет ip - # а если нет ip то и синхронизировать абонента без ip нельзя - continue - - # обновляем абонента если он статический. Иначе его обновит dhcp - if user.opt82 is None: - if tm is None: - tm = Transmitter() - tm.update_user(ab) - - except (NasNetworkError, NasFailedResult) as er: - print("Error:", er) - except LogicError as er: - print("Notice:", er) - + try: + users = Abon.objects.filter(is_active=True, is_admin=False).exclude(ip_address=None) + tm = Transmitter() + tm.sync_nas(users) + + except (NasNetworkError, NasFailedResult) as er: + print("Error:", er) + exit(1) + except LogicError as er: + print("Notice:", er) + exit(1) if __name__ == "__main__": try: