In this tutorial, you’ll learn how to install Apache Iceberg locally and get hands-on experience using Apache Spark as a compute engine.
First, we’ll set up the environment by creating a Python virtual environment and installing PySpark. Please note that some prior knowledge of PySpark is required to follow this tutorial.
Next, you’ll learn how to read and write PySpark DataFrames using Apache Iceberg.
Finally, you’ll explore Iceberg’s key features, including schema evolution, ACID transactions, time travel, and query isolation.
Install required dependencies #
Before you start working with Apache Iceberg and PySpark, you need to install the necessary dependencies. Run the commands below to create a virtual environment called iceberg (or choose any name you prefer), activate it, and then install pyspark dependency.
In this tutorial ther versions used are:
- PySpark: 3.4.1
- Python: 3.11.6
python -m venv iceberg source gx/bin/activate pip install pyspark==3.4.1
Note: If you are using Windows, run the command
.\iceberg\Scripts\activate
to activate the virtual environment.
Import required packages #
Once you install the dependencies, import the necessary packages that you will use in the following sections.
from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark.sql.types import StructType, StructField, StringType, IntegerType
Setup Iceberg Configuration #
To begin working with Iceberg tables in PySpark, it’s essential to configure the PySpark session appropriately.
In the following steps, we will use a catalog named demo
for tables located under the path ./warehouse
of the Hadoop type. Additional configurations can be explored in the Iceberg-Spark-Configuration documentation. Crucially, ensure compatibility between the Iceberg-Spark-Runtime JAR and the PySpark version in use. You can find the necessary JARs in the Iceberg releases.
warehouse_path = "./warehouse" iceberg_spark_jar = 'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0' catalog_name = "demo" # Setup iceberg config conf = SparkConf().setAppName("YourAppName") \ .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \ .set('spark.jars.packages', iceberg_spark_jar) \ .set(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \ .set(f"spark.sql.catalog.{catalog_name}.type", "hadoop")\ .set("spark.sql.defaultCatalog", catalog_name) # Create spark session spark = SparkSession.builder.config(conf=conf).getOrCreate() spark.sparkContext.setLogLevel("ERROR")
Create and Read an Iceberg Table with PySpark #
Let’s start by creating and reading an Iceberg table.
# Create a dataframe schema = StructType([ StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('job_title', StringType(), True) ]) data = [("person1", 28, "Doctor"), ("person2", 35, "Singer"), ("person3", 42, "Teacher")] df = spark.createDataFrame(data, schema=schema) # Create database spark.sql(f"CREATE DATABASE IF NOT EXISTS db") # Write and read Iceberg table table_name = "db.persons" df \ .write \ .format("iceberg") \ .mode("overwrite") \ .saveAsTable(f"{table_name}") iceberg_df = spark\ .read \ .format("iceberg") \ .load(f"{table_name}") iceberg_df.printSchema() iceberg_df.show()
The code above creates a PySpark DataFrame, writes it to an Iceberg table, and then retrieves and displays the stored data.
Now, let’s explore the features that Iceberg comes with to address the issues mentioned in the introduction.
Schema Evolution #
The flexibility of Data Lakes, allowing storage of diverse data formats, can pose challenges in managing schema changes. Iceberg addresses this by enabling the addition, removal, or modification of table columns without requiring a complete data rewrite. This feature simplifies the process of evolving schemas over time.
Let’s modify the previously created table to demonstrate schema evolution.
spark.sql(f"ALTER TABLE {table_name} RENAME COLUMN job_title TO job") spark.sql(f"ALTER TABLE {table_name} ALTER COLUMN age TYPE bigint") spark.sql(f"ALTER TABLE {table_name} ADD COLUMN salary FLOAT AFTER job") iceberg_df = spark.read.format("iceberg").load(f"{table_name}") iceberg_df.printSchema() iceberg_df.show() spark.sql(f"SELECT * FROM {table_name}.snapshots").show()
The above code shows schema evolution by renaming, changing column types, and adding a new column. As you can observe in the screenshots below, after and before schema evolution, the column age type has changed, the column job_title is now renamed to job, and the column salary has been added.


The first time you run the code, in the snapshot table you notice that Iceberg executed all alterations without rewriting the data. This is indicated by having only one snapshot ID and no parent (parent_id = null), signifying that no data rewriting was performed.

Data accuracy and consistency are crucial in data lakes, particularly for business-critical purposes. Iceberg supports ACID transactions for write operations, ensuring that data remains in a consistent state, and enhancing the reliability of the stored information.
Transactional Writes #
To demonstrate the ACID with Iceberg table let’s update, add, and delete records from the table.
spark.sql(f"UPDATE {table_name} SET salary = 100") spark.sql(f"DELETE FROM {table_name} WHERE age = 42") spark.sql(f"INSERT INTO {table_name} values ('person4', 50, 'Teacher', 2000)") spark.sql(f"SELECT * FROM {table_name}.snapshots").show()
In the snapshots table, we can now observe that Iceberg has added three snapshot IDs, each created from the preceding one. If, for any reason, one of the actions fails, the transactions will fail, and the snapshot won’t be created.

Partitioning the table #
As you may be aware, querying large amounts of data in data lakes can be resource-intensive. Iceberg supports data partitioning by one or more columns. This significantly improves query performance by reducing the volume of data read during queries.
spark.sql(f"ALTER TABLE {table_name} ADD PARTITION FIELD age") spark.read.format("iceberg").load(f"{table_name}").where("age = 28").show()
The code creates a new partition using the age column. This partition will apply to the new rows that get inserted moving forward, and old data will not be impacted.

We can also add partitions when we create the Iceberg table using something like below.
spark.sql(f""" CREATE TABLE IF NOT EXISTS {table_name} (name STRING, age INT, job STRING, salary INT) USING iceberg PARTITIONED BY (age) """)
Time Travel #
Analyzing historical trends or tracking changes over time is often essential in a data lake. Iceberg provides a time-travel API that allows users to query data as it appeared at a specific version or timestamp, facilitating historical data analysis.
Apache Iceberg gives you the flexibility to load any snapshot or data at a given point in time. This allows you to examine changes at a given time or roll back to a specific version.
spark.sql(f"SELECT * FROM {table_name}.snapshots").show(1, truncate=False) # Read snapshot by id run spark.read.option("snapshot-id", "306576903892976364").table(table_name).show() # Read at a given time spark.read.option("as-of-timestamp", "306576903892976364").table(table_name).show()
Query Isolation #
Given the simultaneous use of data lakes by multiple users or applications, Iceberg allows for concurrent execution of multiple queries without mutual interference. This capability facilitates the scalability of data lake usage without compromising performance.
Conclusion #
Apache Iceberg provides a robust solution for managing big data tables with features like atomic commits, schema evolution, and time travel. When combined with the power of PySpark, you can harness the capabilities of Iceberg while leveraging the flexibility and scalability of PySpark for your big data processing needs.
This article has provided a basic overview of using Apache Iceberg with PySpark, but there is much more to explore, including partitioning, indexing, and optimizing performance. For more in-depth information, refer to the official Apache Iceberg documentation.
You can find the complete source code for this article in my Github Repository.