데브코스 TIL/빅데이터, 스트리밍 데이터 처리

Spark Unit Test

예니ㅣ 2024. 1. 17. 17:27

강의

Unit Test

"Unit Test"는 코드 상의 특정 기능을 테스트하기 위해 작성된 코드 입니다.

  • 정해진 입력에 대한 예상된 출력 확인 형태
  • CI/CD를 사용할 때 전체 코드의 테스트 커버리지 중요
  • 언어별 정해진 테스트 프레임워크 사용
    • Java : JUnit 
    • .NET : NUnit
    • Python : unittest

 


실습

# 파일 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Unit Test") \
    .getOrCreate()

df = spark.read.option("header", True).csv("name_gender.csv")

df.createOrReplaceTempView("namegender")

spark.sql("""
    SELECT gender, COUNT(1) count 
    FROM namegender 
    GROUP BY 1
    """).show()
    
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd

# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
    return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf_f)

def load_gender(spark, file_path):
    return spark.read.option("header", True).csv(file_path)

def get_gender_count(spark, df, field_to_count):
    df.createOrReplaceTempView("namegender_test")
    return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")
    
df = load_gender(spark, "name_gender.csv")
get_gender_count(spark, df, "gender").show()
df.select(upperUDF("name").alias("NAME")).show()

df.select(upperUDF("name").alias("NAME")).collect()
from unittest import TestCase

class UtilsTestCase(TestCase):
    spark = None

    @classmethod
    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .appName("Spark Unit Test") \
            .getOrCreate()

    def test_datafile_loading(self):
        sample_df = load_gender(self.spark, "name_gender.csv")
        result_count = sample_df.count()
        self.assertEqual(result_count, 100, "Record count should be 100")

    def test_gender_count(self):
        sample_df = load_gender(self.spark, "name_gender.csv")
        count_list = get_gender_count(self.spark, sample_df, "gender").collect()
        count_dict = dict()
        for row in count_list:
            count_dict[row["gender"]] = row["count"]
        self.assertEqual(count_dict["F"], 65, "Count for F should be 65")
        self.assertEqual(count_dict["M"], 28, "Count for M should be 28")
        self.assertEqual(count_dict["Unisex"], 7, "Count for Unisex should be 7")

    def test_upper_udf(self):
        test_data = [
            { "name": "John Kim" },
            { "name": "Johnny Kim"},
            { "name": "1234" }
        ]
        expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]

        upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
        test_df = self.spark.createDataFrame(test_data)
        names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
        results = []
        for name in names:
            results.append(name["NAME"])
        self.assertCountEqual(results, expected_results)

    @classmethod
    def tearDownClass(cls) -> None:
        cls.spark.stop()
        
        
import unittest

unittest.main(argv=[''], verbosity=2, exit=False)

'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글

Spark 내부동작  (0) 2024.01.18
Spark 파일 포맷  (0) 2024.01.18
Hive 메타 스토어  (0) 2024.01.17
Spark SQL 실습 3  (0) 2024.01.17
Spark SQL 실습 1  (0) 2024.01.17