1. PySpark Session & Context Import: from pyspark.sql import SparkSession Create SparkSession: spark = SparkSession.builder \ .appName("MyPySparkApp") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() Stop SparkSession: spark.stop() SparkContext (legacy/RDDs): sc = spark.sparkContext 2. DataFrames - Creation From Python List: data = [("Alice", 1), ("Bob", 2)] df = spark.createDataFrame(data, ["Name", "Age"]) From RDD: rdd = sc.parallelize(data) df = spark.createDataFrame(rdd, ["Name", "Age"]) Read CSV: df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True) Read JSON: df = spark.read.json("path/to/file.json") Read Parquet: df = spark.read.parquet("path/to/file.parquet") Read ORC: df = spark.read.orc("path/to/file.orc") Read JDBC: df = spark.read.format("jdbc").options( url="jdbc:postgresql://host:port/database", dbtable="table_name", user="username", password="password" ).load() 3. DataFrames - Basic Operations Show Schema: df.printSchema() Show Data: df.show(n=20, truncate=True) Select Columns: df.select("col1", df["col2"], F.col("col3")).show() Column Aliases: df.select(df.col1.alias("new_col_name")).show() Filter Rows: df.filter(df.age > 25).show() df.where((df.gender == "M") & (df.age Add/Update Column: df.withColumn("new_col", df.old_col * 2).show() Drop Column: df.drop("col_to_drop").show() Rename Column: df.withColumnRenamed("old_name", "new_name").show() Distinct Rows: df.distinct().show() Drop Duplicates: df.dropDuplicates(["col1", "col2"]).show() Sort/OrderBy: df.sort("col1", F.desc("col2")).show() Limit Rows: df.limit(10).show() Number of Rows: df.count() Number of Columns: len(df.columns) Describe Statistics: df.describe().show() 4. DataFrames - Joins & Unions Join: df1.join(df2, on="key_col", how="inner").show() df1.join(df2, on=["key1", "key2"], how="left_outer").show() df1.join(df2, df1.id == df2.id, how="right_outer").show() # how: "inner", "outer", "left_outer", "right_outer", "left_semi", "left_anti" Union: df_combined = df1.union(df2) (schemas must match) UnionByName: df_combined = df1.unionByName(df2) (matches by column name) 5. DataFrames - Aggregations Import functions: from pyspark.sql import functions as F Group By: df.groupBy("category").agg( F.count("id").alias("total_items"), F.sum("value").alias("sum_value"), F.avg("price").alias("avg_price") ).show() Window Functions: from pyspark.sql.window import Window window_spec = Window.partitionBy("category").orderBy("date") df.withColumn("rank", F.rank().over(window_spec)).show() df.withColumn("row_number", F.row_number().over(window_spec)).show() df.withColumn("lag_value", F.lag("value", 1).over(window_spec)).show() df.withColumn("lead_value", F.lead("value", 1).over(window_spec)).show() df.withColumn("avg_category_value", F.avg("value").over(window_spec)).show() 6. DataFrames - SQL Operations Create Temp View: df.createOrReplaceTempView("my_table") Run SQL Query: result_df = spark.sql(""" SELECT category, SUM(value) as total_value FROM my_table WHERE price > 10 GROUP BY category ORDER BY total_value DESC """) result_df.show() 7. DataFrames - Working with Columns ( pyspark.sql.functions ) Literals: F.lit("constant_value") , F.lit(10) Conditional: when(condition, value_if_true).otherwise(value_if_false) df.withColumn("status", F.when(df.score >= 90, "Excellent") .when(df.score >= 70, "Good") .otherwise("Fail") ).show() String Functions: F.upper(df.col) , F.lower(df.col) F.length(df.col) F.trim(df.col) , F.ltrim(df.col) , F.rtrim(df.col) F.substring(df.col, pos, len) F.concat(df.col1, F.lit("-"), df.col2) F.split(df.col, "delimiter") (returns array) F.regexp_replace(df.col, "pattern", "replacement") F.regexp_extract(df.col, "pattern", idx) Date/Time Functions: F.current_date() , F.current_timestamp() F.year(df.date_col) , F.month(df.date_col) , F.dayofmonth(df.date_col) F.hour(df.timestamp_col) , F.minute(df.timestamp_col) , F.second(df.timestamp_col) F.date_format(df.date_col, "yyyy-MM-dd") F.to_date(df.string_col, "MM/dd/yyyy") F.datediff(df.end_date, df.start_date) F.months_between(df.date1, df.date2) F.add_months(df.date_col, num_months) Math Functions: F.abs(df.col) , F.sqrt(df.col) , F.exp(df.col) F.round(df.col, scale) , F.ceil(df.col) , F.floor(df.col) F.pow(df.col, exponent) Array Functions: F.size(df.array_col) F.array_contains(df.array_col, value) F.explode(df.array_col) (flattens array into new rows) Type Casting: df.col.cast("int") , df.col.cast("string") , df.col.cast("timestamp") 8. Missing Data (Nulls) Drop Rows with Nulls: df.na.drop().show() (any null) df.na.drop(how="all").show() (all columns are null) df.na.drop(subset=["col1", "col2"]).show() (null in specific columns) Fill Nulls: df.na.fill(0).show() (fill all numeric nulls with 0) df.na.fill("N/A", subset=["string_col"]).show() df.na.fill({"col1": 0, "col2": "unknown"}).show() Using mean/median: mean_age = df.agg(F.mean("age")).collect()[0][0] df.na.fill(mean_age, subset=["age"]).show() 9. User-Defined Functions (UDFs) Import: from pyspark.sql.functions import udf Define Python function: def square_udf(x): return x * x square_udf_spark = udf(square_udf, IntegerType()) # Or StringType(), DoubleType(), etc. Apply UDF: df.withColumn("squared_value", square_udf_spark(df.value_col)).show() Register UDF (for SQL): spark.udf.register("square_sql", square_udf, IntegerType()) spark.sql("SELECT value_col, square_sql(value_col) FROM my_table").show() Pandas UDF (Vectorized UDFs): from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import LongType @pandas_udf(LongType(), PandasUDFType.SCALAR) def pandas_plus_one(series): return series + 1 df.withColumn("plus_one", pandas_plus_one(df.value_col)).show() 10. RDD Operations (Resilient Distributed Datasets) Create RDD: rdd = sc.parallelize([1, 2, 3, 4, 5]) Transformations: (lazy evaluation) map(lambda x: x*2) filter(lambda x: x > 2) flatMap(lambda x: range(1, x+1)) reduceByKey(lambda a, b: a + b) (for key-value RDDs) groupByKey() sortByKey() union(other_rdd) join(other_rdd) Actions: (trigger computation) collect() (get all data to driver - use with caution) take(n) first() count() reduce(lambda a, b: a + b) foreach(lambda x: print(x)) saveAsTextFile("path/to/output") Convert to DataFrame: rdd.toDF(["col1", "col2"]) Convert from DataFrame: df.rdd 11. DataFrames - Write Operations Write CSV: df.write.csv("path/to/output.csv", header=True, mode="overwrite") Write JSON: df.write.json("path/to/output.json", mode="append") Write Parquet: df.write.parquet("path/to/output.parquet", mode="overwrite") Write ORC: df.write.orc("path/to/output.orc", mode="overwrite") Write JDBC: df.write.format("jdbc").options( url="jdbc:postgresql://host:port/database", dbtable="table_name", user="username", password="password" ).mode("overwrite").save() Partition By: df.write.partitionBy("col_name").parquet("path/to/output") Save As Table: df.write.saveAsTable("my_catalog_table") 12. Machine Learning (MLlib - DataFrames API) Import: from pyspark.ml.feature import ... , from pyspark.ml.classification import ... , etc. VectorAssembler: Combines feature columns into a single vector column. from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features") df_assembled = assembler.transform(df) StringIndexer: Maps string column to a column of label indices. from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="category", outputCol="category_index") df_indexed = indexer.fit(df).transform(df) OneHotEncoderEstimator: Maps a column of label indices to a column of binary vectors. from pyspark.ml.feature import OneHotEncoderEstimator encoder = OneHotEncoderEstimator(inputCols=["category_index"], outputCols=["category_vec"]) model_encoder = encoder.fit(df_indexed) df_encoded = model_encoder.transform(df_indexed) StandardScaler: Standardizes features by removing the mean and scaling to unit variance. from pyspark.ml.feature import StandardScaler scaler = StandardScaler(inputCol="features", outputCol="scaled_features") scaler_model = scaler.fit(df_assembled) df_scaled = scaler_model.transform(df_assembled) Logistic Regression Example: from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol="scaled_features", labelCol="label") lr_model = lr.fit(training_df) predictions = lr_model.transform(test_df) predictions.select("label", "prediction", "probability").show() Pipeline: Chaining multiple Transformers and Estimators. from pyspark.ml import Pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler, lr]) pipeline_model = pipeline.fit(training_df) predictions = pipeline_model.transform(test_df) Evaluators: from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC") auc = evaluator.evaluate(predictions) 13. Configuration & Optimization Set Config on SparkSession: spark = SparkSession.builder \ .appName("MyApp") \ .config("spark.executor.memory", "4g") \ .config("spark.sql.shuffle.partitions", "200") \ .getOrCreate() Cache DataFrame: df.cache() or df.persist() Unpersist DataFrame: df.unpersist() Broadcast Variables: broadcast_var = sc.broadcast({"key": "value"}) # Access: broadcast_var.value Accumulators: my_acc = sc.accumulator(0) rdd.foreach(lambda x: my_acc.add(x)) print(my_acc.value) 14. Common Data Types Import: from pyspark.sql.types import * Examples: StringType() IntegerType() LongType() FloatType() DoubleType() BooleanType() DateType() TimestampType() ArrayType(StringType()) MapType(StringType(), IntegerType()) StructType([StructField("name", StringType()), StructField("age", IntegerType())])