Article image
Regilene Silva
Regilene Silva20/07/2024 20:06
Compartilhe

Json: Convertendo e normalizando em PySpark - Para inciantes

  • #Data
  • #Python
  • #PySpark

Parte 1/2

Imagine que vamos fazer ETL de um conjunto de dados e a resposta da extração nos apresentou dados estruturados (json) em aninhamentos. O que significa que há arrays com listas de objetos e objetos que também contêm outras listas neles. No contexto do Pyspark, para trabalhar com JSON é essencial entender a estrutura dos dados. Podemos fazer isso verificando os campos disponíveis, os seus tipos de dados e os aninhamentos existentes. Então com base na estrutura dos dados nós definimos o Schema.

O Schema de um conjunto de dados pode ser definido de forma explícita por inferência. Por inferência significa dizer que você pode optar por deixar o Spark inferir automaticamente o esquema dos dados ao ler JSON( CSV ou outros formatos), mas convenhamos que programar é escrever instruções ou algoritmos que dizem ao computador exatamente o que fazer numa linguagem que ele pode entender e sem a qual ele não faria nada por conta própria, então é preferível especificar explicitamente o esquema para garantir que os dados sejam tratados corretamente, especialmente em cenários complexos ou críticos para o negócio.

Com as devidas importações, criamos uma função pra consumir os dados de ‘Rockets’ da SpaceX:

def extract_data():

  url = 'https://api.spacexdata.com/v3/rockets'

  response = requests.get(url)

  rockets_data = response.json()

  return rockets_data  

Ao visualizar os dados da API percebemos que existem estruturas dentro de estruturas. Há campos com int, float e string.

Realizamos uma iteração para conhecer os nomes dos campos desse conjunto de dados:

 for key in rockets_data[0].keys():
          print(key)

E a resposta será algo como:

id

active

stages

boosters

cost_per_launch

success_rate_pct

first_flight

country

company

height

diameter

mass

payload_weights

first_stage

second_stage

engines

landing_legs

flickr_images

wikipedia

description

rocket_id

rocket_name

rocket_type

Vamoas ver o DataType dos itens:

Criamos a sessão do Spark e imprimimos o Schema:
df = spark.createDataFrame(rockets_data)
df.printSchema

Como era de se esperar em estruturas com subcampos complexo e com tipos mistos de dados, a solicitação retornou um erro:

PySparkTypeError: [CANNOT_MERGE_TYPE] Can not merge type `DoubleType` and `LongType`.

Mesmo que você tente ver o DataType item por item, o erro persistirá. o TypeError é bem claro, o Spark não conseguiu fazer o merge dos dados. Alguns campos trazem valores int, e o mesmo campo em outro item traz float. É como você me trazer melancia e carambola, pra fazer cobertura de chocolate de um bolo. É tudo fruta, mas são tipos diferentes.

spark = SparkSession.builder.appName('Extract SpaceX Rockets Data').getOrCreate()
df = spark.createDataFrame(rockets_data)
df.select('first_flight').printSchema()

O código retorna o novamente o erro:

PySparkTypeError: [CANNOT_MERGE_TYPE] Can not merge type `DoubleType` and `LongType`

Isso aconteceu porque o Spark tentou inferir automaticamente os tipos de dados dos campos com base nos valores que ele encontrou. Podemos observar que o problema está nos valores numéricos(DoubleType e LongType - INT e FLOAT). 

Se a ideia é conseguir visualizar os dados de forma tabular, Spark DataFrame,  precisaremos definir os Schema explicitamente. Mas antes vamos rodar um código diagnóstico para ver o DataType dos campos e treinar nosso olhar para os marcadores de problemas. Não sei você, mas eu sou o tipo que gosta de ver os pormenores das coisas e procurar padrão em tudo.

Como o TypError foi numérico (DoubleType e LongType - INT e FLOAT), onde o spark sozinho não conseguiu combinar e retornar corretamente, vamos informar explicitamente pra ele nos trazer os valores de int e float, aqueles valores que não puderam ser automaticamente combinados.

for rocket in rockets_data:
  for key, value in rocket.items():
    data_type = type(value)    
      if not isinstance(value, (int, float)):
          print(f'Chave: {key}, Valor: {value}, Tipo: {data_type}')

No nosso dicionário rockets_data iteramos cada par de chave-valor em for key, value in rocket.items() e determinamos o tipo de dados value. Se value não for int nem float, o segundo bloco é executado. Nos trazendo a chave associada ao value {key}, o valor analisado {value} tipo de dado de value {data_type}. Então, o spark nos retorna as informações:

Chave: rocket_id, Valor: falcon1, Tipo: <class 'str'>

Chave: rocket_name, Valor: Falcon 1, Tipo: <class 'str'>

Chave: rocket_type, Valor: rocket, Tipo: <class 'str'>

Chave: first_flight, Valor: 2010-06-04, Tipo: <class 'str'>

Chave: country, Valor: United States, Tipo: <class 'str'>

Chave: company, Valor: SpaceX, Tipo: <class 'str'>

Chave: height, Valor: {'meters': 70, 'feet': 229.6}, Tipo: <class 'dict'>

Chave: diameter, Valor: {'meters': 3.7, 'feet': 12}, Tipo: <class 'dict'>

Chave: mass, Valor: {'kg': 549054, 'lb': 1207920}, Tipo: <class 'dict'>

Chave: payload_weights, Valor: [{'id': 'leo', 'name': 'Low Earth Orbit', 'kg': 22800, 'lb': 50265}, {'id': 'gto', 'name': 'Geosynchronous Transfer Orbit', 'kg': 8300, 'lb': 18300}, {'id': 'mars', 'name': 'Mars Orbit', 'kg': 4020, 'lb': 8860}], Tipo: <class 'list'>

Chave: first_stage, Valor: {'reusable': True, 'engines': 9, 'fuel_amount_tons': 385, 'burn_time_sec': 162, 'thrust_sea_level': {'kN': 7607, 'lbf': 1710000}, 'thrust_vacuum': {'kN': 8227, 'lbf': 1849500}}, Tipo: <class 'dict'>

Chave: second_stage, Valor: {'reusable': False, 'engines': 1, 'fuel_amount_tons': 90, 'burn_time_sec': 397, 'thrust': {'kN': 934, 'lbf': 210000}, 'payloads': {'option_1': 'dragon', 'option_2': 'composite fairing', 'composite_fairing': {'height': {'meters': 13.1, 'feet': 43}, 'diameter': {'meters': 5.2, 'feet': 17.1}}}}, Tipo: <class 'dict'>

Chave: engines, Valor: {'number': 9, 'type': 'merlin', 'version': '1D+', 'layout': 'octaweb', 'isp': {'sea_level': 288, 'vacuum': 312}, 'engine_loss_max': 2, 'propellant_1': 'liquid oxygen', 'propellant_2': 'RP-1 kerosene', 'thrust_sea_level': {'kN': 845, 'lbf': 190000}, 'thrust_vacuum': {'kN': 914, 'lbf': 205500}, 'thrust_to_weight': 180.1}, Tipo: <class 'dict'>

Chave: landing_legs, Valor: {'number': 4, 'material': 'carbon fiber'}, Tipo: <class 'dict'>

if not isinstance(value, (int, float)): é uma função padrão em Python, usamos ela pra verificar se o tipo de dado de uma variável não é nem inteiro (int) nem decimal (float). Aí então ela se torna True.

Nessa parte, o olho treme quando vê as estruturas de dicionário (dict) e as listas (list). Agora nós sabemos porque vimos na estrutura do json ou no retorno do código diagnóstico que:

height: Contém um dicionário com chaves 'meters' e 'feet'.

diameter: Contém um dicionário com chaves 'meters' e 'feet'.

mass: Contém um dicionário com chaves 'kg' e 'lb'.

payload_weights: É uma lista de dicionários

first_stage: É um dicionário com várias chaves e valores complexos.

second_stage: É um dicionário com várias chaves e valors complexos

engines: É um dicionário com várias chaves e valores complexos.

landing_legs: É um dicionário com várias chaves e valors complexos.
Entre outras situações ...

Esse diagnóstico foi importante pra identificar as estruturas complexas problemáticas. É a partir desses campos que vamos escrever o Schema e instruir o Spark a nos retornar os tipos correto dos dados.

Agora precisaremos:

  • Tratar o Erro de Tipos de Dados:  convertendo os tipos de dados inconsistentes para garantir que os dados em cada coluna sejam do mesmo tipo com interpretação correta.
  • Escrever o StrucType ou o Schema.
  • Normalizar o JSON: extraindo os campos aninhados, transformando eles em colunas separadas.


Qual faremos primeiro?

Parte 2

Bom fim de semana a todos!

Compartilhe
Comentários (0)