Hey there, fellow developer! Ready to dive into the world of Affinity API integration? You're in for a treat. Affinity's API is a powerhouse for managing relationships and data, and we're about to harness that power with Python. Let's get cracking!
Before we jump in, make sure you've got:
requests
library installed (pip install requests
)First things first, let's get you authenticated:
import requests API_KEY = 'your_api_key_here' headers = { 'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json' }
Pro tip: Keep that API key safe! Consider using environment variables for production.
Affinity's API is RESTful, so we'll be working with standard HTTP methods. Here's a quick example:
BASE_URL = 'https://api.affinity.co/api/v1' def make_request(endpoint, method='GET', data=None): url = f'{BASE_URL}/{endpoint}' response = requests.request(method, url, headers=headers, json=data) response.raise_for_status() return response.json()
Let's tackle some basic operations:
def get_lists(): return make_request('lists') lists = get_lists() print(f"You have {len(lists)} lists.")
def get_person(person_id): return make_request(f'persons/{person_id}') person = get_person(12345) print(f"Hello, {person['first_name']}!")
def create_person(data): return make_request('persons', method='POST', data=data) new_person = create_person({ 'first_name': 'John', 'last_name': 'Doe', 'emails': [{'email': '[email protected]'}] }) print(f"Created person with ID: {new_person['id']}")
Affinity uses cursor-based pagination. Here's how to handle it:
def get_all_items(endpoint): items = [] next_cursor = None while True: params = {'cursor': next_cursor} if next_cursor else {} response = make_request(endpoint, params=params) items.extend(response['items']) next_cursor = response.get('next_cursor') if not next_cursor: break return items
Always be prepared for the unexpected:
import time def make_request_with_retry(endpoint, method='GET', data=None, max_retries=3): for attempt in range(max_retries): try: return make_request(endpoint, method, data) except requests.exceptions.HTTPError as e: if e.response.status_code == 429: # Rate limit exceeded wait_time = int(e.response.headers.get('Retry-After', 60)) print(f"Rate limit hit. Waiting {wait_time} seconds...") time.sleep(wait_time) else: raise raise Exception("Max retries exceeded")
Once you've got your data, it's time to put it to work:
def process_persons(persons): for person in persons: # Do something with each person print(f"Processing {person['first_name']} {person['last_name']}") # Integrate with your app's logic here all_persons = get_all_items('persons') process_persons(all_persons)
Want to level up? Try implementing search functionality:
def search_persons(query): return make_request('persons/search', method='POST', data={'query': query}) results = search_persons('John') print(f"Found {len(results)} matches.")
Don't forget to test your integration thoroughly:
import unittest class TestAffinityIntegration(unittest.TestCase): def test_get_lists(self): lists = get_lists() self.assertIsInstance(lists, list) self.assertTrue(len(lists) > 0) if __name__ == '__main__': unittest.main()
And there you have it! You've just built a solid foundation for your Affinity API integration. Remember, this is just the beginning. The Affinity API has a lot more to offer, so don't be afraid to explore and expand on what we've covered here.
Keep coding, keep learning, and most importantly, have fun with it! If you run into any snags, the Affinity API docs are your best friend. Now go forth and build something awesome!