-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[DF] Crash with distributed RDataFrame on dask with dask_jobqueue #9429
Copy link
Copy link
Closed
Labels
Description
- Checked for duplicates
Describe the bug
I've been trying out the new RDF.Experimental.Distributed.Dask.RDataFrame in ROOT master, which is a great addition. It seems to work fine when using a single-machine cluster of workers (dask.distributed.LocalCluster), but it fails when using a batch cluster (either dask_jobqueue.HTCondorCluster or dask_jobqueue.SLURMCluster):
Traceback (most recent call last):
File "/afs/cern.ch/user/s/swertz/testDistRDF/testCondor.py", line 81, in <module>
val = prod.GetValue()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 130, in GetValue
self.execute_graph()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 121, in execute_graph
headnode.backend.execute(generator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 134, in execute
ranges = headnode.build_ranges()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 91, in build_ranges
return Ranges.get_balanced_ranges(self.nentries, self.npartitions)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 276, in get_balanced_ranges
partition_size = nentries // npartitions
ZeroDivisionError: integer division or modulo by zero
To Reproduce
Minimal example, ran on lxplus/lxbatch:
import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
distributed = ROOT.RDF.Experimental.Distributed
from dask.distributed import Client, LocalCluster
from dask_jobqueue import HTCondorCluster
cluster = HTCondorCluster(cores=1, processes=1, memory="1GB", disk="0.1GB", job_extra={"jobflavour": "espresso"})
cluster.scale(jobs=1)
# Works fine with:
# cluster = LocalCluster(n_workers=1, threads_per_worker=1)
client = Client(cluster)
df = RDataFrame(100, daskclient=client)
df = df.Define("myCol", "10")
prod = df.Mean("myCol")
val = prod.GetValue()
print(f"Value: {val}")Setup
- ROOT from LCG dev3:
/cvmfs/sft.cern.ch/lcg/views/dev3/latest/x86_64-centos7-gcc11-opt/setup.sh - need to
pip install dask-jobqueue
Reactions are currently unavailable