Skip to content

[DF] Crash with distributed RDataFrame on dask with dask_jobqueue #9429

@swertz

Description

@swertz
  • Checked for duplicates

Describe the bug

I've been trying out the new RDF.Experimental.Distributed.Dask.RDataFrame in ROOT master, which is a great addition. It seems to work fine when using a single-machine cluster of workers (dask.distributed.LocalCluster), but it fails when using a batch cluster (either dask_jobqueue.HTCondorCluster or dask_jobqueue.SLURMCluster):

Traceback (most recent call last):
File "/afs/cern.ch/user/s/swertz/testDistRDF/testCondor.py", line 81, in <module>
val = prod.GetValue()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 130, in GetValue
self.execute_graph()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 121, in execute_graph
headnode.backend.execute(generator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 134, in execute
ranges = headnode.build_ranges()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 91, in build_ranges
return Ranges.get_balanced_ranges(self.nentries, self.npartitions)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 276, in get_balanced_ranges
partition_size = nentries // npartitions
ZeroDivisionError: integer division or modulo by zero

To Reproduce

Minimal example, ran on lxplus/lxbatch:

import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
distributed = ROOT.RDF.Experimental.Distributed
from dask.distributed import Client, LocalCluster
from dask_jobqueue import HTCondorCluster

cluster = HTCondorCluster(cores=1, processes=1, memory="1GB", disk="0.1GB", job_extra={"jobflavour": "espresso"})
cluster.scale(jobs=1)
# Works fine with:
# cluster = LocalCluster(n_workers=1, threads_per_worker=1)
client = Client(cluster)
df = RDataFrame(100, daskclient=client)
df = df.Define("myCol", "10")
prod = df.Mean("myCol")
val = prod.GetValue()
print(f"Value: {val}")

Setup

  • ROOT from LCG dev3: /cvmfs/sft.cern.ch/lcg/views/dev3/latest/x86_64-centos7-gcc11-opt/setup.sh
  • need to pip install dask-jobqueue

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions