Niekiedy standardowe metody przetwarzania są niewystarczające, szczególnie jeśli przetwarzamy ogromne zbiory danych – Big Data. Pojedyncze węzły nie są w stanie wydajnie przetwarzać równolegle wielu procesów. Pierwszym ograniczeniem jest brak wystarczających zasobów pamięciowych, a kolejnym ograniczone zasoby procesorów. Wtedy przychodzi z pomocą przetwarzanie rozproszone – jednym z jego przykładowych rozwiązań jest Apache Spark.
Co to jest Spark
Spark to silnik dla szybkiego przetwarzania wielkiej ilości danych. Przetwarzanie realizowane może być w trybie przetwarzania wsadowego, strumieniowania w czasie rzeczywistym i zaawansowanych technik eksploracji danych. Wewnętrznie zbudowany z użyciem języka scala, natomiast do pisania kodów do przetwarzania w Sparku oprócz języka Scala można używać języków Java, Python i R.
Moduły
Spark posiada kilka modułów dostępnych w ramach projektu ułatwiających pracę z różnymi typami danych i różnym przeznaczeniem, są to:
- Spark SQL,
- Spark Streaming,
- Spark MLlib,
- Spark GraphX.
Klaster Spark
Spark jest uruchamiany na klastrach, które są zarządzane jednym z poniższych menedżerów zasobów:
- Standalone – prosty klaster zarządzany wewnętrznie przez Sparka,
- Apache Mesos – zarządca zasobów z harmonogramowaniem,
- Hadoop Yarn – zarządca zasobów w ramach klastrów Hadoop 2,
- Kubernetes – system do automatycznego dostarczania zasobów, skalowania i zarządzania aplikacjami kontenerowymi.
Aplikacja Spark
Aplikacje Spark składają się z szeregu niezależnych procesów na klastrze – egzekutorów, koordynowanych przez program sterujący zwany SparkContext. W egzekutorach wykonywane są zadania dostarczone przez program sterujący w ramach aplikacji przekazanej do SparkContext.
Zbiory danych
Podstawową core-ową jednostką bloku danych jest RDD (Resilient Distributed Dataset) będącą rozproszoną kolekcją umieszczoną w pamięci na węzłach przetwarzających – logicznie nazywane jest to podziałem na partycje. Kolekcja RDD może zostać zapisana na dysku.
Na RDD można przeprowadzać operacje transformacji takie jak map, filter czy reduce – w wyniku otrzymujemy nowy RDD zawierający przekształcone dane. Wynika również z tego, że kolekcja RDD jest niezmienna, a dodatkowo operacje tworzące RDD są “leniwe”, to znaczy wykonanie ich w zaprogramowanym łańcuchu następuje dopiero w momencie wykonania kolejnej operacji nie zwracającej RDD. Niestety API jest dostępne tylko dla języków scala i java, a z poziomu języków python czy R są możliwe tylko niektóre operacje. Operacje przeprowadzane na RDD są realizowane w ramach kontekstu Sparka -SparkContext.
Moduł Spark SQL
Moduł Spark SQL ułatwia pracę ze strukturyzowanymi danymi. W ramach modułu używa się nowej struktury do przechowywania danych zwanej DataFrame. Jest to zbiór danych (DataSet) bazujący na RDD ale dane ułożone są w ramach nazwanych kolumn, analogicznie do budowy tabel w bazach danych.
Przekształcenie danych do zbioru DataFrame może zostać wykonane na podstawie pliku z danymi w formie strukturalnej, tabeli z hive, tabeli z zewnętrznych baz lub istniejącego RDD i jest możliwe z każdego obsługiwanego języka. Najczęściej operacje na DataFrame realizowane są w ramach sesji Sparka – SparkSession a nie kontekstu – SparkContext. Przetwarzanie w ramach Spark SQL może odbywać się zarówno poprzez używanie zapytań sql jak i również z użyciem funkcji.
Przykład rozwiązania.
W przykładzie zostanie pokazane zasilanie bazy z użyciem pythona oraz użycie pySpark i Spark SQL do agregowania danych. W pierwszym etapie zostanie stworzona i zasilona baza w Postgresql z danych w plikach csv. W dalszej części zasilone dane posłużą do przetwarzania w Sparku i zapisie wyniku zwrotnie w bazie.
Tworzymy bazę danych z tabelami
Skrypt służący do utworzenia testowej bazy postgresql wraz z tabelami – install.py
import psycopg2
from sql_defs import cts, dts
def create_database():
# connect to default database
conn = psycopg2.connect("host=127.0.0.1 dbname=postgres user=postgres password=postgres")
conn.set_session(autocommit=True)
cur = conn.cursor()
# drop database & user
cur.execute("DROP DATABASE IF EXISTS etl")
cur.execute("DROP USER IF EXISTS etl")
# create user & database
cur.execute("CREATE USER etl password 'etl'")
cur.execute("CREATE DATABASE etl WITH ENCODING 'utf8' owner etl")
cur.execute("GRANT CONNECT ON DATABASE etl TO etl")
# close connection to default database
conn.close()
def conn_database():
# connect to database
conn = psycopg2.connect("host=127.0.0.1 dbname=etl user=etl password=etl")
cur = conn.cursor()
return cur, conn
def drop_tables(cur, conn):
for query in dts:
cur.execute(query)
conn.commit()
def create_tables(cur, conn):
for query in cts:
cur.execute(query)
conn.commit()
def main():
create_database()
cur, conn = conn_database()
drop_tables(cur, conn)
create_tables(cur, conn)
conn.close()
if __name__ == "__main__":
main()
Skrypt zawierający polecenia SQL, DDL i DML wykorzystane podczas instalacji i zasilenia – sql_defs.py
# DROP TABLES
dt_customer = "DROP TABLE IF EXISTS customer"
dt_msisdn = "DROP TABLE IF EXISTS msisdn"
dt_cdr = "DROP TABLE IF EXISTS cdr"
# CREATE TABLES
ct_customer = ("""
CREATE TABLE IF NOT EXISTS customer (
customer_id integer GENERATED BY DEFAULT AS IDENTITY NOT NULL PRIMARY KEY,
name text NOT NULL UNIQUE
)
""")
ct_msisdn = ("""
CREATE TABLE IF NOT EXISTS msisdn (
msisdn text NOT NULL PRIMARY KEY,
customer_id integer NOT NULL
)
""")
ct_cdr = ("""
CREATE TABLE IF NOT EXISTS cdr (
cdr_id integer GENERATED BY DEFAULT AS IDENTITY NOT NULL PRIMARY KEY,
call_a text NOT NULL,
call_b text NOT NULL,
duration integer,
date_end date,
time_end time
)
""")
# INSERTS DATA
ins_customer = ("""
INSERT INTO customer
(name)
VALUES (%s)
ON CONFLICT DO NOTHING
""")
ins_msisdn = ("""
INSERT INTO msisdn
(customer_id, msisdn)
VALUES (%s, %s)
ON CONFLICT DO NOTHING
""")
ins_cdr = ("""
INSERT INTO cdr
(call_a, call_b, duration, date_end, time_end)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
""")
# GetCustomerId
get_customer_id = ("""
SELECT customer_id
FROM customer
WHERE name = %s
""")
# QUERIES DDL
dts = [dt_customer, dt_msisdn, dt_cdr]
cts = [ct_customer, ct_msisdn, ct_cdr]
Przygotowujemy dane wejściowe
Zawartość pliku data/customer/customer.csv
"customer";"msisdn"
"Adam Nowak";500100100
"Ewa Kowalska";500100101
"Karol Kowalski";500100102
"Jadwiga Abacka";500100103
"Edyta Nowak";500100104
"Agnieszka Zielińska";500100105
"Joanna Bogdan";500100106
"Grzegorz Adam";500100107
"Olgierd Biały";500100108
Zawartość pliku data/cdr/cdr_202007.csv
"call_a","call_b","duration_sek","date_end","time_end"
500100102,500100101,124,"2020-07-01",09:34:06
500100105,500100101,345,"2020-07-02",16:57:23
500100108,500100106,12,"2020-07-03",11:15:28
500100106,500100102,5,"2020-07-04",10:52:51
500100103,500100108,135,"2020-07-05",13:56:03
500100101,500100104,64,"2020-07-06",06:56:14
500100103,500100100,11,"2020-07-07",10:06:53
500100104,500100101,678,"2020-07-08",01:56:03
500100101,500100108,23,"2020-07-09",15:16:11
500100107,500100104,19,"2020-07-10",10:51:33
500100100,500100104,50,"2020-07-30",10:56:03
500100104,500100107,34,"2020-07-31",13:59:01
Zasilamy tabele w dane
Import zależności
Tworzymy połączenie do bazy
Tworzymy funkcję wyszukującą pliki csv w zadanym katalogu
Pobieramy listę plików csv do załadowania
Przykładowo wybieramy jeden z plików dla tabeli customer
Odczytujemy jego zawartość
Wybieramy tylko jedną kolumnę do załadowania i wyświetlamy pierwszy rekord
Ładujemy dane do tabeli customer
Przygotowujemy dane do załadowania do tabeli msisdn
Ładujemy dane do tabeli msisdn pobierając wcześniej identyfikator z tabeli customer
Pobieramy listę plików z rekordami cdr
Wybieramy jeden plik do załadowania
Ładujemy plik
Ładujemy dane do tabeli
Użycie Sparka
Importujemy biblioteki oraz rejestrujemy sterownik jdbc do postgresql.
Zawartość tabeli cdr importujemy do DataFrame
Wyświetlamy zawartość DataFrame
Zawartość tabeli msisdn importujemy do DataFrame
i wyświetlamy jej zawartość
Łączymy tabele z użyciem Spark:
- Wskazujemy kolumny do łączenia i typ złączenia
- wybieramy interesujące nas kolumny w wyniku
- jedna kolumna jest tworzona poprzez wyliczenia i nadanie jej nazwy
- grupujemy dane względem pól customer_id, msisdn,month
- agregujemy pole duration z funkcją agregującą sum