Wykrywanie sekwencji powiązanych zdarzeń przy użyciu platformy Splunk

Splunk

Wykrywanie sekwencji powiązanych zdarzeń przy użyciu platformy Splunk

10/09/2017
Podziel się

Analizując zdarzenia generowane przez systemy informatyczne, często stajemy przed koniecznością przeprowadzenia ich analizy statystycznej, albo analizy sekwencji zdarzeń w celu wykrycia anomalii. To wymaga od nas grupowania, czyli identyfikowania zbiorów powiązanych ze sobą zdarzeń, na podstawie zestawu pewnych kryteriów. Najczęściej grupujemy zdarzenia, które mają te same wartości atrybutów i/albo wystąpiły w określonym czasie.

Wszystkie narzędzia do analizy danych umożliwiają analizy statystyczne. W Splunku możemy realizować takie obliczenia przy użyciu komend statsstreamstats oraz eventstats. Inaczej jest z drugim typem grupowania, czyli analizą sekwencji powiązanych zdarzeń. Sprawa jest prosta, gdy dane do analizy znajdują się w tym samym pliku, a zdarzenia łączymy po jednym atrybucie (np. id sesji). Poziom trudności znacznie wzrasta w następujących sytuacjach:

  • zdarzenia, które mamy grupować, znajdują się w różnych plikach;
  • łączenie zdarzeń trzeba przeprowadzić przy użyciu wielu atrybutów;
  • sekwencja zdarzeń powinna zaczynać się i kończyć określonymi zdarzeniami;
  • zdarzenia powinny być grupowane tylko wtedy, gdy upłynął określony czas pomiędzy ich wystąpieniem.

W dalszej części artykułu pokażę, jak wykorzystać Splunk’a do przeprowadzenia zaawansowanego grupowania powiązanych zdarzeń.

Scenariusz

Wyobraźmy sobie hipotetyczny system obsługujący przelewy bankowe. System ma dwa komponenty, które logują do osobnych plików. Component A odpowiada za kontrolę dostępu do aplikacji. Zapisuje do pliku component_a.log informacje komu, kiedy i z jakiego IP przydzielił dostęp, kiedy nastąpiło wylogowanie oraz informacje o błędach. Component B realizuje operacje przelewów zlecanych przez zalogowanych użytkowników. Informacje o wykonanych operacjach loguje do pliku component_b.log. Można się z nich dowiedzieć m.in. kiedy przelew został zdefiniowany, zatwierdzony i wysłany oraz jakie ma atrybuty.

Naszym celem jest śledzenie transakcji użytkowników i wykrywanie różnego rodzaju anomalii. Potrzebujemy więc wygenerować raport, zawierający następujące informacje:

  1. czas zdefiniowania przelewu;
  2. nazwa użytkownika, który zdefiniował przelew;
  3. adres IP, z którego korzystał użytkownik definiujący przelew;
  4. parametry przelewu (tytuł przelewu, kwota, nr konta nadawcy i odbiorcy);
  5. czas zatwierdzenia przelewu;
  6. nazwa użytkownika, który zatwierdził przelew;
  7. adres IP użytkownika, który zatwierdził przelew.

Informacje z punktów 2, 3, 6, 7 znajdują się w pliku component_a.log. Pozostałe w pliku component_b.log. Uzyskanie wspomnianego raportu wymaga zbudowania transakcji, która powiąże ze sobą wszystkie potrzebne zdarzenia.

Przykładowe logi operacji zapisania, zatwierdzenia i wysłania przelewu mogą wyglądać następująco:

Przykładowe logi operacji zapisania, zatwierdzenia i wysłania przelewu
  1. Użytkownik operator1 loguje się o 12:41:30 (msg=logged_in) i zapisuje przelew o 12:42:01. (msg=transfer_saved). Zdarzenia zostały zalogowane przez dwa różne komponenty systemu do dwóch oddzielnych plików. Łączymy je przy użyciu pola session_id.
  2. Przelew został podpisany przez użytkownika operator2 o 12:45:35. W tym zdarzeniu pole session_id ma inną wartość. Nie możemy go więc użyć do wiązania z poprzednim zdarzeniem. Zamiast niego używamy pola transfer_id.
  3. Przelew został wysłany przez Component B o 12:58:02. Łączymy to zdarzenie z poprzednim przy pomocy pola transfer_id.
  4. W raporcie powinniśmy zawrzeć informacje (nazwa, srcip) o użytkowniku zatwierdzającym przelew. Dlatego łączymy zdarzenie zatwierdzenia przelewu (msg=transfer_aproved) z pliku component_b.log ze zdarzeniem logowania (msg=logged_in).

Zapytanie SPL realizujące wyżej opisane grupowanie oraz generujące raport w Splunku wygląda następująco:

index=demo (source=component_a.log OR source=component_b.log)
| eval 
    session_id_approval = if(msg="transfer_approved", session_id, null),
    session_id = if(msg="transfer_approved", null, session_id),
    time_transfer_saved = if(msg="transfer_saved", _time, null)     
| transaction  session_id, session_id_approval, transfer_id
| join session_id_approval [
    search index=demo source=component_a.log msg=logged_in
    | rename _time as time_approval, 
             user as user_approval, 
             srcip as srcip_approval,
             session_id as session_id_approval ]   
| table time_transfer_saved, user, srcip, transfer_id, *_acc
        amount, currency, time_approval, user_approval, srcip_approval
| eval
    time_transfer_saved = strftime(time_transfer_saved, "%F %T"),
    time_approval = strftime(time_approval, "%F %T")

Zdarzenia zostały załadowane do Splunka, do indeksu demo z plików component_a.log i component_b.log.

  • Pierwsza linia zapytania definiuje bazowy zbiór zdarzeń, na którym będziemy pracować.
  • Następnie, przy użyciu komendy eval przygotowujemy nasze zdarzenia do przetworzenia przez komendę transaction.
    • Zapisujemy wartość pola session_id w polu session_id_approval. W późniejszym etapie wykorzystamy je do połączenia ze zdarzeniem logowania użytkownika operator2.
    • Usuwamy pole session_id ze zdarzenia zatwierdzenia przelewu (msg=transfer_approved), żeby nie zakłócać procesu łączenia zdarzeń przy użyciu pola session_id. Jeśli pominiemy ten krok, funkcja grupująca transaction pogubi się, mając do czynienia z dwoma różnymi wartościami pola session_id.
  • Grupujemy zdarzenia po polach session_id, session_id_approval oraz transfer_id. Komenda transaction automatycznie wykryje grupy zdarzeń powiązanych na podstawie określonych przez nas kryteriów i zwróci znalezione grupy jako nowe zdarzenia.
  • Przy pomocy komendy join łączymy zdarzenie zatwierdzenia przelewu ze zdarzeniem logowania użytkownika zatwierdzającego przelew (operator2). Dzięki temu uzyskujemy jego nazwę oraz adres ip, z którego się łączył. Korzystamy w tym celu z pól session_id_approval ze zdarzenia zatwierdzenia przelewu oraz session_id ze zdarzenia logowania użytkownika operator2.
  • Na zakończenie formatujemy i generujemy raport, stosując komendę table.

Wynik zapytania zostanie wyświetlony w formie tabeli:

Wynik zapytania w formie tabeli

Omówiony przykład pokazuje, jak przy użyciu Splunka, w kilku prostych krokach, wykrywać grupy powiązanych ze sobą zdarzeń w sytuacji, gdy zdefiniowanie relacji między nimi wymaga użyciu wielu atrybutów. Głównym „bohaterem” opisanego rozwiązania jest komenda transaction. Możliwości jej zastosowania są bardzo szerokie i przekraczają zakres prezentowanego tematu.

Powiązane wpisy:

Jak zacząć ze Splunk REST API, czyli szybka i łatwa integracja Splunk Enterprise z zewnętrznymi aplikacjami

Logowanie zdarzeń w Docker przy użyciu platformy Splunk

Nowy sposób monitorowania i alertowania w IT oparty na Metrykach

Zobacz również

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *

    Skontaktuj się z nami