PySpark DataFrame konvertēšana uz CSV

Pyspark Dataframe Konvertesana Uz Csv



Apskatīsim četrus dažādos scenārijus PySpark DataFrame konvertēšanai uz CSV. Mēs tieši izmantojam metodi write.csv(), lai PySpark DataFrame pārveidotu par CSV. Izmantojot funkciju to_csv(), mēs pārvēršam PySpark Pandas DataFrame par CSV. Tas var būt arī iespējams, pārvēršot to NumPy masīvā.

Satura tēma:

Ja vēlaties uzzināt par PySpark DataFrame un moduļa instalēšanu, skatiet šo rakstu .







PySpark DataFrame uz CSV, pārveidojot par Pandas DataFrame

To_csv() ir Pandas modulī pieejama metode, kas pārvērš Pandas DataFrame par CSV. Pirmkārt, mums ir jāpārvērš mūsu PySpark DataFrame par Pandas DataFrame. Lai to izdarītu, tiek izmantota metode toPandas(). Apskatīsim to_csv() sintaksi kopā ar tā parametriem.



Sintakse:



pandas_dataframe_obj.to_csv(path/ 'faila_nosaukums.csv' , galvene ,indekss,kolonnas,režīms...)
  1. Mums ir jānorāda CSV faila faila nosaukums. Ja vēlaties saglabāt lejupielādēto CSV failu noteiktā datora vietā, kopā ar faila nosaukumu varat norādīt arī ceļu.
  2. Kolonnas tiek iekļautas, ja galvene ir iestatīta uz “True”. Ja jums nav vajadzīgas kolonnas, iestatiet galvenes vērtību “False”.
  3. Indeksi tiek norādīti, ja indekss ir iestatīts uz “True”. Ja jums nav nepieciešami indeksi, iestatiet indeksu uz “False”.
  4. Parametrs Columns aizņem kolonnu nosaukumu sarakstu, kurā mēs varam norādīt, kuras konkrētās kolonnas tiek izvilktas CSV failā.
  5. Mēs varam pievienot ierakstus CSV, izmantojot režīma parametru. Lai to izdarītu, tiek izmantots 'a'.

1. piemērs: ar galvenes un indeksa parametriem

Izveidojiet PySpark DataFrame “skills_df” ar 3 rindām un 4 kolonnām. Konvertējiet šo DataFrame par CSV, vispirms pārveidojot to Pandas DataFrame.





importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

# prasmju dati ar 3 rindām un 4 kolonnām

prasmes =[{ 'id' : 123 , 'persona' : 'Mīļā' , 'prasme' : 'glezna' , 'balva' : 25 000 },

{ 'id' : 112 , 'persona' : 'Mouni' , 'prasme' : 'dejot' , 'balva' : 2000. gads },

{ 'id' : 153 , 'persona' : 'Tulasi' , 'prasme' : 'lasīšana' , 'balva' : 1200 }

]

# izveidojiet prasmju datu rāmi no iepriekšminētajiem datiem

Skills_df = linuxhint_spark_app.createDataFrame(skills)

prasmes_df.show()

# Pārvērtiet skill_df par pandas DataFrame

pandas_skills_df= prasmes_df.toPandas()

drukāt(pandas_skills_df)

# Konvertējiet šo DataFrame uz csv ar galveni un indeksu

pandas_skills_df.to_csv( 'pandas_skills1.csv' , galvene =True, index = True)

Izvade:



Mēs redzam, ka PySpark DataFrame ir pārveidots par Pandas DataFrame. Apskatīsim, vai tas ir pārveidots par CSV ar kolonnu nosaukumiem un indeksiem:

2. piemērs. Pievienojiet datus CSV failam

Izveidojiet vēl vienu PySpark DataFrame ar 1 ierakstu un pievienojiet to CSV failam, kas ir izveidots kā daļa no mūsu pirmā piemēra. Pārliecinieties, vai mums ir jāiestata galvene uz “False” kopā ar režīma parametru. Pretējā gadījumā kolonnu nosaukumi tiek pievienoti arī kā rinda.

importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

prasmes =[{ 'id' : 90 , 'persona' : 'Bhargav' , 'prasme' : 'lasīšana' , 'balva' : 12 000 }

]

# izveidojiet prasmju datu rāmi no iepriekšminētajiem datiem

Skills_df = linuxhint_spark_app.createDataFrame(skills)

# Pārvērtiet skill_df par pandas DataFrame

pandas_skills_df= prasmes_df.toPandas()

# Pievienojiet šo DataFrame failam pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , režīms= 'a' , galvene =Nepatiesi)

CSV izvade:

Mēs redzam, ka CSV failam ir pievienota jauna rinda.

3. piemērs: ar kolonnu parametru

Iegūsim to pašu DataFrame un pārveidosim to CSV formātā ar divām kolonnām: “persona” un “prize”.

importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

# prasmju dati ar 3 rindām un 4 kolonnām

prasmes =[{ 'id' : 123 , 'persona' : 'Mīļā' , 'prasme' : 'glezna' , 'balva' : 25 000 },

{ 'id' : 112 , 'persona' : 'Mouni' , 'prasme' : 'dejot' , 'balva' : 2000. gads },

{ 'id' : 153 , 'persona' : 'Tulasi' , 'prasme' : 'lasīšana' , 'balva' : 1200 }

]

# izveidojiet prasmju datu rāmi no iepriekšminētajiem datiem

Skills_df = linuxhint_spark_app.createDataFrame(skills)

# Pārvērtiet skill_df par pandas DataFrame

pandas_skills_df= prasmes_df.toPandas()

# Konvertējiet šo DataFrame uz csv ar noteiktām kolonnām

pandas_skills_df.to_csv( 'pandas_skills2.csv' , kolonnas=[ 'persona' , 'balva' ])

CSV izvade:

Mēs redzam, ka CSV failā ir tikai kolonnas “persona” un “balva”.

PySpark Pandas DataFrame uz CSV, izmantojot To_Csv() metodi

To_csv() ir Pandas modulī pieejama metode, kas pārvērš Pandas DataFrame par CSV. Pirmkārt, mums ir jāpārvērš mūsu PySpark DataFrame par Pandas DataFrame. Lai to izdarītu, tiek izmantota metode toPandas(). Apskatīsim to_csv() sintaksi kopā ar tā parametriem:

Sintakse:

pyspark_pandas_dataframe_obj.to_csv(path/ 'faila_nosaukums.csv' , galvene ,indekss,kolonnas,...)
  1. Mums ir jānorāda CSV faila faila nosaukums. Ja vēlaties saglabāt lejupielādēto CSV failu noteiktā datora vietā, kopā ar faila nosaukumu varat norādīt arī ceļu.
  2. Kolonnas tiek iekļautas, ja galvene ir iestatīta uz “True”. Ja jums nav vajadzīgas kolonnas, iestatiet galvenes vērtību “False”.
  3. Indeksi tiek norādīti, ja indekss ir iestatīts uz “True”. Ja jums nav nepieciešami indeksi, iestatiet indeksu uz “False”.
  4. Parametrs kolonnas aizņem kolonnu nosaukumu sarakstu, kurā mēs varam norādīt, kuras konkrētas kolonnas tiek izvilktas CSV failā.

1. piemērs: ar kolonnu parametru

Izveidojiet PySpark Pandas DataFrame ar 3 kolonnām un konvertējiet to uz CSV, izmantojot to_csv() ar kolonnām “person” un “prize”.

no pyspark importa pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persona' :[ 'Mīļā' , 'Mouni' , 'pats' , 'radha' ], 'balva' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Konvertējiet šo DataFrame uz csv ar noteiktām kolonnām

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , kolonnas=[ 'persona' , 'balva' ])

Izvade:

Mēs redzam, ka PySpark Pandas DataFrame tiek pārveidots par CSV ar diviem nodalījumiem. Katrā nodalījumā ir 2 ieraksti. Turklāt CSV slejas ir tikai “persona” un “balva”.

1. nodalījuma fails:

2. nodalījuma fails:

2. piemērs: ar galvenes parametru

Izmantojiet iepriekšējo DataFrame un norādiet galvenes parametru, iestatot to uz “True”.

no pyspark importa pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persona' :[ 'Mīļā' , 'Mouni' , 'pats' , 'radha' ], 'balva' :[ 1 , 2 , 3 , 4 ]})

# Konvertējiet šo DataFrame par csv ar galveni.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , galvene =Tiesa)

CSV izvade:

Mēs redzam, ka PySpark Pandas DataFrame tiek pārveidots par CSV ar diviem nodalījumiem. Katrā nodalījumā ir 2 ieraksti ar kolonnu nosaukumiem.

1. nodalījuma fails:

2. nodalījuma fails:

PySpark Pandas DataFrame uz CSV, pārveidojot par NumPy masīvu

Mums ir iespēja pārvērst PySpark Pandas DataFrame par CSV, pārveidojot par Numpy masīvu. To_numpy() ir PySpark Pandas modulī pieejama metode, kas pārvērš PySpark Pandas DataFrame par NumPy masīvu.

Sintakse:

pyspark_pandas_dataframe_obj.to_numpy()

Tas neprasīs nekādus parametrus.

Izmantojot Tofile() metodi

Pēc konvertēšanas uz NumPy masīvu, mēs varam izmantot tofile() metodi, lai pārvērstu NumPy par CSV. Šeit tas saglabā katru ierakstu jaunā šūnā kolonnā CSV failā.

Sintakse:

array_obj.to_numpy(faila nosaukums/ceļš,sep=’’)

Tas aizņem CSV faila nosaukumu vai ceļu un atdalītāju.

Piemērs:

Izveidojiet PySpark Pandas DataFrame ar 3 kolonnām un 4 ierakstiem un konvertējiet to CSV formātā, vispirms pārvēršot to NumPy masīvā.

no pyspark importa pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persona' :[ 'Mīļā' , 'Mouni' , 'pats' , 'radha' ], 'balva' :[ 1 , 2 , 3 , 4 ]})

# Pārvērtiet iepriekš minēto DataFrame par numpy masīvu

konvertēts = pyspark_pandas_dataframe.to_numpy()

drukāt (pārveidots)

# Izmantojot tofile()

converted.tofile( 'converted1.csv' , septembris = ',' )

Izvade:

[[ 90 'Mīļā' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'pats' 3 ]

[ 57 'radha' 4 ]]

Mēs redzam, ka PySpark Pandas DataFrame tiek pārveidots par NumPy masīvu (12 vērtības). Ja varat redzēt CSV datus, katra šūnas vērtība tiek saglabāta jaunā kolonnā.

PySpark DataFrame uz CSV, izmantojot Write.Csv() metodi

Metode write.csv() izmanto faila nosaukumu/ceļu, kur mums ir jāsaglabā CSV fails kā parametrs.

Sintakse:

dataframe_object.coalesce( 1 ).write.csv( 'faila nosaukums' )

Faktiski CSV fails tiek saglabāts kā nodalījumi (vairāk nekā viens). Lai no tā atbrīvotos, mēs apvienojam visus sadalītos CSV failus vienā. Šajā scenārijā mēs izmantojam funkciju coalesce(). Tagad mēs varam redzēt tikai vienu CSV failu ar visām rindām no PySpark DataFrame.

Piemērs:

Apsveriet PySpark DataFrame ar 4 ierakstiem ar 4 kolonnām. Ierakstiet šo DataFrame CSV formātā ar failu ar nosaukumu “market_details”.

importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

# tirgus dati ar 4 rindām un 4 kolonnām

tirgus =[{ 'm_id' : 'mz-001' , 'm_name' : 'ABC' , 'm_pilsēta' : 'deli' , 'm_state' : 'deli' },

{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_pilsēta' : 'patna' , 'm_state' : 'veiksmi' },

{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_pilsēta' : 'florida' , 'm_state' : 'viens' },

{ 'm_id' : 'mz-004' , 'm_name' : 'ABC' , 'm_pilsēta' : 'deli' , 'm_state' : 'veiksmi' }

]



# izveidojiet tirgus datu rāmi no iepriekšminētajiem datiem

market_df = linuxhint_spark_app.createDataFrame(tirgus)

# Faktiskie tirgus dati

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'tirgus_detaļas' )

Izvade:

Pārbaudīsim failu:

Atveriet pēdējo failu, lai redzētu ierakstus.

Secinājums

Apsverot dažādus parametrus, mēs uzzinājām četrus dažādus scenārijus, kas pārvērš PySpark DataFrame par CSV ar piemēriem. Strādājot ar PySpark DataFrame, jums ir divas iespējas konvertēt šo DataFrame par CSV: viens veids ir izmantot write() metodi, bet otrs izmanto metodi to_csv(), pārveidojot par Pandas DataFrame. Ja strādājat ar PySpark Pandas DataFrame, varat arī izmantot to_csv() un tofile(), konvertējot uz NumPy masīvu.