Recentemente eu estava com problemas de Spill em um job no Spark e já tinha tentado algumas formas de mitigar, porém sem surtir muitos efeitos, foi aí que eu me deparei com uma explosão de dados enquanto investigava a minha execução na Spark UI para achar de onde o spill estava vindo.
Se você não sabe o que é Spill, eu já escrevi sobre aqui.
De forma resumida, a explosão de dados acontece quando os dados crescem de forma significativa após uma transformação e as formas mais comuns disso acontecer é após você ler um arquivo que possui coleções como matrizes, listas e mapas, ou então após uma operação de join.
No meu caso, a explosão acontecia no join e para identificar você precisa ir lá na aba SQL / DataFrame na Spark UI e procurar pelo nó das métricas do SortMergeJoin ou SuffleHashJoin.
Se você se deparar com a métrica “estimated rows output” ou “est” com um número em vermelho muito alto e algo como (100000000000X), você pode ter identificado uma potencial explosão.
Mas qual o impacto dessa explosão?
Por padrão o Spark lê 128 MB por task por core para arquivos Parquet, Delta, etc., o que para 99% dos jobs, funciona muito bem!
Mas quando você está tendo uma explosão de dados esses 128MB podem se tornar alguns GBs, o que se torna um problema pois um único núcleo de CPU pode não ter memória suficiente para caber nessa partição explodida e como consequência, esses dados serão derramados no disco.
Soluções
Diminuir o tamanho da partição de entrada
Utilize spark.sql.files.maxPartitionBytes para criar partições menores a fim de controlar o explode().
- Você pode tentar valores como 16BM ou 32MB, por exemplo.
%python
spark.conf.set("spark.sql.files.maxPartitionBytes", <tamanho em bytes>)
%sql
set spark.sql.files.maxPartitionBytes = <tamanho em bytes>
Importante lembrar que o valor é em bytes, caso tenha dúvidas na conversão, utilizar um conversor online pode ser uma boa.
Usando repartition()
Você pode explicitamente chamar a função repartition() logo após a função read() para aumentar o número total de partições.
Aumentando o número de partições de suffle
Quando a explosão acontece devido a uma operação de join, uma solução simples pode ser aumentar o número de partições de suffle.
Aumentar o número de partições de suffle irá diminuir o tamanho das partições para menos de 128MB.
%python
spark.conf.set("spark.sql.suffle.partitions","<quantidade de partições>")
%sql
set spark.sql.shuffle.partitions = <quantidade de partições>
O valor default para as partições de suffle são 200 partições.
No meu caso de uso específico, o que solucionou foi diminuir o tamanho da partição de entrada para 16MB.
Não existe bala de prata ou solução milagrosa, você terá que testar cada uma das sugestões no seu caso de uso e ver qual irá solucionar o seu problema, no meu caso era Spill.
Bom pessoal, a dica de hoje é essa e se você tiver alguma dúvida sobre Spark ou quiser sugerir algum assunto, me procure no LinkedIn ou no e-mail douglas@engenheirodedados.cloud
Para conhecer um pouco mais sobre o BRAINS, você pode ler o post introdutório BRAINS – Brazilian AI Networks e acessar a nossa comunidade.
Até a próxima.