1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
"""
The main file. The AmpacheClient class allows us to make requests to the
API.
"""
from datetime import datetime
from ampache import models, exceptions
import requests
def get_utc_offset(time):
"""
All the datetimes the API yields are in the form yyyy-mm-ddthh:mm:ss-00:00,
where the last 5 characters are the UTC offset. Python datetime objects can
parse this data, but in the form 0000 (sans the :) so we have to remove it.
"""
prefix = time[:len(time) - 5]
suffix = time[len(time) - 5:].replace(':', '')
return prefix + suffix
class AmpacheClient:
"""
The ampache client class. Responsible for making API calls to your ampache
instance.
"""
def __init__(self, host, api_key, version):
"""
Host: url of your apache instance.
api_key: your api key (you can get it under settings->account). It's
preferable that you provide an admin's api key.
"""
self._endpoint = '/server/json.server.php?'
self._version = version
self._host = host
self._api_key = api_key
self.authenticate()
def raise_by_status(self, error):
"""
The ampache API has a bunch of error codes that can help us diagnose
the problem. Depending of the error code, we'll raise an appropiate exception.
"""
if error.code == exceptions.ACCESS_ERROR_CODE:
raise exceptions.AccessException(error.message)
elif error.code == exceptions.AUTHENTICATION_ERROR_CODE:
raise exceptions.AuthenticationException(error.message)
elif error.code == exceptions.ACCESS_DENIED_ERROR_CODE:
raise exceptions.AccessDeniedException(error.message)
elif error.code == exceptions.NOT_FOUND_ERROR_CODE:
raise exceptions.NotFoundException(error.message)
elif error.code == exceptions.MISSING_ERROR_CODE:
raise exceptions.MissingMethodException(error.message)
elif error.code == exceptions.DEPRECIATED_ERROR_CODE:
raise exceptions.DepreciatedMethodException(error.message)
elif error.code == exceptions.BAD_REQUEST_ERROR_CODE:
raise exceptions.BadRequestException(error.message)
elif error.code == exceptions.FAILED_ACCESS_ERROR_CODE:
raise exceptions.FailedAccessException(error.message)
def request(self, params, headers):
"""
All in one function to pass requests to the API.
"""
response = requests.get(self._host + self._endpoint, params=params,
headers=headers)
if not response.ok:
response.raise_for_status()
if headers['Content-type'] == 'application/json':
response = response.json()
if 'error' in response:
self.raise_by_status(models.Error(int(response['error']['errorCode']),
response['error']['errorMessage']))
return response
def authenticate(self):
"""
Authenticate with the API, setting the token.
"""
params = {
'action': 'handshake',
'auth': self._api_key,
'version': self._version
}
headers = {
'Content-type': 'application/json'
}
data = self.request(params, headers)
self._auth = models.AuthToken(datetime.strptime(get_utc_offset(data['session_expire']), '%Y-%m-%dT%H:%M:%S%z'),
data['auth'])
def get_song(self, song_id):
"""
Get the detail of a song given its primary key.
"""
self.renew_token()
params = {
'action': 'song',
'auth': self._auth.auth,
'filter': song_id,
'version': self._version
}
headers = {
'Content-type': 'application/json'
}
data = self.request(params, headers)
song = models.Song(data['id'], data['title'], data['album']['name'],
data['albumartist']['name'])
return song
def get_album(self, album_id):
params = {
'action': 'album_songs',
'auth': self._auth.auth,
'filter': album_id,
'version': self._version
}
headers = {
'Content-type': 'application/json'
}
data = self.request(params, headers)
album = self.process_album(data)
return album
def process_album(self, album_data):
album_id = album_data['song'][0]['album']['id']
album_title = album_data['song'][0]['album']['name']
album_artist = album_data['song'][0]['artist']['name']
songs = []
for song in album_data['song']:
new_song = models.Song(song['id'], song['title'], album_title,
album_artist)
songs.append(new_song)
return models.Album(album_id, album_title, album_artist, songs)
def download(self, song_id, destination):
"""
Download a song to destination.
"""
params = {
'action': 'download',
'auth': self._auth.auth,
'type': 'song',
'id': song_id,
'version': self._version,
'format': 'mp3'
}
headers = {
'Content-type': 'audio/mpeg'
}
data = self.request(params, headers)
with open(destination, 'wb') as song:
song.write(data.content)
def renew_token(self):
if datetime.now(self._auth.expires.tzinfo) > self._auth.expires:
self.authenticate()
def get_token(self):
return self._auth
|