PySpark Pandas_Udf()

Pyspark Pandas Udf



Det er mulig å transformere PySpark DataFrame ved å bruke pandas_udf()-funksjonen. Det er en brukerdefinert funksjon som brukes på PySpark DataFrame med pil. Vi kan utføre de vektoriserte operasjonene ved å bruke pandas_udf(). Det kan implementeres ved å bestå denne funksjonen som dekoratør. La oss dykke ned i denne guiden for å kjenne syntaksen, parameterne og forskjellige eksempler.

Emne for innhold:

Hvis du vil vite om PySpark DataFrame og modulinstallasjon, gå gjennom dette artikkel .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () er tilgjengelig i sql.functions-modulen i PySpark som kan importeres ved hjelp av nøkkelordet 'fra'. Den brukes til å utføre vektoriserte operasjoner på vår PySpark DataFrame. Denne funksjonen implementeres som en dekoratør ved å sende tre parametere. Etter det kan vi lage en brukerdefinert funksjon som returnerer dataene i vektorformatet (som vi bruker serier/NumPy for dette) ved hjelp av en pil. Innenfor denne funksjonen kan vi returnere resultatet.



Struktur og syntaks:



La oss først se på strukturen og syntaksen til denne funksjonen:

@pandas_udf(datatype)
def funksjonsnavn(operasjon) -> konverter_format:
returoppgave

Her er funksjonsnavn navnet på vår definerte funksjon. Datatypen spesifiserer datatypen som returneres av denne funksjonen. Vi kan returnere resultatet ved å bruke nøkkelordet 'retur'. Alle operasjonene utføres inne i funksjonen med piltildelingen.





Pandas_udf (Function and ReturnType)

  1. Den første parameteren er den brukerdefinerte funksjonen som sendes til den.
  2. Den andre parameteren brukes til å spesifisere returdatatypen fra funksjonen.

Data:

I hele denne guiden bruker vi kun én PySpark DataFrame for demonstrasjon. Alle brukerdefinerte funksjoner som vi definerer brukes på denne PySpark DataFrame. Sørg for at du oppretter denne DataFrame i miljøet ditt først etter installasjonen av PySpark.



importere pyspark

fra pyspark.sql importerer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux hint' ).getOrCreate()

fra pyspark.sql.functions importer pandas_udf

fra pyspark.sql.types import *

importere pandaer som pandaer

# grønnsaksdetaljer

grønnsak =[{ 'type' : 'grønnsak' , 'Navn' : 'tomat' , 'locate_country' : 'USA' , 'mengde' : 800 },

{ 'type' : 'frukt' , 'Navn' : 'banan' , 'locate_country' : 'KINA' , 'mengde' : tjue },

{ 'type' : 'grønnsak' , 'Navn' : 'tomat' , 'locate_country' : 'USA' , 'mengde' : 800 },

{ 'type' : 'grønnsak' , 'Navn' : 'Mango' , 'locate_country' : 'JAPAN' , 'mengde' : 0 },

{ 'type' : 'frukt' , 'Navn' : 'sitron' , 'locate_country' : 'INDIA' , 'mengde' : 1700 },

{ 'type' : 'grønnsak' , 'Navn' : 'tomat' , 'locate_country' : 'USA' , 'mengde' : 1200 },

{ 'type' : 'grønnsak' , 'Navn' : 'Mango' , 'locate_country' : 'JAPAN' , 'mengde' : 0 },

{ 'type' : 'frukt' , 'Navn' : 'sitron' , 'locate_country' : 'INDIA' , 'mengde' : 0 }

]

# opprett markedsdatarammen fra dataene ovenfor

market_df = linuxhint_spark_app.createDataFrame(grønnsak)

market_df.show()

Produksjon:

Her lager vi denne DataFrame med 4 kolonner og 8 rader. Nå bruker vi pandas_udf() for å lage de brukerdefinerte funksjonene og bruke dem på disse kolonnene.

Pandas_udf() med forskjellige datatyper

I dette scenariet lager vi noen brukerdefinerte funksjoner med pandas_udf() og bruker dem på kolonner og viser resultatene ved hjelp av select()-metoden. I hvert tilfelle bruker vi pandas.Series når vi utfører de vektoriserte operasjonene. Dette betrakter kolonneverdiene som en endimensjonal matrise, og operasjonen brukes på kolonnen. I selve dekoratøren angir vi funksjonens returtype.

Eksempel 1: Pandas_udf() med strengtype

Her lager vi to brukerdefinerte funksjoner med strengreturtypen for å konvertere strengtypekolonneverdiene til store og små bokstaver. Til slutt bruker vi disse funksjonene på 'type' og 'locate_country' kolonner.

# Konverter type kolonne til store bokstaver med pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

returner i.str.upper()

# Konverter locate_country-kolonnen til små bokstaver med pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

returner i.str.lower()

# Vis kolonnene med select()

market_df.select( 'type' ,type_store_case( 'type' ), 'lokaliser_land' ,
land_små bokstaver( 'lokaliser_land' )).forestilling()

Produksjon:

Forklaring:

StringType()-funksjonen er tilgjengelig i pyspark.sql.types-modulen. Vi har allerede importert denne modulen mens vi opprettet PySpark DataFrame.

  1. Først returnerer UDF (brukerdefinert funksjon) strengene med store bokstaver ved å bruke str.upper()-funksjonen. Str.upper() er tilgjengelig i seriedatastrukturen (da vi konverterer til serier med en pil inne i funksjonen) som konverterer den gitte strengen til store bokstaver. Til slutt brukes denne funksjonen på 'type'-kolonnen som er spesifisert i select()-metoden. Tidligere var alle strengene i typekolonnen med små bokstaver. Nå er de endret til store bokstaver.
  2. For det andre returnerer UDF strengene med store bokstaver ved å bruke str.lower()-funksjonen. Str.lower() er tilgjengelig i seriedatastrukturen som konverterer den gitte strengen til små bokstaver. Til slutt brukes denne funksjonen på 'type'-kolonnen som er spesifisert i select()-metoden. Tidligere var alle strengene i typekolonnen med store bokstaver. Nå er de endret til små bokstaver.

Eksempel 2: Pandas_udf() med heltallstype

La oss lage en UDF som konverterer PySpark DataFrame-heltallskolonnen til Pandas-serien og legger til 100 til hver verdi. Send 'quantity'-kolonnen til denne funksjonen i select()-metoden.

# Legg til 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

returner i+ 100

# Send mengdekolonnen til funksjonen og displayet ovenfor.

market_df.select( 'mengde' ,legg til_100( 'mengde' )).forestilling()

Produksjon:

Forklaring:

Inne i UDF, itererer vi alle verdiene og konverterer dem til serier. Etter det legger vi til 100 til hver verdi i serien. Til slutt sender vi «mengde»-kolonnen til denne funksjonen, og vi kan se at 100 legges til alle verdiene.

Pandas_udf() med forskjellige datatyper som bruker Groupby() og Agg()

La oss se på eksemplene for å sende UDF til de aggregerte kolonnene. Her blir kolonneverdiene gruppert først ved å bruke groupby()-funksjonen og aggregering gjøres ved å bruke agg()-funksjonen. Vi sender vår UDF i denne aggregatfunksjonen.

Syntaks:

pyspark_dataframe_object.groupby( 'grupperingskolonne' ).agg(UDF
(pyspark_dataframe_object[ 'kolonne' ]))

Her er verdiene i grupperingskolonnen gruppert først. Deretter blir aggregeringen gjort på hver grupperte data med hensyn til vår UDF.

Eksempel 1: Pandas_udf() med Aggregate Mean()

Her lager vi en brukerdefinert funksjon med en returtype flytende. Inne i funksjonen beregner vi gjennomsnittet ved å bruke mean() funksjonen. Denne UDF-en sendes til kolonnen 'antall' for å få gjennomsnittlig mengde for hver type.

# returner gjennomsnittet/gjennomsnittet

@pandas_udf( 'flyte' )

def average_function(i: panda.Series) -> flyte:

returner i.mean()

# Send mengdekolonnen til funksjonen ved å gruppere typekolonnen.

market_df.groupby( 'type' ).agg(gjennomsnittlig_funksjon(marked_df[ 'mengde' ])).forestilling()

Produksjon:

Vi grupperer basert på elementer i 'type'-kolonnen. To grupper dannes - 'frukt' og 'grønnsak'. For hver gruppe beregnes gjennomsnittet og returneres.

Eksempel 2: Pandas_udf() med Aggregate Max() og Min()

Her lager vi to brukerdefinerte funksjoner med heltall (int) returtype. Den første UDF-en returnerer minimumsverdien og den andre UDF-en returnerer maksimumsverdien.

# pandas_udf som returnerer minimumsverdien

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

returner i.min()

# pandas_udf som returnerer maksimalverdien

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

returner i.max()

# Send mengdekolonnen til min_ pandas_udf ved å gruppere locate_country.

market_df.groupby( 'lokaliser_land' ).agg(min_(marked_df[ 'mengde' ])).forestilling()

# Send mengdekolonnen til max_ pandas_udf ved å gruppere locate_country.

market_df.groupby( 'lokaliser_land' ).agg(maks_(marked_df[ 'mengde' ])).forestilling()

Produksjon:

For å returnere minimums- og maksimumsverdier, bruker vi funksjonene min() og max() i returtypen til UDF-er. Nå grupperer vi dataene i kolonnen 'locate_country'. Fire grupper dannes ('KINA', 'INDIA', 'JAPAN', 'USA'). For hver gruppe returnerer vi maksimalt antall. Tilsvarende returnerer vi minimumsmengden.

Konklusjon

I utgangspunktet brukes pandas_udf () til å utføre de vektoriserte operasjonene på vår PySpark DataFrame. Vi har sett hvordan du lager pandas_udf() og bruker den på PySpark DataFrame. For bedre forståelse diskuterte vi de forskjellige eksemplene ved å vurdere alle datatypene (streng, float og heltall). Det kan være mulig å bruke pandas_udf() med groupby() gjennom agg()-funksjonen.