Identificando e solucionando explosões de dados no Spark

Explosão de Dados no Spark
Explosão de Dados no Spark (Dall-e 2)
Domine as explosões de dados no Spark: aprenda a identificar, entender e resolver gargalos de desempenho com esse post e transforme desafios em eficiência.

Recentemente eu estava com problemas de Spill em um job no Spark e já tinha tentado algumas formas de mitigar, porém sem surtir muitos efeitos, foi aí que eu me deparei com uma explosão de dados enquanto investigava a minha execução na Spark UI para achar de onde o spill estava vindo.

Se você não sabe o que é Spill, eu já escrevi sobre aqui.

De forma resumida, a explosão de dados acontece quando os dados crescem de forma significativa após uma transformação e as formas mais comuns disso acontecer é após você ler um arquivo que possui coleções como matrizes, listas e mapas, ou então após uma operação de join.

No meu caso, a explosão acontecia no join e para identificar você precisa ir lá na aba SQL / DataFrame na Spark UI e procurar pelo nó das métricas do SortMergeJoin ou SuffleHashJoin.

Se você se deparar com a métrica “estimated rows output” ou “est” com um número em vermelho muito alto e algo como (100000000000X), você pode ter identificado uma potencial explosão.

Mas qual o impacto dessa explosão?

Por padrão o Spark lê 128 MB por task por core para arquivos Parquet, Delta, etc., o que para 99% dos jobs, funciona muito bem!

Mas quando você está tendo uma explosão de dados esses 128MB podem se tornar alguns GBs, o que se torna um problema pois um único núcleo de CPU pode não ter memória suficiente para caber nessa partição explodida e como consequência, esses dados serão derramados no disco.

Soluções

Diminuir o tamanho da partição de entrada

Utilize spark.sql.files.maxPartitionBytes para criar partições menores a fim de controlar o explode().

  • Você pode tentar valores como 16BM ou 32MB, por exemplo.
%python 
spark.conf.set("spark.sql.files.maxPartitionBytes", <tamanho em bytes>)
%sql
set spark.sql.files.maxPartitionBytes = <tamanho em bytes>

Importante lembrar que o valor é em bytes, caso tenha dúvidas na conversão, utilizar um conversor online pode ser uma boa.

Usando repartition()

Você pode explicitamente chamar a função repartition() logo após a função read() para aumentar o número total de partições.

Aumentando o número de partições de suffle

Quando a explosão acontece devido a uma operação de join, uma solução simples pode ser aumentar o número de partições de suffle.

Aumentar o número de partições de suffle irá diminuir o tamanho das partições para menos de 128MB.

%python
spark.conf.set("spark.sql.suffle.partitions","<quantidade de partições>")
%sql
set spark.sql.shuffle.partitions = <quantidade de partições>

O valor default para as partições de suffle são 200 partições.

No meu caso de uso específico, o que solucionou foi diminuir o tamanho da partição de entrada para 16MB.

Não existe bala de prata ou solução milagrosa, você terá que testar cada uma das sugestões no seu caso de uso e ver qual irá solucionar o seu problema, no meu caso era Spill.

Bom pessoal, a dica de hoje é essa e se você tiver alguma dúvida sobre Spark ou quiser sugerir algum assunto, me procure no LinkedIn ou no e-mail douglas@engenheirodedados.cloud

Para conhecer um pouco mais sobre o BRAINS, você pode ler o post introdutório BRAINS – Brazilian AI Networks e acessar a nossa comunidade.

Até a próxima.

#NoBrains #NoGains 🧠

0 Shares:
Você também pode gostar