Kā ieviest reāllaika datu straumēšanu Python

Ka Ieviest Reallaika Datu Straumesanu Python



Apgūt reāllaika datu straumēšanas ieviešanu Python darbojas kā būtiska prasme mūsdienu pasaulē, kurā ir iesaistīti dati. Šajā rokasgrāmatā ir apskatītas galvenās darbības un svarīgākie rīki, lai Python izmantotu reāllaika datu straumēšanu ar autentiskumu. No piemērota ietvara, piemēram, Apache Kafka vai Apache Pulsar, izvēles līdz Python koda rakstīšanai bez piepūles datu patēriņam, apstrādei un efektīvai vizualizācijai, mēs apgūsim vajadzīgās prasmes, lai izveidotu veiklus un efektīvus reāllaika datu kanālus.

1. piemērs: reāllaika datu straumēšanas ieviešana programmā Python

Reāllaika datu straumēšanas ieviešana Python ir ļoti svarīga mūsdienu uz datiem balstītajā laikmetā un pasaulē. Šajā detalizētajā piemērā mēs apskatīsim reāllaika datu straumēšanas sistēmas izveides procesu, izmantojot Apache Kafka un Python pakalpojumā Google Colab.







Lai inicializētu piemēru, pirms sākam kodēt, pakalpojumā Google Colab ir ļoti svarīgi izveidot konkrētu vidi. Pirmā lieta, kas mums jādara, ir instalēt nepieciešamās bibliotēkas. Kafka integrācijai mēs izmantojam bibliotēku “kafka-python”.



! pip uzstādīt kafka-python


Šī komanda instalē “kafka-python” bibliotēku, kas nodrošina Python funkcijas un Apache Kafka saistījumus. Tālāk mēs importējam mūsu projektam nepieciešamās bibliotēkas. Nepieciešamo bibliotēku importēšana, tostarp “KafkaProducer” un “KafkaConsumer”, ir “kafka-python” bibliotēkas klases, kas ļauj mums sazināties ar Kafka brokeriem. JSON ir Python bibliotēka darbam ar JSON datiem, ko izmantojam, lai serializētu un deserializētu ziņojumus.



no kafka importa KafkaProducer, KafkaConsumer
importēt json


Kafka producenta izveide





Tas ir svarīgi, jo Kafkas producents nosūta datus Kafkas tēmai. Mūsu piemērā mēs izveidojam producentu, lai nosūtītu simulētus reāllaika datus tēmai ar nosaukumu “reāllaika tēma”.

Mēs izveidojam “KafkaProducer” gadījumu, kas norāda Kafka brokera adresi kā “localhost:9092”. Pēc tam mēs izmantojam “value_serializer” — funkciju, kas serializē datus pirms to nosūtīšanas Kafkai. Mūsu gadījumā lambda funkcija kodē datus kā UTF-8 kodētu JSON. Tagad simulēsim dažus reāllaika datus un nosūtīsim tos uz Kafkas tēmu.



producents = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( iekšā ) .kodēt ( 'utf-8' ) )
# Simulēti reāllaika dati
dati = { 'sensor_id' : 1 , 'temperatūra' : 25.5 , 'mitrums' : 60.2 }
# Datu sūtīšana uz tēmu
producents.nosūtīt ( 'reāllaika tēma' , dati )


Šajās rindās mēs definējam “datu” vārdnīcu, kas attēlo simulētu sensora datus. Pēc tam mēs izmantojam “sūtīšanas” metodi, lai publicētu šos datus “reāllaika tēmā”.

Pēc tam mēs vēlamies izveidot Kafka patērētāju, un Kafka patērētājs nolasa datus no Kafka tēmas. Mēs izveidojam patērētāju, lai patērētu un apstrādātu ziņojumus “reāllaika tēmā”. Mēs izveidojam “KafkaConsumer” instanci, norādot tēmu, kuru vēlamies izmantot, piemēram, (reāllaika tēma) un Kafka brokera adresi. Pēc tam “value_deserializer” ir funkcija, kas deserializē no Kafkas saņemtos datus. Mūsu gadījumā lambda funkcija atšifrē datus kā UTF-8 kodētu JSON.

patērētājs = KafkaConsumer ( 'reāllaika tēma' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.dekodēt ( 'utf-8' ) ) )


Mēs izmantojam iteratīvu cilpu, lai nepārtraukti patērētu un apstrādātu tēmas ziņojumus.

# Reāllaika datu lasīšana un apstrāde
priekš ziņa iekšā patērētājs:
dati = ziņojums.vērtība
drukāt ( f 'Saņemtie dati: {data}' )


Mēs izgūstam katra ziņojuma vērtību un mūsu simulētos sensora datus cilpas iekšpusē un izdrukājam tos konsolē. Kafka ražotāja un patērētāja palaišana ietver šī koda palaišanu pakalpojumā Google Colab un koda šūnu izpildi atsevišķi. Ražotājs nosūta simulētos datus Kafkas tēmai, un patērētājs nolasa un izdrukā saņemtos datus.


Izvades analīze koda darbības laikā

Mēs novērosim reāllaika datus, kas tiek ražoti un patērēti. Datu formāts var atšķirties atkarībā no mūsu simulācijas vai faktiskā datu avota. Šajā detalizētajā piemērā ir aprakstīts viss reāllaika datu straumēšanas sistēmas iestatīšanas process, izmantojot Apache Kafka un Python pakalpojumā Google Colab. Mēs izskaidrosim katru koda rindiņu un tās nozīmi šīs sistēmas veidošanā. Reāllaika datu straumēšana ir spēcīga iespēja, un šis piemērs kalpo par pamatu sarežģītākām reālās pasaules lietojumprogrammām.

2. piemērs: reāllaika datu straumēšanas ieviešana Python, izmantojot akciju tirgus datus

Veiksim vēl vienu unikālu piemēru reāllaika datu straumēšanas ieviešanai Python, izmantojot citu scenāriju; šoreiz pievērsīsimies akciju tirgus datiem. Mēs izveidojam reāllaika datu straumēšanas sistēmu, kas fiksē akciju cenu izmaiņas un apstrādā tās, izmantojot Apache Kafka un Python pakalpojumā Google Colab. Kā parādīts iepriekšējā piemērā, mēs sākam ar vides konfigurēšanu pakalpojumā Google Colab. Pirmkārt, mēs instalējam vajadzīgās bibliotēkas:

! pip uzstādīt kafka-python yfinance


Šeit mēs pievienojam “yfinance” bibliotēku, kas ļauj iegūt reāllaika akciju tirgus datus. Tālāk mēs importējam nepieciešamās bibliotēkas. Kafka mijiedarbībai mēs turpinām izmantot klases “KafkaProducer” un “KafkaConsumer” no “kafka-python” bibliotēkas. Mēs importējam JSON, lai strādātu ar JSON datiem. Mēs arī izmantojam “yfinance”, lai iegūtu reāllaika akciju tirgus datus. Mēs arī importējam “laika” bibliotēku, lai pievienotu laika aizkavi, lai simulētu reāllaika atjauninājumus.

no kafka importa KafkaProducer, KafkaConsumer
importēt json
importēt yfinanses yf
imports laiks


Tagad krājumu datiem mēs izveidojam Kafka ražotāju. Mūsu Kafka ražotājs saņem reāllaika krājumu datus un nosūta tos uz Kafka tēmu ar nosaukumu “akciju cena”.

producents = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( iekšā ) .kodēt ( 'utf-8' ) )

kamēr Patiesība:
stock = yf.Ticker ( 'AAPL' ) # Piemērs: Apple Inc. akcijas
stock_data = stock.history ( periodā = '1d' )
pēdējā_cena = krājumu_dati [ 'Aizvērt' ] .iloc [ - 1 ]
dati = { 'simbols' : 'AAPL' , 'cena' : pēdējā_cena }
producents.nosūtīt ( 'akciju cena' , dati )
laiks.gulēt ( 10 ) # Simulēt reāllaika atjauninājumus ik pēc 10 sekundēm


Mēs izveidojam “KafkaProducer” gadījumu ar Kafka brokera adresi šajā kodā. Ciklā mēs izmantojam “yfinance”, lai iegūtu jaunāko Apple Inc. (“AAPL”) akciju cenu. Pēc tam mēs izņemam pēdējo slēgšanas cenu un nosūtām to uz tēmu “akciju cena”. Galu galā mēs ieviešam laika aizkavi, lai simulētu reāllaika atjauninājumus ik pēc 10 sekundēm.

Izveidosim Kafka patērētāju, lai lasītu un apstrādātu akciju cenas datus no tēmas “akciju cena”.

patērētājs = KafkaConsumer ( 'akciju cena' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.dekodēt ( 'utf-8' ) ) )

priekš ziņa iekšā patērētājs:
stock_data = ziņojums.vērtība
drukāt ( f 'Saņemtie akciju dati: {stock_data['symbol']} — cena: {stock_data['price']}' )


Šis kods ir līdzīgs iepriekšējā piemēra patērētāja iestatījumam. Tā nepārtraukti nolasa un apstrādā ziņojumus no tēmas “akciju cena” un drukā akcijas simbolu un cenu konsolei. Mēs izpildām koda šūnas secīgi, piemēram, pa vienam pakalpojumā Google Colab, lai palaistu ražotāju un patērētāju. Ražotājs saņem un nosūta reāllaika akciju cenu atjauninājumus, kamēr patērētājs lasa un parāda šos datus.

! pip uzstādīt kafka-python yfinance
no kafka importa KafkaProducer, KafkaConsumer
importēt json
importēt yfinanses yf
imports laiks
producents = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( iekšā ) .kodēt ( 'utf-8' ) )

kamēr Patiesība:
stock = yf.Ticker ( 'AAPL' ) # Apple Inc. akcijas
stock_data = stock.history ( periodā = '1d' )
pēdējā_cena = krājumu_dati [ 'Aizvērt' ] .iloc [ - 1 ]

dati = { 'simbols' : 'AAPL' , 'cena' : pēdējā_cena }

producents.nosūtīt ( 'akciju cena' , dati )

laiks.gulēt ( 10 ) # Simulēt reāllaika atjauninājumus ik pēc 10 sekundēm
patērētājs = KafkaConsumer ( 'akciju cena' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.dekodēt ( 'utf-8' ) ) )

priekš ziņa iekšā patērētājs:
stock_data = ziņojums.vērtība
drukāt ( f 'Saņemtie krājumu dati: {stock_data['symbol']} — cena: {stock_data['price']}' )


Izvades analīzē pēc koda palaišanas mēs novērojam Apple Inc. akciju cenu atjauninājumus reāllaikā, kas tiek ražoti un patērēti.

Secinājums

Šajā unikālajā piemērā mēs demonstrējām reāllaika datu straumēšanas ieviešanu Python, izmantojot Apache Kafka un “yfinance” bibliotēku, lai iegūtu un apstrādātu akciju tirgus datus. Mēs rūpīgi izskaidrojām katru koda rindiņu. Reāllaika datu straumēšanu var izmantot dažādās jomās, lai izveidotu reālās pasaules lietojumprogrammas finanšu, IoT un citur.