summaryrefslogtreecommitdiff
path: root/src/ampache/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ampache/client.py')
-rw-r--r--src/ampache/client.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/src/ampache/client.py b/src/ampache/client.py
new file mode 100644
index 0000000..b58ba36
--- /dev/null
+++ b/src/ampache/client.py
@@ -0,0 +1,170 @@
+"""
+The main file. The AmpacheClient class allows us to make requests to the
+API.
+"""
+from datetime import datetime
+from ampache import models, exceptions
+import requests
+
+
+def get_utc_offset(time):
+ """
+ All the datetimes the API yields are in the form yyyy-mm-ddthh:mm:ss-00:00,
+ where the last 5 characters are the UTC offset. Python datetime objects can
+ parse this data, but in the form 0000 (sans the :) so we have to remove it.
+ """
+ prefix = time[:len(time) - 5]
+ suffix = time[len(time) - 5:].replace(':', '')
+
+ return prefix + suffix
+
+
+class AmpacheClient:
+ """
+ The ampache client class. Responsible for making API calls to your ampache
+ instance.
+ """
+ def __init__(self, host, api_key, version):
+ """
+ Host: url of your apache instance.
+ api_key: your api key (you can get it under settings->account). It's
+ preferable that you provide an admin's api key.
+ """
+ self._endpoint = '/server/json.server.php?'
+ self._version = version
+ self._host = host
+ self._api_key = api_key
+ self.authenticate()
+
+ def raise_by_status(self, error):
+ """
+ The ampache API has a bunch of error codes that can help us diagnose
+ the problem. Depending of the error code, we'll raise an appropiate exception.
+ """
+ if error.code == exceptions.ACCESS_ERROR_CODE:
+ raise exceptions.AccessException(error.message)
+ elif error.code == exceptions.AUTHENTICATION_ERROR_CODE:
+ raise exceptions.AuthenticationException(error.message)
+ elif error.code == exceptions.ACCESS_DENIED_ERROR_CODE:
+ raise exceptions.AccessDeniedException(error.message)
+ elif error.code == exceptions.NOT_FOUND_ERROR_CODE:
+ raise exceptions.NotFoundException(error.message)
+ elif error.code == exceptions.MISSING_ERROR_CODE:
+ raise exceptions.MissingMethodException(error.message)
+ elif error.code == exceptions.DEPRECIATED_ERROR_CODE:
+ raise exceptions.DepreciatedMethodException(error.message)
+ elif error.code == exceptions.BAD_REQUEST_ERROR_CODE:
+ raise exceptions.BadRequestException(error.message)
+ elif error.code == exceptions.FAILED_ACCESS_ERROR_CODE:
+ raise exceptions.FailedAccessException(error.message)
+
+ def request(self, params, headers):
+ """
+ All in one function to pass requests to the API.
+ """
+ response = requests.get(self._host + self._endpoint, params=params,
+ headers=headers)
+ if not response.ok:
+ response.raise_for_status()
+
+ if headers['Content-type'] == 'application/json':
+ response = response.json()
+
+ if 'error' in response:
+ self.raise_by_status(models.Error(int(response['error']['errorCode']),
+ response['error']['errorMessage']))
+
+ return response
+
+ def authenticate(self):
+ """
+ Authenticate with the API, setting the token.
+ """
+ params = {
+ 'action': 'handshake',
+ 'auth': self._api_key,
+ 'version': self._version
+ }
+ headers = {
+ 'Content-type': 'application/json'
+ }
+
+ data = self.request(params, headers)
+ self._auth = models.AuthToken(datetime.strptime(get_utc_offset(data['session_expire']), '%Y-%m-%dT%H:%M:%S%z'),
+ data['auth'])
+
+ def get_song(self, song_id):
+ """
+ Get the detail of a song given its primary key.
+ """
+ self.renew_token()
+
+ params = {
+ 'action': 'song',
+ 'auth': self._auth.auth,
+ 'filter': song_id,
+ 'version': self._version
+ }
+ headers = {
+ 'Content-type': 'application/json'
+ }
+ data = self.request(params, headers)
+ song = models.Song(data['id'], data['title'], data['album']['name'],
+ data['albumartist']['name'])
+
+ return song
+
+ def get_album(self, album_id):
+ params = {
+ 'action': 'album_songs',
+ 'auth': self._auth.auth,
+ 'filter': album_id,
+ 'version': self._version
+ }
+ headers = {
+ 'Content-type': 'application/json'
+ }
+ data = self.request(params, headers)
+ album = self.process_album(data)
+
+ return album
+
+ def process_album(self, album_data):
+ album_id = album_data['song'][0]['album']['id']
+ album_title = album_data['song'][0]['album']['name']
+ album_artist = album_data['song'][0]['artist']['name']
+ songs = []
+
+ for song in album_data['song']:
+ new_song = models.Song(song['id'], song['title'], album_title,
+ album_artist)
+ songs.append(new_song)
+
+ return models.Album(album_id, album_title, album_artist, songs)
+
+ def download(self, song_id, destination):
+ """
+ Download a song to destination.
+ """
+ params = {
+ 'action': 'download',
+ 'auth': self._auth.auth,
+ 'type': 'song',
+ 'id': song_id,
+ 'version': self._version,
+ 'format': 'mp3'
+ }
+ headers = {
+ 'Content-type': 'audio/mpeg'
+ }
+ data = self.request(params, headers)
+
+ with open(destination, 'wb') as song:
+ song.write(data.content)
+
+ def renew_token(self):
+ if datetime.now(self._auth.expires.tzinfo) > self._auth.expires:
+ self.authenticate()
+
+ def get_token(self):
+ return self._auth