Hvordan lese og skrive en tabelldata i PySpark

Hvordan Lese Og Skrive En Tabelldata I Pyspark



Databehandling i PySpark er raskere hvis dataene lastes inn i form av tabeller. Med dette, ved bruk av SQL-uttrykk, vil behandlingen være rask. Så det er bedre å konvertere PySpark DataFrame/RDD til en tabell før den sendes til behandling. I dag skal vi se hvordan du leser tabelldataene inn i PySpark DataFrame, skriver PySpark DataFrame til tabellen og setter inn ny DataFrame til den eksisterende tabellen ved hjelp av de innebygde funksjonene. La oss gå!

Pyspark.sql.DataFrameWriter.saveAsTable()

Først vil vi se hvordan du skriver den eksisterende PySpark DataFrame inn i tabellen ved å bruke write.saveAsTable()-funksjonen. Det krever tabellnavnet og andre valgfrie parametere som moduser, partionBy, etc., for å skrive DataFrame til tabellen. Den oppbevares som en parkettfil.

Syntaks:







dataframe_obj.write.saveAsTable(bane/tabellnavn,modus,partisjonBy,...)
  1. Tabellnavnet er navnet på tabellen som er opprettet fra dataframe_obj.
  2. Vi kan legge til/overskrive dataene i tabellen ved å bruke modusparameteren.
  3. PartitionBy tar enkelt/flere kolonner for å lage partisjoner basert på verdier i disse angitte kolonnene.

Eksempel 1:

Lag en PySpark DataFrame med 5 rader og 4 kolonner. Skriv denne datarammen til en tabell kalt 'Agri_Table1'.



importere pyspark

fra pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux hint' ).getOrCreate()

# oppdrettsdata med 5 rader og 5 kolonner

agri =[{ 'Soil_Type' : 'Svart' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 2500 , 'Soil_status' : 'Tørke' ,
'Land' : 'USA' },

{ 'Soil_Type' : 'Svart' , 'Irrigation_availability' : 'Ja' , 'Dekar' : 3500 , 'Soil_status' : 'Våt' ,
'Land' : 'India' },

{ 'Soil_Type' : 'Rød' , 'Irrigation_availability' : 'Ja' , 'Dekar' : 210 , 'Soil_status' : 'Tørke' ,
'Land' : 'UK' },

{ 'Soil_Type' : 'Annen' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 1000 , 'Soil_status' : 'Våt' ,
'Land' : 'USA' },

{ 'Soil_Type' : 'Sand' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 500 , 'Soil_status' : 'Tørke' ,
'Land' : 'India' }]



# lag datarammen fra dataene ovenfor

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Skriv DataFrame ovenfor til tabellen.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Produksjon:







Vi kan se at én parkettfil er opprettet med den forrige PySpark Data.



Eksempel 2:

Vurder den forrige DataFrame og skriv 'Agri_Table2' til tabellen ved å partisjonere postene basert på verdiene i 'Country'-kolonnen.

# Skriv DataFrame ovenfor til tabellen med parameteren partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partisjonBy=[ 'Land' ])

Produksjon:

Det er tre unike verdier i «Land»-kolonnen – «India», «UK» og «USA». Så tre partisjoner er opprettet. Hver partisjon inneholder parkettfilene.

Pyspark.sql.DataFrameReader.table()

La oss laste tabellen inn i PySpark DataFrame ved å bruke spark.read.table()-funksjonen. Det tar bare én parameter som er banen/tabellnavnet. Den laster tabellen direkte inn i PySpark DataFrame, og alle SQL-funksjonene som brukes på PySpark DataFrame kan også brukes på denne innlastede DataFrame.

Syntaks:

spark_app.read.table(path/'Table_name')

I dette scenariet bruker vi den forrige tabellen som ble opprettet fra PySpark DataFrame. Sørg for at du må implementere de forrige scenariokodebitene i miljøet ditt.

Eksempel:

Last «Agri_Table1»-tabellen inn i DataFrame kalt «loaded_data».

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Produksjon:

Vi kan se at tabellen er lastet inn i PySpark DataFrame.

Utføre SQL-spørringene

Nå utfører vi noen SQL-spørringer på den innlastede DataFrame ved å bruke spark.sql()-funksjonen.

# Bruk SELECT-kommandoen for å vise alle kolonnene fra tabellen ovenfor.

linuxhint_spark_app.sql( 'VELG * fra Agri_Table1' ).forestilling()

# WHERE-klausul

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).forestilling()

linuxhint_spark_app.sql( 'VELG * fra Agri_Table1 WHERE Acres > 2000' ).forestilling()

Produksjon:

  1. Den første spørringen viser alle kolonnene og postene fra DataFrame.
  2. Den andre spørringen viser postene basert på «Soil_status»-kolonnen. Det er bare tre plater med 'Dry'-elementet.
  3. Den siste spørringen returnerer to poster med 'Acres' som er større enn 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Ved å bruke funksjonen insertInto() kan vi legge til DataFrame i den eksisterende tabellen. Vi kan bruke denne funksjonen sammen med selectExpr() for å definere kolonnenavnene og deretter sette den inn i tabellen. Denne funksjonen tar også tabellnavnet som en parameter.

Syntaks:

DataFrame_obj.write.insertInto('Tabell_name')

I dette scenariet bruker vi den forrige tabellen som ble opprettet fra PySpark DataFrame. Sørg for at du må implementere de forrige scenariokodebitene i miljøet ditt.

Eksempel:

Opprett en ny DataFrame med to poster og sett dem inn i 'Agri_Table1'-tabellen.

importere pyspark

fra pyspark.sql importer SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux hint' ).getOrCreate()

# oppdrettsdata med 2 rader

agri =[{ 'Soil_Type' : 'Sand' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 2500 , 'Soil_status' : 'Tørke' ,
'Land' : 'USA' },

{ 'Soil_Type' : 'Sand' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 1200 , 'Soil_status' : 'Våt' ,
'Land' : 'Japan' }]

# lag datarammen fra dataene ovenfor

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Dekar' , 'Land' , 'Irrigation_availability' , 'Soil_Type' ,
'Soil_status' ).write.insertInto( 'Agri_Table1' )

# Vis den endelige Agri_Table1

linuxhint_spark_app.sql( 'VELG * fra Agri_Table1' ).forestilling()

Produksjon:

Nå er det totale antallet rader som er tilstede i DataFrame 7.

Konklusjon

Du forstår nå hvordan du skriver PySpark DataFrame til tabellen ved å bruke write.saveAsTable()-funksjonen. Den tar tabellnavnet og andre valgfrie parametere. Deretter lastet vi denne tabellen inn i PySpark DataFrame ved å bruke spark.read.table()-funksjonen. Det tar bare én parameter som er banen/tabellnavnet. Hvis du vil legge til den nye DataFrame i den eksisterende tabellen, bruk funksjonen insertInto().