aboutsummaryrefslogtreecommitdiffstats
path: root/gsxws/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'gsxws/core.py')
-rw-r--r--gsxws/core.py469
1 files changed, 469 insertions, 0 deletions
diff --git a/gsxws/core.py b/gsxws/core.py
new file mode 100644
index 0000000..0c55a61
--- /dev/null
+++ b/gsxws/core.py
@@ -0,0 +1,469 @@
+"""
+Copyright (c) 2013, Filipp Lepalaan All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer.
+- Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""
+
+import re
+import json
+import base64
+import shelve
+import os.path
+import hashlib
+import logging
+import httplib
+import tempfile
+from urlparse import urlparse
+import xml.etree.ElementTree as ET
+
+from datetime import date, time, datetime, timedelta
+
+GSX_ENV = "it"
+GSX_LANG = "en"
+GSX_REGION = "emea"
+GSX_SESSION = None
+GSX_LOCALE = "en_XXX"
+
+GSX_TIMEZONES = (
+ ('GMT', "UTC (Greenwich Mean Time)"),
+ ('PDT', "UTC - 7h (Pacific Daylight Time)"),
+ ('PST', "UTC - 8h (Pacific Standard Time)"),
+ ('CDT', "UTC - 5h (Central Daylight Time)"),
+ ('CST', "UTC - 6h (Central Standard Time)"),
+ ('EDT', "UTC - 4h (Eastern Daylight Time)"),
+ ('EST', "UTC - 5h (Eastern Standard Time)"),
+ ('CEST', "UTC + 2h (Central European Summer Time)"),
+ ('CET', "UTC + 1h (Central European Time)"),
+ ('JST', "UTC + 9h (Japan Standard Time)"),
+ ('IST', "UTC + 5.5h (Indian Standard Time)"),
+ ('CCT', "UTC + 8h (Chinese Coast Time)"),
+ ('AEST', "UTC + 10h (Australian Eastern Standard Time)"),
+ ('AEDT', "UTC + 11h (Australian Eastern Daylight Time)"),
+ ('ACST', "UTC + 9.5h (Austrailian Central Standard Time)"),
+ ('ACDT', "UTC + 10.5h (Australian Central Daylight Time)"),
+ ('NZST', "UTC + 12h (New Zealand Standard Time)"),
+)
+
+GSX_REGIONS = (
+ ('002', "Asia/Pacific"),
+ ('003', "Japan"),
+ ('004', "Europe"),
+ ('005', "United States"),
+ ('006', "Canadia"),
+ ('007', "Latin America"),
+)
+
+REGION_CODES = ('apac', 'am', 'la', 'emea',)
+
+ENVIRONMENTS = (
+ ('pr', "Production"),
+ ('ut', "Development"),
+ ('it', "Testing"),
+)
+
+
+def validate(value, what=None):
+ """
+ Tries to guess the meaning of value or validate that
+ value looks like what it's supposed to be.
+ """
+ result = None
+
+ if not isinstance(value, basestring):
+ raise ValueError('%s is not valid input')
+
+ rex = {
+ 'partNumber': r'^([A-Z]{1,2})?\d{3}\-?(\d{4}|[A-Z]{2})(/[A-Z])?$',
+ 'serialNumber': r'^[A-Z0-9]{11,12}$',
+ 'eeeCode': r'^[A-Z0-9]{3,4}$',
+ 'returnOrder': r'^7\d{9}$',
+ 'repairNumber': r'^\d{12}$',
+ 'dispatchId': r'^G\d{9}$',
+ 'alternateDeviceId': r'^\d{15}$',
+ 'diagnosticEventNumber': r'^\d{23}$',
+ 'productName': r'^i?Mac',
+ }
+
+ for k, v in rex.items():
+ if re.match(v, value):
+ result = k
+
+ return (result == what) if what else result
+
+
+def get_formats(locale=GSX_LOCALE):
+ filepath = os.path.join(os.path.dirname(__file__), 'langs.json')
+ df = open(filepath, 'r')
+ return json.load(df).get(locale)
+
+
+class GsxError(Exception):
+ def __init__(self, message=None, xml=None):
+
+ if message is not None:
+ raise ValueError(message)
+
+ if xml is not None:
+ el = ET.fromstring(xml)
+ self.code = el.findtext("*//faultcode")
+
+ if self.code is None:
+ raise ValueError("An unexpected error occured")
+
+ self.message = el.findtext("*//faultstring")
+
+ def __unicode__(self):
+ return self.message
+
+ def __str__(self):
+ return self.message
+
+
+class GsxCache(object):
+ """
+ >>> GsxCache('spam').set('eggs').get()
+ 'eggs'
+ """
+ shelf = None
+ tmpdir = tempfile.gettempdir()
+ expires = timedelta(minutes=20)
+ filename = os.path.join(tmpdir, "gsxws.tmp")
+
+ def __init__(self, key):
+ self.key = key
+ self.shelf = shelve.open(self.filename, protocol=-1)
+ self.now = datetime.now()
+
+ if not self.shelf.get(key):
+ # Initialize the key
+ self.set(None)
+
+ def get(self):
+ try:
+ d = self.shelf[self.key]
+ if d['expires'] > self.now:
+ return d['value']
+ else:
+ del self.shelf[self.key]
+ except KeyError:
+ return None
+
+ def set(self, value):
+ d = {
+ 'value': value,
+ 'expires': self.now + self.expires
+ }
+
+ self.shelf[self.key] = d
+ return self
+
+
+class GsxRequest(object):
+ "Creates and submits the SOAP envelope"
+ env = None
+ obj = None # The GsxObject being submitted
+ data = None # The GsxObject payload in XML format
+ body = None # The Body part of the SOAP envelope
+
+ _request = ""
+ _response = ""
+
+ envs = ('pr', 'it', 'ut',)
+ REGION_CODES = ('apac', 'am', 'la', 'emea',)
+
+ hosts = {'pr': 'ws2', 'it': 'wsit', 'ut': 'wsut'}
+ url = "https://gsx{env}.apple.com/gsx-ws/services/{region}/asp"
+
+ def __init__(self, **kwargs):
+ "Construct the SOAP envelope"
+ self.objects = []
+ self.env = ET.Element("soapenv:Envelope")
+ self.env.set("xmlns:core", "http://gsxws.apple.com/elements/core")
+ self.env.set("xmlns:glob", "http://gsxws.apple.com/elements/global")
+ self.env.set("xmlns:asp", "http://gsxws.apple.com/elements/core/asp")
+ self.env.set("xmlns:soapenv", "http://schemas.xmlsoap.org/soap/envelope/")
+
+ ET.SubElement(self.env, "soapenv:Header")
+ self.body = ET.SubElement(self.env, "soapenv:Body")
+
+ for k, v in kwargs.items():
+ self.obj = v
+ self._request = k
+ self.data = v.to_xml(self._request)
+ self._response = k.replace("Request", "Response")
+
+ def _submit(self, method, response=None):
+ "Construct and submit the final SOAP message"
+ global GSX_ENV, GSX_REGION, GSX_SESSION
+
+ root = ET.SubElement(self.body, self.obj._namespace + method)
+ url = self.url.format(env=self.hosts[GSX_ENV], region=GSX_REGION)
+
+ if method is "Authenticate":
+ root.append(self.data)
+ else:
+ request_name = method + "Request"
+ request = ET.SubElement(root, request_name)
+ request.append(GSX_SESSION)
+
+ if self._request == request_name:
+ "Some requests don't have a top-level container."
+ self.data = list(self.data)[0]
+
+ request.append(self.data)
+
+ data = ET.tostring(self.env, "UTF-8")
+ logging.debug(data)
+
+ parsed = urlparse(url)
+
+ ws = httplib.HTTPSConnection(parsed.netloc)
+ ws.putrequest("POST", parsed.path)
+ ws.putheader("User-Agent", "py-gsxws 0.9")
+ ws.putheader("Content-type", 'text/xml; charset="UTF-8"')
+ ws.putheader("Content-length", "%d" % len(data))
+ ws.putheader("SOAPAction", '"%s"' % method)
+ ws.endheaders()
+ ws.send(data)
+
+ res = ws.getresponse()
+ xml = res.read()
+
+ if res.status > 200:
+ raise GsxError(xml=xml)
+
+ logging.debug("Response: %s %s %s" % (res.status, res.reason, xml))
+ response = response or self._response
+
+ for r in ET.fromstring(xml).findall("*//%s" % response):
+ self.objects.append(GsxObject.from_xml(r))
+
+ return self.objects
+
+ def __str__(self):
+ return ET.tostring(self.env)
+
+
+class GsxObject(object):
+ "XML/SOAP representation of a GSX object"
+ _data = {}
+
+ def __init__(self, *args, **kwargs):
+ self._data = {}
+ self._formats = get_formats()
+
+ for a in args:
+ k = validate(a)
+ if k is not None:
+ kwargs[k] = a
+
+ for k, v in kwargs.items():
+ self.__setattr__(k, v)
+
+ def __setattr__(self, name, value):
+ if name.startswith("_"):
+ super(GsxObject, self).__setattr__(name, value)
+ return
+
+ if isinstance(value, int):
+ value = str(value)
+ if isinstance(value, date):
+ value = value.strftime(self._formats['df'])
+ if isinstance(value, time):
+ value = value.strftime(self._formats['tf'])
+ if isinstance(value, bool):
+ value = 'Y' if value else 'N'
+ if isinstance(value, date):
+ value = value.strftime(self._formats['df'])
+
+ self._data[name] = value
+
+ def __getattr__(self, name):
+ try:
+ return self._data[name]
+ except KeyError:
+ raise AttributeError("Invalid attribute: %s" % name)
+
+ def _submit(self, arg, method, ret=None):
+ self._req = GsxRequest(**{arg: self})
+ result = self._req._submit(method, ret)
+ return result if len(result) > 1 else result[0]
+
+ def to_xml(self, root):
+ "Returns this object as an XML element"
+ root = ET.Element(root)
+ for k, v in self._data.items():
+ el = ET.SubElement(root, k)
+ el.text = v
+
+ return root
+
+ @classmethod
+ def from_xml(cls, el):
+ obj = GsxObject()
+
+ for r in el:
+ newitem = cls.from_xml(r)
+ k, v = r.tag, r.text
+
+ if hasattr(obj, k):
+
+ # found duplicate tag %s" % k
+ attr = obj.__getattr__(k)
+
+ if isinstance(attr, list):
+ # append to existing list
+ newattr = attr.append(newitem)
+ setattr(obj, k, newattr)
+ else:
+ # convert to list
+ setattr(obj, k, [v, newitem])
+ else:
+ # unique tag %s -> set the dictionary" % k
+ setattr(obj, k, newitem)
+
+ if k in ["partsInfo"]:
+ # found new list item %s" % k
+ attr = []
+ attr.append(GsxObject.from_xml(r))
+
+ setattr(obj, k, attr)
+
+ if k in ['packingList', 'proformaFileData', 'returnLabelFileData']:
+ v = base64.b64decode(v)
+
+ if isinstance(v, basestring):
+ # convert dates to native Python type
+ if re.search('^\d{2}/\d{2}/\d{2}$', v):
+ m, d, y = v.split('/')
+ v = date(2000+int(y), int(m), int(d)).isoformat()
+
+ # strip currency prefix and munge into float
+ if re.search('Price$', k):
+ v = float(re.sub('[A-Z ,]', '', v))
+
+ # Convert timestamps to native Python type
+ # 18-Jan-13 14:38:04
+ if re.search('TimeStamp$', k):
+ v = datetime.strptime(v, '%d-%b-%y %H:%M:%S')
+
+ # convert Y and N to corresponding boolean
+ if re.search('^[YN]$', k):
+ v = (v == 'Y')
+
+ setattr(obj, k, v)
+
+ return obj
+
+
+class GsxSession(GsxObject):
+ userId = ""
+ password = ""
+ languageCode = ""
+ userTimeZone = ""
+ serviceAccountNo = ""
+
+ _cache = None
+ _cache_key = ""
+ _session_id = ""
+ _namespace = "glob:"
+
+ def __init__(self, user_id, password, sold_to, language, timezone):
+ self.userId = user_id
+ self.password = password
+ self.languageCode = language
+ self.userTimeZone = timezone
+ self.serviceAccountNo = str(sold_to)
+
+ md5 = hashlib.md5()
+ md5.update(user_id + self.serviceAccountNo)
+
+ self._cache_key = md5.hexdigest()
+ self._cache = GsxCache(self._cache_key)
+
+ def get_session(self):
+ session = ET.Element("userSession")
+ session_id = ET.Element("userSessionId")
+ session_id.text = self._session_id
+ session.append(session_id)
+ return session
+
+ def login(self):
+ global GSX_SESSION
+
+ if not self._cache.get() is None:
+ GSX_SESSION = self._cache.get()
+ else:
+ #result = self._submit("AuthenticateRequest", "Authenticated")
+ self._req = GsxRequest(AuthenticateRequest=self)
+ result = self._req._submit("Authenticate")
+ self._session_id = result[0].userSessionId
+ GSX_SESSION = self.get_session()
+ self._cache.set(GSX_SESSION)
+
+ return GSX_SESSION
+
+ def logout(self):
+ return GsxRequest(LogoutRequest=self)
+
+
+def connect(user_id, password, sold_to,
+ environment='it',
+ language='en',
+ timezone='CEST',
+ region='emea',
+ locale='en_XXX'):
+ """
+ Establishes connection with GSX Web Services.
+ Returns the session ID of the new connection.
+ """
+ global GSX_ENV
+ global GSX_LANG
+ global GSX_LOCALE
+ global GSX_REGION
+
+ GSX_LANG = language
+ GSX_REGION = region
+ GSX_LOCALE = locale
+ GSX_ENV = environment
+
+ act = GsxSession(user_id, password, sold_to, language, timezone)
+ return act.login()
+
+
+if __name__ == '__main__':
+ import doctest
+ import argparse
+
+ parser = argparse.ArgumentParser(description='Communicate with GSX Web Services')
+
+ parser.add_argument('user_id')
+ parser.add_argument('password')
+ parser.add_argument('sold_to')
+ parser.add_argument('--language', default='en')
+ parser.add_argument('--timezone', default='CEST')
+ parser.add_argument('--environment', default='pr')
+ parser.add_argument('--region', default='emea')
+
+ args = parser.parse_args()
+ logging.basicConfig(level=logging.DEBUG)
+ connect(**vars(args))
+ doctest.testmod()