“Mahallenizin emekli albayı Apache Airflow“ Ortaya Karışık Yazılım Serisi — 7
Previously on Ortaya Karışık Yazılım Serisi (Rust’ın en taze meyvelerinden MeiliSearch ve AWS Chalice ile Multiprocessing Serverless) => LINK

Apache Airflow, iş akışlarını otomatikleştirerek ve manuel görevleri yöneterek birçok sorunun çözülmesine yardımcı olan açık kaynak kodlu bir araçtır.
KMS, DynamoDB, EC2, Bigtable, Pig, S3, sqoop, Kinesis vb bir çok platforma entegre olabilir ayrıntıları bulabilirsiniz. [1] İlkeleri kısmına baktığımız zaman;
Ölçeklenebilir(Scalable)
Airflow modüler bir mimariye sahiptir ve isteğe bağlı sayıda işçi düzenlemek için bir ileti kuyruğu kullanır. Hava akışı sonsuzluğa ölçeklenmeye hazırdır.
Dinamik(Dynamic)
Airflow dinamik pipeline oluşturulmasına olanak tanır. Bu işlem, dinamik olarak başlatan kod yazmaya olanak tanır.
Genişletilebilir(Extensible)
Kendi operatörlerinizi kolayca tanımlayabilirsiniz ve kütüphaneleri ortamınıza uygun şekilde genişletebiliriz.
Zarif(Elegant)
Airflow pipelinelar yalın ve açıktır. [1]
Pip install x — Kendi çapında Dark Souls
Bu yazıda yine docker-compose kullanarak airflow’u ayağa kaldıracağız duruma göre diğer araçlar ile bağlayabiliriz.
Docker dosyasını çalıştırmadan önce airflow’un çalışması için yüklememiz gereken kısımlar var burada ilk kısım tabi ki pip install ile airflow yüklemek.
Normal şartlar altında “pip install apache-airflow[all]” dediğimiz anda yükleniyor demek isterdim ama olmuyor. Öncelikle “[all]” kısmında sizin hepsini yüklemenize gerek yok örneğin projenizde postgresql kullanacaksanız;
pip install apache-airflow[‘postgres’]
demeniz yeterli. Bununla birlikte “yok abi ben hepsini yüklemek istiyorum” diyenler için başlıyoruz..

Pip install ile adımımızı attığımız anda bizi hoş bir ekran karşılıyor
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output. Mysql blablabla
Özetle diyorki kardeş bak ben sıkıntılı bir tipim öyle otomatik yükleme falan bekleme git efendi gibi mysql kısımlarını hallet öyle gel.
sudo apt-get install mysql-server
sudo apt-get install libmysqlclient-dev
sudo apt-get install libmysqlclient-dev python-dev
pip install mysql-connector-python
Diyerek ilk aşamayı geçiyoruz. Tabi bu hatayı alana kadar ilk önce 500mb falan kütüphane yükledi işlemi tekrar başlatıyoruz.

ve ikinci hata:
cwd: /tmp/pip-install-we39qd_n/pykerberos/
Complete output (14 lines):
running install
running build
running build_ext
building 'kerberos' extension
creating build
creating build/temp.linux-x86_64-3.8
creating build/temp.linux-x86_64-3.8/src
x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC -I/usr/include/python3.8 -c src/kerberos.c -o build/temp.linux-x86_64-3.8/src/kerberos.o
In file included from src/kerberos.c:19:
src/kerberosbasic.h:17:10: fatal error: gssapi/gssapi.h: No such file or directory
17 | #include <gssapi/gssapi.h>
| ^~~~~~~~~~~~~~~~~
compilation terminated.
error: command 'x86_64-linux-gnu-gcc' failed with exit status 1
Bu sefer aldığımız hatada ise “ben iflah olmaz bir kütüphaneyim benimle uğraşma hayatını karartırım sana sıkıntı yaratırım.” Tabi ki bizde ruh hastası, problemli bireyler olduğumuzdan dolayı bu tarz hatalarda sadece “:)” ifadesi ile ekrana bakıp yolumuza devam ediyoruz [3,4]
sudo apt-get install libkrb5-dev
sudo apt-get install unixodbc-dev
pip install pywinrm[kerberos]
sudo apt-get install gcc
sudo apt-get install libsasl2-dev
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
Bir yığın tool yükledikten sonra niyet edip [all] diyoruz.
pip install apache-airflow[all]
docker-compose up
Evet artık sistem sağlık bir şekilde ayakta. Artık airflow yükleme kısmını düşünmüyoruz ve yolumuza devam ediyoruz.
Veritabanı yedeklemeleri ve kod/config dağıtımının zamanlaması da dahil olmak üzere ETL, makine öğrenimi pipelineları, job scheduling yazmak için Airflow’u kullanabilirsiniz.
Airflow pipeline, esasen bir Airflow Directed Acyclic Graph (DAG) nesnesini tanımlayan Python’da yazılmış bir parametre kümesidir.
Örneğin, A, B ve C adlı üç görevimiz olsun. A’nın görevinin bittiği anda B ve C’nin görevleri başlayacak diye şart koyabiliyoruz.

default_args
Sanırım en önemli kısımlardan birisi args kısmı. Burası basitce DAG kısmını belirlediğimiz kısım ne zaman çalışmaya başlayacak, hata alınca nereye mail atılacak, ne zaman bitecek, ne aralıklarla çalışacak, kaç kere deneyecek vb bir çok parametreye sahip.
İşte diğer bazı önemli parametreler.
end_date kodun son yürütme tarihini belirler. Eğer bu bitiş tarihini koymazsanız, Airflow sonsuza kadar çalışmaya devam edecektir.
depends_on_past bir Boolean değeridir. Örneğin, bağımsız değişkeni true olarak ayarladığınızı varsayalım, bu durumda günlük iş akışı. Dünkü görev çalışması başarısız olursa, önceki tarihin durumuna bağlı olduğu için iki günlük bir görev tetiklenmez.
email e-posta bildirimi alıyorsunuz.
email_on_failure, bir hata oluşursa bildirimi almak isteyip istemediğinizi tanımlamak için kullanılır.
email_on_retry, her yeniden deneme gerçekleştiğinde bir e-posta almak isteyip istemediğinizi tanımlamak için kullanılır.
retries, Airflow'un başarısız bir görevi yeniden denemeye çalışacağı sayısını belirler
retry_delay ardışık yeniden denemeler arasındaki süredir.
DAG Schedule Yapılandırılması
Programınızı belirli aralıklarda çalıştırmak istiyorsanız, aşağıdaki kod parametrelerini kullanabi:
örnek: schedule_interval='@daily'
@once → DAG'yi yalnızca bir kez çalıştır
@hourly → DAG'yi saatlik çalıştırır
@daily → DAG'yi günlük çalıştırır
@weekly → DAG'yi haftalık çalıştırır
@monthly → DAG'yi aylık çalıştırır
@yearly → DAG'yi yıllık çalıştırır
veya 5 dakika aralıklarlar çalışması için [5]
schedule_interval=timedelta(minutes=5)
Task Katmaları
3 çeşit task kullanım tipi var PythonOperator, DummyOperator ve BashOperator.
DummyOperator, pipelinenın yapıldığını belirtmek dışında gerçekten hiçbir şey yapmayan boş bir operatördür.
PythonOperator, bir Python işlevini çağırmanızı sağlar.
BashOperator bash komutlarını çağırmanızı sağlar.

Bağımlılıkları Tanımlanması
Aslında iki tip tanım var bunlar “set_downstream” ve “set_upstream”.
- a>>b = a, b taskinden önce gelir “yani b’nin çalışması a’ya bağlı”
- a<<b = b,a taskinden önce gelir “yani a’nın çalışması b’ye bağlı”
- a.set_downstream(b) = a, b taskinden önce gelir “yani b’nin çalışması a’ya bağlı”
- a.set_upstream(b) = b,a taskinden önce gelir “yani a’nın çalışması b’ye bağlı”
Eğer bir çok task bir taskin çalışmasına bağlı ise ;
a.set_downstream([b, c])
Airflow’u herhangi bir database’e bağlamak istediğimiz zaman
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection
[SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
FROM connection
WHERE connection.conn_id = ?]
[parameters: (‘postgres’,)]
Hata alabiliriz bu durumda docker içerisine gidip initdb oluşturmamız gerekiyor.
docker exec -it “container-id” /bin/bashairflow initdb

DAG eklemeden önce Airflow arayüzüne girip DB kısmımızı bağlıyoruz. Ardından DAG kısmını on yapıyoruz.

Logları kontrol ettiğimiz zaman çalıştığını görebiliriz.

Yine klasik kapanışımız ile kapatalım. Ortaya Karışık Yazılım Serisi’ne kendinizi yakın hissediyorsanız paylaşarak destek olabilirsiniz yaptığım hata ve öneriler için Twitter üzerinden ulaşabilirsiniz.

“Sarı fil ve Arkadaşları Dövüş Kulübü Kuruyor.. HDFS” Ortaya Karışık Yazılım Serisi — 8