Contents

Data_frame_api

DataFrame Transformations

Selecting Columns

Renaming Columns

Change Columns data type

Adding Columns to a DataFrame

Removing Columns from a DataFrame

Basics Arithmetic with DataFrame

Apache Spark Architecture: DataFrame Immutability

How to filter a DataFrame

Apache Spark Architecture: Narrow Transformations

Dropping Rows

How to Drop rows and columns

Handling NULL Values I - Null Functions

1
2
3
4
5
6
7
Dfn = customerDf.selectExpr(
    "salutation",
    "firstname",
    "lastname",
    "email_address",
    "year(birthdate) birthyear"
)
salutation firstname lastname email_address birthyear
null James null james@efsefa.org null
null Adrian null null null
null Ruth null null null
null Christine Jimenez Christine@adfacg.net null

Where 사용

1
Dfn.where($"salutation".isNotNull) # Good
1
Dfn.where($"salutation" === null) # Error

Handling NULL Values II - DataFrameNaFunctions

na 사용

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 모든 Column 값이 NULL인 것 삭제.
Dfn.na.drop(how="all")

# 어떤 Columnd 이던 NULL 인 것 삭제.
Dfn.na.drop("any")

# firstname && lastname이 NULL인 것 삭제.
Dfn.na.drop("all", Seq("firstname", "lastname"))

# firstname || lastname이 NULL 인 것 삭제.
Dfn.na.drop("any", Seq("firstname", "lastname"))

na & fill 사용

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 모든 NULL 값에 abcdefg 채워준다.
# 주의할 점) Type이 일치해야 채워줄 수 있다.
# abcedfg(String) != birthyear(Datetime)이기에 그대로 NULL로 남아있다.
Dfn.na.fill("abcdefg")

# 특정 Column이 NULL 값일 때 특정 값을 채워줄 수 있다.
Dfn.na.fill(Map(
    "salutation" -> "Unknown",
    "firstname" -> "John",
    "lastname" -> "Doe",
    "bithyear" -> 9999
))

Sort and Order Rows - Sort & OrderBy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# NULL 값이 없는 데이터들을
# firstname을 오름차순 정렬
# month를 내림차순 정렬

# sort와 orderBy는 서로 연관이 없기 때문에
# 최종적으로는 firstname 정렬은 적용되지 않고 month만 적용된다.
customerDf
.na.drop("any")
.sort("firstname") == .sort(expr("firstname"))
.orderBy(expr("month(birthdate)").desc)
.select("firstname", "lastname", "birthdate")

# firtname 오름차순 & lastname 내림차순으로 정렬하려면 다음과 같다.
customerDf
.na.drop("any")
.sort($"firstname", $"lastname".desc)
.select("firstname", "lastname", "birthdate")

Create Group of Rows: GroupBy

1
2
3
4
customerPurchases = webSalesDf.selectExpr(
    "ws_bill_customer_sk customer_id",
    "ws_item_sk item_id"
)
customer_id item_id
83074 4591
83074 3566
83074 7286
83074 2755
83074 2516

groupBy 사용

1
2
3
4
5
6
7
# customer_id로 묶고
# 각 customer가 구매한 모든 물품 개수를 count
customerPurchases.groupBy(
    "customer_id"
    )
    .agg(count("item_id"))
    .alias("item_count")

DataFrame Statics

aggregation 사용

1
2
3
4
5
6
webSalesDf.agg(
    max("ws_sales_price")
    min("ws_sales_price")
    avg("ws_sales_price")
    count("ws_sales_price")
)

Joining DataFrames - Inner Join

Address DF

address_id country state city zip street_name street_number location_type
1 United States AZ FairField 86192 Jackson 18 condo
2 United States NM Fairview 85709 Washington 6th 362 condo

Customer DF

address_id birth_country birthdate customer_id demographics
2133 RWANDA 1935-09-13 45721 “buy_potential”: “10000”, “credint_rating”: “High Rist”, “education_status”: “Secondary”
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Apache Spark의 default JOIN = Inner
customerWithAddress = customerDf.join(
    addressDf,
    customerDf.col("address_id") === addressDf.col("address_id"),
    "inner" # 생략 가능
)
.select(
    "customer_id",
    "address_id",
    "demographics.education_status",
    "location_type",
    "country", "city",
    "street_name"
)

Joining DataFrames - Right Outer Join

Right table을 기준으로 Left table에서 일치하는 값들을 매핑 해준다. 만약 Left table에서 일치하는 값이 없다면 NULL을 넣는다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
webSalesDf.join(
    customerDf,
    "customer_id" === "ws_bill_customer_sk",
    "right"
)
.select(
    "customer_id",
    "ws_bill_customer_sk",
)
.where(
    "ws_bill_customer_sk is null"
)

Joining DataFrames - Left Outer Join

1
2
3
4
5
webSalesDf.join(
    customerDf,
    $"customer_id" === $"ws_bill_customer_sk",
    "left"
)

Appending Rows to a DataFrame - Union

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
df1 = customerDf.select(
    "firstname",
    "lastname",
    "customer_id"
)
.withColumn(
    "from",
    lit("df1")
)

df2 = customerDf.select(
    "lastname",
    "firstname",
    "customer_id"
)
.withColumn(
    "from",
    lit("df2")
)
1
2
3
# df2 의 모든 데이터를 df1에 합쳐준다.
# 같은 데이터 임에도 column 위치를 기반으로 합치기 때문에 다른 것으로 인식한다.
df1.union(df2)
firstname lastname customer_id from
Tiffany Skinner 45721 df1
Skinner Tiffany 45721 df2
1
df1.unionByName(df2)
firstname lastname customer_id from
Tiffany Skinner 45721 df1
Tiffany Skinner 45721 df2
1
2
# distinct() 와 결합해서 중복을 없애준다.
df1.unionByName(df2).distinct()
firstname lastname customer_id
Tiffany Skinner 45721
Tiffany Skinner 45721

Cashing a DataFrame

DataFrameWriter I

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
customerWithAddress = customerDf
.na.drop("any")
.join(
    addressDf,
    customerDf.address_id == addressDf.address_id,
)
.select(
    'customer_id',
    'demographics',
    concat_ws(" ", "firstname", "lastname").as("Name")
    addressDf("*")
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
salesWithItem = webSalesDf
.na.drop("any")
.join(
    itemDf,
    webSalesDf.ws_item_sk == itemDf.i_item_sk,
)
.selectExpr(
    "ws_bill_customer_sk customer_id",
    "ws_ship_addr_sk ship_address_id",
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# default로 partition은 200이다.
# 불필요하게 많은 경우, 8개로 낮춰준다.
# 파일 형식은 json으로 해준다.
# SaveMode.Overwrite로 똑같은 파일을 덮어써준다.
customerWithAddress
.repartition(8)
.write
.format("json")
.mode(SaveMode.Overwrite)
.option("path", "tmp/output/customerWithAddress")
.save

DataFrameWriter II - PartitionBy

Partitions the output by the given columns on the file system.

1
2
3
4
5
6
7
8
customerWithAddress
.repartition(8)
.write
.partitionBy("item_category")
.format("json")
.mode(SaveMode.Overwrite)
.option("path", "tmp/output/customerWithAddress")
.save

User Defined Functions

함수 정의

1
2
3
@udf
def stringConcat(sep, first, second):
    return first + sep + second

함수 등록

1
2
3
4
5
6
7
8
9
customerDf.select(
    "firstname",
    "lastname"
    stringConcat(
        "-",
        "firstname",
        "lastname"
    )
)

Apache Spark Architecture: Execution

Partitioning a DataFrame

1
2
3
4
5
df.repartition(10)\ # df with 10 partitions
      .rdd \
      .getNumPartitions()

df.coalesce(1).rdd.getNumPartitions() #df with 1 partition```

Repartition VS Coalesce

RDD를 생성한 뒤 filter() 연산을 비롯한 다양한 transformation 연산을 수행하다보면 최초에 설정된 partition 개수가 적합하지 않은 경우가 발생할 수 있다.

이 경우 coalesce()repartition()연산을 사용해 현재의 RDD의 파티션 개수를 조정할 수 있다.

두 메서드는 모두 파티션의 크기를 나타내는 정수를 인자로 받아서 파티션의 수를 조정한다는 점에서 공통점이 있지만 repartition()이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce()는 줄이는 것만 가능하다.

그럼 모든 것이 가능한 repartition() 메서드가 있음에도 coalesce() 메서드를 따로 두는 이유는 바로 처리 방식에 따른 성능 차이 때문이다. 즉 repartition()은 셔플을 기반으로 동작을 수행하는 데 반해 coalesce()는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않기 때문이다. 따라서 데이터 필터링 등의 작업으로 데이터 수가 줄어들어 파티션 수를 줄이고자 할 때는 상대적으로 성능이 좋은 coalesce()를 사용하고, 파티션 수를 늘려야 하는 경우에만 repartition() 메서드를 사용하는 것이 좋다.