top of page

Wykorzystanie Change Streams jako alternatywy do Kafki lub Kinesisa dla MongoDB



W podejściu do przetwarzania danych strumieniowo najczęściej wybieranym rozwiązaniem jest instancja Kafki służąca za bufor między producentem danych a odbiorcą danych strumieniowych. W przypadku bazy MongoDB można pominąć osobną instancję Kafki, czy też Amazon Kinesisa, a zamiast tego wykorzystać wewnętrzny mechanizm tej bazy – Change Streams.


Change Streams, bazując na logu operacyjnym, obserwują wszelkie dokonane zmiany danych. W swoich metodach pozwalają na obserwowanie kolejnych zachodzących zmian w całej bazie danych lub poszczególnych kolekcjach oraz także obserwowanie określnych operacji, tzn. insert, update, itp. Co więcej w przypadku przerwania pobierania danych istnieje możliwość wznowienia procesowania danych od punktu przerwania wykorzystując do tego dedykowane tokeny (tzw. resume tokens).


Kiedy naszym zadaniem będzie pobranie danych z określonych kolekcji MongoDB można do tego zastosować bibliotekę pymongo. Na wstępie należy dokonać połączenia z bazą danych oraz utworzyć potrzebne obiekty:



Przy pobieraniu kolejnych zmian z bazy mamy możliwość wskazania za pomocą pipeline-ów określonych kolekcji czy też określnych typów transakcji. Dodatkowo mamy możliwość jedynie pobierania zmienionych wartości w danym dokumencie lub też całych dokumentów, w których doszło do zmian. Poniższy kod obserwuje zmiany z bazy danych z dwóch określonych kolekcji, gdzie zawsze pobierane są całe dokumenty.



W powyższym kodzie wskazany jest także token do wznawiania obserwowania bazy od określonego momentu. Token jest przekazywany wraz z innymi wartościami określonej zmiany jak np. czas zmiany (clusterTime), identyfikator dokumentu (documentKey), typ operacji (operationType), czy też nazwy bazy danych i kolekcji (ns). Sam token należy odkładać do osobnego miejsca przechowywania danych, który można wykorzystać przy ponownym uruchomieniu procesu od ostatniej zaobserwowanej zmiany. W przypadku przekazania pustego tokenu, obserwacja bazy danych zaczyna się od obecnego momentu.


Prosta konfiguracja Change Streams pozwala na szybkie utworzenie procesu strumieniowania danych, z pominięciem osobnej instancji Kafki czy też Kinesisa. Kolejną zaletą jest tworzenie generycznych i łatwo konfigurowalnych date pipeline-ów. Dla realnego przykładu Change Streams pozwoliły bez przeszkód utworzyć jeden pipeline służący ładowaniu danych do Date Lake z MongoDB, który można bez trudu rozszerzać o dodatkowe kolekcje bazodanowe jedynie poprzez zmianę konfiguracji. Dodatkowo jest to proces łatwo wznawialny od ostatniej przeprocesowanej zmiany, ponieważ opiera się na tokenach zapisywanych w osobnej bazie danych.



Michał Dąbrowski

Junior Machine Learning Developer

michal.dabrowski@bitpeak.pl



Źródła:

https://www.mongodb.com/blog/post/an-introduction-to-change-streams

https://www.mongodb.com/basics/change-streams

https://docs.mongodb.com/manual/reference/change-events/#delete-event



69 wyświetleń

Ostatnie posty

Zobacz wszystkie
bottom of page