Kā lasīt un rakstīt tabulas datus PySpark

Ka Lasit Un Rakstit Tabulas Datus Pyspark



Datu apstrāde PySpark ir ātrāka, ja dati tiek ielādēti tabulas veidā. Ar to, izmantojot SQL izteiksmes, apstrāde būs ātra. Tāpēc labāka pieeja ir PySpark DataFrame/RDD konvertēšana tabulā pirms nosūtīšanas apstrādei. Šodien mēs redzēsim, kā nolasīt tabulas datus PySpark DataFrame, ierakstīt PySpark DataFrame tabulā un ievietot jaunu DataFrame esošajā tabulā, izmantojot iebūvētās funkcijas. Ejam!

Pyspark.sql.DataFrameWriter.saveAsTable()

Pirmkārt, mēs redzēsim, kā tabulā ierakstīt esošo PySpark DataFrame, izmantojot funkciju write.saveAsTable(). Lai tabulā ierakstītu DataFrame, ir nepieciešams tabulas nosaukums un citi izvēles parametri, piemēram, režīmi, partitionBy utt. Tas tiek glabāts kā parketa fails.

Sintakse:







dataframe_obj.write.saveAsTable(ceļš/tabulas_nosaukums,režīms,partitionBy,…)
  1. Tabulas_nosaukums ir tās tabulas nosaukums, kas izveidota no datu rāmja_obj.
  2. Mēs varam pievienot/pārrakstīt tabulas datus, izmantojot mode parametru.
  3. PartitionBy izmanto vienu/vairākas kolonnas, lai izveidotu nodalījumus, pamatojoties uz vērtībām šajās norādītajās kolonnās.

1. piemērs:

Izveidojiet PySpark DataFrame ar 5 rindām un 4 kolonnām. Ierakstiet šo datu rāmi tabulā ar nosaukumu “Agri_Table1”.



importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

# lauksaimniecības dati ar 5 rindām un 5 kolonnām

agri =[{ 'augsnes_veids' : 'melns' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 2500 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'ASV' },

{ 'augsnes_veids' : 'melns' , 'Apūdeņošanas_pieejamība' : 'Jā' , 'Acres' : 3500 , Augsnes_statuss : 'slapjš' ,
'Valsts' : 'Indija' },

{ 'augsnes_veids' : 'Sarkans' , 'Apūdeņošanas_pieejamība' : 'Jā' , 'Acres' : 210 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'UK' },

{ 'augsnes_veids' : 'Cits' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 1000 , Augsnes_statuss : 'slapjš' ,
'Valsts' : 'ASV' },

{ 'augsnes_veids' : 'Smiltis' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 500 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'Indija' }]



# izveidojiet datu rāmi no iepriekšminētajiem datiem

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Ierakstiet augstāk minēto DataFrame tabulā.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Izvade:







Redzams, ka ar iepriekšējo PySpark Data ir izveidots viens parketa fails.



2. piemērs:

Apsveriet iepriekšējo DataFrame un ierakstiet tabulā “Agri_Table2”, sadalot ierakstus, pamatojoties uz vērtībām kolonnā “Valsts”.

# Ierakstiet iepriekš minēto DataFrame tabulā ar parametru partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Valsts' ])

Izvade:

Kolonnā “Valsts” ir trīs unikālas vērtības – “Indija”, “UK” un “ASV”. Tātad tiek izveidoti trīs nodalījumi. Katrā nodalījumā ir parketa faili.

Pyspark.sql.DataFrameReader.table()

Ielādēsim tabulu PySpark DataFrame, izmantojot funkciju spark.read.table(). Tam nepieciešams tikai viens parametrs, kas ir ceļa/tabulas nosaukums. Tas tieši ielādē tabulu PySpark DataFrame, un visas SQL funkcijas, kas tiek lietotas PySpark DataFrame, var lietot arī šajā ielādētajā DataFrame.

Sintakse:

spark_app.read.table(ceļš/'Tabulas_nosaukums')

Šajā scenārijā mēs izmantojam iepriekšējo tabulu, kas tika izveidota no PySpark DataFrame. Pārliecinieties, vai savā vidē ir jāievieš iepriekšējā scenārija koda fragmenti.

Piemērs:

Ielādējiet tabulu “Agri_Table1” DataFrame ar nosaukumu “loaded_data”.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Izvade:

Mēs redzam, ka tabula ir ielādēta PySpark DataFrame.

SQL vaicājumu izpilde

Tagad mēs izpildām dažus SQL vaicājumus ielādētajā DataFrame, izmantojot funkciju spark.sql().

# Izmantojiet komandu SELECT, lai parādītu visas iepriekšējās tabulas kolonnas.

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).show()

# WHERE klauzula

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry'' ).show()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).show()

Izvade:

  1. Pirmajā vaicājumā tiek parādītas visas kolonnas un ieraksti no DataFrame.
  2. Otrajā vaicājumā tiek parādīti ieraksti, pamatojoties uz kolonnu “Soil_status”. Ir tikai trīs ieraksti ar elementu “Dry”.
  3. Pēdējais vaicājums atgriež divus ierakstus ar “Acres”, kas ir lielāki par 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Izmantojot funkciju insertInto(), mēs varam pievienot DataFrame esošajai tabulai. Mēs varam izmantot šo funkciju kopā ar selectExpr (), lai definētu kolonnu nosaukumus un pēc tam ievietotu to tabulā. Šī funkcija arī ņem tableName kā parametru.

Sintakse:

DataFrame_obj.write.insertInto('Tabulas_nosaukums')

Šajā scenārijā mēs izmantojam iepriekšējo tabulu, kas tika izveidota no PySpark DataFrame. Pārliecinieties, vai savā vidē ir jāievieš iepriekšējā scenārija koda fragmenti.

Piemērs:

Izveidojiet jaunu DataFrame ar diviem ierakstiem un ievietojiet tos tabulā “Agri_Table1”.

importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

# lauksaimniecības dati ar 2 rindām

agri =[{ 'augsnes_veids' : 'Smiltis' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 2500 , Augsnes_statuss : 'sauss' ,
'Valsts' : 'ASV' },

{ 'augsnes_veids' : 'Smiltis' , 'Apūdeņošanas_pieejamība' : 'Nē' , 'Acres' : 1200 , Augsnes_statuss : 'slapjš' ,
'Valsts' : 'Japāna' }]

# izveidojiet datu rāmi no iepriekšminētajiem datiem

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Acres' , 'Valsts' , 'Apūdeņošanas_pieejamība' , 'Augsnes_tips' ,
'Augsnes_statuss' ).write.insertInto( 'Agri_Table1' )

# Parādīt galīgo Agri_Table1

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).show()

Izvade:

Tagad kopējais DataFrame rindu skaits ir 7.

Secinājums

Tagad jūs saprotat, kā PySpark DataFrame ierakstīt tabulā, izmantojot funkciju write.saveAsTable(). Tas aizņem tabulas nosaukumu un citus izvēles parametrus. Pēc tam mēs ielādējām šo tabulu PySpark DataFrame, izmantojot funkciju spark.read.table(). Tam nepieciešams tikai viens parametrs, kas ir ceļa/tabulas nosaukums. Ja vēlaties esošajai tabulai pievienot jauno DataFrame, izmantojiet funkciju insertInto().