A comprehensive Python client library for Unstract APIHUB services that provides clean, Pythonic interfaces for multiple document processing APIs including table extraction, document splitting, and generic document processing with dynamic endpoints.
- Multi-Client Architecture: Three specialized clients for different use cases
ApiHubClient
: Table extraction and discovery APIsDocSplitterClient
: Document splitting and chunking servicesGenericUnstractClient
: Dynamic endpoint processing (invoice, contract, receipt, etc.)
- File Processing: Support for document processing with file uploads across all clients
- Status Monitoring: Track processing status with polling capabilities
- Error Handling: Comprehensive exception handling with meaningful messages
- Flexible Parameters: Support for custom parameters and configurations
- Automatic Polling: Optional wait-for-completion functionality
- Type Safety: Full type hints for better development experience
- Batch Processing: Built-in support for processing multiple documents
- Integration Ready: Easy integration between different client services
pip install apihub-python-client
Or install from source:
git clone https://github.com/Zipstack/apihub-python-client.git
cd apihub-python-client
pip install -e .
from apihub_client import ApiHubClient
# Initialize the client
client = ApiHubClient(
api_key="your-api-key-here",
base_url="https://api-hub.us-central.unstract.com/api/v1"
)
# Process a document with automatic completion waiting
result = client.extract(
endpoint="bank_statement",
vertical="table",
sub_vertical="bank_statement",
file_path="statement.pdf",
wait_for_completion=True,
polling_interval=3 # Check status every 3 seconds
)
print("Processing completed!")
print(result)
Split documents into smaller parts using the doc-splitter service:
from apihub_client import DocSplitterClient
# Initialize the doc-splitter client
doc_client = DocSplitterClient(
api_key="your-api-key-here",
base_url="http://localhost:8005"
)
# Simple upload and wait for completion
result = doc_client.upload(
file_path="large_document.pdf",
wait_for_completion=True,
polling_interval=5 # Check status every 5 seconds
)
# Download the split result
output_file = doc_client.download_result(
job_id=result["job_id"],
output_path="split_result.zip"
)
print(f"Downloaded result to: {output_file}")
# Step 1: Upload document
upload_result = doc_client.upload(file_path="document.pdf")
job_id = upload_result["job_id"]
print(f"Upload completed. Job ID: {job_id}")
# Step 2: Monitor status manually
status = doc_client.get_job_status(job_id)
print(f"Current status: {status['status']}")
# Step 3: Wait for completion (with custom timeout)
final_result = doc_client.wait_for_completion(
job_id=job_id,
timeout=600, # Wait up to 10 minutes
polling_interval=3 # Check every 3 seconds
)
# Step 4: Download the processed result
downloaded_file = doc_client.download_result(
job_id=job_id,
output_path="processed_document.zip"
)
print(f"Processing complete! Downloaded: {downloaded_file}")
import os
from pathlib import Path
def process_documents_batch(file_paths):
"""Process multiple documents with doc-splitter."""
results = []
for file_path in file_paths:
try:
print(f"Processing {file_path}...")
# Upload and wait for completion
result = doc_client.upload(
file_path=file_path,
wait_for_completion=True,
polling_interval=5
)
# Generate output filename
input_name = Path(file_path).stem
output_path = f"{input_name}_split.zip"
# Download result
downloaded_file = doc_client.download_result(
job_id=result["job_id"],
output_path=output_path
)
results.append({
"input": file_path,
"output": downloaded_file,
"job_id": result["job_id"],
"success": True
})
except Exception as e:
print(f"Failed to process {file_path}: {e}")
results.append({
"input": file_path,
"error": str(e),
"success": False
})
return results
# Process multiple files
files = ["document1.pdf", "document2.pdf", "document3.pdf"]
results = process_documents_batch(files)
# Summary
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Process documents using dynamic endpoints like invoice, contract, receipt, etc.:
from apihub_client import GenericUnstractClient
# Initialize the generic client
client = GenericUnstractClient(
api_key="your-api-key-here",
base_url="http://localhost:8005"
)
# Simple processing with automatic completion waiting
result = client.process(
endpoint="invoice",
file_path="invoice.pdf",
wait_for_completion=True,
polling_interval=5 # Check status every 5 seconds
)
print("Invoice processing completed:", result)
# Step 1: Start processing
process_result = client.process(
endpoint="contract",
file_path="contract.pdf"
)
execution_id = process_result["execution_id"]
print(f"Processing started. Execution ID: {execution_id}")
# Step 2: Check status manually
status = client.check_status("contract", execution_id)
print(f"Current status: {status}")
# Step 3: Wait for completion (with custom timeout)
final_result = client.wait_for_completion(
endpoint="contract",
execution_id=execution_id,
timeout=600, # Wait up to 10 minutes
polling_interval=3 # Check every 3 seconds
)
# Step 4: Get result later (if needed)
result = client.get_result("contract", execution_id)
print("Processing complete:", result)
def process_documents_batch(endpoint, file_paths):
"""Process multiple documents with the same endpoint."""
results = []
for file_path in file_paths:
try:
print(f"Processing {file_path} with {endpoint} endpoint...")
# Process and wait for completion
result = client.process(
endpoint=endpoint,
file_path=file_path,
wait_for_completion=True,
polling_interval=5
)
results.append({
"input": file_path,
"execution_id": result["execution_id"],
"result": result,
"success": True
})
except Exception as e:
print(f"Failed to process {file_path}: {e}")
results.append({
"input": file_path,
"error": str(e),
"success": False
})
return results
# Process multiple invoices
invoice_files = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"]
results = process_documents_batch("invoice", invoice_files)
# Process multiple contracts
contract_files = ["contract1.pdf", "contract2.pdf"]
contract_results = process_documents_batch("contract", contract_files)
# Summary
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Combine doc-splitter with extraction APIs for complete document processing:
from apihub_client import ApiHubClient, DocSplitterClient
# Initialize both clients
api_client = ApiHubClient(
api_key="your-api-key",
base_url="https://api-hub.us-central.unstract.com/api/v1"
)
doc_splitter = DocSplitterClient(
api_key="your-api-key",
base_url="http://localhost:8005"
)
# Step 1: Split the large document
split_result = doc_splitter.upload(
file_path="large_contract.pdf",
wait_for_completion=True
)
# Step 2: Download split result
doc_splitter.download_result(
job_id=split_result["job_id"],
output_path="split_documents.zip"
)
# Step 3: Process individual documents (example with one document)
# (assuming you extract individual PDFs from the zip)
table_result = api_client.extract(
endpoint="bank_statement",
vertical="table",
sub_vertical="bank_statement",
file_path="individual_page.pdf",
wait_for_completion=True
)
print("Extracted data:", table_result)
from apihub_client import ApiHubClient, DocSplitterClient, GenericUnstractClient
# Initialize all clients
api_client = ApiHubClient(
api_key="your-api-key",
base_url="https://api-hub.us-central.unstract.com/api/v1"
)
doc_splitter = DocSplitterClient(
api_key="your-api-key",
base_url="http://localhost:8005"
)
generic_client = GenericUnstractClient(
api_key="your-api-key",
base_url="http://localhost:8005"
)
# Workflow: Split → Extract → Process with Generic API
# Step 1: Split large document
split_result = doc_splitter.upload(
file_path="large_document.pdf",
wait_for_completion=True
)
# Step 2: Extract tables from split documents
# (after extracting individual files from the zip)
table_result = api_client.extract(
endpoint="discover_tables",
vertical="table",
sub_vertical="discover_tables",
file_path="split_page_1.pdf",
wait_for_completion=True
)
# Step 3: Process with generic invoice API
invoice_result = generic_client.process(
endpoint="invoice",
file_path="split_page_2.pdf",
wait_for_completion=True
)
print("Complete workflow finished!")
print("Tables extracted:", len(table_result.get('data', [])))
print("Invoice processed:", invoice_result.get('execution_id'))
# Step 1: Discover tables from the uploaded PDF
initial_result = client.extract(
endpoint="discover_tables",
vertical="table",
sub_vertical="discover_tables",
ext_cache_result="true",
ext_cache_text="true",
file_path="statement.pdf"
)
file_hash = initial_result.get("file_hash")
print("File hash", file_hash)
discover_tables_result = client.wait_for_complete(file_hash,
timeout=600, # max wait for 10 mins
polling_interval=3 # polling every 3s
)
tables = json.loads(discover_tables_result['data'])
print(f"Total tables in this document: {len(tables)}")
all_table_result = []
# Step 2: Extract specific table
for i, table in enumerate(tables):
table_result = client.extract(
endpoint="extract_table",
vertical="table",
sub_vertical="extract_table",
file_hash=file_hash,
ext_table_no=i, # extracting nth table
wait_for_completion=True
)
print(f"Extracted table : {table['table_name']}")
all_table_result.append({table["table_name"]: table_result})
print("All table result")
print(all_table_result)
# Process bank statement
result = client.extract(
endpoint="bank_statement",
vertical="table",
sub_vertical="bank_statement",
file_path="bank_statement.pdf",
wait_for_completion=True,
polling_interval=3
)
print("Bank statement processed:", result)
# Step 1: Start processing
initial_result = client.extract(
endpoint="discover_tables",
vertical="table",
sub_vertical="discover_tables",
file_path="document.pdf"
)
file_hash = initial_result["file_hash"]
print(f"Processing started with hash: {file_hash}")
# Step 2: Monitor status
status = client.get_status(file_hash)
print(f"Current status: {status['status']}")
# Step 3: Wait for completion (using wait_for_complete method)
final_result = client.wait_for_complete(
file_hash=file_hash,
timeout=600, # Wait up to 10 minutes
polling_interval=3 # Check every 3 seconds
)
print("Final result:", final_result)
Once a file has been processed, you can reuse it by file hash:
# Process a different operation on the same file
table_result = client.extract(
endpoint="extract_table",
vertical="table",
sub_vertical="extract_table",
file_hash="previously-obtained-hash",
ext_table_no=1, # Extract second table. Indexing starts at 0
wait_for_completion=True
)
Create a .env
file:
API_KEY=your_api_key_here
BASE_URL=https://api.example.com
LOG_LEVEL=INFO
Then load in your code:
import os
from dotenv import load_dotenv
from apihub_client import ApiHubClient
load_dotenv()
client = ApiHubClient(
api_key=os.getenv("API_KEY"),
base_url=os.getenv("BASE_URL")
)
The main client class for interacting with the ApiHub service.
client = ApiHubClient(api_key: str, base_url: str)
Parameters:
api_key
(str): Your API key for authenticationbase_url
(str): The base URL of the ApiHub service
Client for interacting with doc-splitter APIs for document splitting operations.
doc_client = DocSplitterClient(api_key: str, base_url: str)
Parameters:
api_key
(str): Your API key for authenticationbase_url
(str): The base URL of the doc-splitter service
Client for interacting with generic Unstract APIs using dynamic endpoints.
generic_client = GenericUnstractClient(api_key: str, base_url: str)
Parameters:
api_key
(str): Your API key for authenticationbase_url
(str): The base URL of the Unstract service
Start a document processing operation.
extract(
endpoint: str,
vertical: str,
sub_vertical: str,
file_path: str | None = None,
file_hash: str | None = None,
wait_for_completion: bool = False,
polling_interval: int = 5,
**kwargs
) -> dict
Parameters:
endpoint
(str): The API endpoint to call (e.g., "discover_tables", "extract_table")vertical
(str): The processing verticalsub_vertical
(str): The processing sub-verticalfile_path
(str, optional): Path to file for upload (for new files)file_hash
(str, optional): Hash of previously uploaded file (for cached operations)wait_for_completion
(bool): If True, polls until completion and returns final resultpolling_interval
(int): Seconds between status checks when waiting (default: 5)**kwargs
: Additional parameters specific to the endpoint
Returns:
dict
: API response containing processing results or file hash for tracking
Check the status of a processing job.
get_status(file_hash: str) -> dict
Parameters:
file_hash
(str): The file hash returned from extract()
Returns:
dict
: Status information including current processing state
Get the final results of a completed processing job.
retrieve(file_hash: str) -> dict
Parameters:
file_hash
(str): The file hash of the completed job
Returns:
dict
: Final processing results
Wait for a processing job to complete by polling its status.
wait_for_complete(
file_hash: str,
timeout: int = 600,
polling_interval: int = 3
) -> dict
Parameters:
file_hash
(str): The file hash of the job to wait fortimeout
(int): Maximum time to wait in seconds (default: 600)polling_interval
(int): Seconds between status checks (default: 3)
Returns:
dict
: Final processing results when completed
Raises:
ApiHubClientException
: If processing fails or times out
Upload a document for splitting.
upload(
file_path: str,
wait_for_completion: bool = False,
polling_interval: int = 5,
) -> dict
Parameters:
file_path
(str): Path to the file to uploadwait_for_completion
(bool): If True, polls until completion and returns final resultpolling_interval
(int): Seconds between status checks when waiting (default: 5)
Returns:
dict
: Response containing job_id and status information
Check the status of a splitting job.
get_job_status(job_id: str) -> dict
Parameters:
job_id
(str): The job ID to check status for
Returns:
dict
: Status information including current processing state
Download the result of a completed splitting job.
download_result(
job_id: str,
output_path: str | None = None
) -> str
Parameters:
job_id
(str): The job ID to download results foroutput_path
(str, optional): Path where to save the downloaded file. If None, uses 'result_{job_id}.zip'
Returns:
str
: Path to the downloaded file
Wait for a splitting job to complete by polling its status.
wait_for_completion(
job_id: str,
timeout: int = 600,
polling_interval: int = 3
) -> dict
Parameters:
job_id
(str): The job ID to wait fortimeout
(int): Maximum time to wait in seconds (default: 600)polling_interval
(int): Seconds between status checks (default: 3)
Returns:
dict
: Final job status information when completed
Raises:
ApiHubClientException
: If processing fails or times out
Process a document using the specified endpoint.
process(
endpoint: str,
file_path: str,
wait_for_completion: bool = False,
polling_interval: int = 5,
timeout: int = 600,
) -> dict
Parameters:
endpoint
(str): The endpoint name (e.g., 'invoice', 'contract', 'receipt')file_path
(str): Path to the file to uploadwait_for_completion
(bool): If True, polls until completion and returns final resultpolling_interval
(int): Seconds between status checks when waiting (default: 5)timeout
(int): Maximum time to wait for completion in seconds (default: 600)
Returns:
dict
: Response containing execution_id and processing information
Get the result of a processing operation.
get_result(endpoint: str, execution_id: str) -> dict
Parameters:
endpoint
(str): The endpoint name used for processingexecution_id
(str): The execution ID to get results for
Returns:
dict
: Processing result or status information
Wait for a processing operation to complete by polling its status.
wait_for_completion(
endpoint: str,
execution_id: str,
timeout: int = 600,
polling_interval: int = 3,
) -> dict
Parameters:
endpoint
(str): The endpoint name used for processingexecution_id
(str): The execution ID to wait fortimeout
(int): Maximum time to wait in seconds (default: 600)polling_interval
(int): Seconds between status checks (default: 3)
Returns:
dict
: Final processing result when completed
Check the current status of a processing operation.
check_status(endpoint: str, execution_id: str) -> str | None
Parameters:
endpoint
(str): The endpoint name used for processingexecution_id
(str): The execution ID to check status for
Returns:
str | None
: Current status string, or None if not available
Raises:
ApiHubClientException
: If processing fails or times out
All clients (ApiHubClient
, DocSplitterClient
, and GenericUnstractClient
) use the same exception handling:
from apihub_client import ApiHubClientException, GenericUnstractClient
generic_client = GenericUnstractClient(api_key="key", base_url="http://localhost:8005")
try:
result = generic_client.process(
endpoint="invoice",
file_path="invoice.pdf",
wait_for_completion=True
)
print("Processing completed:", result["execution_id"])
except ApiHubClientException as e:
print(f"Error: {e.message}")
print(f"Status Code: {e.status_code}")
import time
from pathlib import Path
def process_documents(file_paths, endpoint):
results = []
for file_path in file_paths:
try:
print(f"Processing {file_path}...")
# Start processing
initial_result = client.extract(
endpoint=endpoint,
vertical="table",
sub_vertical=endpoint,
file_path=file_path
)
# Wait for completion with custom settings
result = client.wait_for_complete(
file_hash=initial_result["file_hash"],
timeout=900, # 15 minutes for batch processing
polling_interval=5 # Less frequent polling for batch
)
results.append({"file": file_path, "result": result, "success": True})
except ApiHubClientException as e:
print(f"Failed to process {file_path}: {e.message}")
results.append({"file": file_path, "error": str(e), "success": False})
return results
# Process multiple files
file_paths = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
results = process_documents(file_paths, "bank_statement")
# Summary
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
Run the test suite:
# Install development dependencies
pip install -e ".[dev]"
# Run all tests
pytest
# Run tests with coverage
pytest --cov=apihub_client --cov-report=html
# Run specific test files
pytest test/test_client.py -v
pytest test/test_integration.py -v
For integration tests with a real API:
# Create .env file with real credentials
cp .env.example .env
# Edit .env with your API credentials
# Run integration tests
pytest test/test_integration.py -v
Enable debug logging to see detailed request/response information:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
client = ApiHubClient(api_key="your-key", base_url="https://api.example.com")
# Now all API calls will show detailed logs
result = client.extract(...)
This project uses automated releases through GitHub Actions with PyPI Trusted Publishers for secure publishing.
- Go to GitHub Actions → "Release Tag and Publish Package"
- Click "Run workflow" and configure:
- Version bump:
patch
(bug fixes),minor
(new features), ormajor
(breaking changes) - Pre-release: Check for beta/alpha versions
- Release notes: Optional custom notes
- Version bump:
- Click "Run workflow" - the automation handles the rest!
The workflow will automatically:
- Update version in the code
- Create Git tags and GitHub releases
- Run all tests and quality checks
- Publish to PyPI using
uv publish
with Trusted Publishers
For more details, see Release Documentation.
We welcome contributions! Please see our Contributing Guide for details.
# Clone the repository
git clone https://github.com/Zipstack/apihub-python-client.git
cd apihub-python-client
# Install dependencies using uv (required - do not use pip)
uv sync
# Install pre-commit hooks
uv run --frozen pre-commit install
# Run tests
uv run --frozen pytest
# Run linting and formatting
uv run --frozen ruff check .
uv run --frozen ruff format .
# Run type checking
uv run --frozen mypy src/
# Run all pre-commit hooks manually
uv run --frozen pre-commit run --all-files
This project is licensed under the MIT License - see the LICENSE file for details.
- Issues: GitHub Issues
- Documentation: Check this README and inline code documentation
- Examples: See the
examples/
directory for more usage patterns
- Initial release
- Basic client functionality with extract, status, and retrieve operations
- File upload support
- Automatic polling with wait_for_completion
- Comprehensive test suite
Made with ❤️ by the Unstract team