top of page

Paralleles Arbeiten in R

Frühe Designentscheidungen der Architektur von R haben dazu geführt, dass R single-threaded ist und parallele Ausführung nicht direkt unterstützt. Das ein Bedarf für ein entsprechendes Features existiert, zeigt die vergleichsweise frühe Einführung des parallel paketes als base-package und damit als Teil von R ab Version 2.14. Das Paket vereint die Funktionen von multicore und snow.


Parallele Ausführung lässt sich in 2 Grundformen unterteilen:

  1. Gemeinsam genutzter Speicher (shared memory): Hier greifen mehrere Ausführungsstränge auf den selben Speicher zu. Theoretische Geschwindigkeitsvorteile stehen erhöhter Komplexität im Bezug auf Synchronisierung und Speicherschutz gegenüber. In R wird dieses System besonders durch Nutzung der Grafikkarte (bspw. in gpuR) implementiert.

  2. Verteilter Speicher (distributed memory): Wenn von vorneherein von einem verteilten Speicher ausgegangen wird, hat jede prozessierende Einheit ihren eigenen Speicher. Latenzen in der Kommunikation, zum Beispiel zwischen Prozessen, führen im allgemeinen nicht zu Abbrüchen. Zudem sind Implementierung und Skalierbarkeit höher.


Wir werden hier fürs Erste von einem einzelnen CPU ausgehen und dafür das parallel Paket verwenden, also nur mit verteiltem Speicher arbeiten. Auch hier gibt es wieder zwei Optionen:

  1. Sockets: Neue R-Prozesse werden gestartet. Alle Pakete und Daten, die für die Ausführung des parallel auszuführenden Codes gebraucht werden, müssen geladen werden. Das System eignet sich besonders für heterogene worker. Wer ein paar ältere Server hat kann sie so unkompliziert für Berechnungen heranziehen.

  2. Forking Der R-Prozess wird kopiert und alle Variablen und Pakete sind so direkt verfügbar. Für die lokale Ausführung ist forking, insbesondere dank des sehr einfachen Interfaces, meist die bessere Alternative. Verfügbar ist es jedoch nur für Unix-basierte Betriebssysteme.


Limitierungen


Grundlegend stellt sich die Frage: Wieviel des Codes ist parallelisierbar? Wieviel Kommunikation zwischen Prozessen oder auch ins Netzwerk ist erforderlich? Oftmals kann die Optimierung an Schlüssenstellen des sequentiellen Codes schon zu (stärkeren) Erhöhung der Performance führen, als parallele Ausführung hinzuzuziehen.

Eine Einschätzung hierfür liefert das Amdahlsches Gesetz:

Formel des Ahmdalesches Gesetz
Ahmdalesches Gesetz



wobei der Multiplikator des Performancegewinns ist, der parallelisierbare Anteil des Codes und die Anzahl der Kerne. Deutlich ist hier, dass nur bei mehr oder weniger komplett parallelisierbaren Aufgaben der Aufwand lohnt.



library(tidyverse)
library(latex2exp)

 amdahl<-function(S,N){
  1/((1-S)+(S/N))
 }
 
 expand.grid(parallelizable=(1:100)/100,
  processors=2^(1:5))%>%
  mutate(x_speedup=amdahl(parallelizable,processors))%>%
  ggplot(aes(parallelizable,col=as.factor(processors),y=x_speedup),data=.)+
  geom_line()+
  scale_x_continuous(labels = scales::percent)+
  theme_minimal()+
  xlab("parallelisierbar")+
  ylab("Geschwindigkeitsgewinn")+
  scale_color_discrete(name="Kerne")

Auswertung des Ahmdalschen Gesetztes
Auswertung des Ahmdalschen Gesetztes


Anwendung: mclapply

Eine klassische parallelisierbare Aufgabe ist cross-validation. In diesem Beispiel, leave-one-out cross-validation, die naturgemäß rechenaufwändig ist. Für sparen uns hier den test-prozedur und berechnen nur die Entscheidungsbäume auf Basis des Iris-Datensatz mit je n-1 Beobachtungen, also Bäume.


Achtung: Dieser Code ist nur unter Mac und Linux ausführbar. Wenn ein Windows-System vorliegt gibt es auf RBloggers eine gute Anleitung.

library(parallel)
library(rpart)
 
 f1 <- function(i) {
  rpart::rpart(Species~.,data=iris[-i,])
 }
 
 system.time(save1 <- lapply(1:150, f1))

Vergleich der Laufzeiten mit und ohne Parallelisierung
Vergleich der Laufzeiten mit und ohne Parallelisierung

system.time unterteilt nach CPU-Zeit, die vom Hauptprozess bzw. von child Prozessen verbraucht wurde. Interessant ist hier vor allem die tatsächliche abgelaufene zeit (elapsed time). Bei einem einzeln ablaufenden Prozess Ergeben die Summe aus der CPU-Zeit der tatsächlichen Arbeiten des Prozesses user time und Infrastrukturaufgaben (I/O, prozessstart etc.) system time die elapsed time.

Standardmäßig werden 2 Kerne für die Ausführung herangezogen der Standard lässt sich mit options(Ncpu=4) setzen und beeinflusst auch die Anzahl der Kerne die zum kompilieren von Paketen benutzt werden. Besonders bei der Installation von sehr vielen Paketen kann das einen spürbaren Unterschied machen.


Anwendung: parLapply


Wir erstellen einen cluster mit Anzahl der an nodes, die der Anzahl der Kerne entspricht, welche vom Betriebssystem gefunden werden. Von ihm hängt auch ab, was als Kern gewertet wird (logisch und physische Kerne).

ncores <- detectCores()
 cl <- makeCluster(spec = ncores,type = "PSOCK")

Die entsprechende expression wird an alle nodes geschickt.

clusterEvalQ(cl,expr = {2 + 2})
Ergebnis der Cluster-Evaluierung
Ergebnis der Cluster-Evaluierung






















Da Socket nodes vollständig neue R-prozesse repräsentieren, können auch alle Objekte in der Globalen Umgebung des Hauptprozesses nicht gefunden werden.

x <- 1
 clusterEvalQ(cl, {
  print(x)
 })

Nur bei vorherigem exportieren in die nodes kann das klappen:

x <- 1
 clusterExport(cl, list("x"))
 clusterEvalQ(cl, x+x)

Schlussendlich sollte man den cluster wieder schließen um eine Ansammlung verwaister Prozesse zu vermeiden:

stopCluster(cl)

Das Ziehen von (pseudo-)zufälligen Stichproben lässt sich auch gut parallelisieren. Hier sollte man in der Anwendung allerdings darauf achten, dass sich Sequenzen von Zufallszahlen überlappen können. An dieser Stelle klammern wir das Problem aus.


Ein klassisches Beispiel ist das sampling von der A-posteriori-Wahrscheinlichkeit eines einfaches Bayessches Modell über Monte Carlo Sampling. Auch hier handelt es sich um eine reine Demonstration, in der Praxis würde man hier die Lösung algebraisch berechnen können.

ncores <- detectCores()
 cl <- makeCluster(spec = ncores,type = "PSOCK")
 
 f2<-function(i){
  n=1e5
  rnorm(n=n,mean = runif(n,min = 5,max = 7),sd=(rexp(n,rate = 5)))
 }
 
 clusterExport(cl, list("f2"))
 system.time(save3 <- parLapply(cl, 1:10, f2))
Timing des Clusster-Exports
Timing des Clusster-Exports



Fazit


Besonders auf Unix-basierten Systemen ist Parallelisierung eine gute Möglichkeit, die Abarbeitung von rechenintensiven Aufgaben zu Beschleunigen. Wer sich an einen funktionalen Stil gewöhnt hat und kaum Schleifen verwendet, kann seine lapply Aufrufe einfach durch mclapply austauschen ohne direkt Abbrüche erwarten zu müssen. Bei einer hoher Anzahl von Abfragen aus dem Netz mit hoher Latenz kann es sogar hilfreich sein, eine höhere Anzahl von nodes zu starten, als Kerne zu Verfügung stehen. Wer 16 Kerne hat wird jedoch so gut wie nie eine 16-fache Beschleunigung seines Codes erfahren.

57 Ansichten

Aktuelle Beiträge

Alle ansehen

Kommentare


Die Kommentarfunktion wurde abgeschaltet.
bottom of page