Batch Processing API¶
The Batch Processing API provides a high-level interface for bulk operations with automatic async executor configuration, progress tracking, and error handling.
Overview¶
The BatchContext class simplifies bulk data operations by:
- Automatic Configuration: Sets up async executor with optimal defaults
- Progress Tracking: Optional progress bar with
tqdmintegration - Error Handling: Collects errors without stopping execution
- Context Manager: Automatic cleanup and completion waiting
- High Performance: 50,000-200,000 records/sec throughput
Embedded note: For embedded workloads, explicit chunked transactions (
with db.transaction():in fixed-size slices) currently outperformbatch_context. Usebatch_contextprimarily for API demonstrations or tests until performance parity is improved.
Class: BatchContext¶
High-level batch processing context manager.
Constructor¶
db.batch_context(
batch_size: int = 5000,
parallel: int = 4,
use_wal: bool = True,
back_pressure: int = 50,
progress: bool = False,
progress_desc: str = "Processing"
) -> BatchContext
Parameters:
batch_size(int): Auto-commit every N operations (default: 5000)parallel(int): Number of parallel worker threads 1-16 (default: 4)use_wal(bool): Enable Write-Ahead Log for durability (default: True)back_pressure(int): Queue back-pressure threshold 0-100 (default: 50)progress(bool): Enable progress tracking with tqdm (default: False)progress_desc(str): Description for progress bar (default: "Processing")
Returns:
BatchContext: Context manager for batch operations
Basic Usage¶
Simple Batch Processing¶
import arcadedb_embedded as arcadedb
db = arcadedb.create_database("./mydb")
# Create schema
db.schema.create_vertex_type("User")
db.schema.create_property("User", "userId", "LONG")
db.schema.create_property("User", "name", "STRING")
db.schema.create_index("User", ["userId"], unique=True)
# Batch create vertices
with db.batch_context(batch_size=10000, parallel=8) as batch:
for i in range(100000):
batch.create_vertex("User", userId=i, name=f"User {i}")
# All operations complete automatically on context exit
db.close()
With Progress Tracking¶
# Enable progress bar (requires tqdm package)
with db.batch_context(batch_size=5000, progress=True) as batch:
# Set total for accurate progress
batch.set_total(len(dataset))
for record in dataset:
batch.create_vertex("User", **record)
# Progress bar closes automatically
With Error Handling¶
with db.batch_context(batch_size=5000) as batch:
for record in dataset:
batch.create_document("LogEntry", **record)
# Check for errors after completion
if batch.get_errors():
print(f"Encountered {len(batch.get_errors())} errors")
for error in batch.get_errors():
print(f"Error: {error}")
else:
print(f"Successfully processed {batch.get_success_count()} records")
Methods¶
create_vertex¶
Create a vertex asynchronously.
Parameters:
type_name(str): Vertex type namecallback(Optional[Callable]): Success callback**properties: Vertex properties as keyword arguments
Example:
with db.batch_context() as batch:
batch.create_vertex("Person", name="Alice", age=30)
batch.create_vertex("Person", name="Bob", age=25)
create_document¶
Create a document asynchronously.
Parameters:
type_name(str): Document type namecallback(Optional[Callable]): Success callback**properties: Document properties as keyword arguments
Example:
with db.batch_context() as batch:
for i in range(10000):
batch.create_document(
"LogEntry",
timestamp=datetime.now(),
level="INFO",
message=f"Log {i}"
)
create_edge¶
batch.create_edge(
from_vertex,
to_vertex,
edge_type: str,
callback: Optional[Callable] = None,
**properties
)
Create an edge synchronously (edges persist immediately).
Parameters:
from_vertex: Source vertex (Java vertex object)to_vertex: Destination vertex (Java vertex object)edge_type(str): Edge type namecallback(Optional[Callable]): Success callback**properties: Edge properties as keyword arguments
Example:
with db.batch_context() as batch:
# First create vertices (async)
alice = db.new_vertex("Person")
alice.set("name", "Alice")
alice.save()
bob = db.new_vertex("Person")
bob.set("name", "Bob")
bob.save()
# Then create edge (sync - persists immediately)
batch.create_edge(alice, bob, "KNOWS", since=2020)
Edge Creation is Synchronous
Unlike vertex/document creation, edge creation with new_edge() is synchronous and
immediately persists the edge. The callback is called immediately after creation.
delete_record¶
Delete a record asynchronously.
Parameters:
record: Java record object to deletecallback(Optional[Callable]): Success callback
Example:
# Query records to delete
to_delete = list(db.query("sql", "SELECT FROM User WHERE inactive = true"))
with db.batch_context() as batch:
for result in to_delete:
element = result.get_element()
batch.delete_record(element)
set_total¶
Set total number of operations for progress tracking.
Parameters:
total(int): Total number of operations
Example:
with db.batch_context(progress=True) as batch:
batch.set_total(len(dataset))
for item in dataset:
batch.create_vertex("Item", **item)
wait_completion¶
Manually wait for all operations to complete.
Parameters:
timeout(Optional[float]): Max wait time in seconds (None = wait forever)
Note: Usually not needed as context manager waits automatically on exit.
is_pending¶
Check if there are pending operations.
Returns:
bool: True if operations are still pending
Example:
with db.batch_context() as batch:
for i in range(1000):
batch.create_vertex("Task", taskId=i)
if batch.is_pending():
print("Operations still in progress...")
get_errors¶
Get list of errors collected during batch processing.
Returns:
List[Exception]: List of exceptions that occurred
get_success_count¶
Get count of successful operations.
Returns:
int: Number of successful operations
Performance Optimization¶
Tuning Parameters¶
# For maximum throughput
with db.batch_context(
batch_size=10000, # Larger batches = fewer commits
parallel=16, # More workers (match CPU cores)
back_pressure=75 # Higher back-pressure tolerance
) as batch:
# Your bulk operations
pass
Choosing Batch Size¶
| Records | Recommended batch_size |
|---|---|
| < 10K | 1000-2000 |
| 10K-100K | 5000 |
| 100K-1M | 10000-20000 |
| > 1M | 20000+ |
Choosing Parallel Level¶
- CPU-bound: Set to number of CPU cores (4-16)
- I/O-bound: Can exceed CPU cores (8-32)
- Default: 4 (good for most cases)
Complete Example¶
import arcadedb_embedded as arcadedb
import time
db = arcadedb.create_database("./bulk_db")
# Create schema
db.schema.create_vertex_type("Product")
db.schema.create_property("Product", "productId", "LONG")
db.schema.create_property("Product", "name", "STRING")
db.schema.create_property("Product", "price", "DECIMAL")
db.schema.create_index("Product", ["productId"], unique=True)
# Generate sample data
products = [
{"productId": i, "name": f"Product {i}", "price": i * 10.5}
for i in range(100000)
]
start = time.time()
# Batch import with progress
with db.batch_context(
batch_size=10000,
parallel=8,
progress=True,
progress_desc="Importing products"
) as batch:
batch.set_total(len(products))
for product in products:
batch.create_vertex("Product", **product)
elapsed = time.time() - start
throughput = len(products) / elapsed
print(f"\n✅ Imported {len(products):,} products")
print(f"⏱️ Time: {elapsed:.2f}s")
print(f"🚀 Throughput: {throughput:,.0f} records/sec")
# Verify
count = db.count_type("Product")
print(f"✓ Verified: {count:,} products in database")
db.close()
Comparison with AsyncExecutor¶
| Feature | BatchContext | AsyncExecutor |
|---|---|---|
| Ease of Use | ✅ Simple API | ⚠️ More complex |
| Progress Bar | ✅ Built-in | ❌ Manual |
| Error Collection | ✅ Automatic | ⚠️ Manual callbacks |
| Auto-Cleanup | ✅ Context manager | ⚠️ Manual close |
| Fine Control | ⚠️ Limited | ✅ Full control |
| Use Case | Most bulk operations | Advanced tuning |
Best Practices¶
1. Create Schema First¶
# ✅ Good: Schema defined before batch
db.schema.create_vertex_type("User")
db.schema.create_property("User", "userId", "LONG")
db.schema.create_index("User", ["userId"], unique=True)
with db.batch_context() as batch:
# Batch operations
pass
2. Use Context Manager¶
# ✅ Good: Automatic cleanup
with db.batch_context() as batch:
# Operations
pass
# ❌ Bad: Manual management
batch = db.batch_context()
batch.__enter__()
# Operations
batch.__exit__(None, None, None) # Easy to forget!
3. Check for Errors¶
with db.batch_context() as batch:
for record in dataset:
batch.create_vertex("User", **record)
# ✅ Always check for errors
if batch.get_errors():
print(f"Failed: {len(batch.get_errors())} errors")
# Handle errors
4. Enable Progress for Long Operations¶
# ✅ Good: User feedback for long operations
with db.batch_context(progress=True) as batch:
batch.set_total(len(large_dataset))
for item in large_dataset:
batch.create_vertex("Item", **item)
See Also¶
- AsyncExecutor API - Lower-level async operations
- Transactions API - Transaction management
- Database API - Database operations
- Importer API - CSV/XML import utilities
- Example 04: CSV Import - Batch import example
- Example 05: Graph Import - Graph bulk loading