Browse Source

Теперь крон будет наблюдать за тем чтоб то что есть в биллинге соответствовало тому что есть в nas'е

devel
bashmak 9 years ago
parent
commit
d3298a719c
  1. 24
      agent/core.py
  2. 86
      agent/mod_mikrotik.py
  3. 35
      agent/structs.py
  4. 31
      cron.py

24
agent/core.py

@ -99,3 +99,27 @@ class BaseTransmitter(metaclass=ABCMeta):
:param count: количество пингов
:return: None если не пингуется, иначе кортеж, в котором (сколько вернулось, сколько было отправлено)
"""
@abstractmethod
def read_users(self):
"""
Читаем пользователей с NAS
:return: список AbonStruct
"""
def _diff_users(self, users_from_db):
"""
:param users_from_db: QuerySet всех абонентов у которых может быть обслуживание
:return: на выходе получаем абонентов которых надо добавить в nas и которых надо удалить
"""
users_from_db = [ab.build_agent_struct() for ab in users_from_db if ab.is_access()]
users_from_db = set([ab for ab in users_from_db if ab is not None and ab.tariff is not None])
users_from_nas = set(self.read_users())
list_for_del = (users_from_db ^ users_from_nas) - users_from_db
list_for_add = users_from_db - users_from_nas
return list_for_add, list_for_del
def sync_nas(self, users_from_db):
list_for_add, list_for_del = self._diff_users(users_from_db)
self.remove_user_range( list_for_del )
self.add_user_range( list_for_add )

86
agent/mod_mikrotik.py

@ -5,7 +5,7 @@ from abc import ABCMeta
from hashlib import md5
from .core import BaseTransmitter, NasFailedResult, NasNetworkError
from mydefs import ping
from .structs import TariffStruct, AbonStruct, IpStruct, ShapeItem
from .structs import TariffStruct, AbonStruct, IpStruct
from . import settings
from djing.settings import DEBUG
import re
@ -190,13 +190,12 @@ class TransmitterManager(BaseTransmitter, metaclass=ABCMeta):
res = text_speed_digit
elif text_append == 'k':
res = text_speed_digit / 1000
#elif text_append == 'G':
# elif text_append == 'G':
# res = text_speed_digit * 0x400
else:
res = float(re.sub(r'[a-zA-Z]', '', text_speed)) / 1000**2
res = float(re.sub(r'[a-zA-Z]', '', text_speed)) / 1000 ** 2
return res
try:
speeds = info['=max-limit'].split('/')
t = TariffStruct(
speedIn=parse_speed(speeds[1]),
@ -204,14 +203,13 @@ class TransmitterManager(BaseTransmitter, metaclass=ABCMeta):
)
a = AbonStruct(
uid=int(info['=name'][3:]),
#FIXME: тут в разных микротиках или =target-addresses или =target
# FIXME: тут в разных микротиках или =target-addresses или =target
ip=info['=target'][:-3],
tariff=t,
is_active=False if info['=disabled'] == 'false' else True
)
return ShapeItem(abon=a, sid=info['=.id'].replace('*', ''))
except KeyError:
return
a.queue_id = info['=.id']
return a
class QueueManager(TransmitterManager, metaclass=ABCMeta):
@ -223,11 +221,12 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta):
def add(self, user):
assert isinstance(user, AbonStruct)
assert isinstance(user.tariff, TariffStruct)
if user.tariff is None or not isinstance(user.tariff, TariffStruct):
return
return self._exec_cmd(['/queue/simple/add',
'=name=uid%d' % user.uid,
#FIXME: тут в разных микротиках или =target-addresses или =target
'=target=%s' % user.ip.get_str(),
# FIXME: тут в разных микротиках или =target-addresses или =target
'=target=%s' % str(user.ip),
'=max-limit=%.3fM/%.3fM' % (user.tariff.speedOut, user.tariff.speedIn),
'=queue=MikroBILL_SFQ/MikroBILL_SFQ',
'=burst-time=1/1'
@ -240,11 +239,13 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta):
return self._exec_cmd(['/queue/simple/remove', '=.id=*' + str(q.sid)])
def remove_range(self, q_ids):
names = ['%d' % usr for usr in q_ids]
return self._exec_cmd(['/queue/simple/remove'] + names)
if q_ids is not None and len(q_ids) > 0:
return self._exec_cmd(['/queue/simple/remove', '=numbers=' + ','.join(q_ids)])
def update(self, user):
assert isinstance(user, AbonStruct)
if user.tariff is None or not isinstance(user.tariff, TariffStruct):
return
queue = self.find('uid%d' % user.uid)
if queue is None:
# не нашли запись в шейпере об абоненте, добавим
@ -255,7 +256,7 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta):
return self._exec_cmd(['/queue/simple/set', '=.id=*' + mk_id,
'=name=uid%d' % user.uid,
'=max-limit=%.3fM/%.3fM' % (user.tariff.speedOut, user.tariff.speedIn),
#FIXME: тут в разных микротиках или =target-addresses или =target
# FIXME: тут в разных микротиках или =target-addresses или =target
'=target=%s' % user.ip.get_str(),
'=queue=MikroBILL_SFQ/MikroBILL_SFQ',
'=burst-time=1/1'
@ -266,7 +267,9 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta):
queues = self._exec_cmd_iter(['/queue/simple/print', '=detail'])
for queue in queues:
if queue[0] == '!done': return
yield self._build_shape_obj(queue[1])
sobj = self._build_shape_obj(queue[1])
if sobj is not None:
yield sobj
# то же что и выше, только получаем только номера в микротике
def read_mikroids_iter(self):
@ -294,6 +297,12 @@ class QueueManager(TransmitterManager, metaclass=ABCMeta):
return self._exec_cmd(['/queue/simple/enable', '=.id=*' + q.sid])
class IpAddressListObj(IpStruct):
def __init__(self, ip, mk_id):
super(IpAddressListObj, self).__init__(ip)
self.mk_id = str(mk_id).replace('*', '')
class IpAddressListManager(TransmitterManager, metaclass=ABCMeta):
def add(self, list_name, ip, timeout=None):
@ -301,7 +310,7 @@ class IpAddressListManager(TransmitterManager, metaclass=ABCMeta):
commands = [
'/ip/firewall/address-list/add',
'=list=%s' % list_name,
'=address=%s' % ip.get_str()
'=address=%s' % str(ip)
]
if type(timeout) is int:
commands.append('=timeout=%d' % timeout)
@ -321,12 +330,26 @@ class IpAddressListManager(TransmitterManager, metaclass=ABCMeta):
'=.id=*' + str(mk_id).replace('*', '')
])
def remove_range(self, items):
ids = [ip.mk_id for ip in items if isinstance(ip, IpAddressListObj)]
if len(ids) > 0:
return self._exec_cmd([
'/ip/firewall/address-list/remove',
'numbers=' + ','.join(ids)
])
def find(self, ip, list_name):
assert isinstance(ip, IpStruct)
return self._exec_cmd([
'/ip/firewall/address-list/print', 'where',
'?list=%s' % list_name,
'?address=%s' % ip.get_str()
'?address=%s' % str(ip)
])
def read_ips_iter(self, list_name):
return self._exec_cmd([
'/ip/firewall/address-list/print', 'where',
'?list=%s' % list_name
])
def disable(self, user):
@ -352,21 +375,24 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager):
def add_user_range(self, user_list):
for usr in user_list:
if hasattr(usr, 'is_dhcp') and not usr.is_dhcp():
self.add_user(usr)
def remove_user_range(self, users):
queues = [QueueManager.find(self, 'uid%d' % user.uid) for user in users if isinstance(user, AbonStruct)]
queue_names = ["uid%d" % queue.sid for queue in queues]
QueueManager.remove_range(self, queue_names)
queue_ids = [usr.queue_id for usr in users if usr is not None]
QueueManager.remove_range(self, queue_ids)
ips = [user.ip for user in users if isinstance(user, AbonStruct)]
for ip in ips:
ip_list_entity = IpAddressListManager.find(self, ip, LIST_USERS_ALLOWED)
if len(ip_list_entity) > 1:
if ip_list_entity is not None and len(ip_list_entity) > 1:
IpAddressListManager.remove(self, ip_list_entity[0]['=.id'])
def add_user(self, user, ip_timeout=None):
assert isinstance(user.tariff, TariffStruct)
assert isinstance(user.ip, IpStruct)
if user.tariff is None or not isinstance(user.tariff, TariffStruct):
return
if not user.is_access():
return
QueueManager.add(self, user)
IpAddressListManager.add(self, LIST_USERS_ALLOWED, user.ip, ip_timeout)
# удаляем из списка заблокированных абонентов
@ -377,12 +403,11 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager):
def remove_user(self, user):
QueueManager.remove(self, user)
firewall_ip_list_obj = IpAddressListManager.find(self, user.ip, LIST_USERS_ALLOWED)
if len(firewall_ip_list_obj) > 1:
if firewall_ip_list_obj is not None and len(firewall_ip_list_obj) > 1:
IpAddressListManager.remove(self, firewall_ip_list_obj[0]['=.id'])
# обновляем основную инфу абонента
def update_user(self, user, ip_timeout=None):
assert isinstance(user.tariff, TariffStruct)
assert isinstance(user.ip, IpStruct)
# ищем ip абонента в списке ip
@ -396,6 +421,13 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager):
IpAddressListManager.remove(self, find_res[0]['=.id'])
return
# если нет услуги то её не должно быть и в nas
if user.tariff is None or not isinstance(user.tariff, TariffStruct):
queue = QueueManager.find(self, 'uid%d' % user.uid)
if queue is not None:
QueueManager.remove(self, user)
return
# если не найден (mikrotik возвращает пустой словарь в списке если ничего нет)
if len(find_res) < 2:
# добавим запись об абоненте
@ -454,3 +486,9 @@ class MikrotikTransmitter(QueueManager, IpAddressListManager):
def remove_tariff(self, tid):
pass
def read_users(self):
# shapes is ShapeItem
# allowed_ips = IpAddressListManager.read_ips_iter(self, LIST_USERS_ALLOWED)
queues = QueueManager.read_queue_iter(self)
return queues

35
agent/structs.py

@ -34,9 +34,6 @@ class IpStruct(BaseStruct):
self.__ip = int(dt[0])
return self
def get_str(self):
return int2ip(self.__ip)
def get_int(self):
return self.__ip
@ -44,22 +41,32 @@ class IpStruct(BaseStruct):
assert isinstance(other, IpStruct)
return self.__ip == other.__ip
def __int__(self):
return self.__ip
def __str__(self):
return int2ip(self.__ip)
def __hash__(self):
return hash(self.__ip)
# Как обслуживается абонент
class TariffStruct(BaseStruct):
def __init__(self, tariff_id=0, speedIn=None, speedOut=None):
self.tid = tariff_id
self.speedIn = speedIn if speedIn is not None else 0.001
self.speedOut = speedOut if speedOut is not None else 0.001
self.tid = int(tariff_id)
self.speedIn = float(speedIn if speedIn is not None else 0.001)
self.speedOut = float(speedOut if speedOut is not None else 0.001)
def serialize(self):
dt = pack("!Iff", int(self.tid), float(self.speedIn), float(self.speedOut))
return dt
# Да, если все значения нулевые
def is_empty(self):
return self.tid == 0 and self.speedIn == 0.001 and self.speedOut == 0.001
def deserialize(self, data, *args):
dt = unpack("!Iff", data)
self.tid = int(dt[0])
@ -68,7 +75,6 @@ class TariffStruct(BaseStruct):
return self
def __eq__(self, other):
assert isinstance(other, TariffStruct)
# не сравниваем id, т.к. тарифы с одинаковыми скоростями для NAS одинаковы
# Да и иногда не удобно доставать из nas id тарифы из базы
return self.speedIn == other.speedIn and self.speedOut == other.speedOut
@ -76,6 +82,11 @@ class TariffStruct(BaseStruct):
def __str__(self):
return "Id=%d, speedIn=%.2f, speedOut=%.2f" % (self.tid, self.speedIn, self.speedOut)
# нужно чтоб хеши тарифов In10,Out20 и In20,Out10 были разными
# поэтому сначала float->str и потом хеш
def __hash__(self):
return hash(str(self.speedIn) + str(self.speedOut))
# Абонент из базы
class AbonStruct(BaseStruct):
@ -83,14 +94,15 @@ class AbonStruct(BaseStruct):
def __init__(self, uid=None, ip=None, tariff=None, is_active=True):
self.uid = int(uid)
self.ip = IpStruct(ip)
assert isinstance(tariff, TariffStruct)
self.tariff = tariff
self.is_active = is_active
def serialize(self):
if self.tariff is None:
return
assert isinstance(self.tariff, TariffStruct)
assert isinstance(self.ip, IpStruct)
dt = pack("!LII?", self.uid, self.ip.get_int(), self.tariff.tid, self.is_active)
dt = pack("!LII?", self.uid, int(self.ip), self.tariff.tid, self.is_active)
return dt
def deserialize(self, data, tariff=None):
@ -110,7 +122,10 @@ class AbonStruct(BaseStruct):
return r
def __str__(self):
return "uid=%d, ip=%s, tariff=%s" % (self.uid, self.ip, self.tariff)
return "uid=%d, ip=%s, tariff=%s" % (self.uid, self.ip, self.tariff or '<No Service>')
def __hash__(self):
return hash(int(self.ip) + hash(self.tariff)) if self.tariff is not None else 0
# Правило шейпинга в фаере, или ещё можно сказать услуга абонента на NAS

31
cron.py

@ -10,40 +10,17 @@ from mydefs import LogicError
def main():
tm = None
users = Abon.objects.all()
for user in users:
try:
# бдим за услугами абонента: просроченные отключить, заказанные подключить
user.activate_next_tariff(user)
# если нет ip то и нет смысла лезть в NAS
if user.ip_address is None:
continue
# а есть-ли у абонента доступ к услуге
if not user.is_access():
continue
# строим структуру агента
ab = user.build_agent_struct()
if ab is None:
# если не построилась структура агента, значит нет ip
# а если нет ip то и синхронизировать абонента без ip нельзя
continue
# обновляем абонента если он статический. Иначе его обновит dhcp
if user.opt82 is None:
if tm is None:
users = Abon.objects.filter(is_active=True, is_admin=False).exclude(ip_address=None)
tm = Transmitter()
tm.update_user(ab)
tm.sync_nas(users)
except (NasNetworkError, NasFailedResult) as er:
print("Error:", er)
exit(1)
except LogicError as er:
print("Notice:", er)
exit(1)
if __name__ == "__main__":
try:

Loading…
Cancel
Save