Pyspark.sql.DataFrameWriter.saveAsTable()
Pirmkārt, mēs redzēsim, kā tabulā ierakstīt esošo PySpark DataFrame, izmantojot funkciju write.saveAsTable(). Lai tabulā ierakstītu DataFrame, ir nepieciešams tabulas nosaukums un citi izvēles parametri, piemēram, režīmi, partitionBy utt. Tas tiek glabāts kā parketa fails.
Sintakse:
dataframe_obj.write.saveAsTable(ceļš/tabulas_nosaukums,režīms,partitionBy,…)
- Tabulas_nosaukums ir tās tabulas nosaukums, kas izveidota no datu rāmja_obj.
- Mēs varam pievienot/pārrakstīt tabulas datus, izmantojot mode parametru.
- PartitionBy izmanto vienu/vairākas kolonnas, lai izveidotu nodalījumus, pamatojoties uz vērtībām šajās norādītajās kolonnās.
1. piemērs:
Izveidojiet PySpark DataFrame ar 5 rindām un 4 kolonnām. Ierakstiet šo datu rāmi tabulā ar nosaukumu “Agri_Table1”.
importēt pyspark
no pyspark.sql importēt SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()
# lauksaimniecības dati ar 5 rindām un 5 kolonnām
agri =[{ 'augsnes_veids' : 'melns' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 2500 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'ASV' },
{ 'augsnes_veids' : 'melns' , 'Apūdeņošanas_pieejamība' : 'Jā' , 'Acres' : 3500 , Augsnes_statuss : 'slapjš' ,
'Valsts' : 'Indija' },
{ 'augsnes_veids' : 'Sarkans' , 'Apūdeņošanas_pieejamība' : 'Jā' , 'Acres' : 210 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'UK' },
{ 'augsnes_veids' : 'Cits' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 1000 , Augsnes_statuss : 'slapjš' ,
'Valsts' : 'ASV' },
{ 'augsnes_veids' : 'Smiltis' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 500 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'Indija' }]
# izveidojiet datu rāmi no iepriekšminētajiem datiem
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Ierakstiet augstāk minēto DataFrame tabulā.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Izvade:
Redzams, ka ar iepriekšējo PySpark Data ir izveidots viens parketa fails.
2. piemērs:
Apsveriet iepriekšējo DataFrame un ierakstiet tabulā “Agri_Table2”, sadalot ierakstus, pamatojoties uz vērtībām kolonnā “Valsts”.
# Ierakstiet iepriekš minēto DataFrame tabulā ar parametru partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Valsts' ])
Izvade:
Kolonnā “Valsts” ir trīs unikālas vērtības – “Indija”, “UK” un “ASV”. Tātad tiek izveidoti trīs nodalījumi. Katrā nodalījumā ir parketa faili.
Pyspark.sql.DataFrameReader.table()
Ielādēsim tabulu PySpark DataFrame, izmantojot funkciju spark.read.table(). Tam nepieciešams tikai viens parametrs, kas ir ceļa/tabulas nosaukums. Tas tieši ielādē tabulu PySpark DataFrame, un visas SQL funkcijas, kas tiek lietotas PySpark DataFrame, var lietot arī šajā ielādētajā DataFrame.
Sintakse:
spark_app.read.table(ceļš/'Tabulas_nosaukums')Šajā scenārijā mēs izmantojam iepriekšējo tabulu, kas tika izveidota no PySpark DataFrame. Pārliecinieties, vai savā vidē ir jāievieš iepriekšējā scenārija koda fragmenti.
Piemērs:
Ielādējiet tabulu “Agri_Table1” DataFrame ar nosaukumu “loaded_data”.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Izvade:
Mēs redzam, ka tabula ir ielādēta PySpark DataFrame.
SQL vaicājumu izpilde
Tagad mēs izpildām dažus SQL vaicājumus ielādētajā DataFrame, izmantojot funkciju spark.sql().
# Izmantojiet komandu SELECT, lai parādītu visas iepriekšējās tabulas kolonnas.linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).show()
# WHERE klauzula
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry'' ).show()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).show()
Izvade:
- Pirmajā vaicājumā tiek parādītas visas kolonnas un ieraksti no DataFrame.
- Otrajā vaicājumā tiek parādīti ieraksti, pamatojoties uz kolonnu “Soil_status”. Ir tikai trīs ieraksti ar elementu “Dry”.
- Pēdējais vaicājums atgriež divus ierakstus ar “Acres”, kas ir lielāki par 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Izmantojot funkciju insertInto(), mēs varam pievienot DataFrame esošajai tabulai. Mēs varam izmantot šo funkciju kopā ar selectExpr (), lai definētu kolonnu nosaukumus un pēc tam ievietotu to tabulā. Šī funkcija arī ņem tableName kā parametru.
Sintakse:
DataFrame_obj.write.insertInto('Tabulas_nosaukums')Šajā scenārijā mēs izmantojam iepriekšējo tabulu, kas tika izveidota no PySpark DataFrame. Pārliecinieties, vai savā vidē ir jāievieš iepriekšējā scenārija koda fragmenti.
Piemērs:
Izveidojiet jaunu DataFrame ar diviem ierakstiem un ievietojiet tos tabulā “Agri_Table1”.
importēt pysparkno pyspark.sql importēt SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()
# lauksaimniecības dati ar 2 rindām
agri =[{ 'augsnes_veids' : 'Smiltis' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 2500 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'ASV' },
{ 'augsnes_veids' : 'Smiltis' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 1200 , Augsnes_statuss : 'slapjš' ,
'Valsts' : 'Japāna' }]
# izveidojiet datu rāmi no iepriekšminētajiem datiem
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Acres' , 'Valsts' , 'Apūdeņošanas_pieejamība' , 'Augsnes_tips' ,
'Augsnes_statuss' ).write.insertInto( 'Agri_Table1' )
# Parādīt galīgo Agri_Table1
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).show()
Izvade:
Tagad kopējais DataFrame rindu skaits ir 7.
Secinājums
Tagad jūs saprotat, kā PySpark DataFrame ierakstīt tabulā, izmantojot funkciju write.saveAsTable(). Tas aizņem tabulas nosaukumu un citus izvēles parametrus. Pēc tam mēs ielādējām šo tabulu PySpark DataFrame, izmantojot funkciju spark.read.table(). Tam nepieciešams tikai viens parametrs, kas ir ceļa/tabulas nosaukums. Ja vēlaties esošajai tabulai pievienot jauno DataFrame, izmantojiet funkciju insertInto().