Hvordan implementere sanntidsdatastrømming i Python

Hvordan Implementere Sanntidsdatastromming I Python



Å mestre implementeringen av sanntidsdatastrømming i Python fungerer som en essensiell ferdighet i dagens data-involverte verden. Denne guiden utforsker kjernetrinnene og viktige verktøy for å bruke sanntidsdatastrømming med autentisitet i Python. Fra å velge et passende rammeverk som Apache Kafka eller Apache Pulsar til å skrive en Python-kode for uanstrengt dataforbruk, prosessering og effektiv visualisering, vil vi tilegne oss de nødvendige ferdighetene for å konstruere de smidige og effektive sanntidsdatakanalene.

Eksempel 1: Implementering av sanntidsdatastrømming i Python

Implementering av sanntidsdatastrømming i Python er avgjørende i dagens datadrevne tidsalder og verden. I dette detaljerte eksemplet vil vi gå gjennom prosessen med å bygge et sanntidsdatastrømsystem ved å bruke Apache Kafka og Python i Google Colab.







For å initialisere eksemplet før vi begynner å kode, er det viktig å bygge et spesifikt miljø i Google Colab. Det første vi må gjøre er å installere de nødvendige bibliotekene. Vi bruker 'kafka-python'-biblioteket for Kafka-integrasjon.



! pip installere kafka-python


Denne kommandoen installerer 'kafka-python'-biblioteket som gir Python-funksjonene og bindingene for Apache Kafka. Deretter importerer vi de nødvendige bibliotekene for prosjektet vårt. Å importere de nødvendige bibliotekene inkludert 'KafkaProducer' og 'KafkaConsumer' er klassene fra 'kafka-python'-biblioteket som lar oss samhandle med Kafka-meglere. JSON er Python-biblioteket for å jobbe med JSON-dataene som vi bruker til å serialisere og deserialisere meldingene.



fra kafka import KafkaProducer, KafkaConsumer
importere json


Opprettelse av en Kafka-produsent





Dette er viktig fordi en Kafka-produsent sender dataene til et Kafka-emne. I vårt eksempel oppretter vi en produsent for å sende simulerte sanntidsdata til et emne kalt «sanntidsemne».

Vi oppretter en 'KafkaProducer'-forekomst som spesifiserer Kafka-meglerens adresse som 'localhost:9092'. Deretter bruker vi 'value_serializer', en funksjon som serialiserer dataene før de sendes til Kafka. I vårt tilfelle koder en lambda-funksjon dataene som UTF-8-kodet JSON. La oss nå simulere noen sanntidsdata og sende dem til Kafka-emnet.



produsent = KafkaProdusent ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( i ) .encode ( 'utf-8' ) )
# Simulerte sanntidsdata
data = { 'sensor_id' : 1 , 'temperatur' : 25.5 , 'luftfuktighet' : 60,2 }
# Sender data til emnet
produsent.send ( 'emne i sanntid' , data )


I disse linjene definerer vi en 'data'-ordbok som representerer simulerte sensordata. Vi bruker deretter 'send'-metoden for å publisere disse dataene til 'sanntidsemnet'.

Deretter ønsker vi å lage en Kafka-forbruker, og en Kafka-forbruker leser data fra et Kafka-emne. Vi skaper en forbruker for å konsumere og behandle meldingene i «sanntids-emnet». Vi oppretter en 'KafkaConsumer'-instans som spesifiserer emnet vi ønsker å konsumere, for eksempel (sanntidsemne) og Kafka-meglerens adresse. Deretter er 'value_deserializer' en funksjon som deserialiserer dataene som mottas fra Kafka. I vårt tilfelle dekoder en lambda-funksjon dataene som UTF-8-kodet JSON.

forbruker = KafkaForbruker ( 'emne i sanntid' ,
bootstrap_servers = 'localhost:9092' ,
verdi_deserializer =lambda x: json.loads ( x.dekode ( 'utf-8' ) ) )


Vi bruker en iterativ loop for kontinuerlig å konsumere og behandle meldingene fra emnet.

# Lese og behandle sanntidsdata
til beskjed i forbruker:
data = melding.verdi
skrive ut ( f 'Motta data: {data}' )


Vi henter hver meldings verdi og våre simulerte sensordata inne i sløyfen og skriver den ut til konsollen. Å kjøre Kafka-produsenten og -forbrukeren innebærer å kjøre denne koden i Google Colab og kjøre kodecellene individuelt. Produsenten sender de simulerte dataene til Kafka-emnet, og forbrukeren leser og skriver ut de mottatte dataene.


Analyse av utdata mens koden kjøres

Vi vil observere sanntidsdata som produseres og forbrukes. Dataformatet kan variere avhengig av vår simulering eller faktiske datakilde. I dette detaljerte eksemplet dekker vi hele prosessen med å sette opp et sanntidsdatastrømningssystem ved å bruke Apache Kafka og Python i Google Colab. Vi vil forklare hver linje med kode og dens betydning for å bygge dette systemet. Sanntidsdatastrømming er en kraftig funksjon, og dette eksemplet fungerer som grunnlaget for mer komplekse applikasjoner i den virkelige verden.

Eksempel 2: Implementering av sanntidsdatastrømming i Python ved hjelp av børsdata

La oss gjøre et annet unikt eksempel på å implementere en sanntidsdatastrømming i Python ved å bruke et annet scenario; denne gangen vil vi fokusere på børsdata. Vi lager et datastrømningssystem i sanntid som fanger opp aksjekursendringene og behandler dem ved hjelp av Apache Kafka og Python i Google Colab. Som vist i forrige eksempel, starter vi med å konfigurere miljøet vårt i Google Colab. Først installerer vi de nødvendige bibliotekene:

! pip installere kafka-python yfinance


Her legger vi til 'yfinance'-biblioteket som lar oss få aksjemarkedsdata i sanntid. Deretter importerer vi de nødvendige bibliotekene. Vi fortsetter å bruke 'KafkaProducer'- og 'KafkaConsumer'-klassene fra 'kafka-python'-biblioteket for Kafka-interaksjon. Vi importerer JSON for å jobbe med JSON-dataene. Vi bruker også 'yfinance' for å få sanntids aksjemarkedsdata. Vi importerer også 'tids'-biblioteket for å legge til en tidsforsinkelse for å simulere sanntidsoppdateringene.

fra kafka import KafkaProducer, KafkaConsumer
importere json
importere finans som yf
import tid


Nå oppretter vi en Kafka-produsent for lagerdata. Kafka-produsenten vår mottar aksjedata i sanntid og sender den til et Kafka-emne kalt 'aksjepris'.

produsent = KafkaProdusent ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( i ) .encode ( 'utf-8' ) )

samtidig som Ekte:
aksje = yf.Ticker ( 'AAPL' ) # Eksempel: Apple Inc.-aksje
stock_data = stock.history ( periode = '1d' )
siste_pris = lager_data [ 'Lukk' ] .iloc [ - 1 ]
data = { 'symbol' : 'AAPL' , 'pris' : siste pris }
produsent.send ( 'aksjepris' , data )
tid.søvn ( 10 ) # Simuler sanntidsoppdateringer hvert 10. sekund


Vi oppretter en 'KafkaProducer'-forekomst med Kafka-meglerens adresse i denne koden. Inne i loopen bruker vi 'yfinance' for å få siste aksjekurs for Apple Inc. ('AAPL'). Deretter trekker vi ut den siste sluttkursen og sender den til emnet 'aksjepris'. Etter hvert introduserer vi en tidsforsinkelse for å simulere sanntidsoppdateringene hvert 10. sekund.

La oss lage en Kafka-forbruker for å lese og behandle aksjekursdataene fra 'aksjepris'-emnet.

forbruker = KafkaForbruker ( 'aksjepris' ,
bootstrap_servers = 'localhost:9092' ,
verdi_deserializer =lambda x: json.loads ( x.dekode ( 'utf-8' ) ) )

til beskjed i forbruker:
stock_data = melding.verdi
skrive ut ( f 'Mottakte lagerdata: {stock_data['symbol']} - Pris: {stock_data['price']}' )


Denne koden ligner på forrige eksempels forbrukeroppsett. Den leser og behandler meldingene fra 'aksjepris'-emnet kontinuerlig og skriver ut aksjesymbolet og prisen til konsollen. Vi kjører kodecellene sekvensielt, for eksempel én etter én i Google Colab for å kjøre produsent og forbruker. Produsenten får og sender aksjekursoppdateringene i sanntid mens forbrukeren leser og viser disse dataene.

! pip installere kafka-python yfinance
fra kafka import KafkaProducer, KafkaConsumer
importere json
importere finans som yf
import tid
produsent = KafkaProdusent ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( i ) .encode ( 'utf-8' ) )

samtidig som Ekte:
aksje = yf.Ticker ( 'AAPL' ) # Apple Inc. aksje
stock_data = stock.history ( periode = '1d' )
siste_pris = lager_data [ 'Lukk' ] .iloc [ - 1 ]

data = { 'symbol' : 'AAPL' , 'pris' : siste pris }

produsent.send ( 'aksjepris' , data )

tid.søvn ( 10 ) # Simuler sanntidsoppdateringer hvert 10. sekund
forbruker = KafkaForbruker ( 'aksjepris' ,
bootstrap_servers = 'localhost:9092' ,
verdi_deserializer =lambda x: json.loads ( x.dekode ( 'utf-8' ) ) )

til beskjed i forbruker:
stock_data = melding.verdi
skrive ut ( f 'Mottakte lagerdata: {stock_data['symbol']} - Pris: {stock_data['price']}' )


I analysen av utdataene etter at koden er kjørt, vil vi observere at aksjekursoppdateringene i sanntid for Apple Inc. blir produsert og konsumert.

Konklusjon

I dette unike eksemplet demonstrerte vi implementeringen av sanntidsdatastrømming i Python ved å bruke Apache Kafka og «yfinance»-biblioteket for å fange opp og behandle aksjemarkedsdata. Vi forklarte hver linje i koden grundig. Sanntidsdatastrømming kan brukes på ulike felt for å bygge de virkelige applikasjonene innen finans, IoT og mer.