Big Data Processing with Spark 4.0

·16 min read·Data and AIintermediate

As data volumes and streaming latency demands grow, Spark 4.0 introduces practical improvements that simplify real-time pipelines and reduce operational overhead for teams building at scale.

A compact server rack with multiple nodes and network cables representing a distributed Apache Spark processing cluster running on-premise or in the cloud

When I first moved from batch ETL to streaming, I expected Spark Structured Streaming to be the “easy button.” It was, mostly, until we hit production realities: late data, watermark tuning, schema evolution, and incremental maintenance of stateful pipelines. Spark 4.0 didn’t rewrite the universe, but it nudged several of those sharp edges into smoother curves. The new visual explain plan, the improved state store metrics, and the extended DataFrame APIs make day-to-day debugging and iteration less painful.

If you are building data pipelines that process terabytes per day or feeding features to machine learning models, you likely care about consistency, throughput, and cost. Spark 4.0 strengthens Structured Streaming and DataFrame ergonomics while keeping PySpark and Scala/Java APIs aligned. In this post, I will walk through where Spark fits today, what’s practically new in 4.0, and how to structure a project that balances developer velocity with operational reliability.

Where Spark 4.0 fits in the modern data stack

Spark remains the workhorse for large-scale data processing. You will find it orchestrating lakehouse transformations in Databricks and Snowflake, powering streaming ingestion from Kafka in adtech and fintech, and serving as a feature engineering backbone for ML pipelines. It sits between raw storage and consumption layers (dashboards, ML models, APIs), handling both batch and streaming workloads with a unified API.

Compared to alternatives:

  • Flink: Often preferred for ultra-low-latency streaming, but Spark’s unified batch/streaming model and ecosystem maturity make it a safer bet for general data engineering teams.
  • Trino/Presto: Excellent for interactive SQL and data lake queries, but not designed for complex stateful streaming or iterative ML.
  • BigQuery/Redshift/Snowflake: Strong for analytics and SQL workloads; Spark complements them by handling preprocessing, ETL, and model feature engineering outside the data warehouse.
  • Dask: Python-native and flexible, but Spark’s distributed execution and JVM optimizations are more battle-tested for petabyte-scale data and production monitoring.

Spark 4.0 continues to focus on developer experience, observability, and incremental processing. This matters right now because organizations are pushing more workloads to real-time, adopting the medallion architecture (bronze/silver/gold), and trying to keep costs predictable. Spark 4.0 makes these patterns more approachable.

Core concepts in Spark 4.0

Unified batch and streaming with Structured Streaming

Structured Streaming treats streams as unbounded tables, allowing you to write the same DataFrame transformations for batch and streaming. In 4.0, debuggability improves via better plan visualization and metrics, which are critical when diagnosing watermark gaps or state store issues.

Conceptually, a streaming pipeline:

  • Reads from a source (Kafka, file source, Rate).
  • Applies transformations (filter, join, aggregation).
  • Writes to a sink (delta, parquet, Kafka, console) with an output mode and trigger.

A minimal streaming pattern looks like this in Python:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

spark = SparkSession.builder \
    .appName("spark4-streaming-quickstart") \
    .getOrCreate()

# Define a JSON schema for incoming events
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("user_id", LongType(), True),
    StructField("ts", TimestampType(), True),
    StructField("action", StringType(), True),
])

# Read from Kafka topic
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse JSON payload
parsed_df = raw_df.select(
    from_json(col("value").cast("string"), event_schema).alias("data")
).select("data.*")

# Use event time for watermarking
enriched_df = parsed_df \
    .withWatermark("ts", "10 minutes") \
    .groupBy(col("action"), col("ts").alias("window_start")) \
    .count()

# Write stream to console (for dev); switch to Delta in production
query = enriched_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Real-world tips:

  • Use event-time timestamps, not processing-time, for consistent windows.
  • Start with “append” mode for aggregations only when you can use mapGroupsWithState or flatMapGroupsWithState for incremental updates; otherwise, “complete” can be simpler for small aggregated results.
  • In production, write streaming checkpoints to durable storage (e.g., S3 or HDFS) and monitor state metrics.

The DataFrame API and the typed Dataset feel

If you come from pandas or SQL, the DataFrame API feels familiar. In Scala/Java, Datasets add compile-time type safety, which catches schema mismatches early. In Python, you still get schema enforcement via StructType and Catalyst optimizer benefits. Spark 4.0 continues to polish the API ergonomics; for example, explain plans are easier to read, which helps when debugging complex joins or skewed partitions.

Catalyst optimizer and Adaptive Query Execution (AQE)

Catalyst is Spark’s query optimizer; AQE adapts the physical plan at runtime by coalescing small partitions and optimizing skew joins. This is especially helpful for ETL jobs with uneven data distribution. With Spark 4.0, plan visualization improvements make it easier to see the impact of AQE decisions. In practice, this reduces shuffle I/O and improves throughput without code changes.

Schema evolution and the lakehouse pattern

Delta Lake (often paired with Spark) supports schema evolution, allowing you to safely add columns as upstream data changes. In Spark 4.0, DataFrame operations and streaming integrations make this flow smoother. A common pattern:

  • Bronze: Ingest raw events into Delta with schema evolution enabled.
  • Silver: Clean, deduplicate, and enforce types; use watermarks to manage late data.
  • Gold: Aggregate features for BI dashboards or ML training sets.

Stateful processing

Stateful streaming is powerful and tricky. mapGroupsWithState and flatMapGroupsWithState allow custom state management per key. Spark 4.0’s state store metrics and logging improvements help you track memory usage and checkpoint consistency. Common use cases: sessionization, fraud detection with sliding windows, or incremental feature computation.

Practical examples grounded in real usage

Batch ETL pipeline with Delta Lake and schema evolution

Suppose you are building a daily ETL that ingests CSV files, cleans them, and appends to a Silver Delta table. You want to allow new columns without breaking downstream consumers.

Project structure:

spark4-etl/
├── jobs/
│   ├── ingest.py
│   ├── clean.py
│   └── aggregate.py
├── schemas/
│   └── events_schema.json
├── configs/
│   └── dev.yaml
├── tests/
│   └── test_clean.py
├── Dockerfile
├── requirements.txt
└── README.md

A minimal ingestion job with schema inference and evolution:

# jobs/ingest.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

def main():
    spark = SparkSession.builder \
        .appName("ingest-events") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    raw_path = "/data/raw/events/*.csv"
    output_path = "/data/bronze/events"

    # Define base schema; missing columns will be inferred if enabled
    base_schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("user_id", LongType(), True),
        StructField("ts", TimestampType(), True),
        StructField("action", StringType(), True),
    ])

    df = spark.read \
        .option("header", "true") \
        .schema(base_schema) \
        .csv(raw_path)

    # Write to Delta with schema evolution enabled
    df.write \
        .format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .save(output_path)

if __name__ == "__main__":
    main()

To query the Bronze table in downstream jobs, use the Delta API:

# jobs/clean.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, upper

def main():
    spark = SparkSession.builder \
        .appName("clean-events") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    bronze_path = "/data/bronze/events"
    silver_path = "/data/silver/events"

    # Read latest snapshot
    bronze_df = spark.read.format("delta").load(bronze_path)

    cleaned = bronze_df \
        .withColumn("action", upper(trim(col("action")))) \
        .filter(col("ts").isNotNull()) \
        .dropDuplicates(["event_id"])

    cleaned.write \
        .format("delta") \
        .mode("append") \
        .save(silver_path)

if __name__ == "__main__":
    main()

Notes from real projects:

  • Use a daily batch cadence and add a “data quality check” step that asserts non-null constraints and valid ranges before writing to Silver.
  • Track row counts and schema drift via a lightweight audit table. Spark 4.0’s explain plan readability helps when queries become complex.
  • If downstream jobs are BI tools, consider creating a Gold layer with partitioning by date to optimize scans.

Streaming pipeline with watermarking and state

Here is a streaming job that counts actions per user within sliding windows and writes results to a Delta sink. We’ll use event-time and watermarks to handle late data.

# jobs/streaming_actions.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

def main():
    spark = SparkSession.builder \
        .appName("streaming-actions") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("user_id", LongType(), True),
        StructField("ts", TimestampType(), True),
        StructField("action", StringType(), True),
    ])

    # Read Kafka stream
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .load()

    parsed = kafka_df.select(
        from_json(col("value").cast("string"), schema).alias("data")
    ).select("data.*")

    # Watermark and windowed aggregation
    windowed = parsed \
        .withWatermark("ts", "5 minutes") \
        .groupBy(
            col("user_id"),
            window(col("ts"), "10 minutes", "5 minutes"),
            col("action")
        ) \
        .count()

    # Write to Delta sink
    query = windowed.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/checkpoints/actions") \
        .trigger(processingTime="30 seconds") \
        .start("/data/gold/actions_per_user")

    query.awaitTermination()

if __name__ == "__main__":
    main()

Observability tips:

  • In Spark 4.0, visual explain plans (via the Spark UI or programmatic explain) help identify whether watermarks are being respected and whether partitions are too small or too large.
  • Monitor state metrics; if memory pressure appears, consider increasing the checkpoint interval or pruning unnecessary keys before grouping.
  • For production streaming, prefer the Delta sink for its transactional guarantees and schema support.

Custom stateful logic with mapGroupsWithState

Sometimes count-based aggregations are not enough; you might need to track a per-user state (e.g., session end time, flags). mapGroupsWithState allows custom state updates.

# jobs/sessionization.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from pyspark.sql.streaming import GroupState, GroupStateTimeout

class UserSession:
    def __init__(self, user_id, start_ts, end_ts, actions):
        self.user_id = user_id
        self.start_ts = start_ts
        self.end_ts = end_ts
        self.actions = actions  # list of strings

def update_session_state(key, rows, state: GroupState):
    if not state.exists:
        # Initialize new session
        first_row = next(rows, None)
        if first_row:
            session = UserSession(
                user_id=key[0],
                start_ts=first_row["ts"],
                end_ts=first_row["ts"],
                actions=[first_row["action"]]
            )
            state.update(session)
            state.setTimeoutDuration("5 minutes")
    else:
        session = state.get
        for row in rows:
            if row["ts"] > session.end_ts:
                session.end_ts = row["ts"]
            session.actions.append(row["action"])
        state.update(session)
        state.setTimeoutDuration("5 minutes")

    # Yield aggregated session if timeout or finalizing
    if state.hasTimedOut:
        s = state.get
        yield (s.user_id, s.start_ts, s.end_ts, s.actions)

def main():
    spark = SparkSession.builder \
        .appName("sessionization") \
        .getOrCreate()

    schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("user_id", LongType(), True),
        StructField("ts", TimestampType(), True),
        StructField("action", StringType(), True),
    ])

    # Parse input (assume source is preprocessed from Kafka or file source)
    parsed_df = spark.readStream \
        .format("rate") \
        .option("rowsPerSecond", "10") \
        .load() \
        .select(from_json(col("value").cast("string"), schema).alias("data")) \
        .select("data.*")

    sessions = parsed_df \
        .withWatermark("ts", "10 minutes") \
        .groupby(col("user_id")) \
        .applyInPandasWithState(
            update_session_state,
            "user_id long, start_ts timestamp, end_ts timestamp, actions array<string>",
            "session_state",
            GroupStateTimeout.ProcessingTimeTimeout
        )

    # Write to Delta or console for dev
    query = sessions.writeStream \
        .format("console") \
        .outputMode("update") \
        .option("truncate", "false") \
        .start()

    query.awaitTermination()

if __name__ == "__main__":
    main()

Contextual notes:

  • mapGroupsWithState is more efficient than complete-mode aggregations when you need incremental updates and custom state.
  • In Spark 4.0, the improved explain and metrics help validate that stateful operators are being applied correctly.
  • Use careful timeouts and checkpointing; stateful streaming can be sensitive to failures if checkpoints are not on durable storage.

Honest evaluation: strengths, weaknesses, tradeoffs

Strengths

  • Unified API: Same code for batch and streaming reduces duplication and cognitive load.
  • Performance: Catalyst and AQE provide robust optimization, especially for skewed data.
  • Ecosystem: Broad connectors (Kafka, S3, HDFS, JDBC), Delta Lake integration, MLlib, and graph processing via GraphFrames.
  • Operational maturity: The Spark UI, metrics, and logging give deep visibility; Spark 4.0’s visual explain plan makes it easier to share and reason about plans with teammates.

Weaknesses

  • Latency: Spark’s micro-batch model is not ideal for sub-second latency; Flink may be better there.
  • Stateful complexity: Custom state logic requires careful testing; memory and checkpoint management can be tricky.
  • Tuning overhead: Partition sizing, shuffle behavior, and AQE settings can demand experimentation.
  • Cost: For purely SQL analytics, cloud data warehouses might be more cost-effective and simpler.

When to choose Spark 4.0

  • Choose Spark if you need unified batch/streaming, large-scale ETL, or ML feature engineering with strong ecosystem support.
  • Choose Flink if ultra-low latency and event-time correctness are non-negotiable (e.g., complex event processing).
  • Choose Trino/Presto if you primarily need fast SQL queries over lake data and don’t need streaming state.

Personal experience: lessons from the trenches

I once debugged a streaming job where late events were not being handled, causing downstream counts to drift. The issue wasn’t the watermark value itself; it was that we used the processing timestamp instead of the event timestamp. After switching to event time and increasing the watermark tolerance, the drift disappeared. Spark 4.0’s improved explain plan would have made this more obvious; now you can visualize where watermarks propagate and where aggregations are finalized.

Another time, we introduced a new column in upstream JSON. With schema evolution enabled in Delta, the ingestion job adapted automatically, but downstream joins expected the old schema. We learned to version our schema in a shared repository (schemas/events_schema.json) and coordinate changes across teams. Spark 4.0’s DataFrame APIs help, but the real safeguard is process: contracts and notifications.

On stateful streaming, I have seen jobs fail because checkpoint directories were on ephemeral storage. When the cluster restarted, the state was lost, and backfilling was painful. Spark 4.0 continues to emphasize reliable checkpointing; use durable storage and test restart scenarios in staging. Also, keep the state small: filter early, group by compact keys, and set reasonable timeouts.

Finally, performance tuning is iterative. AQE can dramatically improve shuffle performance, but you need to inspect physical plans and partition sizes. With Spark 4.0, the visual explain plan makes it easier to discuss tradeoffs with teammates; it’s easier to ask “why is this join skewed?” when you can see the plan clearly.

Getting started: setup, tooling, and workflow

Workflow mental model

  • Develop locally: Use a small local Spark instance or Docker. Focus on unit tests for transformations and integration tests for streaming behavior.
  • Incremental data design: Structure jobs as bronze -> silver -> gold layers. Prefer append writes and rely on checkpoints for streaming.
  • Observability: Instrument jobs with metrics and log key checkpoints. Use Spark UI to analyze stages and shuffles.
  • Deployment: Package jobs as Python modules and submit them to a Spark cluster. Use configuration files for environment-specific settings.

Minimal project setup

spark4-project/
├── jobs/
│   ├── ingest.py
│   ├── clean.py
│   └── stream.py
├── schemas/
│   └── events.json
├── configs/
│   ├── dev.yaml
│   └── prod.yaml
├── tests/
│   └── test_clean.py
├── Dockerfile
├── requirements.txt
└── README.md

requirements.txt:

pyspark>=4.0.0
delta-spark>=2.4.0
pytest>=7.0.0
pyyaml>=6.0

configs/dev.yaml:

spark:
  app_name: "spark4-dev"
  checkpoint_dir: "/tmp/checkpoints"
  kafka_servers: "localhost:9092"
  warehouse: "/tmp/warehouse"
data:
  raw_path: "/tmp/data/raw"
  bronze_path: "/tmp/data/bronze"
  silver_path: "/tmp/data/silver"
  gold_path: "/tmp/data/gold"

A simple Dockerfile to run local jobs:

FROM python:3.10-slim

RUN apt-get update && apt-get install -y openjdk-17-jre-headless \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENTRYPOINT ["python", "-m"]

Submitting a job locally:

# Start a local Spark session via PySpark CLI for dev (optional)
pyspark --master local[2] --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

# Run a job (Docker example)
docker build -t spark4-app .
docker run -v $(pwd)/data:/data spark4-app jobs.ingest

In production, you would typically submit jobs to a cluster manager (YARN, Kubernetes) using spark-submit. The mental model is to keep job entrypoints small, delegate logic to functions, and rely on configuration to vary environments.

Testing and maintainability

  • Unit tests: Test transformation functions independently, using small DataFrames created with spark.createDataFrame.
  • Integration tests: Use local Kafka and file sources to simulate streams and assert expected outputs.
  • Schema management: Store schemas as JSON and validate against sample files before deployment.
  • Code style: Use type hints in Python where practical; for Scala/Java, lean on Dataset typing.

What makes Spark 4.0 stand out

  • Visual explain plan: Easier to share and debug complex queries; crucial when discussing performance with non-Spark specialists.
  • Unified batch/streaming: Reduced duplication and more consistent semantics across pipelines.
  • Ecosystem strengths: Tight integration with Delta Lake and MLlib; broad connector support and robust documentation.
  • Developer experience: DataFrame APIs remain approachable for Python developers while retaining JVM performance; streaming APIs are more predictable with improved metrics.
  • Maintainability: Clear checkpointing patterns and state management encourage robust pipelines that survive restarts and backfills.

These improvements translate to real outcomes: faster onboarding for new engineers, fewer production surprises, and a more predictable path from prototype to scale.

Free learning resources

Summary: who should use Spark 4.0 and who might skip it

Use Spark 4.0 if you are building large-scale batch and streaming data pipelines, especially within a lakehouse architecture. It is ideal for data engineering teams that need unified APIs, strong ecosystem support, and operational visibility. It is also valuable for ML engineers who require robust feature engineering at scale and for organizations integrating streaming into existing ETL workflows.

Consider skipping or deferring if:

  • Your workload is purely interactive SQL over a lakehouse, where Trino or cloud warehouses might be simpler and cheaper.
  • Ultra-low latency streaming (sub-second) is a hard requirement; Flink may fit better.
  • Your data volumes are modest and do not justify the operational overhead of a Spark cluster; small-scale pipelines may be simpler with pandas or DuckDB.

If you do adopt Spark 4.0, start by structuring your project around clear layers (bronze/silver/gold), invest in testing and checkpointing, and use the improved visual explain plans to iterate on performance with your team. The real value is not just speed on benchmarks, but the reliability and maintainability of your data pipelines in production.