Skip to content

Data Import Guide

This guide covers strategies, best practices, and patterns for importing data into ArcadeDB efficiently and reliably.

Overview

ArcadeDB's Importer supports multiple data formats:

  • CSV: Tabular data with headers
  • ArcadeDB JSONL export/import: Full database export/restore via IMPORT DATABASE

Key Features:

  • Automatic type inference
  • Batch processing for performance
  • Relationship/edge mapping
  • Schema validation
  • Error handling and recovery

Quick Start

CSV Import

import arcadedb_embedded as arcadedb
from arcadedb_embedded import Importer

with arcadedb.create_database("./mydb") as db:
    db.command("sql", "CREATE VERTEX TYPE Product")

    importer = Importer(db)
    stats = importer.import_file(
        file_path="products.csv",
        format_type="csv",
        import_type="vertices",   # create vertices
        type_name="Product",
        typeIdProperty="id",      # REQUIRED for vertices/edges
        commitEvery=5000,         # batch size (default ~5000)
    )

    print("Import complete!", stats)

# Convenience helper (same importer under the hood, tested in bindings):
# arcadedb.import_csv(db, "products.csv", "Product", import_type="vertices", typeIdProperty="id")

ArcadeDB JSONL Import (full database)

# Import ArcadeDB JSONL export (schema + data)
db.command("sql", "IMPORT DATABASE file:///exports/mydb.jsonl.tgz WITH commitEvery = 50000")

Format Selection

CSV - Tabular Data

Best For:

  • Spreadsheet data
  • Relational database exports
  • Time-series data
  • Simple structured data

Advantages:

  • Simple format
  • Excel/LibreOffice compatible
  • Wide tool support
  • Human readable

Disadvantages:

  • No nested structures
  • Limited type information
  • Relationships require separate files

Example:

id,name,email,age
1,Alice,alice@example.com,30
2,Bob,bob@example.com,25

ArcadeDB JSONL - Full Database Restore

Best For:

  • Re-importing ArcadeDB EXPORT DATABASE outputs
  • Moving databases between environments
  • Backups and restores with schema + data

Advantages:

  • Preserves full database (schema, indexes, data)
  • Single command: IMPORT DATABASE file://...
  • Works with compressed .jsonl.tgz exports

Disadvantages:

  • Full-database scope (not selective)
  • Requires access to ArcadeDB server or embedded instance

Schema Design

Pre-create Schema

Recommended: Define schema before importing for better control and validation.

with arcadedb.create_database("./mydb") as db:
    db.command("sql", "CREATE VERTEX TYPE User")
    db.command("sql", "CREATE PROPERTY User.id STRING")
    db.command("sql", "CREATE PROPERTY User.name STRING")
    db.command("sql", "CREATE PROPERTY User.email STRING")
    db.command("sql", "CREATE PROPERTY User.age INTEGER")
    db.command("sql", "CREATE INDEX ON User (id) UNIQUE")
    db.command("sql", "CREATE INDEX ON User (email) UNIQUE")

    importer = Importer(db)
    importer.import_file(
        file_path="users.csv",
        format_type="csv",
        import_type="vertices",
        type_name="User",
        typeIdProperty="id",
    )

Benefits:

  • Type safety
  • Validation
  • Better performance
  • Prevents errors

Let Importer Infer

Quick Start: Let importer create schema automatically.

# No schema definition needed
from arcadedb_embedded import Importer

with arcadedb.create_database("./mydb") as db:
    importer = Importer(db)
    importer.import_file(
        file_path="users.csv",
        import_type="vertices",
        type_name="User",
        typeIdProperty="id",
    )

Auto-inference:

  • Creates vertex type if missing
  • Infers property types from data
  • Creates properties as needed

Trade-offs:

  • Quick to start
  • Less control
  • Types may be wrong
  • No validation

Hybrid Approach

Best of Both: Define critical fields, allow others to be inferred.

with arcadedb.create_database("./mydb") as db:
    # Define only the critical parts; importer will add the rest
    db.command("sql", "CREATE VERTEX TYPE User")
    db.command("sql", "CREATE PROPERTY User.id STRING")
    db.command("sql", "CREATE INDEX ON User (id) UNIQUE")

    importer = Importer(db)
    importer.import_file(
        file_path="users.csv",
        format_type="csv",
        import_type="vertices",
        type_name="User",
        typeIdProperty="id",
    )

Performance Optimization

Batch Size (commitEvery)

Control transaction batch size for memory vs. speed trade-off via the commitEvery option:

from arcadedb_embedded import Importer
importer = Importer(db)

# Small batches: lower memory, more transactions
importer.import_file(
    file_path="large_file.csv",
    import_type="documents",
    type_name="Data",
    commitEvery=1000,
)

# Medium batches: balanced (default ~5000)
importer.import_file(
    file_path="large_file.csv",
    import_type="documents",
    type_name="Data",
    commitEvery=10000,
)

# Large batches: higher memory, fewer transactions
importer.import_file(
    file_path="large_file.csv",
    import_type="documents",
    type_name="Data",
    commitEvery=100000,
)

Guidelines:

Dataset Size Recommended Batch Size
< 100K rows 1,000 - 10,000
100K - 1M 10,000 - 50,000
> 1M rows 50,000 - 100,000

Consider:

  • Available memory
  • Record size
  • Concurrent operations
  • Disk I/O

Parallel Processing

Java Importer Uses Multi-threading by Default:

The Java CSV importer automatically uses parallel processing with multiple threads:

# Default: Uses (CPU count / 2) - 1 threads (minimum 1)
# Example: 8 CPU cores → 3 threads
stats = importer.import_file('large_file.csv', type_name='Data')

# Specify custom thread count
stats = importer.import_file(
    'large_file.csv',
    type_name='Data',
    parallel=8  # Use 8 threads
)

# Disable parallelism (single-threaded)
stats = importer.import_file(
    'large_file.csv',
    type_name='Data',
    parallel=1
)

Important Notes:

  • The Java importer handles parallelism internally using native Java threads
  • You don't need to split files manually for parallel processing
  • The parallel parameter controls Java's internal thread pool
  • Default is conservative: (CPU_COUNT / 2) - 1 with minimum of 1
  • For large imports, consider increasing to match your CPU cores

Manual File Splitting (Advanced):

For very large files or special requirements, you can split files and import chunks:

import concurrent.futures
import os

def split_csv(input_file, chunk_size=100000):
    """Split large CSV into chunks."""
    chunks = []
    chunk_num = 0

    with open(input_file, 'r') as f:
        header = f.readline()

        chunk_file = f"chunk_{chunk_num}.csv"
        chunk_writer = open(chunk_file, 'w')
        chunk_writer.write(header)
        chunks.append(chunk_file)

        line_count = 0
        for line in f:
            chunk_writer.write(line)
            line_count += 1

            if line_count >= chunk_size:
                chunk_writer.close()
                chunk_num += 1
                chunk_file = f"chunk_{chunk_num}.csv"
                chunk_writer = open(chunk_file, 'w')
                chunk_writer.write(header)
                chunks.append(chunk_file)
                line_count = 0

        chunk_writer.close()

    return chunks

def import_chunk(db_path, chunk_file, vertex_type, type_id_property="id"):
    """Import single chunk as vertices."""
    from arcadedb_embedded import Importer
    with arcadedb.open_database(db_path) as db:
        importer = Importer(db)
        importer.import_file(
            file_path=chunk_file,
            import_type="vertices",
            type_name=vertex_type,
            typeIdProperty=type_id_property,
            commitEvery=10000,
        )
    os.remove(chunk_file)

# Split file
chunks = split_csv("large_data.csv", chunk_size=100000)

# Import in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(import_chunk, "./mydb", chunk, "Data", "id")
        for chunk in chunks
    ]

    for future in concurrent.futures.as_completed(futures):
        future.result()

print(f"Imported {len(chunks)} chunks")

Disable Indexes During Import

For massive imports, temporarily disable indexes:

# 1. Drop indexes
db.command("sql", "DROP INDEX `User[email]`")
db.command("sql", "DROP INDEX `User[id]`")

# 2. Import data (vertices)
stats = importer.import_file(
    file_path="huge_file.csv",
    import_type="vertices",
    type_name="User",
    typeIdProperty="id",
    commitEvery=100000,
)

# 3. Recreate indexes
db.command("sql", "CREATE INDEX ON User (id) UNIQUE")
db.command("sql", "CREATE INDEX ON User (email) UNIQUE")

Speed Improvement: 2-5x faster for large imports


Error Handling

Validation Before Import

import csv

def validate_csv(file_path, required_columns):
    """Validate CSV before importing."""
    errors = []

    try:
        with open(file_path, 'r') as f:
            reader = csv.DictReader(f)

            # Check headers
            missing = set(required_columns) - set(reader.fieldnames)
            if missing:
                errors.append(f"Missing columns: {missing}")
                return False, errors

            # Check data
            for i, row in enumerate(reader, start=2):
                # Validate required fields
                for col in required_columns:
                    if not row.get(col):
                        errors.append(f"Line {i}: Missing {col}")

                # Validate types (example)
                if row.get('age') and not row['age'].isdigit():
                    errors.append(f"Line {i}: Invalid age '{row['age']}'")

                # Stop after 100 errors
                if len(errors) >= 100:
                    errors.append("... more errors found")
                    return False, errors

        return len(errors) == 0, errors

    except Exception as e:
        return False, [f"File error: {e}"]

# Validate before import
valid, errors = validate_csv("users.csv", ["id", "name", "email"])
if valid:
    stats = importer.import_file(
        file_path="users.csv",
        import_type="vertices",
        type_name="User",
        typeIdProperty="id",
    )
    print("Imported:", stats)
else:
    print("Validation errors:")
    for error in errors:
        print(f"  - {error}")

Relationship Mapping

CSV with Relationships

Import entities and relationships from separate CSV files:

# Step 1: Import users
stats = importer.import_file(
    file_path="users.csv",
    import_type="vertices",
    type_name="User",
    typeIdProperty="id",
)

# Step 2: Import products
stats = importer.import_file(
    file_path="products.csv",
    import_type="vertices",
    type_name="Product",
    typeIdProperty="id",
)

# Step 3: Import relationships from CSV
# purchases.csv:
# user_id,product_id,date,amount
# 1,101,2024-01-15,29.99

import csv

with open("purchases.csv", 'r') as f:
    reader = csv.DictReader(f)

    with db.transaction():
        for row in reader:
            # Find vertices
            user_result = db.query("sql",
                f"SELECT FROM User WHERE id = '{row['user_id']}'")
            product_result = db.query("sql",
                f"SELECT FROM Product WHERE id = '{row['product_id']}'")

            if user_result.has_next() and product_result.has_next():
                db.command(
                    "sql",
                    """
                    CREATE EDGE Purchased
                    FROM (SELECT FROM User WHERE id = ?)
                    TO (SELECT FROM Product WHERE id = ?)
                    SET date = ?, amount = ?
                    """,
                    row["user_id"],
                    row["product_id"],
                    row["date"],
                    float(row["amount"]),
                )

JSON with Embedded Relationships

# orders.json structure:
# [
#   {
#     "order_id": "ORD123",
#     "customer": {"id": "CUST1", "name": "Alice"},
#     "items": [
#       {"product_id": "PROD1", "qty": 2},
#       {"product_id": "PROD2", "qty": 1}
#     ]
#   }
# ]

import json

with open("orders.json", 'r') as f:
    orders = json.load(f)

with db.transaction():
    for order in orders:
        # Create or find customer
        customer_id = order['customer']['id']
        customer_result = db.query("sql",
            f"SELECT FROM Customer WHERE id = '{customer_id}'")

        if customer_result.has_next():
            customer = customer_result.next()
        else:
            db.command(
                "sql",
                "INSERT INTO Customer SET id = ?, name = ?",
                customer_id,
                order["customer"]["name"],
            )

        # Create order vertex
        db.command("sql", "INSERT INTO Order SET order_id = ?", order["order_id"])

        # Link customer to order
        db.command(
            "sql",
            "CREATE EDGE Placed FROM (SELECT FROM Customer WHERE id = ?) TO (SELECT FROM Order WHERE order_id = ?)",
            customer_id,
            order["order_id"],
        )

        # Link order to products
        for item in order['items']:
            product_result = db.query("sql",
                f"SELECT FROM Product WHERE id = '{item['product_id']}'")

            if product_result.has_next():
                db.command(
                    "sql",
                    """
                    CREATE EDGE Contains
                    FROM (SELECT FROM Order WHERE order_id = ?)
                    TO (SELECT FROM Product WHERE id = ?)
                    SET quantity = ?
                    """,
                    order["order_id"],
                    item["product_id"],
                    item["qty"],
                )

See Also