Emne for innhold:
- Pandas_udf() med forskjellige datatyper
- Pandas_udf() med forskjellige datatyper som bruker Groupby() og Agg()
Hvis du vil vite om PySpark DataFrame og modulinstallasjon, gå gjennom dette artikkel .
Pyspark.sql.functions.pandas_udf()
Pandas_udf () er tilgjengelig i sql.functions-modulen i PySpark som kan importeres ved hjelp av nøkkelordet 'fra'. Den brukes til å utføre vektoriserte operasjoner på vår PySpark DataFrame. Denne funksjonen implementeres som en dekoratør ved å sende tre parametere. Etter det kan vi lage en brukerdefinert funksjon som returnerer dataene i vektorformatet (som vi bruker serier/NumPy for dette) ved hjelp av en pil. Innenfor denne funksjonen kan vi returnere resultatet.
Struktur og syntaks:
La oss først se på strukturen og syntaksen til denne funksjonen:
@pandas_udf(datatype)def funksjonsnavn(operasjon) -> konverter_format:
returoppgave
Her er funksjonsnavn navnet på vår definerte funksjon. Datatypen spesifiserer datatypen som returneres av denne funksjonen. Vi kan returnere resultatet ved å bruke nøkkelordet 'retur'. Alle operasjonene utføres inne i funksjonen med piltildelingen.
Pandas_udf (Function and ReturnType)
- Den første parameteren er den brukerdefinerte funksjonen som sendes til den.
- Den andre parameteren brukes til å spesifisere returdatatypen fra funksjonen.
Data:
I hele denne guiden bruker vi kun én PySpark DataFrame for demonstrasjon. Alle brukerdefinerte funksjoner som vi definerer brukes på denne PySpark DataFrame. Sørg for at du oppretter denne DataFrame i miljøet ditt først etter installasjonen av PySpark.
importere pyspark
fra pyspark.sql importerer SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux hint' ).getOrCreate()
fra pyspark.sql.functions importer pandas_udf
fra pyspark.sql.types import *
importere pandaer som pandaer
# grønnsaksdetaljer
grønnsak =[{ 'type' : 'grønnsak' , 'Navn' : 'tomat' , 'locate_country' : 'USA' , 'mengde' : 800 },
{ 'type' : 'frukt' , 'Navn' : 'banan' , 'locate_country' : 'KINA' , 'mengde' : tjue },
{ 'type' : 'grønnsak' , 'Navn' : 'tomat' , 'locate_country' : 'USA' , 'mengde' : 800 },
{ 'type' : 'grønnsak' , 'Navn' : 'Mango' , 'locate_country' : 'JAPAN' , 'mengde' : 0 },
{ 'type' : 'frukt' , 'Navn' : 'sitron' , 'locate_country' : 'INDIA' , 'mengde' : 1700 },
{ 'type' : 'grønnsak' , 'Navn' : 'tomat' , 'locate_country' : 'USA' , 'mengde' : 1200 },
{ 'type' : 'grønnsak' , 'Navn' : 'Mango' , 'locate_country' : 'JAPAN' , 'mengde' : 0 },
{ 'type' : 'frukt' , 'Navn' : 'sitron' , 'locate_country' : 'INDIA' , 'mengde' : 0 }
]
# opprett markedsdatarammen fra dataene ovenfor
market_df = linuxhint_spark_app.createDataFrame(grønnsak)
market_df.show()
Produksjon:
Her lager vi denne DataFrame med 4 kolonner og 8 rader. Nå bruker vi pandas_udf() for å lage de brukerdefinerte funksjonene og bruke dem på disse kolonnene.
Pandas_udf() med forskjellige datatyper
I dette scenariet lager vi noen brukerdefinerte funksjoner med pandas_udf() og bruker dem på kolonner og viser resultatene ved hjelp av select()-metoden. I hvert tilfelle bruker vi pandas.Series når vi utfører de vektoriserte operasjonene. Dette betrakter kolonneverdiene som en endimensjonal matrise, og operasjonen brukes på kolonnen. I selve dekoratøren angir vi funksjonens returtype.
Eksempel 1: Pandas_udf() med strengtype
Her lager vi to brukerdefinerte funksjoner med strengreturtypen for å konvertere strengtypekolonneverdiene til store og små bokstaver. Til slutt bruker vi disse funksjonene på 'type' og 'locate_country' kolonner.
# Konverter type kolonne til store bokstaver med pandas_udf@pandas_udf(StringType())
def type_upper_case(i: panda.Series) -> panda.Series:
returner i.str.upper()
# Konverter locate_country-kolonnen til små bokstaver med pandas_udf
@pandas_udf(StringType())
def country_lower_case(i: panda.Series) -> panda.Series:
returner i.str.lower()
# Vis kolonnene med select()
market_df.select( 'type' ,type_store_case( 'type' ), 'lokaliser_land' ,
land_små bokstaver( 'lokaliser_land' )).forestilling()
Produksjon:
Forklaring:
StringType()-funksjonen er tilgjengelig i pyspark.sql.types-modulen. Vi har allerede importert denne modulen mens vi opprettet PySpark DataFrame.
- Først returnerer UDF (brukerdefinert funksjon) strengene med store bokstaver ved å bruke str.upper()-funksjonen. Str.upper() er tilgjengelig i seriedatastrukturen (da vi konverterer til serier med en pil inne i funksjonen) som konverterer den gitte strengen til store bokstaver. Til slutt brukes denne funksjonen på 'type'-kolonnen som er spesifisert i select()-metoden. Tidligere var alle strengene i typekolonnen med små bokstaver. Nå er de endret til store bokstaver.
- For det andre returnerer UDF strengene med store bokstaver ved å bruke str.lower()-funksjonen. Str.lower() er tilgjengelig i seriedatastrukturen som konverterer den gitte strengen til små bokstaver. Til slutt brukes denne funksjonen på 'type'-kolonnen som er spesifisert i select()-metoden. Tidligere var alle strengene i typekolonnen med store bokstaver. Nå er de endret til små bokstaver.
Eksempel 2: Pandas_udf() med heltallstype
La oss lage en UDF som konverterer PySpark DataFrame-heltallskolonnen til Pandas-serien og legger til 100 til hver verdi. Send 'quantity'-kolonnen til denne funksjonen i select()-metoden.
# Legg til 100@pandas_udf(IntegerType())
def add_100(i: panda.Series) -> panda.Series:
returner i+ 100
# Send mengdekolonnen til funksjonen og displayet ovenfor.
market_df.select( 'mengde' ,legg til_100( 'mengde' )).forestilling()
Produksjon:
Forklaring:
Inne i UDF, itererer vi alle verdiene og konverterer dem til serier. Etter det legger vi til 100 til hver verdi i serien. Til slutt sender vi «mengde»-kolonnen til denne funksjonen, og vi kan se at 100 legges til alle verdiene.
Pandas_udf() med forskjellige datatyper som bruker Groupby() og Agg()
La oss se på eksemplene for å sende UDF til de aggregerte kolonnene. Her blir kolonneverdiene gruppert først ved å bruke groupby()-funksjonen og aggregering gjøres ved å bruke agg()-funksjonen. Vi sender vår UDF i denne aggregatfunksjonen.
Syntaks:
pyspark_dataframe_object.groupby( 'grupperingskolonne' ).agg(UDF(pyspark_dataframe_object[ 'kolonne' ]))
Her er verdiene i grupperingskolonnen gruppert først. Deretter blir aggregeringen gjort på hver grupperte data med hensyn til vår UDF.
Eksempel 1: Pandas_udf() med Aggregate Mean()
Her lager vi en brukerdefinert funksjon med en returtype flytende. Inne i funksjonen beregner vi gjennomsnittet ved å bruke mean() funksjonen. Denne UDF-en sendes til kolonnen 'antall' for å få gjennomsnittlig mengde for hver type.
# returner gjennomsnittet/gjennomsnittet@pandas_udf( 'flyte' )
def average_function(i: panda.Series) -> flyte:
returner i.mean()
# Send mengdekolonnen til funksjonen ved å gruppere typekolonnen.
market_df.groupby( 'type' ).agg(gjennomsnittlig_funksjon(marked_df[ 'mengde' ])).forestilling()
Produksjon:
Vi grupperer basert på elementer i 'type'-kolonnen. To grupper dannes - 'frukt' og 'grønnsak'. For hver gruppe beregnes gjennomsnittet og returneres.
Eksempel 2: Pandas_udf() med Aggregate Max() og Min()
Her lager vi to brukerdefinerte funksjoner med heltall (int) returtype. Den første UDF-en returnerer minimumsverdien og den andre UDF-en returnerer maksimumsverdien.
# pandas_udf som returnerer minimumsverdien@pandas_udf( 'int' )
def min_(i: panda.Series) -> int:
returner i.min()
# pandas_udf som returnerer maksimalverdien
@pandas_udf( 'int' )
def max_(i: panda.Series) -> int:
returner i.max()
# Send mengdekolonnen til min_ pandas_udf ved å gruppere locate_country.
market_df.groupby( 'lokaliser_land' ).agg(min_(marked_df[ 'mengde' ])).forestilling()
# Send mengdekolonnen til max_ pandas_udf ved å gruppere locate_country.
market_df.groupby( 'lokaliser_land' ).agg(maks_(marked_df[ 'mengde' ])).forestilling()
Produksjon:
For å returnere minimums- og maksimumsverdier, bruker vi funksjonene min() og max() i returtypen til UDF-er. Nå grupperer vi dataene i kolonnen 'locate_country'. Fire grupper dannes ('KINA', 'INDIA', 'JAPAN', 'USA'). For hver gruppe returnerer vi maksimalt antall. Tilsvarende returnerer vi minimumsmengden.
Konklusjon
I utgangspunktet brukes pandas_udf () til å utføre de vektoriserte operasjonene på vår PySpark DataFrame. Vi har sett hvordan du lager pandas_udf() og bruker den på PySpark DataFrame. For bedre forståelse diskuterte vi de forskjellige eksemplene ved å vurdere alle datatypene (streng, float og heltall). Det kan være mulig å bruke pandas_udf() med groupby() gjennom agg()-funksjonen.