Source code for pysnc.client

from io import BytesIO

import requests
from requests.auth import AuthBase
import re
import logging
import base64
from typing import Callable, no_type_check

from requests.cookies import MockRequest, MockResponse
from requests.structures import CaseInsensitiveDict
from requests.utils import get_encoding_from_headers
from requests.adapters import HTTPAdapter, Retry

from .exceptions import *
from .record import GlideRecord
from .attachment import Attachment
from .utils import get_instance, MockHeaders
from .auth import ServiceNowFlow


[docs]class ServiceNowClient(object): """ ServiceNow Python Client :param str instance: The instance to connect to e.g. ``https://dev00000.service-now.com`` or ``dev000000`` :param auth: Username password combination ``(name,pass)`` or :class:`pysnc.ServiceNowOAuth2` or ``requests.sessions.Session`` or ``requests.auth.AuthBase`` object :param proxy: HTTP(s) proxy to use as a str ``'http://proxy:8080`` or dict ``{'http':'http://proxy:8080'}`` :param bool verify: Verify the SSL/TLS certificate OR the certificate to use. Useful if you're using a self-signed HTTPS proxy. :param cert: if String, path to ssl client cert file (.pem). If Tuple, (‘cert’, ‘key’) pair. """ def __init__(self, instance, auth, proxy=None, verify=None, cert=None, auto_retry=True): self._log = logging.getLogger(__name__) self.__instance = get_instance(instance) if proxy: if type(proxy) != dict: proxies = dict(http=proxy, https=proxy) else: proxies = proxy self.__proxies = proxies if verify is None: verify = True # default to verify with proxy else: self.__proxies = None if auth is not None and cert is not None: raise AuthenticationException('Cannot specify both auth and cert') elif isinstance(auth, (list, tuple)) and len(auth) == 2: self.__user = auth[0] auth = requests.auth.HTTPBasicAuth(auth[0], auth[1]) self.__session = requests.session() self.__session.auth = auth elif isinstance(auth, AuthBase): self.__session = requests.session() self.__session.auth = auth elif isinstance(auth, requests.sessions.Session): # maybe we've got an oauth token? Let this be permissive self.__session = auth elif isinstance(auth, ServiceNowFlow): self.__session = auth.authenticate(self.__instance, proxies=self.__proxies, verify=verify) elif cert is not None: self.__session.cert = cert else: raise AuthenticationException('No valid authentication method provided') if proxy: self.__session.proxies = self.__proxies if verify is not None: self.__session.verify = verify self.__session.headers.update(dict(Accept="application/json")) if auto_retry is True: # https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry retry = Retry(total=4, backoff_factor=0.2, status_forcelist=[429, 500, 502, 503]) self.__session.mount(self.__instance, HTTPAdapter(max_retries=retry)) self.table_api = TableAPI(self) self.attachment_api = AttachmentAPI(self) self.batch_api = BatchAPI(self)
[docs] def GlideRecord(self, table, batch_size=100, rewindable=True) -> GlideRecord: """ Create a :class:`pysnc.GlideRecord` for a given table against the current client :param str table: The table name e.g. ``problem`` :param int batch_size: Batch size (items returned per HTTP request). Default is ``100``. :param bool rewindable: If we can rewind the record. Default is ``True``. If ``False`` then we cannot rewind the record, which means as an Iterable this object will be 'spent' after iteration. This is normally the default behavior expected for a python Iterable, but not a GlideRecord. When ``False`` less memory will be consumed, as each previous record will be collected. :return: :class:`pysnc.GlideRecord` """ return GlideRecord(self, table, batch_size, rewindable)
[docs] def Attachment(self, table) -> Attachment: """ Create an Attachment object for the current client :return: :class:`pysnc.Attachment` """ return Attachment(self, table)
@property def instance(self) -> str: """ The instance we're associated with. :return: Instance URI :rtype: str """ return self.__instance @property def session(self): """ :return: The requests session """ return self.__session
[docs] @staticmethod def guess_is_sys_id(value) -> bool: """ Attempt to guess if this is a probable sys_id :param str value: the value to check :return: If this is probably a sys_id :rtype: bool """ return re.match(r'^[A-Za-z0-9]{32}$', value) is not None
[docs]class API(object): def __init__(self, client): self._client = client @property def session(self): return self._client.session # noinspection PyMethodMayBeStatic def _set_params(self, record=None): params = {} if record is None else record._parameters() if 'sysparm_display_value' not in params: params['sysparm_display_value'] = 'all' if 'sysparm_exclude_reference_link' not in params: params['sysparm_exclude_reference_link'] = 'true' # Scratch it!, by default params['sysparm_suppress_pagination_header'] = 'true' # Required for large queries return params # noinspection PyMethodMayBeStatic def _validate_response(self, response: requests.Response) -> None: assert response is not None, f"response argument required" code = response.status_code if code >= 400: try: rjson = response.json() if code == 404: raise NotFoundException(rjson) if code == 403: raise RoleException(rjson) if code == 401: raise AuthenticationException(rjson) raise RequestException(rjson) except requests.exceptions.JSONDecodeError: raise RequestException(response.text) def _send(self, req, stream=False) -> requests.Response: # https://stackoverflow.com/a/55889308/253594 # if we're oauth, we have to do magic for prepared requests if hasattr(self.session, 'token'): try: req.url, req.headers, req.data = self.session._client.add_token( req.url, http_method=req.method, body=req.data, headers=req.headers ) except Exception as e: if e.__class__.__name__ == 'TokenExpiredError': # use refresh token to get new token if self.session.auto_refresh_url: if hasattr(req, 'auth'): req.auth = None self.session.refresh_token(self.session.auto_refresh_url) else: raise e else: raise e request = self.session.prepare_request(req) # Merge environment settings into session settings = self.session.merge_environment_settings(request.url, {}, stream, None, None) r = self.session.send(request, **settings) self._validate_response(r) return r
[docs]class TableAPI(API): def _target(self, table, sys_id=None) -> str: target = "{url}/api/now/table/{table}".format(url=self._client.instance, table=table) if sys_id: target = "{}/{}".format(target, sys_id) return target
[docs] def list(self, record: GlideRecord) -> requests.Response: params = self._set_params(record) target_url = self._target(record.table) req = requests.Request('GET', target_url, params=params) return self._send(req)
[docs] def get(self, record: GlideRecord, sys_id: str) -> requests.Response: params = self._set_params(record) # delete extra stuff if 'sysparm_offset' in params: del params['sysparm_offset'] target_url = self._target(record.table, sys_id) req = requests.Request('GET', target_url, params=params) return self._send(req)
[docs] def put(self, record: GlideRecord) -> requests.Response: return self.patch(record)
[docs] def patch(self, record: GlideRecord) -> requests.Response: body = record.serialize(changes_only=True) params = self._set_params() target_url = self._target(record.table, record.sys_id) req = requests.Request('PATCH', target_url, params=params, json=body) return self._send(req)
[docs] def post(self, record: GlideRecord) -> requests.Response: body = record.serialize() params = self._set_params() target_url = self._target(record.table) req = requests.Request('POST', target_url, params=params, json=body) return self._send(req)
[docs] def delete(self, record: GlideRecord) -> requests.Response: target_url = self._target(record.table, record.sys_id) req = requests.Request('DELETE', target_url) return self._send(req)
[docs]class AttachmentAPI(API): API_VERSION = 'v1' def _target(self, sys_id=None): target = "{url}/api/now/{version}/attachment".format(url=self._client.instance, version=self.API_VERSION) if sys_id: target = "{}/{}".format(target, sys_id) return target
[docs] def get(self, sys_id=None): target_url = self._target(sys_id) req = requests.Request('GET', target_url, params={}) return self._send(req)
[docs] def get_file(self, sys_id, stream=True): """ This may be dangerous, as stream is true and if not fully read could leave open handles One should always ``with api.get_file(sys_id) as f:`` """ target_url = "{}/file".format(self._target(sys_id)) req = requests.Request('GET', target_url) return self._send(req, stream=stream)
[docs] def list(self, attachment: Attachment): params = self._set_params(attachment) url = self._target() req = requests.Request('GET', url, params=params, headers=dict(Accept="application/json")) return self._send(req)
[docs] def upload_file(self, file_name, table_name, table_sys_id, file, content_type=None, encryption_context=None): url = f"{self._target()}/file" params = {'file_name': file_name, 'table_name': table_name, 'table_sys_id': f"{table_sys_id}"} if encryption_context: params['encryption_context'] = encryption_context if not content_type: content_type = 'application/octet-stream' headers = {'Content-Type': content_type} req = requests.Request('POST', url, params=params, headers=headers, data=file) return self._send(req)
[docs] def delete(self, sys_id): target_url = self._target(sys_id) req = requests.Request('DELETE', target_url) return self._send(req)
[docs]class BatchAPI(API): API_VERSION = 'v1' def __init__(self, client): API.__init__(self, client) self.__requests = [] self.__stored_requests = {} self.__hooks = {} self.__request_id = 0 def _batch_target(self): return "{url}/api/now/{version}/batch".format(url=self._client.instance, version=self.API_VERSION) def _table_target(self, table, sys_id=None): # note: the instance is still in here so requests behaves normally when preparing requests target = "{url}/api/now/table/{table}".format(url=self._client.instance, table=table) if sys_id: target = "{}/{}".format(target, sys_id) return target def _next_id(self): self.__request_id += 1 return self.__request_id def _add_request(self, request: requests.Request, hook: Callable): prepared = request.prepare() request_id = str(id(prepared)) headers = [{'name': k, 'value': v} for (k,v) in prepared.headers.items()] relative_url = prepared.url[prepared.url.index('/', 8):] # type: ignore ## slice from the first non https:// slash now_request = { 'id': request_id, 'method': prepared.method, 'url': relative_url, 'headers': headers, #'exclude_response_headers': False } if prepared.body: now_request['body'] = base64.b64encode(prepared.body).decode() # type: ignore ## could theoretically do us dirty self.__hooks[request_id] = hook self.__stored_requests[request_id] = prepared self.__requests.append(now_request) @no_type_check def _transform_response(self, req: requests.PreparedRequest, serviced_request) -> requests.Response: # modeled after requests.adapters.HttpAdapter.build_response response = requests.Response() response.status_code = serviced_request['status_code'] headers = {k: v for (k, v) in [(e['name'], e['value']) for e in serviced_request.get("headers", [])]} response.headers = CaseInsensitiveDict(headers) response.encoding = get_encoding_from_headers(response.headers) body = base64.b64decode(serviced_request.get('body', '')) response.raw = BytesIO(body) if isinstance(req.url, bytes): response.url = req.url.decode("utf-8") else: response.url = req.url # type: ignore # cookies - kinda hack an adapter in req = MockRequest(req) res = MockResponse(MockHeaders(headers)) response.cookies.extract_cookies(res, req) response.request = req # response.connection = None return response
[docs] def execute(self, attempt=0): if attempt > 2: # just give up and tell em we tried for h in self.__hooks: self.__hooks[h](None) self.__hooks = {} self.__requests = [] self.__stored_requests = {} bid = self._next_id() body = { 'batch_request_id': bid, 'rest_requests': self.__requests } r = self.session.post(self._batch_target(), json=body) self._validate_response(r) data = r.json() assert str(bid) == data['batch_request_id'], f"How did we get a response id different from {bid}" for response in data['serviced_requests']: response_id = response['id'] assert response_id in self.__hooks, f"Somehow has no hook for {response_id}" assert response_id in self.__stored_requests, f"Somehow we did not store request for {response_id}" self.__hooks[response['id']](self._transform_response(self.__stored_requests.pop(response_id), response)) del self.__hooks[response_id] self.__requests = list(filter(lambda x: x['id'] != response_id, self.__requests)) if len(data['unserviced_requests']) > 0: self.execute(attempt=attempt+1)
[docs] def get(self, record: GlideRecord, sys_id: str, hook: Callable) -> None: params = self._set_params(record) if 'sysparm_offset' in params: del params['sysparm_offset'] target_url = self._table_target(record.table, sys_id) req = requests.Request('GET', target_url, params=params) self._add_request(req, hook)
[docs] def put(self, record: GlideRecord, hook: Callable) -> None: self.patch(record, hook)
[docs] def patch(self, record: GlideRecord, hook: Callable) -> None: body = record.serialize(changes_only=True) params = self._set_params() target_url = self._table_target(record.table, record.sys_id) req = requests.Request('PATCH', target_url, params=params, json=body) self._add_request(req, hook)
[docs] def post(self, record: GlideRecord, hook: Callable): body = record.serialize() params = self._set_params() target_url = self._table_target(record.table) req = requests.Request('POST', target_url, params=params, json=body) self._add_request(req, hook)
[docs] def delete(self, record: GlideRecord, hook: Callable): target_url = self._table_target(record.table, record.sys_id) req = requests.Request('DELETE', target_url) self._add_request(req, hook)
[docs] def list(self, record: GlideRecord, hook: Callable): params = self._set_params(record) target_url = self._table_target(record.table) req = requests.Request('GET', target_url, params=params) self._add_request(req, hook)