How to Create a PySpark DataFrame from a MongoDB Collection: The Ultimate Guide

Published on April 17, 2025


Diving Straight into Creating PySpark DataFrames from MongoDB Collections

Got a MongoDB collection brimming with data—like user profiles or transaction logs—and eager to transform it into a PySpark DataFrame for big data analytics? Creating a DataFrame from a MongoDB collection is a key skill for data engineers building ETL pipelines with Apache Spark. MongoDB, a NoSQL database, stores flexible, semi-structured data, and PySpark’s MongoDB Connector bridges it to Spark’s distributed processing. This guide dives into the syntax and steps for reading MongoDB collections into a PySpark DataFrame, with examples covering simple to complex scenarios. We’ll tackle key errors to keep your pipelines robust. Let’s unlock that MongoDB data! For more on PySpark, see Introduction to PySpark.


Configuring PySpark to Connect to MongoDB

Before reading a MongoDB collection, you need to configure PySpark with the MongoDB Spark Connector to access the database. This setup is critical for all scenarios in this guide and involves specifying the connection URI and connector dependency. Here’s how to set it up:

  1. Install MongoDB Spark Connector: Ensure the connector (e.g., org.mongodb.spark:mongo-spark-connector_2.12:10.0.2 for Spark 3.x) is available. Add it via --packages in spark-submit or include it in your Spark cluster’s libraries.
  2. Configure SparkSession: Set the MongoDB connection URI (spark.mongodb.input.uri) and output URI (spark.mongodb.output.uri) in the SparkSession configuration.
  3. MongoDB Setup: Ensure MongoDB is running (e.g., mongodb://localhost:27017) and the collection exists with appropriate user privileges.

Here’s the basic setup code:

from pyspark.sql import SparkSession

# Initialize SparkSession with MongoDB connector
spark = SparkSession.builder \
    .appName("MongoDBToDataFrame") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/database.collection") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.2") \
    .getOrCreate()

Error to Watch: Missing connector or invalid URI fails:

try:
    spark = SparkSession.builder.appName("NoMongoConfig").getOrCreate()
    df = spark.read.format("mongodb").load()
except Exception as e:
    print(f"Error: {e}")

Output:

Error: Cannot find source: mongodb

Fix: Include the connector package and verify the URI: assert "mongodb://" in spark.conf.get("spark.mongodb.input.uri"), "Invalid MongoDB URI". Ensure MongoDB is running and accessible.


Reading a Simple MongoDB Collection into a DataFrame

Reading a simple MongoDB collection, with flat documents containing fields like strings or numbers, is the foundation for ETL tasks, such as loading user data for analytics, as seen in ETL Pipelines. The read.format("mongodb") method connects to the collection and infers the schema. Assume a company.users collection with documents:

{"_id": "U001", "name": "Alice", "age": 25, "salary": 75000.0}
{"_id": "U002", "name": "Bob", "age": 30, "salary": 82000.5}
{"_id": "U003", "name": "Cathy", "age": 28, "salary": 90000.75}

Here’s the code to read it:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SimpleMongoCollection") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/company.users") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.2") \
    .getOrCreate()

# Read MongoDB collection
df_simple = spark.read.format("mongodb").load()
df_simple.show(truncate=False)
df_simple.printSchema()

Output:

+----+-----+---+---------+
|_id |name |age|salary   |
+----+-----+---+---------+
|U001|Alice|25 |75000.0  |
|U002|Bob  |30 |82000.5  |
|U003|Cathy|28 |90000.75 |
+----+-----+---+---------+

root
 |-- _id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: double (nullable = true)

This DataFrame is ready for Spark operations. Spark infers the schema by sampling documents. Error to Watch: Invalid collection or permissions fail:

try:
    df_invalid = spark.read.format("mongodb").option("uri", "mongodb://localhost:27017/company.nonexistent").load()
    df_invalid.show()
except Exception as e:
    print(f"Error: {e}")

Output:

Error: Collection not found or insufficient permissions

Fix: Verify collection: from pymongo import MongoClient; client = MongoClient("localhost:27017"); assert "users" in client["company"].list_collection_names(), "Collection missing". Check user privileges.


Specifying a Schema for Type Safety

Schema inference may misjudge types or be slow for large collections. Specifying a schema with StructType ensures type safety and performance, building on simple reads by enforcing column types, as discussed in Schema Operations. This is critical for production ETL pipelines:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder \
    .appName("SchemaMongoCollection") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/company.users") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.2") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True)
])

# Read with schema
df_schema = spark.read.format("mongodb").schema(schema).load()
df_schema.show(truncate=False)

Output:

+----+-----+---+---------+
|_id |name |age|salary   |
+----+-----+---+---------+
|U001|Alice|25 |75000.0  |
|U002|Bob  |30 |82000.5  |
|U003|Cathy|28 |90000.75 |
+----+-----+---+---------+

This ensures correct types (e.g., integer for age) and improves performance by skipping schema inference. Validate: assert df_schema.schema["age"].dataType == IntegerType(), "Schema mismatch".


Handling Null Values in MongoDB Collections

MongoDB collections often have null or missing fields, like absent salaries, common in semi-structured data. The connector maps missing fields to nulls, extending simple reads by ensuring null handling, as seen in Column Null Handling. Assume company.users with some nulls:

{"_id": "U001", "name": "Alice", "age": 25, "salary": 75000.0}
{"_id": "U002", "name": null, "age": null, "salary": 82000.5}
{"_id": "U003", "name": "Cathy", "age": 28}
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NullMongoCollection") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/company.users") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.2") \
    .getOrCreate()

# Read collection
df_nulls = spark.read.format("mongodb").load()
df_nulls.show(truncate=False)

Output:

+----+-----+----+--------+
|_id |name |age |salary  |
+----+-----+----+--------+
|U001|Alice|25  |75000.0 |
|U002|null |null|82000.5 |
|U003|Cathy|28  |null    |
+----+-----+----+--------+

This DataFrame handles nulls gracefully, ideal for cleaning or filtering in ETL pipelines. Use SQL or DataFrame APIs to manage nulls further.


Filtering MongoDB Collection Data

Filtering data during reading, using an aggregation pipeline or filter, optimizes performance by reducing data transferred from MongoDB to Spark, building on null handling for targeted queries, as discussed in DataFrame Operations. Assume you want users with salaries above 80000:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FilteredMongoCollection") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/company.users") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.2") \
    .getOrCreate()

# Read with pipeline filter
pipeline = [{"$match": {"salary": {"$gt": 80000}}}]
df_filtered = spark.read.format("mongodb").option("pipeline", pipeline).load()
df_filtered.show(truncate=False)

Output:

+----+-----+---+---------+
|_id |name |age|salary   |
+----+-----+---+---------+
|U002|Bob  |30 |82000.5  |
|U003|Cathy|28 |90000.75 |
+----+-----+---+---------+

This pipeline filters data in MongoDB before Spark processing, boosting efficiency.


Reading Nested MongoDB Documents

MongoDB collections often have nested documents, like user profiles with contact details, requiring a schema to capture the structure. This extends filtering by handling complex data, common in ETL pipelines, as seen in DataFrame UDFs. Assume company.users with nested documents:

{"_id": "U001", "name": "Alice", "contact": {"phone": 1234567890, "email": "alice@example.com"}}
{"_id": "U002", "name": "Bob", "contact": {"phone": 9876543210, "email": "bob@example.com"}}
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType

spark = SparkSession.builder \
    .appName("NestedMongoCollection") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/company.users") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.0.2") \
    .getOrCreate()

# Define schema for nested documents
schema = StructType([
    StructField("_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("contact", StructType([
        StructField("phone", LongType(), True),
        StructField("email", StringType(), True)
    ]), True)
])

# Read with schema
df_nested = spark.read.format("mongodb").schema(schema).load()
df_nested.show(truncate=False)

Output:

+----+-----+--------------------------------+
|_id |name |contact                         |
+----+-----+--------------------------------+
|U001|Alice|[1234567890, alice@example.com] |
|U002|Bob  |[9876543210, bob@example.com]   |
+----+-----+--------------------------------+

This schema enables queries on nested fields like contact.email. Error to Watch: Schema mismatches fail:

schema_invalid = StructType([StructField("_id", StringType()), StructField("name", IntegerType())])
try:
    df_invalid = spark.read.format("mongodb").schema(schema_invalid).load()
    df_invalid.show()
except Exception as e:
    print(f"Error: {e}")

Output:

Error: field name: IntegerType can not accept object string

Fix: Align schema with documents: assert df_nested.schema["name"].dataType == StringType(), "Schema mismatch".


How to Fix Common DataFrame Creation Errors

Errors can disrupt MongoDB collection reads. Here are key issues, with fixes:

  1. Missing Connector: No connector package fails. Fix: Add spark.jars.packages with org.mongodb.spark:mongo-spark-connector. Validate: assert "mongo-spark-connector" in spark.conf.get("spark.jars.packages", ""), "Connector missing".
  2. Invalid URI/Collection: Wrong URI or collection fails. Fix: Verify URI and collection: from pymongo import MongoClient; client = MongoClient("localhost:27017"); assert "users" in client["company"].list_collection_names(), "Collection missing".
  3. Schema Mismatch: Incorrect schema fails. Fix: Align schema with documents. Validate: df.printSchema().

For more, see Error Handling and Debugging.


Wrapping Up Your DataFrame Creation Mastery

Creating a PySpark DataFrame from a MongoDB collection is a vital skill, and the MongoDB Spark Connector makes it easy to handle simple, null-filled, filtered, and nested data. These techniques will level up your ETL pipelines. Try them in your next Spark job, and share tips or questions in the comments or on X. Keep exploring with DataFrame Operations!


More Spark Resources to Keep You Going