Skip to content

AsyncExecutor API

Experimental

async_executor() is experimental and not advised for production use yet. Prefer standard transactions and synchronous workflows unless you are explicitly testing or benchmarking.

The AsyncExecutor provides low-level async operations for parallel processing, automatic batching, and optimized WAL operations.

Using Context Managers

For automatic resource cleanup, prefer using context managers:

with arcadedb.create_database("./mydb") as db:
    async_exec = db.async_executor()
    async_exec.set_parallel_level(8)
    # Use for bulk operations...
    async_exec.wait_completion()
# Database automatically closed
Examples below show explicit db.close() for clarity, but context managers are recommended in production.

Overview

The AsyncExecutor class enables:

  • Parallel Execution: 1-16 worker threads for concurrent operations
  • Automatic Batching: Auto-commit every N operations
  • Optimized WAL: Configurable Write-Ahead Log settings
  • High Performance: 50,000-200,000 records/sec throughput
  • Fluent Interface: Method chaining for configuration

Getting AsyncExecutor

import arcadedb_embedded as arcadedb

db = arcadedb.create_database("./mydb")

# Get async executor
async_exec = db.async_executor()

# Configure (all methods return self for chaining)
async_exec.set_parallel_level(8)       # 8 worker threads
async_exec.set_commit_every(5000)      # Auto-commit every 5K ops
async_exec.set_back_pressure(75)       # Queue back-pressure at 75%

# Use for bulk operations
for i in range(100000):
    vertex = db.new_vertex("User")
    vertex.set("userId", i)
    async_exec.create_record(vertex)

# Wait for completion
async_exec.wait_completion()

# Clean up worker threads
async_exec.close()

db.close()

Configuration Methods

All configuration methods return self for method chaining.

set_parallel_level

async_exec.set_parallel_level(level: int) -> AsyncExecutor

Set number of parallel worker threads (1-16).

Parameters:

  • level (int): Number of worker threads

Returns:

  • AsyncExecutor: Self for chaining

Guidelines:

  • CPU-bound: Match CPU cores (4-8)
  • I/O-bound: Can exceed cores (8-16)
  • Default: 4

Example:

# Configure for 8-core CPU
async_exec = db.async_executor().set_parallel_level(8)

set_commit_every

async_exec.set_commit_every(count: int) -> AsyncExecutor

Set auto-commit batch size. Commits transaction every N operations.

Parameters:

  • count (int): Number of operations before commit (0 = no auto-commit)

Returns:

  • AsyncExecutor: Self for chaining

Guidelines:

  • Small datasets (< 10K): 1000-2000
  • Medium datasets (10K-100K): 5000
  • Large datasets (> 100K): 10000-20000

Example:

# Auto-commit every 10K operations
async_exec = db.async_executor().set_commit_every(10000)

set_transaction_use_wal

async_exec.set_transaction_use_wal(enabled: bool) -> AsyncExecutor

Enable or disable Write-Ahead Log for transactions.

Parameters:

  • enabled (bool): True to enable WAL (durability), False for speed

Returns:

  • AsyncExecutor: Self for chaining

Note: Disabling WAL increases speed but reduces durability.

Example:

# Disable WAL for maximum speed (less durable)
async_exec = db.async_executor().set_transaction_use_wal(False)

set_back_pressure

async_exec.set_back_pressure(threshold: int) -> AsyncExecutor

Set queue back-pressure threshold (0-100).

Parameters:

  • threshold (int): Percentage (0=disabled, 100=always)

Returns:

  • AsyncExecutor: Self for chaining

How it works:

  • Queue fills up → Back-pressure kicks in
  • Slows down enqueue operations
  • Prevents memory overflow
  • 0 = disabled, 50-75 = recommended

Example:

# Set back-pressure at 75% full
async_exec = db.async_executor().set_back_pressure(75)

Method Chaining

# Chain all configurations
async_exec = (db.async_executor()
    .set_parallel_level(8)
    .set_commit_every(10000)
    .set_transaction_use_wal(True)
    .set_back_pressure(75)
)

Operation Methods

create_record

async_exec.create_record(
    record,
    callback: Optional[Callable] = None
)

Schedule asynchronous record creation.

Parameters:

  • record: Document, Vertex, or Edge object to create
  • callback (Optional[Callable]): Success callback

Example:

async_exec = db.async_executor()

for i in range(10000):
    vertex = db.new_vertex("User")
    vertex.set("userId", i)
    vertex.set("name", f"User {i}")
    async_exec.create_record(vertex)

async_exec.wait_completion()
async_exec.close()

update_record

async_exec.update_record(
    record,
    callback: Optional[Callable] = None
)

Schedule asynchronous record update.

Parameters:

  • record: Document, Vertex, or Edge object to update
  • callback (Optional[Callable]): Success callback

Example:

# Query records
results = list(db.query("sql", "SELECT FROM User WHERE active = false"))

async_exec = db.async_executor()

for result in results:
    element = result.get_element()
    mutable = element.modify()
    mutable.set("active", True)
    async_exec.update_record(mutable)

async_exec.wait_completion()
async_exec.close()

delete_record

async_exec.delete_record(
    record,
    callback: Optional[Callable] = None
)

Schedule asynchronous record deletion.

Parameters:

  • record: Document, Vertex, or Edge object to delete
  • callback (Optional[Callable]): Success callback

Example:

# Delete old records
to_delete = list(db.query("sql", "SELECT FROM LogEntry WHERE timestamp < ?",
                          cutoff_date))

async_exec = db.async_executor()

for result in to_delete:
    element = result.get_element()
    async_exec.delete_record(element)

async_exec.wait_completion()
async_exec.close()

query

async_exec.query(
    language: str,
    query: str,
    callback: Callable,
    **params
)

Execute async query.

Parameters:

  • language (str): Query language ("sql", "cypher", etc.)
  • query (str): Query string
  • callback (Callable): Callback for query results
  • **params: Query parameters

Example:

def process_results(resultset):
    for result in resultset:
        print(result.get("name"))

async_exec = db.async_executor()
async_exec.query("sql", "SELECT FROM User WHERE age > 18", process_results)
async_exec.wait_completion()
async_exec.close()

command

async_exec.command(
    language: str,
    command: str,
    callback: Callable,
    **params
)

Execute async command.

Parameters:

  • language (str): Command language ("sql", etc.)
  • command (str): Command string
  • callback (Callable): Callback for command results
  • **params: Command parameters

Status Methods

wait_completion

async_exec.wait_completion(timeout: Optional[float] = None)

Wait for all pending operations to complete.

Parameters:

  • timeout (Optional[float]): Max wait time in seconds (None = forever)

Note: Always call before closing executor or database.

Example:

async_exec = db.async_executor()

# Queue operations
for i in range(10000):
    vertex = db.new_vertex("User")
    vertex.set("userId", i)
    async_exec.create_record(vertex)

# Wait for all to complete
async_exec.wait_completion()

# Now safe to close
async_exec.close()

is_pending

async_exec.is_pending() -> bool

Check if operations are still pending.

Returns:

  • bool: True if operations in progress

Example:

while async_exec.is_pending():
    print("Still processing...")
    time.sleep(1)

close

async_exec.close()

Shutdown worker threads and clean up resources.

Note: Always call after wait_completion().

Example:

try:
    async_exec = db.async_executor()
    # Operations
    async_exec.wait_completion()
finally:
    async_exec.close()

Complete Example

import arcadedb_embedded as arcadedb
import time

# Create database
db = arcadedb.create_database("./async_demo")

# Create schema (ArcadeDB SQL DDL)
db.command("sql", "CREATE VERTEX TYPE Product")
db.command("sql", "CREATE PROPERTY Product.productId LONG")
db.command("sql", "CREATE PROPERTY Product.name STRING")
db.command("sql", "CREATE PROPERTY Product.price DECIMAL")
db.command("sql", "CREATE INDEX ON Product (productId) UNIQUE")

# Prepare async executor
async_exec = (db.async_executor()
    .set_parallel_level(8)
    .set_commit_every(10000)
    .set_back_pressure(75)
)

# Measure performance
start = time.time()

# Create 100K vertices asynchronously
for i in range(100000):
    vertex = db.new_vertex("Product")
    vertex.set("productId", i)
    vertex.set("name", f"Product {i}")
    vertex.set("price", i * 10.5)
    async_exec.create_record(vertex)

# Wait for completion
async_exec.wait_completion()

elapsed = time.time() - start
throughput = 100000 / elapsed

print(f"✅ Created 100,000 vertices")
print(f"⏱️  Time: {elapsed:.2f}s")
print(f"🚀 Throughput: {throughput:,.0f} records/sec")

# Clean up
async_exec.close()
db.close()

Performance Comparison

import time

# Synchronous (baseline)
start = time.time()
with db.transaction():
    for i in range(10000):
        vertex = db.new_vertex("User")
        vertex.set("userId", i)
        vertex.save()
sync_time = time.time() - start

# Asynchronous
start = time.time()
async_exec = db.async_executor().set_parallel_level(8)
for i in range(10000):
    vertex = db.new_vertex("User")
    vertex.set("userId", i)
    async_exec.create_record(vertex)
async_exec.wait_completion()
async_exec.close()
async_time = time.time() - start

print(f"Synchronous: {10000 / sync_time:,.0f} records/sec")
print(f"Asynchronous: {10000 / async_time:,.0f} records/sec")
print(f"Speedup: {sync_time / async_time:.1f}x")

Typical Results: - Synchronous: 15,000-30,000 records/sec - Asynchronous: 50,000-200,000 records/sec - Speedup: 3-5x

Best Practices

0. Set a Commit Cadence

async_exec = db.async_executor()
async_exec.set_commit_every(500)  # Ensures async writes are persisted transactionally
  • Configure set_commit_every() for every async workload so writes are grouped into transactions.
  • Tune the batch size to balance commit overhead and memory.

1. Always Close the Executor

# ✅ Good: Use try/finally
async_exec = db.async_executor()
try:
    # Operations
    async_exec.wait_completion()
finally:
    async_exec.close()

2. Wait Before Closing

# ✅ Good: Wait first
async_exec.wait_completion()
async_exec.close()

# ❌ Bad: Close without waiting
async_exec.close()  # Operations may be lost!

3. Use Appropriate Batch Size

# ✅ Good: Tune for dataset size
if record_count < 10000:
    async_exec.set_commit_every(2000)
elif record_count < 100000:
    async_exec.set_commit_every(5000)
else:
    async_exec.set_commit_every(20000)

4. Match Parallelism to Hardware

import os

# ✅ Good: Match CPU cores
cpu_count = os.cpu_count() or 4
async_exec.set_parallel_level(min(cpu_count, 16))

Comparison with BatchContext

Feature AsyncExecutor BatchContext
Control ✅ Full control ⚠️ High-level API
Complexity ⚠️ More complex ✅ Simple
Progress Bar ❌ Manual ✅ Built-in
Error Handling ⚠️ Callbacks ✅ Automatic collection
Use Case Advanced tuning Most bulk ops

When to use AsyncExecutor: - Need fine-grained control - Custom callbacks required - Performance tuning critical

When to use BatchContext: - Standard bulk operations - Want progress tracking - Prefer simple API

Troubleshooting

Out of Memory Errors

# Reduce back-pressure threshold
async_exec.set_back_pressure(50)  # Slow down enqueue

# Or reduce parallel level
async_exec.set_parallel_level(4)  # Fewer workers

Slow Performance

# Increase parallelism
async_exec.set_parallel_level(16)

# Increase batch size
async_exec.set_commit_every(20000)

# Consider disabling WAL (less durable!)
async_exec.set_transaction_use_wal(False)

Operations Not Completing

# Always call wait_completion()
async_exec.wait_completion()

# Check for pending operations
if async_exec.is_pending():
    print("Still processing...")

See Also