PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFrame pārveidošana ir iespējama, izmantojot funkciju pandas_udf(). Tā ir lietotāja definēta funkcija, kas tiek lietota PySpark DataFrame ar bultiņu. Mēs varam veikt vektorizētās darbības, izmantojot pandas_udf(). To var īstenot, nododot šo dekoratora funkciju. Iedziļināsimies šajā rokasgrāmatā, lai uzzinātu sintaksi, parametrus un dažādus piemērus.

Satura tēma:

Ja vēlaties uzzināt par PySpark DataFrame un moduļa instalēšanu, skatiet šo rakstu .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () ir pieejams PySpark modulī sql.functions, ko var importēt, izmantojot atslēgvārdu “no”. To izmanto, lai veiktu vektorizētās darbības mūsu PySpark DataFrame. Šī funkcija tiek īstenota kā dekorators, nododot trīs parametrus. Pēc tam, izmantojot bultiņu, mēs varam izveidot lietotāja definētu funkciju, kas atgriež datus vektora formātā (piemēram, mēs izmantojam sēriju/NumPy). Šīs funkcijas ietvaros mēs varam atgriezt rezultātu.



Struktūra un sintakse:



Vispirms apskatīsim šīs funkcijas struktūru un sintaksi:

@pandas_udf(datu tips)
def funkcijas_nosaukums(operācija) -> convert_format:
atgriešanas paziņojums

Šeit funkcijas_nosaukums ir mūsu definētās funkcijas nosaukums. Datu tips norāda datu tipu, ko šī funkcija atgriež. Mēs varam atgriezt rezultātu, izmantojot atslēgvārdu “atgriezties”. Visas darbības tiek veiktas funkcijas ietvaros ar bultiņu piešķiršanu.





Pandas_udf (funkcija un atgriešanas veids)

  1. Pirmais parametrs ir lietotāja definēta funkcija, kas tam tiek nodota.
  2. Otro parametru izmanto, lai norādītu atgriešanas datu tipu no funkcijas.

Dati:

Visā šajā rokasgrāmatā demonstrēšanai mēs izmantojam tikai vienu PySpark DataFrame. Visas mūsu definētās lietotāja definētās funkcijas tiek lietotas šajā PySpark DataFrame. Vispirms pēc PySpark instalēšanas noteikti izveidojiet šo DataFrame savā vidē.



importēt pyspark

no pyspark.sql importēt SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux padoms' ).getOrCreate()

no pyspark.sql.functions importēt pandas_udf

no pyspark.sql.types importa *

importēt pandas kā pandas

# dārzeņu detaļas

dārzenis =[{ 'tips' : 'dārzenis' , 'vārds' : 'tomāts' , 'locate_country' : 'ASV' , 'daudzums' : 800 },

{ 'tips' : 'auglis' , 'vārds' : 'banāns' , 'locate_country' : 'ĶĪNA' , 'daudzums' : divdesmit },

{ 'tips' : 'dārzenis' , 'vārds' : 'tomāts' , 'locate_country' : 'ASV' , 'daudzums' : 800 },

{ 'tips' : 'dārzenis' , 'vārds' : 'Mango' , 'locate_country' : 'JAPĀNA' , 'daudzums' : 0 },

{ 'tips' : 'auglis' , 'vārds' : 'citrons' , 'locate_country' : 'INDIJA' , 'daudzums' : 1700. gads },

{ 'tips' : 'dārzenis' , 'vārds' : 'tomāts' , 'locate_country' : 'ASV' , 'daudzums' : 1200 },

{ 'tips' : 'dārzenis' , 'vārds' : 'Mango' , 'locate_country' : 'JAPĀNA' , 'daudzums' : 0 },

{ 'tips' : 'auglis' , 'vārds' : 'citrons' , 'locate_country' : 'INDIJA' , 'daudzums' : 0 }

]

# izveidojiet tirgus datu rāmi no iepriekšminētajiem datiem

market_df = linuxhint_spark_app.createDataFrame(dārzenis)

market_df.show()

Izvade:

Šeit mēs izveidojam šo DataFrame ar 4 kolonnām un 8 rindām. Tagad mēs izmantojam pandas_udf(), lai izveidotu lietotāja definētas funkcijas un lietotu tās šajās kolonnās.

Pandas_udf() ar dažādiem datu tipiem

Šajā scenārijā mēs izveidojam dažas lietotāja definētas funkcijas ar pandas_udf() un lietojam tās kolonnās un parāda rezultātus, izmantojot Select() metodi. Katrā gadījumā mēs izmantojam pandas.Series, veicot vektorizētās darbības. Tādējādi kolonnu vērtības tiek uzskatītas par viendimensijas masīvu, un darbība tiek piemērota kolonnai. Pašā dekoratorā mēs norādām funkcijas atgriešanas veidu.

1. piemērs: Pandas_udf() ar virknes veidu

Šeit mēs izveidojam divas lietotāja definētas funkcijas ar virknes atgriešanas veidu, lai pārvērstu virknes tipa kolonnas vērtības uz lielajiem un mazajiem burtiem. Visbeidzot, mēs izmantojam šīs funkcijas kolonnās “type” un “locate_country”.

# Konvertējiet tipa kolonnu uz lielajiem burtiem ar pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

atgriezties i.str.upper()

# Konvertējiet kolonnu locate_country uz mazajiem burtiem ar pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

atgriezties i.str.lower()

# Parādiet kolonnas, izmantojot select()

market_df.select( 'tips' ,type_upper_case( 'tips' ), 'locate_country' ,
valsts_mazais_burts( 'locate_country' )).show()

Izvade:

Paskaidrojums:

Funkcija StringType() ir pieejama modulī pyspark.sql.types. Mēs jau importējām šo moduli, veidojot PySpark DataFrame.

  1. Pirmkārt, UDF (lietotāja definēta funkcija) atgriež virknes ar lielajiem burtiem, izmantojot funkciju str.upper(). Str.upper() ir pieejams sērijas datu struktūrā (jo mēs pārvēršam sērijās ar bultiņu funkcijas iekšpusē), kas pārvērš doto virkni par lielajiem burtiem. Visbeidzot, šī funkcija tiek piemērota kolonnai “tips”, kas norādīta atlases () metodē. Iepriekš visas virknes tipa kolonnā ir ar mazajiem burtiem. Tagad tie ir mainīti uz lielajiem burtiem.
  2. Otrkārt, UDF atgriež virknes ar lielajiem burtiem, izmantojot funkciju str.lower(). Str.lower() ir pieejams sērijas datu struktūrā, kas pārvērš doto virkni mazajos burtos. Visbeidzot, šī funkcija tiek piemērota kolonnai “tips”, kas norādīta atlases () metodē. Iepriekš visas virknes tipa kolonnā ir ar lielajiem burtiem. Tagad tie ir mainīti uz mazajiem burtiem.

2. piemērs: Pandas_udf() ar vesela skaitļa veidu

Izveidosim UDF, kas pārvērš PySpark DataFrame veselo skaitļu kolonnu Pandas sērijā un pievienosim katrai vērtībai 100. Nododiet kolonnu “daudzums” šai funkcijai atlases () metodes ietvaros.

# Pievienojiet 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

atgriezt i+ 100

# Nododiet daudzuma kolonnu iepriekš minētajai funkcijai un displejam.

market_df.select( 'daudzums' ,add_100( 'daudzums' )).show()

Izvade:

Paskaidrojums:

UDF ietvaros mēs atkārtojam visas vērtības un pārvēršam tās sērijās. Pēc tam katrai sērijas vērtībai pievienojam 100. Visbeidzot, šai funkcijai tiek nodota kolonna “daudzums”, un mēs redzam, ka visām vērtībām tiek pievienots 100.

Pandas_udf() ar dažādiem datu tipiem, izmantojot Groupby() un Agg()

Apskatīsim piemērus, lai UDF nodotu apkopotajām kolonnām. Šeit kolonnu vērtības vispirms tiek grupētas, izmantojot funkciju groupby (), un apkopošana tiek veikta, izmantojot funkciju agg (). Mēs nododam savu UDF šajā apkopojuma funkcijā.

Sintakse:

pyspark_dataframe_object.groupby( 'grupēšanas_kolonna' ).agg(UDF
(pyspark_dataframe_object[ 'kolonna' ]))

Šeit vispirms tiek grupētas vērtības grupēšanas kolonnā. Pēc tam tiek veikta visu grupēto datu apkopošana attiecībā uz mūsu UDF.

1. piemērs: Pandas_udf() ar kopējo vidējo()

Šeit mēs izveidojam lietotāja definētu funkciju ar atgriešanas tipa pludiņu. Funkcijas iekšpusē mēs aprēķinām vidējo, izmantojot funkciju mean (). Šis UDF tiek nosūtīts uz kolonnu “daudzums”, lai iegūtu vidējo daudzumu katram veidam.

# atgriež vidējo/vidējo

@pandas_udf( 'peldēt' )

def medium_function(i: panda.Series) -> float:

atgriezt i.mean()

# Nododiet daudzuma kolonnu funkcijai, grupējot tipa kolonnu.

market_df.groupby( 'tips' ).agg(average_function(market_df[ 'daudzums' ])).show()

Izvade:

Mēs grupējam, pamatojoties uz elementiem kolonnā “tips”. Tiek izveidotas divas grupas – “augļi” un “dārzenis”. Katrai grupai vidējais tiek aprēķināts un atgriezts.

2. piemērs: Pandas_udf() ar Aggregate Max() un Min()

Šeit mēs izveidojam divas lietotāja definētas funkcijas ar vesela skaitļa (int) atgriešanas veidu. Pirmais UDF atgriež minimālo vērtību, bet otrais UDF atgriež maksimālo vērtību.

# pandas_udf, kas atgriež minimālo vērtību

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

atgriezties i.min()

# pandas_udf, kas atgriež maksimālo vērtību

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

atgriezties i.max()

# Nododiet daudzuma kolonnu uz min_ pandas_udf, grupējot locate_country.

market_df.groupby( 'locate_country' ).agg(min_(tirgus_df[ 'daudzums' ])).show()

# Nododiet daudzuma kolonnu uz max_ pandas_udf, grupējot locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'daudzums' ])).show()

Izvade:

Lai atgrieztu minimālās un maksimālās vērtības, UDF atgriešanas veidam izmantojam funkcijas min () un max (). Tagad mēs grupējam datus kolonnā “locate_country”. Tiek izveidotas četras grupas (“ĶĪNA”, “INDIJA”, “JAPĀNA”, “ASV”). Katrai grupai atdodam maksimālo daudzumu. Līdzīgi mēs atgriežam minimālo daudzumu.

Secinājums

Būtībā pandas_udf () tiek izmantots, lai veiktu vektorizētās darbības mūsu PySpark DataFrame. Mēs esam redzējuši, kā izveidot pandas_udf() un lietot to PySpark DataFrame. Lai labāk izprastu, mēs apspriedām dažādus piemērus, ņemot vērā visus datu tipus (virkne, pludiņš un vesels skaitlis). Var būt iespējams izmantot pandas_udf() ar groupby(), izmantojot funkciju agg().