강의
Unit Test
"Unit Test"는 코드 상의 특정 기능을 테스트하기 위해 작성된 코드 입니다.
- 정해진 입력에 대한 예상된 출력 확인 형태
- CI/CD를 사용할 때 전체 코드의 테스트 커버리지 중요
- 언어별 정해진 테스트 프레임워크 사용
- Java : JUnit
- .NET : NUnit
- Python : unittest
실습
# 파일 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Unit Test") \
.getOrCreate()
df = spark.read.option("header", True).csv("name_gender.csv")
df.createOrReplaceTempView("namegender")
spark.sql("""
SELECT gender, COUNT(1) count
FROM namegender
GROUP BY 1
""").show()
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd
# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
def load_gender(spark, file_path):
return spark.read.option("header", True).csv(file_path)
def get_gender_count(spark, df, field_to_count):
df.createOrReplaceTempView("namegender_test")
return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")
df = load_gender(spark, "name_gender.csv")
get_gender_count(spark, df, "gender").show()
df.select(upperUDF("name").alias("NAME")).show()
df.select(upperUDF("name").alias("NAME")).collect()
from unittest import TestCase
class UtilsTestCase(TestCase):
spark = None
@classmethod
def setUpClass(cls) -> None:
cls.spark = SparkSession.builder \
.appName("Spark Unit Test") \
.getOrCreate()
def test_datafile_loading(self):
sample_df = load_gender(self.spark, "name_gender.csv")
result_count = sample_df.count()
self.assertEqual(result_count, 100, "Record count should be 100")
def test_gender_count(self):
sample_df = load_gender(self.spark, "name_gender.csv")
count_list = get_gender_count(self.spark, sample_df, "gender").collect()
count_dict = dict()
for row in count_list:
count_dict[row["gender"]] = row["count"]
self.assertEqual(count_dict["F"], 65, "Count for F should be 65")
self.assertEqual(count_dict["M"], 28, "Count for M should be 28")
self.assertEqual(count_dict["Unisex"], 7, "Count for Unisex should be 7")
def test_upper_udf(self):
test_data = [
{ "name": "John Kim" },
{ "name": "Johnny Kim"},
{ "name": "1234" }
]
expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]
upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
test_df = self.spark.createDataFrame(test_data)
names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
results = []
for name in names:
results.append(name["NAME"])
self.assertCountEqual(results, expected_results)
@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()
import unittest
unittest.main(argv=[''], verbosity=2, exit=False)
'데브코스 TIL > 빅데이터, 스트리밍 데이터 처리' 카테고리의 다른 글
Spark 내부동작 (0) | 2024.01.18 |
---|---|
Spark 파일 포맷 (0) | 2024.01.18 |
Hive 메타 스토어 (0) | 2024.01.17 |
Spark SQL 실습 3 (0) | 2024.01.17 |
Spark SQL 실습 1 (0) | 2024.01.17 |