Appearance
Python SDK Integration
This example demonstrates how to integrate with the Engagifii API using Python.
Code Example
python
#!/usr/bin/env python3
"""
Python Integration Example for Bill Tracking API
Complete Flask application with error handling, caching, and async support
"""
import os
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from functools import wraps
import asyncio
import aiohttp
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_caching import Cache
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
class Config:
API_URL = os.getenv('BILLTRACKING_API_URL', 'https://engagifii-billtracking.azurewebsites.net')
API_VERSION = os.getenv('BILLTRACKING_API_VERSION', '1.0')
TENANT_CODE = os.getenv('BILLTRACKING_TENANT_CODE', 'YOUR_TENANT_CODE')
CACHE_TYPE = 'simple'
CACHE_DEFAULT_TIMEOUT = 300 # 5 minutes
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
# Initialize Flask app
app = Flask(__name__)
app.config.from_object(Config)
CORS(app)
# Initialize cache
cache = Cache(app)
# Configure logging
logging.basicConfig(
level=getattr(logging, Config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class BillTrackingAPIClient:
"""
Client for interacting with the Bill Tracking API
"""
def __init__(self):
self.base_url = Config.API_URL
self.api_version = Config.API_VERSION
self.tenant_code = Config.TENANT_CODE
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""Create a requests session with retry logic"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set default headers
session.headers.update({
'api-version': self.api_version,
'tenant-code': self.tenant_code,
'Content-Type': 'application/json'
})
return session
def _handle_response(self, response: requests.Response) -> Dict:
"""Handle API response and errors"""
try:
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error: {e}")
if response.status_code == 429:
retry_after = response.headers.get('Retry-After', 60)
raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after}s", retry_after)
raise APIError(f"API request failed: {e}")
except ValueError:
raise APIError("Invalid JSON response")
def get_states(self) -> List[Dict]:
"""Get list of states"""
url = f"{self.base_url}/api/{self.api_version}/dropdown/states"
response = self.session.get(url)
return self._handle_response(response)
def get_sessions(self, state_id: Optional[int] = None) -> List[Dict]:
"""Get list of sessions"""
url = f"{self.base_url}/api/{self.api_version}/dropdown/sessions"
params = {'stateId': state_id} if state_id else {}
response = self.session.get(url, params=params)
return self._handle_response(response)
def get_committees(self, state_id: Optional[int] = None,
session_id: Optional[int] = None) -> List[Dict]:
"""Get list of committees"""
url = f"{self.base_url}/api/{self.api_version}/dropdown/committees"
params = {}
if state_id:
params['stateId'] = state_id
if session_id:
params['sessionId'] = session_id
response = self.session.get(url, params=params)
return self._handle_response(response)
def get_calendar_events(self, filters: Dict) -> List[Dict]:
"""Get bill calendar events"""
url = f"{self.base_url}/api/{self.api_version}/bill/event/calendar"
# Set default filters
default_filters = {
'startDate': datetime.now().strftime('%Y-%m-%d'),
'endDate': (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d'),
'pageNumber': 1,
'pageSize': 50
}
default_filters.update(filters)
response = self.session.post(url, json=default_filters)
return self._handle_response(response)
def save_activity_log(self, user_id: str, action: str,
entity_type: str, entity_id: str,
metadata: Optional[Dict] = None) -> bool:
"""Save activity log entry"""
url = f"{self.base_url}/api/{self.api_version}/activity/log/save"
log_entry = {
'userId': user_id,
'action': action,
'entityType': entity_type,
'entityId': entity_id,
'metadata': metadata or {},
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
response = self.session.post(url, json=log_entry)
return self._handle_response(response)
def get_activity_logs(self, filters: Dict) -> Dict:
"""Get activity logs with pagination"""
url = f"{self.base_url}/api/{self.api_version}/activity/log/list"
default_filters = {
'pageNumber': 1,
'pageSize': 50,
'sortBy': 'timestamp',
'isAscending': False
}
default_filters.update(filters)
response = self.session.post(url, json=default_filters)
return self._handle_response(response)
def save_campaign_template(self, campaign_data: Dict) -> int:
"""Save campaign template"""
url = f"{self.base_url}/api/{self.api_version}/Advocacy/saveChampaignTemplate"
response = self.session.post(url, json=campaign_data)
return self._handle_response(response)
def get_campaign_template(self, template_id: int) -> Dict:
"""Get campaign template by ID"""
url = f"{self.base_url}/api/{self.api_version}/Advocacy/getChampaignTemplate/{template_id}"
response = self.session.get(url)
return self._handle_response(response)
def list_campaigns(self, filters: Dict) -> Dict:
"""List campaigns with filtering"""
url = f"{self.base_url}/api/{self.api_version}/Advocacy/list"
default_filters = {
'pageNumber': 1,
'pageSize': 20,
'isActive': True
}
default_filters.update(filters)
response = self.session.post(url, json=default_filters)
return self._handle_response(response)
class AsyncBillTrackingClient:
"""
Async client for high-performance API interactions
"""
def __init__(self):
self.base_url = Config.API_URL
self.api_version = Config.API_VERSION
self.tenant_code = Config.TENANT_CODE
self.headers = {
'api-version': self.api_version,
'tenant-code': self.tenant_code,
'Content-Type': 'application/json'
}
async def fetch_multiple_endpoints(self, endpoints: List[str]) -> List[Dict]:
"""Fetch data from multiple endpoints concurrently"""
async with aiohttp.ClientSession(headers=self.headers) as session:
tasks = [self._fetch(session, endpoint) for endpoint in endpoints]
return await asyncio.gather(*tasks)
async def _fetch(self, session: aiohttp.ClientSession, endpoint: str) -> Dict:
"""Fetch data from a single endpoint"""
url = f"{self.base_url}/api/{self.api_version}/{endpoint}"
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"Error fetching {url}: {response.status}")
return {}
except Exception as e:
logger.error(f"Exception fetching {url}: {e}")
return {}
async def batch_save_activity_logs(self, log_entries: List[Dict]) -> List[bool]:
"""Save multiple activity logs concurrently"""
async with aiohttp.ClientSession(headers=self.headers) as session:
tasks = [self._save_log(session, entry) for entry in log_entries]
return await asyncio.gather(*tasks)
async def _save_log(self, session: aiohttp.ClientSession, log_entry: Dict) -> bool:
"""Save a single activity log"""
url = f"{self.base_url}/api/{self.api_version}/activity/log/save"
try:
async with session.post(url, json=log_entry) as response:
return response.status == 200
except Exception as e:
logger.error(f"Error saving log: {e}")
return False
# Custom exceptions
class APIError(Exception):
"""Base API error"""
pass
class RateLimitError(APIError):
"""Rate limit exceeded error"""
def __init__(self, message: str, retry_after: int):
super().__init__(message)
self.retry_after = retry_after
# Initialize clients
api_client = BillTrackingAPIClient()
async_client = AsyncBillTrackingClient()
# Decorators
def handle_api_errors(f):
"""Decorator to handle API errors"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
return f(*args, **kwargs)
except RateLimitError as e:
return jsonify({
'success': False,
'error': 'Rate limit exceeded',
'retry_after': e.retry_after
}), 429
except APIError as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
except Exception as e:
logger.error(f"Unexpected error: {e}")
return jsonify({
'success': False,
'error': 'Internal server error'
}), 500
return decorated_function
# Flask Routes
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'config': {
'api_url': Config.API_URL,
'api_version': Config.API_VERSION
}
})
@app.route('/api/states')
@cache.cached(timeout=3600) # Cache for 1 hour
@handle_api_errors
def get_states():
"""Get list of states"""
states = api_client.get_states()
return jsonify({'success': True, 'data': states})
@app.route('/api/sessions')
@cache.cached(timeout=1800, query_string=True) # Cache for 30 minutes
@handle_api_errors
def get_sessions():
"""Get list of sessions"""
state_id = request.args.get('stateId', type=int)
sessions = api_client.get_sessions(state_id)
return jsonify({'success': True, 'data': sessions})
@app.route('/api/committees')
@cache.cached(timeout=1800, query_string=True)
@handle_api_errors
def get_committees():
"""Get list of committees"""
state_id = request.args.get('stateId', type=int)
session_id = request.args.get('sessionId', type=int)
committees = api_client.get_committees(state_id, session_id)
return jsonify({'success': True, 'data': committees})
@app.route('/api/calendar/events', methods=['POST'])
@handle_api_errors
def get_calendar_events():
"""Get calendar events"""
filters = request.json or {}
events = api_client.get_calendar_events(filters)
return jsonify({'success': True, 'data': events})
@app.route('/api/activity/log', methods=['POST'])
@handle_api_errors
def save_activity_log():
"""Save activity log"""
data = request.json
result = api_client.save_activity_log(
user_id=data.get('userId'),
action=data.get('action'),
entity_type=data.get('entityType'),
entity_id=data.get('entityId'),
metadata=data.get('metadata')
)
return jsonify({'success': True, 'data': result})
@app.route('/api/activity/list', methods=['POST'])
@handle_api_errors
def get_activity_logs():
"""Get activity logs"""
filters = request.json or {}
logs = api_client.get_activity_logs(filters)
return jsonify({'success': True, 'data': logs})
@app.route('/api/campaigns', methods=['POST'])
@handle_api_errors
def save_campaign():
"""Save campaign template"""
campaign_data = request.json
campaign_id = api_client.save_campaign_template(campaign_data)
return jsonify({'success': True, 'data': {'id': campaign_id}})
@app.route('/api/campaigns/<int:template_id>')
@handle_api_errors
def get_campaign(template_id):
"""Get campaign template"""
campaign = api_client.get_campaign_template(template_id)
return jsonify({'success': True, 'data': campaign})
@app.route('/api/campaigns/list', methods=['POST'])
@handle_api_errors
def list_campaigns():
"""List campaigns"""
filters = request.json or {}
campaigns = api_client.list_campaigns(filters)
return jsonify({'success': True, 'data': campaigns})
@app.route('/api/batch/fetch', methods=['POST'])
@handle_api_errors
def batch_fetch():
"""Fetch data from multiple endpoints concurrently"""
endpoints = request.json.get('endpoints', [])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(async_client.fetch_multiple_endpoints(endpoints))
return jsonify({'success': True, 'data': results})
@app.route('/api/cache/clear', methods=['POST'])
def clear_cache():
"""Clear application cache"""
cache.clear()
return jsonify({'success': True, 'message': 'Cache cleared successfully'})
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
logger.error(f"Internal error: {error}")
return jsonify({'success': False, 'error': 'Internal server error'}), 500
# CLI commands for testing
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Bill Tracking API Integration')
parser.add_argument('--test', action='store_true', help='Run API tests')
parser.add_argument('--port', type=int, default=5000, help='Port to run on')
parser.add_argument('--debug', action='store_true', help='Run in debug mode')
args = parser.parse_args()
if args.test:
# Run basic API tests
print("Running API tests...")
try:
print("\n1. Testing states endpoint...")
states = api_client.get_states()
print(f" ✓ Retrieved {len(states)} states")
print("\n2. Testing sessions endpoint...")
sessions = api_client.get_sessions()
print(f" ✓ Retrieved {len(sessions)} sessions")
print("\n3. Testing activity log...")
result = api_client.save_activity_log(
user_id='test_user',
action='TEST_ACTION',
entity_type='TEST',
entity_id='123'
)
print(f" ✓ Activity log saved: {result}")
print("\nAll tests passed!")
except Exception as e:
print(f"\n✗ Test failed: {e}")
else:
# Run Flask app
app.run(
host='0.0.0.0',
port=args.port,
debug=args.debug
)Usage Notes
- Make sure to replace placeholder values with your actual API credentials
- Install required dependencies before running this code
- Refer to the main API documentation for detailed endpoint information
Download
Download this example: python-example.py
