Skip to content
MasterData
  • Home
  • All Courses
  • Blog
    • Apache Spark
Start Learning
Start Learning
MasterData
  • Home
  • All Courses
  • Blog
    • Apache Spark

Apache Airflow

2
  • Apache Airflow: What, Why, and How?
  • How to Deploy Apache Airflow on Kubernetes

Apache Iceberg

3
  • [01] – Introduction to Apache Iceberg
  • [02] – Getting Started with Apache Iceberg
  • [03] – Apache Iceberg Architecture

Apache Spark

4
  • [00] – Apache Spark By Example Course
  • [01] – What is Apache Spark?
  • [02] – Installing Apache Spark Locally
  • [03] – Deploy Apache Spark with Kubernetes (K8s)

Data Build Tool (DBT)

7
  • [00] – dbt by Example Course
  • [01] – dbt : What it is, Why and How?
  • [02] – Install dbt in local
  • [03] – Explore dbt Models
  • [04] – Sources in dbt
  • [05] – Seeds in dbt
  • [06] – Jinja Templates and Macros in dbt

SQL - Advanced

2
  • [02] – View vs Materialized View
  • [03] – Window function in SQL

SQL - Basics

1
  • 02 – Understanding SQL Operations (DML, DDL, DCL, and TCL)

SQL - Intermediate

1
  • SQL Joins: Understanding INNER, LEFT, RIGHT, and FULL Joins
  • Home
  • Docs
  • Data Processing
  • Apache Iceberg
  • [02] – Getting Started with Apache Iceberg
View Categories

[02] – Getting Started with Apache Iceberg

kerrache.massipssa

In this tutorial, you’ll learn how to install Apache Iceberg locally and get hands-on experience using Apache Spark as a compute engine.

First, we’ll set up the environment by creating a Python virtual environment and installing PySpark. Please note that some prior knowledge of PySpark is required to follow this tutorial.

Next, you’ll learn how to read and write PySpark DataFrames using Apache Iceberg.

Finally, you’ll explore Iceberg’s key features, including schema evolution, ACID transactions, time travel, and query isolation.

Install required dependencies #

Before you start working with Apache Iceberg and PySpark, you need to install the necessary dependencies. Run the commands below to create a virtual environment called iceberg (or choose any name you prefer), activate it, and then install pyspark dependency.

In this tutorial ther versions used are:

  • PySpark: 3.4.1
  • Python: 3.11.6
python -m venv iceberg
source gx/bin/activate
pip install pyspark==3.4.1

Note: If you are using Windows, run the command .\iceberg\Scripts\activate to activate the virtual environment.

Import required packages #

Once you install the dependencies, import the necessary packages that you will use in the following sections.

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

Setup Iceberg Configuration #

To begin working with Iceberg tables in PySpark, it’s essential to configure the PySpark session appropriately.
In the following steps, we will use a catalog named demo for tables located under the path ./warehouse of the Hadoop type. Additional configurations can be explored in the Iceberg-Spark-Configuration documentation. Crucially, ensure compatibility between the Iceberg-Spark-Runtime JAR and the PySpark version in use. You can find the necessary JARs in the Iceberg releases.

warehouse_path = "./warehouse"
iceberg_spark_jar  = 'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0'
catalog_name = "demo"

# Setup iceberg config
conf = SparkConf().setAppName("YourAppName") \
    .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .set('spark.jars.packages', iceberg_spark_jar) \
    .set(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
    .set(f"spark.sql.catalog.{catalog_name}.type", "hadoop")\
    .set("spark.sql.defaultCatalog", catalog_name) 

# Create spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Create and Read an Iceberg Table with PySpark #

Let’s start by creating and reading an Iceberg table.

# Create a dataframe
schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('job_title', StringType(), True)
])
data = [("person1", 28, "Doctor"), ("person2", 35, "Singer"), ("person3", 42, "Teacher")]
df = spark.createDataFrame(data, schema=schema)

# Create database
spark.sql(f"CREATE DATABASE IF NOT EXISTS db")

# Write and read Iceberg table
table_name = "db.persons"
df \
.write \
.format("iceberg") \
.mode("overwrite") \
.saveAsTable(f"{table_name}")

iceberg_df = spark\
	.read  \
	.format("iceberg") \
  	.load(f"{table_name}")

iceberg_df.printSchema()
iceberg_df.show()

The code above creates a PySpark DataFrame, writes it to an Iceberg table, and then retrieves and displays the stored data.

Now, let’s explore the features that Iceberg comes with to address the issues mentioned in the introduction.

Schema Evolution #

The flexibility of Data Lakes, allowing storage of diverse data formats, can pose challenges in managing schema changes. Iceberg addresses this by enabling the addition, removal, or modification of table columns without requiring a complete data rewrite. This feature simplifies the process of evolving schemas over time.

Let’s modify the previously created table to demonstrate schema evolution.

spark.sql(f"ALTER TABLE {table_name} RENAME COLUMN job_title TO job")
spark.sql(f"ALTER TABLE {table_name} ALTER COLUMN age TYPE bigint")
spark.sql(f"ALTER TABLE {table_name} ADD COLUMN salary FLOAT AFTER job")
iceberg_df = spark.read.format("iceberg").load(f"{table_name}")
iceberg_df.printSchema()
iceberg_df.show()

spark.sql(f"SELECT * FROM {table_name}.snapshots").show()

The above code shows schema evolution by renaming, changing column types, and adding a new column. As you can observe in the screenshots below, after and before schema evolution, the column age type has changed, the column job_title is now renamed to job, and the column salary has been added.

Apache Iceberg with PySpark
Schema before altering the table
Apache Iceberg with PySpark
Schema after altering the table

The first time you run the code, in the snapshot table you notice that Iceberg executed all alterations without rewriting the data. This is indicated by having only one snapshot ID and no parent (parent_id = null), signifying that no data rewriting was performed.

How to use Apache Iceberg with PySpark
Snapshot table

Data accuracy and consistency are crucial in data lakes, particularly for business-critical purposes. Iceberg supports ACID transactions for write operations, ensuring that data remains in a consistent state, and enhancing the reliability of the stored information.

Transactional Writes #

To demonstrate the ACID with Iceberg table let’s update, add, and delete records from the table.

spark.sql(f"UPDATE {table_name} SET salary = 100")
spark.sql(f"DELETE FROM {table_name} WHERE age = 42")
spark.sql(f"INSERT INTO {table_name} values ('person4', 50, 'Teacher', 2000)")
spark.sql(f"SELECT * FROM {table_name}.snapshots").show()

In the snapshots table, we can now observe that Iceberg has added three snapshot IDs, each created from the preceding one. If, for any reason, one of the actions fails, the transactions will fail, and the snapshot won’t be created.

How to use Apache Iceberg with PySpark
ACID transactions

Partitioning the table #

As you may be aware, querying large amounts of data in data lakes can be resource-intensive. Iceberg supports data partitioning by one or more columns. This significantly improves query performance by reducing the volume of data read during queries.

spark.sql(f"ALTER TABLE {table_name} ADD PARTITION FIELD age")
spark.read.format("iceberg").load(f"{table_name}").where("age = 28").show()

The code creates a new partition using the age column. This partition will apply to the new rows that get inserted moving forward, and old data will not be impacted.

Apache Iceberg with PySpark
Partitioned DataFrame

We can also add partitions when we create the Iceberg table using something like below.

spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
        (name STRING, age INT, job STRING, salary INT)
        USING iceberg
        PARTITIONED BY (age)
    """)

Time Travel #

Analyzing historical trends or tracking changes over time is often essential in a data lake. Iceberg provides a time-travel API that allows users to query data as it appeared at a specific version or timestamp, facilitating historical data analysis.

Apache Iceberg gives you the flexibility to load any snapshot or data at a given point in time. This allows you to examine changes at a given time or roll back to a specific version.

spark.sql(f"SELECT * FROM {table_name}.snapshots").show(1, truncate=False)
# Read snapshot by id run
spark.read.option("snapshot-id", "306576903892976364").table(table_name).show()
# Read at a given time
spark.read.option("as-of-timestamp", "306576903892976364").table(table_name).show()

Query Isolation #

Given the simultaneous use of data lakes by multiple users or applications, Iceberg allows for concurrent execution of multiple queries without mutual interference. This capability facilitates the scalability of data lake usage without compromising performance.

Conclusion #

Apache Iceberg provides a robust solution for managing big data tables with features like atomic commits, schema evolution, and time travel. When combined with the power of PySpark, you can harness the capabilities of Iceberg while leveraging the flexibility and scalability of PySpark for your big data processing needs.

This article has provided a basic overview of using Apache Iceberg with PySpark, but there is much more to explore, including partitioning, indexing, and optimizing performance. For more in-depth information, refer to the official Apache Iceberg documentation.

You can find the complete source code for this article in my Github Repository.

Updated on March 5, 2025

Leave a Reply Cancel reply

You must be logged in to post a comment.

Table of Contents
  • Install required dependencies
  • Import required packages
  • Setup Iceberg Configuration
  • Create and Read an Iceberg Table with PySpark
  • Schema Evolution
  • Transactional Writes
  • Partitioning the table
  • Time Travel
  • Query Isolation
  • Conclusion

Copyright © 2025 MasterData

Powered by MasterData

Manage Consent
To provide the best experiences, we use technologies like cookies to store and/or access device information. Consenting to these technologies will allow us to process data such as browsing behavior or unique IDs on this site. Not consenting or withdrawing consent, may adversely affect certain features and functions.
Functional Always active
The technical storage or access is strictly necessary for the legitimate purpose of enabling the use of a specific service explicitly requested by the subscriber or user, or for the sole purpose of carrying out the transmission of a communication over an electronic communications network.
Preferences
The technical storage or access is necessary for the legitimate purpose of storing preferences that are not requested by the subscriber or user.
Statistics
The technical storage or access that is used exclusively for statistical purposes. The technical storage or access that is used exclusively for anonymous statistical purposes. Without a subpoena, voluntary compliance on the part of your Internet Service Provider, or additional records from a third party, information stored or retrieved for this purpose alone cannot usually be used to identify you.
Marketing
The technical storage or access is required to create user profiles to send advertising, or to track the user on a website or across several websites for similar marketing purposes.
Manage options Manage services Manage {vendor_count} vendors Read more about these purposes
View preferences
{title} {title} {title}