You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
330 lines
10 KiB
330 lines
10 KiB
# See LICENSE
|
|
"""Classes for sending SMS"""
|
|
|
|
from datetime import datetime, timedelta
|
|
import re
|
|
|
|
from messaging.sms import consts
|
|
from messaging.utils import (debug, encode_str, clean_number,
|
|
pack_8bits_to_ucs2, pack_8bits_to_7bits,
|
|
pack_8bits_to_8bit,
|
|
timedelta_to_relative_validity,
|
|
datetime_to_absolute_validity)
|
|
from messaging.sms.base import SmsBase
|
|
from messaging.sms.gsm0338 import is_gsm_text
|
|
from messaging.sms.pdu import Pdu
|
|
|
|
VALID_NUMBER = re.compile("^\+?\d{3,20}$")
|
|
|
|
|
|
class SmsSubmit(SmsBase):
|
|
"""I am a SMS ready to be sent"""
|
|
|
|
def __init__(self, number, text):
|
|
super(SmsSubmit, self).__init__()
|
|
self._number = None
|
|
self._csca = None
|
|
self._klass = None
|
|
self._validity = None
|
|
self.request_status = False
|
|
self.ref = None
|
|
self.rand_id = None
|
|
self.id_list = range(0, 255)
|
|
self.msgvp = 0xaa
|
|
self.pid = 0x00
|
|
|
|
self.number = number
|
|
self.text = text
|
|
self.text_gsm = None
|
|
|
|
def _set_number(self, number):
|
|
if number and not VALID_NUMBER.match(number):
|
|
raise ValueError("Invalid number format: %s" % number)
|
|
|
|
self._number = number
|
|
|
|
number = property(lambda self: self._number, _set_number)
|
|
|
|
def _set_csca(self, csca):
|
|
if csca and not VALID_NUMBER.match(csca):
|
|
raise ValueError("Invalid csca format: %s" % csca)
|
|
|
|
self._csca = csca
|
|
|
|
csca = property(lambda self: self._csca, _set_csca)
|
|
|
|
def _set_validity(self, validity):
|
|
if validity is None or isinstance(validity, (timedelta, datetime)):
|
|
# valid values are None, timedelta and datetime
|
|
self._validity = validity
|
|
else:
|
|
raise TypeError("Don't know what to do with %s" % validity)
|
|
|
|
validity = property(lambda self: self._validity, _set_validity)
|
|
|
|
def _set_klass(self, klass):
|
|
if not isinstance(klass, int):
|
|
raise TypeError("_set_klass only accepts int objects")
|
|
|
|
if klass not in [0, 1, 2, 3]:
|
|
raise ValueError("class must be between 0 and 3")
|
|
|
|
self._klass = klass
|
|
|
|
klass = property(lambda self: self._klass, _set_klass)
|
|
|
|
def to_pdu(self):
|
|
"""Returns a list of :class:`~messaging.pdu.Pdu` objects"""
|
|
smsc_pdu = self._get_smsc_pdu()
|
|
sms_submit_pdu = self._get_sms_submit_pdu()
|
|
tpmessref_pdu = self._get_tpmessref_pdu()
|
|
sms_phone_pdu = self._get_phone_pdu()
|
|
tppid_pdu = self._get_tppid_pdu()
|
|
sms_msg_pdu = self._get_msg_pdu()
|
|
|
|
if len(sms_msg_pdu) == 1:
|
|
pdu = smsc_pdu
|
|
len_smsc = len(smsc_pdu) / 2
|
|
pdu += sms_submit_pdu
|
|
pdu += tpmessref_pdu
|
|
pdu += sms_phone_pdu
|
|
pdu += tppid_pdu
|
|
pdu += sms_msg_pdu[0]
|
|
debug("smsc_pdu: %s" % smsc_pdu)
|
|
debug("sms_submit_pdu: %s" % sms_submit_pdu)
|
|
debug("tpmessref_pdu: %s" % tpmessref_pdu)
|
|
debug("sms_phone_pdu: %s" % sms_phone_pdu)
|
|
debug("tppid_pdu: %s" % tppid_pdu)
|
|
debug("sms_msg_pdu: %s" % sms_msg_pdu)
|
|
debug("-" * 20)
|
|
debug("full_pdu: %s" % pdu)
|
|
debug("full_text: %s" % self.text)
|
|
debug("-" * 20)
|
|
return [Pdu(pdu, len_smsc)]
|
|
|
|
# multipart SMS
|
|
sms_submit_pdu = self._get_sms_submit_pdu(udh=True)
|
|
pdu_list = []
|
|
cnt = len(sms_msg_pdu)
|
|
for i, sms_msg_pdu_item in enumerate(sms_msg_pdu):
|
|
pdu = smsc_pdu
|
|
len_smsc = len(smsc_pdu) / 2
|
|
pdu += sms_submit_pdu
|
|
pdu += tpmessref_pdu
|
|
pdu += sms_phone_pdu
|
|
pdu += tppid_pdu
|
|
pdu += sms_msg_pdu_item
|
|
debug("smsc_pdu: %s" % smsc_pdu)
|
|
debug("sms_submit_pdu: %s" % sms_submit_pdu)
|
|
debug("tpmessref_pdu: %s" % tpmessref_pdu)
|
|
debug("sms_phone_pdu: %s" % sms_phone_pdu)
|
|
debug("tppid_pdu: %s" % tppid_pdu)
|
|
debug("sms_msg_pdu: %s" % sms_msg_pdu_item)
|
|
debug("-" * 20)
|
|
debug("full_pdu: %s" % pdu)
|
|
debug("full_text: %s" % self.text)
|
|
debug("-" * 20)
|
|
|
|
pdu_list.append(Pdu(pdu, len_smsc, cnt=cnt, seq=i + 1))
|
|
|
|
return pdu_list
|
|
|
|
def _get_smsc_pdu(self):
|
|
if not self.csca or not self.csca.strip():
|
|
return "00"
|
|
|
|
number = clean_number(self.csca)
|
|
ptype = 0x81 # set to unknown number by default
|
|
if number[0] == '+':
|
|
number = number[1:]
|
|
ptype = 0x91
|
|
|
|
if len(number) % 2:
|
|
number += 'F'
|
|
|
|
ps = chr(ptype)
|
|
for n in range(0, len(number), 2):
|
|
num = number[n + 1] + number[n]
|
|
ps += chr(int(num, 16))
|
|
|
|
pl = len(ps)
|
|
ps = chr(pl) + ps
|
|
|
|
return encode_str(ps)
|
|
|
|
def _get_tpmessref_pdu(self):
|
|
if self.ref is None:
|
|
self.ref = self._get_rand_id()
|
|
|
|
self.ref &= 0xFF
|
|
return encode_str(chr(self.ref))
|
|
|
|
def _get_phone_pdu(self):
|
|
number = clean_number(self.number)
|
|
ptype = 0x81
|
|
if number[0] == '+':
|
|
number = number[1:]
|
|
ptype = 0x91
|
|
|
|
pl = len(number)
|
|
if len(number) % 2:
|
|
number += 'F'
|
|
|
|
ps = chr(ptype)
|
|
for n in range(0, len(number), 2):
|
|
num = number[n + 1] + number[n]
|
|
ps += chr(int(num, 16))
|
|
|
|
ps = chr(pl) + ps
|
|
return encode_str(ps)
|
|
|
|
def _get_tppid_pdu(self):
|
|
return encode_str(chr(self.pid))
|
|
|
|
def _get_sms_submit_pdu(self, udh=False):
|
|
sms_submit = 0x1
|
|
if self.validity is None:
|
|
# handle no validity
|
|
pass
|
|
elif isinstance(self.validity, datetime):
|
|
# handle absolute validity
|
|
sms_submit |= 0x18
|
|
elif isinstance(self.validity, timedelta):
|
|
# handle relative validity
|
|
sms_submit |= 0x10
|
|
|
|
if self.request_status:
|
|
sms_submit |= 0x20
|
|
|
|
if udh:
|
|
sms_submit |= 0x40
|
|
|
|
return encode_str(chr(sms_submit))
|
|
|
|
def _get_msg_pdu(self):
|
|
# Data coding scheme
|
|
if self.fmt is None:
|
|
if is_gsm_text(self.text):
|
|
self.fmt = 0x00
|
|
else:
|
|
self.fmt = 0x08
|
|
|
|
self.dcs = self.fmt
|
|
|
|
if self.klass is not None:
|
|
if self.klass == 0:
|
|
self.dcs |= 0x10
|
|
elif self.klass == 1:
|
|
self.dcs |= 0x11
|
|
elif self.klass == 2:
|
|
self.dcs |= 0x12
|
|
elif self.klass == 3:
|
|
self.dcs |= 0x13
|
|
|
|
dcs_pdu = encode_str(chr(self.dcs))
|
|
|
|
# Validity period
|
|
msgvp_pdu = ""
|
|
if self.validity is None:
|
|
# handle no validity
|
|
pass
|
|
|
|
elif isinstance(self.validity, timedelta):
|
|
# handle relative
|
|
msgvp = timedelta_to_relative_validity(self.validity)
|
|
msgvp_pdu = encode_str(chr(msgvp))
|
|
|
|
elif isinstance(self.validity, datetime):
|
|
# handle absolute
|
|
msgvp = datetime_to_absolute_validity(self.validity)
|
|
msgvp_pdu = ''.join(map(encode_str, map(chr, msgvp)))
|
|
|
|
# UDL + UD
|
|
message_pdu = ""
|
|
|
|
if self.fmt == 0x00:
|
|
self.text_gsm = self.text.encode("gsm0338")
|
|
if len(self.text_gsm) <= consts.SEVENBIT_SIZE:
|
|
message_pdu = [pack_8bits_to_7bits(self.text_gsm)]
|
|
else:
|
|
message_pdu = self._split_sms_message(self.text_gsm)
|
|
elif self.fmt == 0x04:
|
|
if len(self.text) <= consts.EIGHTBIT_SIZE:
|
|
message_pdu = [pack_8bits_to_8bit(self.text)]
|
|
else:
|
|
message_pdu = self._split_sms_message(self.text)
|
|
elif self.fmt == 0x08:
|
|
if len(self.text) <= consts.UCS2_SIZE:
|
|
message_pdu = [pack_8bits_to_ucs2(self.text)]
|
|
else:
|
|
message_pdu = self._split_sms_message(self.text)
|
|
else:
|
|
raise ValueError("Unknown data coding scheme: %d" % self.fmt)
|
|
|
|
ret = []
|
|
for msg in message_pdu:
|
|
ret.append(dcs_pdu + msgvp_pdu + msg)
|
|
|
|
return ret
|
|
|
|
def _split_sms_message(self, text):
|
|
if self.fmt == 0x00:
|
|
len_without_udh = consts.SEVENBIT_MP_SIZE
|
|
limit = consts.SEVENBIT_SIZE
|
|
packing_func = pack_8bits_to_7bits
|
|
total_len = len(self.text_gsm)
|
|
|
|
elif self.fmt == 0x04:
|
|
len_without_udh = consts.EIGHTBIT_MP_SIZE
|
|
limit = consts.EIGHTBIT_SIZE
|
|
packing_func = pack_8bits_to_8bit
|
|
total_len = len(self.text)
|
|
|
|
elif self.fmt == 0x08:
|
|
len_without_udh = consts.UCS2_MP_SIZE
|
|
limit = consts.UCS2_SIZE
|
|
packing_func = pack_8bits_to_ucs2
|
|
total_len = len(self.text)
|
|
|
|
msgs = []
|
|
pi, pe = 0, len_without_udh
|
|
|
|
while pi < total_len:
|
|
if text[pi:pe][-1] == '\x1b':
|
|
pe -= 1
|
|
|
|
msgs.append(text[pi:pe])
|
|
pi = pe
|
|
pe += len_without_udh
|
|
|
|
pdu_msgs = []
|
|
|
|
udh_len = 0x05
|
|
mid = 0x00
|
|
data_len = 0x03
|
|
|
|
sms_ref = self._get_rand_id() if self.rand_id is None else self.rand_id
|
|
sms_ref &= 0xFF
|
|
|
|
for i, msg in enumerate(msgs):
|
|
i += 1
|
|
total_parts = len(msgs)
|
|
if limit == consts.SEVENBIT_SIZE:
|
|
udh = (chr(udh_len) + chr(mid) + chr(data_len) +
|
|
chr(sms_ref) + chr(total_parts) + chr(i))
|
|
padding = " "
|
|
else:
|
|
udh = (unichr(int("%04x" % ((udh_len << 8) | mid), 16)) +
|
|
unichr(int("%04x" % ((data_len << 8) | sms_ref), 16)) +
|
|
unichr(int("%04x" % ((total_parts << 8) | i), 16)))
|
|
padding = ""
|
|
|
|
pdu_msgs.append(packing_func(padding + msg, udh))
|
|
|
|
return pdu_msgs
|
|
|
|
def _get_rand_id(self):
|
|
if not self.id_list:
|
|
self.id_list = range(0, 255)
|
|
|
|
return list(self.id_list).pop(0)
|