"""HMRC API session with authorization support"""
from datetime import datetime
import getpass
import os
from pathlib import Path
import platform
import socket
from urllib.parse import urljoin, quote
import uuid
import psutil
from requests.adapters import HTTPAdapter
from requests_oauthlib import OAuth2Session
from urllib3.util.retry import Retry
__all__ = [
'HmrcSession',
]
OAUTHLIB_INSECURE_TRANSPORT = 'OAUTHLIB_INSECURE_TRANSPORT'
"""Environment variable required for out-of-band authorization"""
UUID_NS = uuid.UUID('c9da8da2-c7e0-4873-97fc-6d783e908751')
"""Namespace for fraud prevention client identifiers"""
CLIENT_ID = 'Oo4p6xztJXMIMhpZQgLR3UccFnQN'
"""Client ID issued by HMRC"""
CLIENT_NON_SECRET = '114e4a63-6172-4348-81ec-c0282838fa12'
"""Client "secret" issued by HMRC
The client "secret" is associated with the codebase (rather than with
any individual user). It does not itself grant access to any user
data: it merely allows the codebase to access the authentication
endpoints and to therefore direct the user through the real
authentication process.
There is zero point to this client "secret" from a security
perspective, but it is required by the HMRC API design.
Interested readers are politely requested to steal a client secret
from elsewhere, to avoid disruption to any users of this open-source
codebase. HMRC publishes a list of authorised closed-source
applications that run locally (e.g. as VBA macros within an Excel
spreadsheet): please steal a secret from one of those instead.
"""
[docs]class HmrcSession(OAuth2Session):
"""HMRC API session"""
BASE_URI = 'https://api.service.hmrc.gov.uk'
BASE_TEST_URI = 'https://test-api.service.hmrc.gov.uk'
AUTH_URI = '/oauth/authorize'
TOKEN_URI = '/oauth/token'
OOB_REDIRECT_URI = 'urn:ietf:wg:oauth:2.0:oob'
def __init__(self, client_id=None, *, client_secret=None, test=False,
uri=None, token=None, storage=None, gdpr_consent=False,
**kwargs):
# Construct base URI
self.test = test
if uri is None:
self.uri = self.BASE_TEST_URI if self.test else self.BASE_URI
# Set default out-of-band redirect URI
kwargs.setdefault('redirect_uri', self.OOB_REDIRECT_URI)
# Use default client credentials if none provided
if client_id is None:
client_id = CLIENT_ID
if client_secret is None:
client_secret = CLIENT_NON_SECRET
self.client_secret = client_secret
# Configure automatic token refresh
kwargs.setdefault('auto_refresh_url',
urljoin(self.uri, self.TOKEN_URI))
kwargs.setdefault('auto_refresh_kwargs', {
'client_id': client_id,
'client_secret': client_secret,
})
# Allow server token to be passed as a plain string
if isinstance(token, str):
token = {'access_token': token, 'token_type': 'bearer'}
# Use token storage if provided
self.storage = storage
if self.storage is not None:
if token is None:
token = self.storage.token
kwargs.setdefault('token_updater', self.storage.save)
# Use existing token's scope, if applicable
scope = [] if token is None else token.get('scope', [])
# Record GDPR consent status
self.gdpr_consent = gdpr_consent or test
# Call superclass
super().__init__(client_id, scope=scope, token=token, **kwargs)
# Configure automatic retries since API is unreliable
retries = Retry(status_forcelist=[503])
adapter = HTTPAdapter(max_retries=retries)
self.mount('https://', adapter)
self.mount('http://', adapter)
def __repr__(self):
return '%s(%r, uri=%r, scope=%r)' % (
self.__class__.__name__, self.client_id, self.uri, self.scope
)
[docs] def extend_scope(self, scope):
"""Extend OAuth2 scope"""
current = set(self.scope)
self.scope = self.scope + [x for x in scope if x not in current]
[docs] def authorization_url(self, url=None, **kwargs):
"""Form an authorization URL"""
# pylint: disable=arguments-differ
if url is None:
url = urljoin(self.uri, self.AUTH_URI)
return super().authorization_url(url, **kwargs)
[docs] def fetch_token(self, url=None, **kwargs):
"""Fetch an access token"""
# pylint: disable=arguments-differ
# Use default token URI if none provided
if url is None:
url = urljoin(self.uri, self.TOKEN_URI)
# Use stored client secret if available
kwargs.setdefault('client_secret', self.client_secret)
# Force client_id to be included in request body
kwargs.setdefault('include_client_id', True)
# Include authorization URI to support test user flow
kwargs.setdefault('auth_uri', self.authorization_url()[0])
# Fetch token, allowing for use of out-of-band redirect URI
saved = os.environ.get(OAUTHLIB_INSECURE_TRANSPORT)
try:
# Allow use of out-of-band redirect URI if applicable
if self.redirect_uri == self.OOB_REDIRECT_URI:
os.environ[OAUTHLIB_INSECURE_TRANSPORT] = '1'
# Fetch token
token = super().fetch_token(url, **kwargs)
finally:
# Restore environment
if saved is None:
del os.environ[OAUTHLIB_INSECURE_TRANSPORT]
else:
os.environ[OAUTHLIB_INSECURE_TRANSPORT] = saved
# Store token if storage is available
if self.storage:
self.storage.save(token)
return token
[docs] def request(self, method, url, params=None, data=None, headers=None,
**kwargs):
"""Send request"""
# pylint: disable=arguments-differ,too-many-arguments
headers = {} if headers is None else headers.copy()
headers.update(self.defraud())
return super().request(method, url, params=params, data=data,
headers=headers, **kwargs)
[docs] @staticmethod
def dmifile(filename, default='Unknown'):
"""Read DMI file contents"""
try:
path = Path('/sys/devices/virtual/dmi/id/%s' % filename)
return path.read_text(encoding='utf8').strip() or default
except FileNotFoundError:
return default
[docs] def defraud(self):
"""Construct fraud prevention headers"""
timestamp = datetime.utcnow().isoformat(timespec='milliseconds') + 'Z'
headers = {
'Gov-Client-Connection-Method': 'DESKTOP_APP_DIRECT',
'Gov-Client-Device-ID': str(UUID_NS),
'Gov-Client-Local-IPs': '127.0.0.1',
'Gov-Client-Local-IPs-Timestamp': timestamp,
'Gov-Client-MAC-Addresses': quote('52:54:00:12:34:56'),
'Gov-Client-Multi-Factor': '',
'Gov-Client-Screens': '&'.join([
'width=1920',
'height=1080',
'scaling-factor=1',
'colour-depth=24',
]),
'Gov-Client-Timezone': 'UTC+00:00',
'Gov-Client-User-Agent': '&'.join([
'os-family=Linux',
'os-version=1',
'device-manufacturer=Intel',
'device-model=Computer',
]),
'Gov-Client-User-IDs': 'os=user',
'Gov-Client-Window-Size': 'width=640&height=480',
'Gov-Vendor-License-IDs': 'hmrc=497427732047504C2C20626974636821',
'Gov-Vendor-Product-Name': quote('Python API'),
'Gov-Vendor-Version': 'hmrc=1.1.3',
}
if self.gdpr_consent:
nics = psutil.net_if_addrs()
headers['Gov-Client-Local-IPs'] = ','.join(sorted(
quote(addr.address) for nic in nics.values() for addr in nic
if addr.family == socket.AF_INET
))
headers['Gov-Client-MAC-Addresses'] = ','.join(sorted(
quote(addr.address) for nic in nics.values() for addr in nic
if addr.family == psutil.AF_LINK
))
headers['Gov-Client-Device-ID'] = str(uuid.uuid5(
UUID_NS, headers['Gov-Client-MAC-Addresses']
))
tzsec = datetime.now().astimezone().utcoffset().total_seconds()
tzmin = tzsec / 60
if tzmin >= 0:
(tzhour, tzmin) = divmod(tzmin, 60)
else:
(tzhour, tzmin) = divmod(-tzmin, 60)
tzhour = -tzhour
headers['Gov-Client-Timezone'] = 'UTC%+03d:%02d' % (tzhour, tzmin)
headers['Gov-Client-User-Agent'] = '&'.join([
'os-family=%s' % quote(platform.system()),
'os-version=%s' % quote(platform.release()),
'device-manufacturer=%s' % quote(self.dmifile('sys_vendor')),
'device-model=%s' % quote(self.dmifile('product_family')),
])
headers['Gov-Client-User-IDs'] = 'os=%s' % getpass.getuser()
return headers