“Sarı Fil ve Arkadaşları Dövüş Kulübü Kuruyor.. Apache Spark’” Ortaya Karışık Yazılım Serisi — 10

Kemalcan Bora
4 min readJan 27, 2021

Previously on Ortaya Karışık Yazılım Serisi (“Sarı Fil ve Arkadaşları Dövüş Kulübü Kuruyor.. MapReduce’” Ortaya Karışık Yazılım Serisi — 9) => LINK

https://thinkr.fr/wp-content/uploads/2019/07/spark-meme.jpg

Spark en basit tanımla, büyük veri yığınları için oluşturulmuş bir data processing motoru olmasına karşın içerisinde ML, graph analizi, streaming data gibi bir çok güçlü yanı bulunan bir framework diyebiliriz.

https://www.edureka.co/blog/wp-content/uploads/2018/09/Picture6-2.png

Yukarıdaki mimariye baktığımız zaman Spark Context görevi yazdığınız scriptin çalıştığı zaman ne olacağını bilen kısım. Daha sonra spark YARN,Mesos veya Hadoop üzerinde(cluster manager) çalışır.

Not: Hadoop üzerinde çalışmak zorunda değildir.

Cluster manager’da bu görevi workerlara dağıtır. Cache burada performans açısından önemlidir çünkü hdfs’e nazaran spark memory temelli bir çözümdür ne kadar ram o kadar köfte..

  • Memory’de Hadoop MapReduce’e göre 100x veya diske göre 10x hızlıdır.w
  • DAG Engine
  • Resilient distributed datasets (RDD) temel konseptimiz.
  • Java, Scala, Python ile yazılabilir. Şahsen Python’da Spark ile hiç iyi tecrübelerim olmadı niyeyse pyspark beni üzüyor.

Spark Core üzerinde Spark Streaming, Spark Sql, MLLib, GraphX gibi modüller barındırıyor.

Spark Streaming: Batch processing için kullanılıyor. Real timeda use case’i var logları sunmak gibi vs.

Spark Sql: Basitce Spark için SQL arayüzü direk sql yazabiliyoruz.

MLLib: Adından belli olacağı üzere ML tooları

GraphX: Adı graph teoriden geliyor. Twitterda ben kiminle bağlantılı olduğum kişi kiminle bağlantılı gibi bir sosyal ağ örneği aklınıza gelebilir.

RDD (Resilient Distributed Datasets)

RDD, Esnek dağıtık veriseti olarak çevirilebilir. RDD objeleri key-value bilgilerini veya herhangi bir bilgiyi otomatik olarak saklar yani programlama açısından bakıldığında bir rdd sizin için sadece bir veri kümesidir

SparkContext

  • Driver programları oluşturur.
  • RDD lerin esnek ve dağıtık çalışmalarından sorumludur.
  • RDD ler oluşturur
  • Spark Shell bizim için “sc” objesi olarak bunu otomatik oluşturur.

Transformations

Yürütülen her bir transformation işlemi gerçekten bir action ile karşılaşana kadar gerçekleştirilmez. Buna "Lazy Evaluation" denir.Lazy Evaluation yumurta göte dayanıncaya kadar hiç bir şey yapmamak olarak açıklayabiliriz.

  • map
  • flatmap
  • filter
  • distinct
  • sample
  • union, subtract, intersection, cartesian gibi fonksiyonlara sahip.

Map, Apache Spark’taki bir dönüşüm işlemidir. RDD’nin her öğesi için geçerlidir ve sonucu yeni RDD olarak döndürür. map içinde, işlem geliştiricisi kendi özel iş mantığını tanımlayabilir. Aynı mantık, RDD’nin tüm unsurlarına uygulanacaktır.[1]

Filter () veya where () işlevi, DataFrame veya Dataset’ten verilen bir veya birden çok koşula veya SQL ifadesine göre satırları filtrelemek için kullanılır.

örneğimizde data = sc.textFile("deneme.txt") gibi basit bir giriş yaptığımız zaman bir string RDD'si yaratmış olduk.

Oluşturulan bir RDD yaratıldıktan sonra bkz:transformations veya actions kullanılabilir. Transformation işlemleri ile bir önceki RDD’den yeni bir RDD yaratılmasını sağlar.

filter_data = data.filter(lambda line: "Python" in line)

Burada bir önceki RDD’yi(‘data’) kullanarak yeni bir RDD(filter_data) oluşturuyoruz.

Başka örnek:

input = sc.textFile("log.txt")
error = input.filter(lambda x: "error" in x)
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
https://www.koseburak.net/images/apache_spark/2/1.png

İki çok genel transformation işlemi vardır, map ve filter. Bunların işlevleri aşağıdaki görseldeki gibi açıklanabilir.[2]

https://www.koseburak.net/images/apache_spark/2/2.png

Cache veya Persist

Eğer elinizdeki bir RDD’yi birçok kez kullanacaksak ve performansdan kazanmak istiyorsak, Spark’a bunu RDD.persist() şeklinde bildirerek, memory'i kullanarak daha fazla performans elde edebilirsiniz(cache(), persist() ile aynı işlevi görür).

Not:persist() fonksiyonunun RDD henüz action ile karşılaşmadan önce çağrıldığına dikkat edin.

from operator import add
from pyspark import SparkContext

if __name__ == '__main__':
sc = SparkContext(appName='myname')
data = sc.parallelize(range(10)).map(lambda x: x ** 2)
data.persist(StorageLevel.DISK_ONLY)
print(data.count())
print(','.join(str(s) for s in data.collect()))

Eğer belleğe sığamayacak kadar veriyi önbelleklemeye çalışırsanız Spark otomatik olarak kendini Least Recently Used (LRU) politikasına göre düzenleyecektir.

Aslına bakıldığı zaman Cache ve Persist, yinelemeli ve etkileşimli RDD işlerinin performansını artırmak için en iyileştirme teknikleridir.
Milyarlarca veya trilyonlarca veri ile uğraşırken performans düşüşünü göreceksiniz. Bu nedenle, hesaplamalara bakmalı ve performansı artırmanın yollarından biri olarak optimizasyon tekniklerini kullanmalıyız.

Spark cache () ve persist () yöntemlerini hesaplamak, depolamak için bir optimizasyon sunar, böylece sonraki eylemlerde yeniden kullanılabilir.

Bir RDD’yi persist veya cache yaptığmız zaman, her alt node bölümlenmiş verilerini bellekte veya diskte depolar ve bunları bu RDD üzerindeki diğer eylemlerde yeniden kullanır. Ve Spark’ın nodelardaki kalıcı verileri hataya toleranslıdır, yani herhangi bir bölüm kaybolursa, onu oluşturan orijinal dönüşümler kullanılarak otomatik olarak yeniden hesaplanır.

  • Uygun maliyetli
  • Zaman verimli liği— Tekrarlanan hesaplamaları yeniden kullanmak çok zaman kazandırır.
  • Yürütme süresi — İşin yürütme süresini kazandırır ve aynı kümede daha fazla iş gerçekleştirebiliriz.

Dataset sınıfındaki Spark cache () yöntemi, DataFrame veya Dataset sonuç kümesini önbelleğe almak için sparkSession.sharedState.cacheManager.cacheQuery kullanan persist () yöntemini dahili olarak çağırır.

Spark persist () yöntemi, DataFrame veya Dataset’i MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2 ve daha fazlasını depolama düzeylerinden birine depolamak için kullanılır.

Hiç birinizin okumadığı bu seride klasik kapanışımız ile kapatalım. Ortaya Karışık Yazılım Serisi’ne kendinizi yakın hissediyorsanız paylaşarak destek olabilirsiniz yaptığım hata ve öneriler için Twitter üzerinden ulaşabilirsiniz.

Referanslar

--

--