summaryrefslogtreecommitdiff
path: root/src/ampache/ampache.py
blob: 64939d9772317bb8934da9ee61f01b362c047c98 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""
The main file. The AmpacheClient class allows us to make requests to the
API.
"""
from datetime import datetime
from sys import version
from ampache import models, exceptions
import requests


def get_utc_offset(time):
    """
    All the datetimes the API yields are in the form yyyy-mm-ddthh:mm:ss-00:00,
    where the last 5 characters are the UTC offset. Python datetime objects can
    parse this data, but in the form 0000 (sans the :) so we have to remove it.
    """
    prefix = time[:len(time) - 5]
    suffix = time[len(time) - 5:].replace(':', '')

    return prefix + suffix


class AmpacheClient:
    """
    The ampache client class. Responsible for making API calls to your ampache
    instance.
    """
    def __init__(self, host, api_key, version):
        """
        Host: url of your apache instance.
        api_key: your api key (you can get it under settings->account). It's
        preferable that you provide an admin's api key.
        """
        self._endpoint = '/server/json.server.php?'
        self._version = version
        self._host = host
        self._api_key = api_key
        self.authenticate()

    def raise_by_status(self, error):
        """
        The ampache API has a bunch of error codes that can help us diagnose
        the problem. Depending of the error code, we'll raise an appropiate exception.
        """
        if error.code == exceptions.ACCESS_ERROR_CODE:
            raise exceptions.AccessException(error.message)
        elif error.code == exceptions.AUTHENTICATION_ERROR_CODE:
            raise exceptions.AuthenticationException(error.message)
        elif error.code == exceptions.ACCESS_DENIED_ERROR_CODE:
            raise exceptions.AccessDeniedException(error.message)
        elif error.code == exceptions.NOT_FOUND_ERROR_CODE:
            raise exceptions.NotFoundException(error.message)
        elif error.code == exceptions.MISSING_ERROR_CODE:
            raise exceptions.MissingMethodException(error.message)
        elif error.code == exceptions.DEPRECIATED_ERROR_CODE:
            raise exceptions.DepreciatedMethodException(error.message)
        elif error.code == exceptions.BAD_REQUEST_ERROR_CODE:
            raise exceptions.BadRequestException(error.message)
        elif error.code == exceptions.FAILED_ACCESS_ERROR_CODE:
            raise exceptions.FailedAccessException(error.message)

    def request(self, params, headers):
        """
        All in one function to pass JSON requests to the API.
        """
        response = requests.get(self._host + self._endpoint, params=params,
                                headers=headers)
        if not response.ok:
            response.raise_for_status()

        if headers['Content-type'] == 'application/json':
            response = response.json()

        if 'error' in response:
            self.raise_by_status(models.Error(int(response['error']['errorCode']),
                                              response['error']['errorMessage']))

        return response

    def authenticate(self):
        """
        Authenticate with the API, setting the token.
        """
        params = {
            'action': 'handshake',
            'auth': self._api_key,
            'version': self._version
        }
        headers = {
            'Content-type': 'application/json'
        }

        data = self.request(params, headers)
        self._auth = models.AuthToken(datetime.strptime(get_utc_offset(data['session_expire']), '%Y-%m-%dT%H:%M:%S%z'),
                                      data['auth'])

    def get_song(self, song_id):
        """
        Get the detail of a song given its primary key.
        """
        self.renew_token()

        params = {
            'action': 'song',
            'auth': self._auth.auth,
            'filter': song_id,
            'version': self._version
        }
        headers = {
            'Content-type': 'application/json'
        }
        data = self.request(params, headers)
        song = models.Song(data['id'], data['title'], data['album']['name'],
                           data['albumartist']['name'])

        return song

    def get_album(self, album_id):
        params = {
            'action': 'album_songs',
            'auth': self._auth.auth,
            'filter': album_id,
            'version': self._version
        }
        headers = {
            'Content-type': 'application/json'
        }
        data = self.request(params, headers)
        album = self.process_album(data)

        return album

    def process_album(self, album_data):
        album_id = album_data['song'][0]['album']['id']
        album_title = album_data['song'][0]['album']['name']
        album_artist = album_data['song'][0]['artist']['name']
        songs = []

        for song in album_data['song']:
            new_song = models.Song(song['id'], song['title'], album_title,
                                   album_artist)
            songs.append(new_song)

        return models.Album(album_id, album_title, album_artist, songs)

    def download(self, song_id, destination):
        """
        Download a song to destination.
        """
        params = {
            'action': 'download',
            'auth': self._auth.auth,
            'type': 'song',
            'id': song_id,
            'version': self._version,
            'format': 'mp3'
        }
        headers = {
            'Content-type': 'audio/mpeg'
        }
        data = self.request(params, headers)

        with open(destination, 'wb') as song:
            song.write(data.content)

    def renew_token(self):
        if datetime.now(self._auth.expires.tzinfo) > self._auth.expires:
            self.authenticate()

    def get_token(self):
        return self._auth