Pyspark.sql.DataFrameWriter.saveAsTable()
Først vil vi se hvordan du skriver den eksisterende PySpark DataFrame inn i tabellen ved å bruke write.saveAsTable()-funksjonen. Det krever tabellnavnet og andre valgfrie parametere som moduser, partionBy, etc., for å skrive DataFrame til tabellen. Den oppbevares som en parkettfil.
Syntaks:
dataframe_obj.write.saveAsTable(bane/tabellnavn,modus,partisjonBy,...)
- Tabellnavnet er navnet på tabellen som er opprettet fra dataframe_obj.
- Vi kan legge til/overskrive dataene i tabellen ved å bruke modusparameteren.
- PartitionBy tar enkelt/flere kolonner for å lage partisjoner basert på verdier i disse angitte kolonnene.
Eksempel 1:
Lag en PySpark DataFrame med 5 rader og 4 kolonner. Skriv denne datarammen til en tabell kalt 'Agri_Table1'.
importere pyspark
fra pyspark.sql importer SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux hint' ).getOrCreate()
# oppdrettsdata med 5 rader og 5 kolonner
agri =[{ 'Soil_Type' : 'Svart' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 2500 , 'Soil_status' : 'Tørke' ,
'Land' : 'USA' },
{ 'Soil_Type' : 'Svart' , 'Irrigation_availability' : 'Ja' , 'Dekar' : 3500 , 'Soil_status' : 'Våt' ,
'Land' : 'India' },
{ 'Soil_Type' : 'Rød' , 'Irrigation_availability' : 'Ja' , 'Dekar' : 210 , 'Soil_status' : 'Tørke' ,
'Land' : 'UK' },
{ 'Soil_Type' : 'Annen' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 1000 , 'Soil_status' : 'Våt' ,
'Land' : 'USA' },
{ 'Soil_Type' : 'Sand' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 500 , 'Soil_status' : 'Tørke' ,
'Land' : 'India' }]
# lag datarammen fra dataene ovenfor
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Skriv DataFrame ovenfor til tabellen.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Produksjon:
Vi kan se at én parkettfil er opprettet med den forrige PySpark Data.
Eksempel 2:
Vurder den forrige DataFrame og skriv 'Agri_Table2' til tabellen ved å partisjonere postene basert på verdiene i 'Country'-kolonnen.
# Skriv DataFrame ovenfor til tabellen med parameteren partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partisjonBy=[ 'Land' ])
Produksjon:
Det er tre unike verdier i «Land»-kolonnen – «India», «UK» og «USA». Så tre partisjoner er opprettet. Hver partisjon inneholder parkettfilene.
Pyspark.sql.DataFrameReader.table()
La oss laste tabellen inn i PySpark DataFrame ved å bruke spark.read.table()-funksjonen. Det tar bare én parameter som er banen/tabellnavnet. Den laster tabellen direkte inn i PySpark DataFrame, og alle SQL-funksjonene som brukes på PySpark DataFrame kan også brukes på denne innlastede DataFrame.
Syntaks:
spark_app.read.table(path/'Table_name')I dette scenariet bruker vi den forrige tabellen som ble opprettet fra PySpark DataFrame. Sørg for at du må implementere de forrige scenariokodebitene i miljøet ditt.
Eksempel:
Last «Agri_Table1»-tabellen inn i DataFrame kalt «loaded_data».
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Produksjon:
Vi kan se at tabellen er lastet inn i PySpark DataFrame.
Utføre SQL-spørringene
Nå utfører vi noen SQL-spørringer på den innlastede DataFrame ved å bruke spark.sql()-funksjonen.
# Bruk SELECT-kommandoen for å vise alle kolonnene fra tabellen ovenfor.linuxhint_spark_app.sql( 'VELG * fra Agri_Table1' ).forestilling()
# WHERE-klausul
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).forestilling()
linuxhint_spark_app.sql( 'VELG * fra Agri_Table1 WHERE Acres > 2000' ).forestilling()
Produksjon:
- Den første spørringen viser alle kolonnene og postene fra DataFrame.
- Den andre spørringen viser postene basert på «Soil_status»-kolonnen. Det er bare tre plater med 'Dry'-elementet.
- Den siste spørringen returnerer to poster med 'Acres' som er større enn 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Ved å bruke funksjonen insertInto() kan vi legge til DataFrame i den eksisterende tabellen. Vi kan bruke denne funksjonen sammen med selectExpr() for å definere kolonnenavnene og deretter sette den inn i tabellen. Denne funksjonen tar også tabellnavnet som en parameter.
Syntaks:
DataFrame_obj.write.insertInto('Tabell_name')I dette scenariet bruker vi den forrige tabellen som ble opprettet fra PySpark DataFrame. Sørg for at du må implementere de forrige scenariokodebitene i miljøet ditt.
Eksempel:
Opprett en ny DataFrame med to poster og sett dem inn i 'Agri_Table1'-tabellen.
importere pysparkfra pyspark.sql importer SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux hint' ).getOrCreate()
# oppdrettsdata med 2 rader
agri =[{ 'Soil_Type' : 'Sand' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 2500 , 'Soil_status' : 'Tørke' ,
'Land' : 'USA' },
{ 'Soil_Type' : 'Sand' , 'Irrigation_availability' : 'Nei' , 'Dekar' : 1200 , 'Soil_status' : 'Våt' ,
'Land' : 'Japan' }]
# lag datarammen fra dataene ovenfor
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Dekar' , 'Land' , 'Irrigation_availability' , 'Soil_Type' ,
'Soil_status' ).write.insertInto( 'Agri_Table1' )
# Vis den endelige Agri_Table1
linuxhint_spark_app.sql( 'VELG * fra Agri_Table1' ).forestilling()
Produksjon:
Nå er det totale antallet rader som er tilstede i DataFrame 7.
Konklusjon
Du forstår nå hvordan du skriver PySpark DataFrame til tabellen ved å bruke write.saveAsTable()-funksjonen. Den tar tabellnavnet og andre valgfrie parametere. Deretter lastet vi denne tabellen inn i PySpark DataFrame ved å bruke spark.read.table()-funksjonen. Det tar bare én parameter som er banen/tabellnavnet. Hvis du vil legge til den nye DataFrame i den eksisterende tabellen, bruk funksjonen insertInto().