You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

330 lines
10 KiB

# See LICENSE
"""Classes for sending SMS"""
from datetime import datetime, timedelta
import re
from messaging.sms import consts
from messaging.utils import (debug, encode_str, clean_number,
pack_8bits_to_ucs2, pack_8bits_to_7bits,
pack_8bits_to_8bit,
timedelta_to_relative_validity,
datetime_to_absolute_validity)
from messaging.sms.base import SmsBase
from messaging.sms.gsm0338 import is_gsm_text
from messaging.sms.pdu import Pdu
VALID_NUMBER = re.compile("^\+?\d{3,20}$")
class SmsSubmit(SmsBase):
"""I am a SMS ready to be sent"""
def __init__(self, number, text):
super(SmsSubmit, self).__init__()
self._number = None
self._csca = None
self._klass = None
self._validity = None
self.request_status = False
self.ref = None
self.rand_id = None
self.id_list = range(0, 255)
self.msgvp = 0xaa
self.pid = 0x00
self.number = number
self.text = text
self.text_gsm = None
def _set_number(self, number):
if number and not VALID_NUMBER.match(number):
raise ValueError("Invalid number format: %s" % number)
self._number = number
number = property(lambda self: self._number, _set_number)
def _set_csca(self, csca):
if csca and not VALID_NUMBER.match(csca):
raise ValueError("Invalid csca format: %s" % csca)
self._csca = csca
csca = property(lambda self: self._csca, _set_csca)
def _set_validity(self, validity):
if validity is None or isinstance(validity, (timedelta, datetime)):
# valid values are None, timedelta and datetime
self._validity = validity
else:
raise TypeError("Don't know what to do with %s" % validity)
validity = property(lambda self: self._validity, _set_validity)
def _set_klass(self, klass):
if not isinstance(klass, int):
raise TypeError("_set_klass only accepts int objects")
if klass not in [0, 1, 2, 3]:
raise ValueError("class must be between 0 and 3")
self._klass = klass
klass = property(lambda self: self._klass, _set_klass)
def to_pdu(self):
"""Returns a list of :class:`~messaging.pdu.Pdu` objects"""
smsc_pdu = self._get_smsc_pdu()
sms_submit_pdu = self._get_sms_submit_pdu()
tpmessref_pdu = self._get_tpmessref_pdu()
sms_phone_pdu = self._get_phone_pdu()
tppid_pdu = self._get_tppid_pdu()
sms_msg_pdu = self._get_msg_pdu()
if len(sms_msg_pdu) == 1:
pdu = smsc_pdu
len_smsc = len(smsc_pdu) / 2
pdu += sms_submit_pdu
pdu += tpmessref_pdu
pdu += sms_phone_pdu
pdu += tppid_pdu
pdu += sms_msg_pdu[0]
debug("smsc_pdu: %s" % smsc_pdu)
debug("sms_submit_pdu: %s" % sms_submit_pdu)
debug("tpmessref_pdu: %s" % tpmessref_pdu)
debug("sms_phone_pdu: %s" % sms_phone_pdu)
debug("tppid_pdu: %s" % tppid_pdu)
debug("sms_msg_pdu: %s" % sms_msg_pdu)
debug("-" * 20)
debug("full_pdu: %s" % pdu)
debug("full_text: %s" % self.text)
debug("-" * 20)
return [Pdu(pdu, len_smsc)]
# multipart SMS
sms_submit_pdu = self._get_sms_submit_pdu(udh=True)
pdu_list = []
cnt = len(sms_msg_pdu)
for i, sms_msg_pdu_item in enumerate(sms_msg_pdu):
pdu = smsc_pdu
len_smsc = len(smsc_pdu) / 2
pdu += sms_submit_pdu
pdu += tpmessref_pdu
pdu += sms_phone_pdu
pdu += tppid_pdu
pdu += sms_msg_pdu_item
debug("smsc_pdu: %s" % smsc_pdu)
debug("sms_submit_pdu: %s" % sms_submit_pdu)
debug("tpmessref_pdu: %s" % tpmessref_pdu)
debug("sms_phone_pdu: %s" % sms_phone_pdu)
debug("tppid_pdu: %s" % tppid_pdu)
debug("sms_msg_pdu: %s" % sms_msg_pdu_item)
debug("-" * 20)
debug("full_pdu: %s" % pdu)
debug("full_text: %s" % self.text)
debug("-" * 20)
pdu_list.append(Pdu(pdu, len_smsc, cnt=cnt, seq=i + 1))
return pdu_list
def _get_smsc_pdu(self):
if not self.csca or not self.csca.strip():
return "00"
number = clean_number(self.csca)
ptype = 0x81 # set to unknown number by default
if number[0] == '+':
number = number[1:]
ptype = 0x91
if len(number) % 2:
number += 'F'
ps = chr(ptype)
for n in range(0, len(number), 2):
num = number[n + 1] + number[n]
ps += chr(int(num, 16))
pl = len(ps)
ps = chr(pl) + ps
return encode_str(ps)
def _get_tpmessref_pdu(self):
if self.ref is None:
self.ref = self._get_rand_id()
self.ref &= 0xFF
return encode_str(chr(self.ref))
def _get_phone_pdu(self):
number = clean_number(self.number)
ptype = 0x81
if number[0] == '+':
number = number[1:]
ptype = 0x91
pl = len(number)
if len(number) % 2:
number += 'F'
ps = chr(ptype)
for n in range(0, len(number), 2):
num = number[n + 1] + number[n]
ps += chr(int(num, 16))
ps = chr(pl) + ps
return encode_str(ps)
def _get_tppid_pdu(self):
return encode_str(chr(self.pid))
def _get_sms_submit_pdu(self, udh=False):
sms_submit = 0x1
if self.validity is None:
# handle no validity
pass
elif isinstance(self.validity, datetime):
# handle absolute validity
sms_submit |= 0x18
elif isinstance(self.validity, timedelta):
# handle relative validity
sms_submit |= 0x10
if self.request_status:
sms_submit |= 0x20
if udh:
sms_submit |= 0x40
return encode_str(chr(sms_submit))
def _get_msg_pdu(self):
# Data coding scheme
if self.fmt is None:
if is_gsm_text(self.text):
self.fmt = 0x00
else:
self.fmt = 0x08
self.dcs = self.fmt
if self.klass is not None:
if self.klass == 0:
self.dcs |= 0x10
elif self.klass == 1:
self.dcs |= 0x11
elif self.klass == 2:
self.dcs |= 0x12
elif self.klass == 3:
self.dcs |= 0x13
dcs_pdu = encode_str(chr(self.dcs))
# Validity period
msgvp_pdu = ""
if self.validity is None:
# handle no validity
pass
elif isinstance(self.validity, timedelta):
# handle relative
msgvp = timedelta_to_relative_validity(self.validity)
msgvp_pdu = encode_str(chr(msgvp))
elif isinstance(self.validity, datetime):
# handle absolute
msgvp = datetime_to_absolute_validity(self.validity)
msgvp_pdu = ''.join(map(encode_str, map(chr, msgvp)))
# UDL + UD
message_pdu = ""
if self.fmt == 0x00:
self.text_gsm = self.text.encode("gsm0338")
if len(self.text_gsm) <= consts.SEVENBIT_SIZE:
message_pdu = [pack_8bits_to_7bits(self.text_gsm)]
else:
message_pdu = self._split_sms_message(self.text_gsm)
elif self.fmt == 0x04:
if len(self.text) <= consts.EIGHTBIT_SIZE:
message_pdu = [pack_8bits_to_8bit(self.text)]
else:
message_pdu = self._split_sms_message(self.text)
elif self.fmt == 0x08:
if len(self.text) <= consts.UCS2_SIZE:
message_pdu = [pack_8bits_to_ucs2(self.text)]
else:
message_pdu = self._split_sms_message(self.text)
else:
raise ValueError("Unknown data coding scheme: %d" % self.fmt)
ret = []
for msg in message_pdu:
ret.append(dcs_pdu + msgvp_pdu + msg)
return ret
def _split_sms_message(self, text):
if self.fmt == 0x00:
len_without_udh = consts.SEVENBIT_MP_SIZE
limit = consts.SEVENBIT_SIZE
packing_func = pack_8bits_to_7bits
total_len = len(self.text_gsm)
elif self.fmt == 0x04:
len_without_udh = consts.EIGHTBIT_MP_SIZE
limit = consts.EIGHTBIT_SIZE
packing_func = pack_8bits_to_8bit
total_len = len(self.text)
elif self.fmt == 0x08:
len_without_udh = consts.UCS2_MP_SIZE
limit = consts.UCS2_SIZE
packing_func = pack_8bits_to_ucs2
total_len = len(self.text)
msgs = []
pi, pe = 0, len_without_udh
while pi < total_len:
if text[pi:pe][-1] == '\x1b':
pe -= 1
msgs.append(text[pi:pe])
pi = pe
pe += len_without_udh
pdu_msgs = []
udh_len = 0x05
mid = 0x00
data_len = 0x03
sms_ref = self._get_rand_id() if self.rand_id is None else self.rand_id
sms_ref &= 0xFF
for i, msg in enumerate(msgs):
i += 1
total_parts = len(msgs)
if limit == consts.SEVENBIT_SIZE:
udh = (chr(udh_len) + chr(mid) + chr(data_len) +
chr(sms_ref) + chr(total_parts) + chr(i))
padding = " "
else:
udh = (unichr(int("%04x" % ((udh_len << 8) | mid), 16)) +
unichr(int("%04x" % ((data_len << 8) | sms_ref), 16)) +
unichr(int("%04x" % ((total_parts << 8) | i), 16)))
padding = ""
pdu_msgs.append(packing_func(padding + msg, udh))
return pdu_msgs
def _get_rand_id(self):
if not self.id_list:
self.id_list = range(0, 255)
return list(self.id_list).pop(0)