Skip to content

Python SDK Integration

This example demonstrates how to integrate with the Engagifii API using Python.

Code Example

python
#!/usr/bin/env python3
"""
Python Integration Example for Bill Tracking API
Complete Flask application with error handling, caching, and async support
"""

import os
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from functools import wraps
import asyncio
import aiohttp
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_caching import Cache
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configuration
class Config:
    API_URL = os.getenv('BILLTRACKING_API_URL', 'https://engagifii-billtracking.azurewebsites.net')
    API_VERSION = os.getenv('BILLTRACKING_API_VERSION', '1.0')
    TENANT_CODE = os.getenv('BILLTRACKING_TENANT_CODE', 'YOUR_TENANT_CODE')
    CACHE_TYPE = 'simple'
    CACHE_DEFAULT_TIMEOUT = 300  # 5 minutes
    LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')

# Initialize Flask app
app = Flask(__name__)
app.config.from_object(Config)
CORS(app)

# Initialize cache
cache = Cache(app)

# Configure logging
logging.basicConfig(
    level=getattr(logging, Config.LOG_LEVEL),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class BillTrackingAPIClient:
    """
    Client for interacting with the Bill Tracking API
    """
    
    def __init__(self):
        self.base_url = Config.API_URL
        self.api_version = Config.API_VERSION
        self.tenant_code = Config.TENANT_CODE
        self.session = self._create_session()
    
    def _create_session(self) -> requests.Session:
        """Create a requests session with retry logic"""
        session = requests.Session()
        
        # Configure retry strategy
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        # Set default headers
        session.headers.update({
            'api-version': self.api_version,
            'tenant-code': self.tenant_code,
            'Content-Type': 'application/json'
        })
        
        return session
    
    def _handle_response(self, response: requests.Response) -> Dict:
        """Handle API response and errors"""
        try:
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            logger.error(f"HTTP Error: {e}")
            if response.status_code == 429:
                retry_after = response.headers.get('Retry-After', 60)
                raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after}s", retry_after)
            raise APIError(f"API request failed: {e}")
        except ValueError:
            raise APIError("Invalid JSON response")
    
    def get_states(self) -> List[Dict]:
        """Get list of states"""
        url = f"{self.base_url}/api/{self.api_version}/dropdown/states"
        response = self.session.get(url)
        return self._handle_response(response)
    
    def get_sessions(self, state_id: Optional[int] = None) -> List[Dict]:
        """Get list of sessions"""
        url = f"{self.base_url}/api/{self.api_version}/dropdown/sessions"
        params = {'stateId': state_id} if state_id else {}
        response = self.session.get(url, params=params)
        return self._handle_response(response)
    
    def get_committees(self, state_id: Optional[int] = None, 
                      session_id: Optional[int] = None) -> List[Dict]:
        """Get list of committees"""
        url = f"{self.base_url}/api/{self.api_version}/dropdown/committees"
        params = {}
        if state_id:
            params['stateId'] = state_id
        if session_id:
            params['sessionId'] = session_id
        response = self.session.get(url, params=params)
        return self._handle_response(response)
    
    def get_calendar_events(self, filters: Dict) -> List[Dict]:
        """Get bill calendar events"""
        url = f"{self.base_url}/api/{self.api_version}/bill/event/calendar"
        
        # Set default filters
        default_filters = {
            'startDate': datetime.now().strftime('%Y-%m-%d'),
            'endDate': (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d'),
            'pageNumber': 1,
            'pageSize': 50
        }
        default_filters.update(filters)
        
        response = self.session.post(url, json=default_filters)
        return self._handle_response(response)
    
    def save_activity_log(self, user_id: str, action: str, 
                          entity_type: str, entity_id: str,
                          metadata: Optional[Dict] = None) -> bool:
        """Save activity log entry"""
        url = f"{self.base_url}/api/{self.api_version}/activity/log/save"
        
        log_entry = {
            'userId': user_id,
            'action': action,
            'entityType': entity_type,
            'entityId': entity_id,
            'metadata': metadata or {},
            'timestamp': datetime.utcnow().isoformat() + 'Z'
        }
        
        response = self.session.post(url, json=log_entry)
        return self._handle_response(response)
    
    def get_activity_logs(self, filters: Dict) -> Dict:
        """Get activity logs with pagination"""
        url = f"{self.base_url}/api/{self.api_version}/activity/log/list"
        
        default_filters = {
            'pageNumber': 1,
            'pageSize': 50,
            'sortBy': 'timestamp',
            'isAscending': False
        }
        default_filters.update(filters)
        
        response = self.session.post(url, json=default_filters)
        return self._handle_response(response)
    
    def save_campaign_template(self, campaign_data: Dict) -> int:
        """Save campaign template"""
        url = f"{self.base_url}/api/{self.api_version}/Advocacy/saveChampaignTemplate"
        response = self.session.post(url, json=campaign_data)
        return self._handle_response(response)
    
    def get_campaign_template(self, template_id: int) -> Dict:
        """Get campaign template by ID"""
        url = f"{self.base_url}/api/{self.api_version}/Advocacy/getChampaignTemplate/{template_id}"
        response = self.session.get(url)
        return self._handle_response(response)
    
    def list_campaigns(self, filters: Dict) -> Dict:
        """List campaigns with filtering"""
        url = f"{self.base_url}/api/{self.api_version}/Advocacy/list"
        
        default_filters = {
            'pageNumber': 1,
            'pageSize': 20,
            'isActive': True
        }
        default_filters.update(filters)
        
        response = self.session.post(url, json=default_filters)
        return self._handle_response(response)

class AsyncBillTrackingClient:
    """
    Async client for high-performance API interactions
    """
    
    def __init__(self):
        self.base_url = Config.API_URL
        self.api_version = Config.API_VERSION
        self.tenant_code = Config.TENANT_CODE
        self.headers = {
            'api-version': self.api_version,
            'tenant-code': self.tenant_code,
            'Content-Type': 'application/json'
        }
    
    async def fetch_multiple_endpoints(self, endpoints: List[str]) -> List[Dict]:
        """Fetch data from multiple endpoints concurrently"""
        async with aiohttp.ClientSession(headers=self.headers) as session:
            tasks = [self._fetch(session, endpoint) for endpoint in endpoints]
            return await asyncio.gather(*tasks)
    
    async def _fetch(self, session: aiohttp.ClientSession, endpoint: str) -> Dict:
        """Fetch data from a single endpoint"""
        url = f"{self.base_url}/api/{self.api_version}/{endpoint}"
        
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    logger.error(f"Error fetching {url}: {response.status}")
                    return {}
        except Exception as e:
            logger.error(f"Exception fetching {url}: {e}")
            return {}
    
    async def batch_save_activity_logs(self, log_entries: List[Dict]) -> List[bool]:
        """Save multiple activity logs concurrently"""
        async with aiohttp.ClientSession(headers=self.headers) as session:
            tasks = [self._save_log(session, entry) for entry in log_entries]
            return await asyncio.gather(*tasks)
    
    async def _save_log(self, session: aiohttp.ClientSession, log_entry: Dict) -> bool:
        """Save a single activity log"""
        url = f"{self.base_url}/api/{self.api_version}/activity/log/save"
        
        try:
            async with session.post(url, json=log_entry) as response:
                return response.status == 200
        except Exception as e:
            logger.error(f"Error saving log: {e}")
            return False

# Custom exceptions
class APIError(Exception):
    """Base API error"""
    pass

class RateLimitError(APIError):
    """Rate limit exceeded error"""
    def __init__(self, message: str, retry_after: int):
        super().__init__(message)
        self.retry_after = retry_after

# Initialize clients
api_client = BillTrackingAPIClient()
async_client = AsyncBillTrackingClient()

# Decorators
def handle_api_errors(f):
    """Decorator to handle API errors"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except RateLimitError as e:
            return jsonify({
                'success': False,
                'error': 'Rate limit exceeded',
                'retry_after': e.retry_after
            }), 429
        except APIError as e:
            return jsonify({
                'success': False,
                'error': str(e)
            }), 500
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            return jsonify({
                'success': False,
                'error': 'Internal server error'
            }), 500
    return decorated_function

# Flask Routes

@app.route('/health')
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.utcnow().isoformat(),
        'config': {
            'api_url': Config.API_URL,
            'api_version': Config.API_VERSION
        }
    })

@app.route('/api/states')
@cache.cached(timeout=3600)  # Cache for 1 hour
@handle_api_errors
def get_states():
    """Get list of states"""
    states = api_client.get_states()
    return jsonify({'success': True, 'data': states})

@app.route('/api/sessions')
@cache.cached(timeout=1800, query_string=True)  # Cache for 30 minutes
@handle_api_errors
def get_sessions():
    """Get list of sessions"""
    state_id = request.args.get('stateId', type=int)
    sessions = api_client.get_sessions(state_id)
    return jsonify({'success': True, 'data': sessions})

@app.route('/api/committees')
@cache.cached(timeout=1800, query_string=True)
@handle_api_errors
def get_committees():
    """Get list of committees"""
    state_id = request.args.get('stateId', type=int)
    session_id = request.args.get('sessionId', type=int)
    committees = api_client.get_committees(state_id, session_id)
    return jsonify({'success': True, 'data': committees})

@app.route('/api/calendar/events', methods=['POST'])
@handle_api_errors
def get_calendar_events():
    """Get calendar events"""
    filters = request.json or {}
    events = api_client.get_calendar_events(filters)
    return jsonify({'success': True, 'data': events})

@app.route('/api/activity/log', methods=['POST'])
@handle_api_errors
def save_activity_log():
    """Save activity log"""
    data = request.json
    result = api_client.save_activity_log(
        user_id=data.get('userId'),
        action=data.get('action'),
        entity_type=data.get('entityType'),
        entity_id=data.get('entityId'),
        metadata=data.get('metadata')
    )
    return jsonify({'success': True, 'data': result})

@app.route('/api/activity/list', methods=['POST'])
@handle_api_errors
def get_activity_logs():
    """Get activity logs"""
    filters = request.json or {}
    logs = api_client.get_activity_logs(filters)
    return jsonify({'success': True, 'data': logs})

@app.route('/api/campaigns', methods=['POST'])
@handle_api_errors
def save_campaign():
    """Save campaign template"""
    campaign_data = request.json
    campaign_id = api_client.save_campaign_template(campaign_data)
    return jsonify({'success': True, 'data': {'id': campaign_id}})

@app.route('/api/campaigns/<int:template_id>')
@handle_api_errors
def get_campaign(template_id):
    """Get campaign template"""
    campaign = api_client.get_campaign_template(template_id)
    return jsonify({'success': True, 'data': campaign})

@app.route('/api/campaigns/list', methods=['POST'])
@handle_api_errors
def list_campaigns():
    """List campaigns"""
    filters = request.json or {}
    campaigns = api_client.list_campaigns(filters)
    return jsonify({'success': True, 'data': campaigns})

@app.route('/api/batch/fetch', methods=['POST'])
@handle_api_errors
def batch_fetch():
    """Fetch data from multiple endpoints concurrently"""
    endpoints = request.json.get('endpoints', [])
    
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    results = loop.run_until_complete(async_client.fetch_multiple_endpoints(endpoints))
    
    return jsonify({'success': True, 'data': results})

@app.route('/api/cache/clear', methods=['POST'])
def clear_cache():
    """Clear application cache"""
    cache.clear()
    return jsonify({'success': True, 'message': 'Cache cleared successfully'})

# Error handlers
@app.errorhandler(404)
def not_found(error):
    return jsonify({'success': False, 'error': 'Endpoint not found'}), 404

@app.errorhandler(500)
def internal_error(error):
    logger.error(f"Internal error: {error}")
    return jsonify({'success': False, 'error': 'Internal server error'}), 500

# CLI commands for testing
if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Bill Tracking API Integration')
    parser.add_argument('--test', action='store_true', help='Run API tests')
    parser.add_argument('--port', type=int, default=5000, help='Port to run on')
    parser.add_argument('--debug', action='store_true', help='Run in debug mode')
    
    args = parser.parse_args()
    
    if args.test:
        # Run basic API tests
        print("Running API tests...")
        
        try:
            print("\n1. Testing states endpoint...")
            states = api_client.get_states()
            print(f"   ✓ Retrieved {len(states)} states")
            
            print("\n2. Testing sessions endpoint...")
            sessions = api_client.get_sessions()
            print(f"   ✓ Retrieved {len(sessions)} sessions")
            
            print("\n3. Testing activity log...")
            result = api_client.save_activity_log(
                user_id='test_user',
                action='TEST_ACTION',
                entity_type='TEST',
                entity_id='123'
            )
            print(f"   ✓ Activity log saved: {result}")
            
            print("\nAll tests passed!")
            
        except Exception as e:
            print(f"\n✗ Test failed: {e}")
    else:
        # Run Flask app
        app.run(
            host='0.0.0.0',
            port=args.port,
            debug=args.debug
        )

Usage Notes

  • Make sure to replace placeholder values with your actual API credentials
  • Install required dependencies before running this code
  • Refer to the main API documentation for detailed endpoint information

Download

Download this example: python-example.py