Json: Convertendo e normalizando em PySpark - Para inciantes
- #Data
- #Python
- #PySpark
Parte 1/2
Imagine que vamos fazer ETL de um conjunto de dados e a resposta da extração nos apresentou dados estruturados (json) em aninhamentos. O que significa que há arrays com listas de objetos e objetos que também contêm outras listas neles. No contexto do Pyspark, para trabalhar com JSON é essencial entender a estrutura dos dados. Podemos fazer isso verificando os campos disponíveis, os seus tipos de dados e os aninhamentos existentes. Então com base na estrutura dos dados nós definimos o Schema.
O Schema de um conjunto de dados pode ser definido de forma explícita por inferência. Por inferência significa dizer que você pode optar por deixar o Spark inferir automaticamente o esquema dos dados ao ler JSON( CSV ou outros formatos), mas convenhamos que programar é escrever instruções ou algoritmos que dizem ao computador exatamente o que fazer numa linguagem que ele pode entender e sem a qual ele não faria nada por conta própria, então é preferível especificar explicitamente o esquema para garantir que os dados sejam tratados corretamente, especialmente em cenários complexos ou críticos para o negócio.
Com as devidas importações, criamos uma função pra consumir os dados de ‘Rockets’ da SpaceX:
def extract_data():
url = 'https://api.spacexdata.com/v3/rockets'
response = requests.get(url)
rockets_data = response.json()
return rockets_data
Ao visualizar os dados da API percebemos que existem estruturas dentro de estruturas. Há campos com int, float e string.
Realizamos uma iteração para conhecer os nomes dos campos desse conjunto de dados:
for key in rockets_data[0].keys():
print(key)
E a resposta será algo como:
id
active
stages
boosters
cost_per_launch
success_rate_pct
first_flight
country
company
height
diameter
mass
payload_weights
first_stage
second_stage
engines
landing_legs
flickr_images
wikipedia
description
rocket_id
rocket_name
rocket_type
Vamoas ver o DataType dos itens:
Criamos a sessão do Spark e imprimimos o Schema:
df = spark.createDataFrame(rockets_data)
df.printSchema
Como era de se esperar em estruturas com subcampos complexo e com tipos mistos de dados, a solicitação retornou um erro:
PySparkTypeError: [CANNOT_MERGE_TYPE] Can not merge type `DoubleType` and `LongType`.
Mesmo que você tente ver o DataType item por item, o erro persistirá. o TypeError é bem claro, o Spark não conseguiu fazer o merge dos dados. Alguns campos trazem valores int, e o mesmo campo em outro item traz float. É como você me trazer melancia e carambola, pra fazer cobertura de chocolate de um bolo. É tudo fruta, mas são tipos diferentes.
spark = SparkSession.builder.appName('Extract SpaceX Rockets Data').getOrCreate()
df = spark.createDataFrame(rockets_data)
df.select('first_flight').printSchema()
O código retorna o novamente o erro:
PySparkTypeError: [CANNOT_MERGE_TYPE] Can not merge type `DoubleType` and `LongType`
Isso aconteceu porque o Spark tentou inferir automaticamente os tipos de dados dos campos com base nos valores que ele encontrou. Podemos observar que o problema está nos valores numéricos(DoubleType e LongType - INT e FLOAT).
Se a ideia é conseguir visualizar os dados de forma tabular, Spark DataFrame, precisaremos definir os Schema explicitamente. Mas antes vamos rodar um código diagnóstico para ver o DataType dos campos e treinar nosso olhar para os marcadores de problemas. Não sei você, mas eu sou o tipo que gosta de ver os pormenores das coisas e procurar padrão em tudo.
Como o TypError foi numérico (DoubleType e LongType - INT e FLOAT), onde o spark sozinho não conseguiu combinar e retornar corretamente, vamos informar explicitamente pra ele nos trazer os valores de int e float, aqueles valores que não puderam ser automaticamente combinados.
for rocket in rockets_data:
for key, value in rocket.items():
data_type = type(value)
if not isinstance(value, (int, float)):
print(f'Chave: {key}, Valor: {value}, Tipo: {data_type}')
No nosso dicionário rockets_data iteramos cada par de chave-valor em for key, value in rocket.items() e determinamos o tipo de dados value. Se value não for int nem float, o segundo bloco é executado. Nos trazendo a chave associada ao value {key}, o valor analisado {value} e tipo de dado de value {data_type}. Então, o spark nos retorna as informações:
Chave: rocket_id, Valor: falcon1, Tipo: <class 'str'>
Chave: rocket_name, Valor: Falcon 1, Tipo: <class 'str'>
Chave: rocket_type, Valor: rocket, Tipo: <class 'str'>
Chave: first_flight, Valor: 2010-06-04, Tipo: <class 'str'>
Chave: country, Valor: United States, Tipo: <class 'str'>
Chave: company, Valor: SpaceX, Tipo: <class 'str'>
Chave: height, Valor: {'meters': 70, 'feet': 229.6}, Tipo: <class 'dict'>
Chave: diameter, Valor: {'meters': 3.7, 'feet': 12}, Tipo: <class 'dict'>
Chave: mass, Valor: {'kg': 549054, 'lb': 1207920}, Tipo: <class 'dict'>
Chave: payload_weights, Valor: [{'id': 'leo', 'name': 'Low Earth Orbit', 'kg': 22800, 'lb': 50265}, {'id': 'gto', 'name': 'Geosynchronous Transfer Orbit', 'kg': 8300, 'lb': 18300}, {'id': 'mars', 'name': 'Mars Orbit', 'kg': 4020, 'lb': 8860}], Tipo: <class 'list'>
Chave: first_stage, Valor: {'reusable': True, 'engines': 9, 'fuel_amount_tons': 385, 'burn_time_sec': 162, 'thrust_sea_level': {'kN': 7607, 'lbf': 1710000}, 'thrust_vacuum': {'kN': 8227, 'lbf': 1849500}}, Tipo: <class 'dict'>
Chave: second_stage, Valor: {'reusable': False, 'engines': 1, 'fuel_amount_tons': 90, 'burn_time_sec': 397, 'thrust': {'kN': 934, 'lbf': 210000}, 'payloads': {'option_1': 'dragon', 'option_2': 'composite fairing', 'composite_fairing': {'height': {'meters': 13.1, 'feet': 43}, 'diameter': {'meters': 5.2, 'feet': 17.1}}}}, Tipo: <class 'dict'>
Chave: engines, Valor: {'number': 9, 'type': 'merlin', 'version': '1D+', 'layout': 'octaweb', 'isp': {'sea_level': 288, 'vacuum': 312}, 'engine_loss_max': 2, 'propellant_1': 'liquid oxygen', 'propellant_2': 'RP-1 kerosene', 'thrust_sea_level': {'kN': 845, 'lbf': 190000}, 'thrust_vacuum': {'kN': 914, 'lbf': 205500}, 'thrust_to_weight': 180.1}, Tipo: <class 'dict'>
Chave: landing_legs, Valor: {'number': 4, 'material': 'carbon fiber'}, Tipo: <class 'dict'>
if not isinstance(value, (int, float)): é uma função padrão em Python, usamos ela pra verificar se o tipo de dado de uma variável não é nem inteiro (int) nem decimal (float). Aí então ela se torna True.
Nessa parte, o olho treme quando vê as estruturas de dicionário (dict) e as listas (list). Agora nós sabemos porque vimos na estrutura do json ou no retorno do código diagnóstico que:
height: Contém um dicionário com chaves 'meters' e 'feet'.
diameter: Contém um dicionário com chaves 'meters' e 'feet'.
mass: Contém um dicionário com chaves 'kg' e 'lb'.
payload_weights: É uma lista de dicionários
first_stage: É um dicionário com várias chaves e valores complexos.
second_stage: É um dicionário com várias chaves e valors complexos
engines: É um dicionário com várias chaves e valores complexos.
landing_legs: É um dicionário com várias chaves e valors complexos.
Entre outras situações ...
Esse diagnóstico foi importante pra identificar as estruturas complexas problemáticas. É a partir desses campos que vamos escrever o Schema e instruir o Spark a nos retornar os tipos correto dos dados.
Agora precisaremos:
- Tratar o Erro de Tipos de Dados: convertendo os tipos de dados inconsistentes para garantir que os dados em cada coluna sejam do mesmo tipo com interpretação correta.
- Escrever o StrucType ou o Schema.
- Normalizar o JSON: extraindo os campos aninhados, transformando eles em colunas separadas.
Qual faremos primeiro?
Parte 2
Bom fim de semana a todos!