""" The main file. The AmpacheClient class allows us to make requests to the API. """ from datetime import datetime from sys import version from ampache import models, exceptions import requests def get_utc_offset(time): """ All the datetimes the API yields are in the form yyyy-mm-ddthh:mm:ss-00:00, where the last 5 characters are the UTC offset. Python datetime objects can parse this data, but in the form 0000 (sans the :) so we have to remove it. """ prefix = time[:len(time) - 5] suffix = time[len(time) - 5:].replace(':', '') return prefix + suffix class AmpacheClient: """ The ampache client class. Responsible for making API calls to your ampache instance. """ def __init__(self, host, api_key, version): """ Host: url of your apache instance. api_key: your api key (you can get it under settings->account). It's preferable that you provide an admin's api key. """ self._endpoint = '/server/json.server.php?' self._version = version self._host = host self._api_key = api_key self.authenticate() def raise_by_status(self, error): """ The ampache API has a bunch of error codes that can help us diagnose the problem. Depending of the error code, we'll raise an appropiate exception. """ if error.code == exceptions.ACCESS_ERROR_CODE: raise exceptions.AccessException(error.message) elif error.code == exceptions.AUTHENTICATION_ERROR_CODE: raise exceptions.AuthenticationException(error.message) elif error.code == exceptions.ACCESS_DENIED_ERROR_CODE: raise exceptions.AccessDeniedException(error.message) elif error.code == exceptions.NOT_FOUND_ERROR_CODE: raise exceptions.NotFoundException(error.message) elif error.code == exceptions.MISSING_ERROR_CODE: raise exceptions.MissingMethodException(error.message) elif error.code == exceptions.DEPRECIATED_ERROR_CODE: raise exceptions.DepreciatedMethodException(error.message) elif error.code == exceptions.BAD_REQUEST_ERROR_CODE: raise exceptions.BadRequestException(error.message) elif error.code == exceptions.FAILED_ACCESS_ERROR_CODE: raise exceptions.FailedAccessException(error.message) def request(self, params, headers): """ All in one function to pass JSON requests to the API. """ response = requests.get(self._host + self._endpoint, params=params, headers=headers) if not response.ok: response.raise_for_status() if headers['Content-type'] == 'application/json': response = response.json() if 'error' in response: self.raise_by_status(models.Error(int(response['error']['errorCode']), response['error']['errorMessage'])) return response def authenticate(self): """ Authenticate with the API, setting the token. """ params = { 'action': 'handshake', 'auth': self._api_key, 'version': self._version } headers = { 'Content-type': 'application/json' } data = self.request(params, headers) self._auth = models.AuthToken(datetime.strptime(get_utc_offset(data['session_expire']), '%Y-%m-%dT%H:%M:%S%z'), data['auth']) def get_song(self, song_id): """ Get the detail of a song given its primary key. """ self.renew_token() params = { 'action': 'song', 'auth': self._auth.auth, 'filter': song_id, 'version': self._version } headers = { 'Content-type': 'application/json' } data = self.request(params, headers) song = models.Song(data['id'], data['title'], data['album']['name'], data['albumartist']['name']) return song def get_album(self, album_id): params = { 'action': 'album_songs', 'auth': self._auth.auth, 'filter': album_id, 'version': self._version } headers = { 'Content-type': 'application/json' } data = self.request(params, headers) album = self.process_album(data) return album def process_album(self, album_data): album_id = album_data['song'][0]['album']['id'] album_title = album_data['song'][0]['album']['name'] album_artist = album_data['song'][0]['artist']['name'] songs = [] for song in album_data['song']: new_song = models.Song(song['id'], song['title'], album_title, album_artist) songs.append(new_song) return models.Album(album_id, album_title, album_artist, songs) def download(self, song_id, destination): """ Download a song to destination. """ params = { 'action': 'download', 'auth': self._auth.auth, 'type': 'song', 'id': song_id, 'version': self._version, 'format': 'mp3' } headers = { 'Content-type': 'audio/mpeg' } data = self.request(params, headers) with open(destination, 'wb') as song: song.write(data.content) def renew_token(self): if datetime.now(self._auth.expires.tzinfo) > self._auth.expires: self.authenticate() def get_token(self): return self._auth