Skip to content

Silent thread leak on exception in completeValueForList #2068

@KammererTob

Description

@KammererTob

This is a very specific bug and thus might not be relevant to most people, but i think it should at least cover the possibility and correctly throw the exception/not leak the thread. I am not sure i fully understand the bug, but i have a reproduction which i will share.

The bug happens when the completeValueForList method throws an exception (for example if getting the iterator throws an exception) and at the same time there is a deeper batch loaded level in another branch of the AST. This leads to the deeper batch loader never to be dispatched, because the parent level is never marked as completed (because of the exception).

To Reproduce
The crucial part of the reproduction is the RuntimeException in the iterator of the overwritten List implementation. For our use case this exception was a LazyInitializationException (Hibernate) thrown inside the iterator - so not an uncommon dependency. The error is obviously something which should be avoided, but since this does not throw any error it is quite hard to pinpoint what and where something went wrong.

package graphql

import graphql.execution.instrumentation.dataloader.DataLoaderDispatcherInstrumentation
import graphql.schema.DataFetcher
import graphql.schema.DataFetchingEnvironment
import graphql.schema.StaticDataFetcher
import graphql.schema.idl.RuntimeWiring
import org.apache.commons.lang3.concurrent.BasicThreadFactory
import org.dataloader.BatchLoader
import org.dataloader.DataLoader
import org.dataloader.DataLoaderOptions
import org.dataloader.DataLoaderRegistry
import spock.lang.Specification

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring
import static graphql.ExecutionInput.newExecutionInput

class Issue2068 extends Specification {
    def "deadlock attempt"() {
        setup:
        def sdl = """
        type Nation {
            name: String
        }
        
        type Toy {
            name: String
        }
        
        type Owner {
            nation: Nation
        }
        
        type Cat {
            toys: [Toy]
        }
        
        type Dog {
            owner: Owner
        }
        
        type Pets {
            cats: [Cat]
            dogs: [Dog]
        }
        
        type Query {
            pets: Pets
        }
        """

        def cats = [['id': "cat-1"]]
        def dogs = [['id': "dog-1"]]

        ThreadFactory threadFactory = new BasicThreadFactory.Builder()
                .namingPattern("resolver-chain-thread-%d").build()
        def executor = new ThreadPoolExecutor(15, 15, 0L,
                TimeUnit.MILLISECONDS, new SynchronousQueue<>(), threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy())

        DataFetcher nationsDf = { env -> env.getDataLoader("owner.nation").load(env) }
        DataFetcher ownersDf = { env -> env.getDataLoader("dog.owner").load(env) }

        def wiring = RuntimeWiring.newRuntimeWiring()
                .type(newTypeWiring("Query")
                        .dataFetcher("pets", new StaticDataFetcher(['cats': cats, 'dogs': dogs])))
                .type(newTypeWiring("Cat")
                        .dataFetcher("toys", new StaticDataFetcher(new List<Object>() {
                            @Override
                            int size() {
                                return 1
                            }

                            @Override
                            boolean isEmpty() {
                                return false
                            }

                            @Override
                            boolean contains(Object o) {
                                return false
                            }

                            @Override
                            Iterator iterator() {
                                throw new RuntimeException()
                            }

                            @Override
                            Object[] toArray() {
                                return new Object[0]
                            }

                            @Override
                            Object[] toArray(Object[] a) {
                                return null
                            }

                            @Override
                            boolean add(Object o) {
                                return false
                            }

                            @Override
                            boolean remove(Object o) {
                                return false
                            }

                            @Override
                            boolean containsAll(Collection c) {
                                return false
                            }

                            @Override
                            boolean addAll(Collection c) {
                                return false
                            }

                            @Override
                            boolean addAll(int index, Collection c) {
                                return false
                            }

                            @Override
                            boolean removeAll(Collection c) {
                                return false
                            }

                            @Override
                            boolean retainAll(Collection c) {
                                return false
                            }

                            @Override
                            void clear() {

                            }

                            @Override
                            Object get(int index) {
                                return null
                            }

                            @Override
                            Object set(int index, Object element) {
                                return null
                            }

                            @Override
                            void add(int index, Object element) {

                            }

                            @Override
                            Object remove(int index) {
                                return null
                            }

                            @Override
                            int indexOf(Object o) {
                                return 0
                            }

                            @Override
                            int lastIndexOf(Object o) {
                                return 0
                            }

                            @Override
                            ListIterator listIterator() {
                                return null
                            }

                            @Override
                            ListIterator listIterator(int index) {
                                return null
                            }

                            @Override
                            List subList(int fromIndex, int toIndex) {
                                return null
                            }
                        })))
                .type(newTypeWiring("Dog")
                        .dataFetcher("owner", ownersDf))
                .type(newTypeWiring("Owner")
                        .dataFetcher("nation", nationsDf))
                .build()

        def schema = TestUtil.schema(sdl, wiring)

        when:
        def graphql = GraphQL.newGraphQL(schema)
                .instrumentation(new DataLoaderDispatcherInstrumentation())
                .build()

        then: "execution shouldn't hang"
        DataLoaderRegistry dataLoaderRegistry = mkNewDataLoaderRegistry(executor)

        def result = graphql.executeAsync(newExecutionInput()
                .dataLoaderRegistry(dataLoaderRegistry)
                .query("""
                query LoadPets {
                      pets {
                        cats {
                          toys {
                            name
                          }
                        }
                        dogs {
                          owner {
                            nation {
                              name
                            }
                          }
                        }
                      }
                    }
                    """)
                .build())
        // wait for each future to complete and grab the results
        result.whenComplete({ results, error ->
            if (error) {
                throw error
            }
            results.each { assert it.errors.empty }
        }).join()
    }

    private static DataLoaderRegistry mkNewDataLoaderRegistry(executor) {
        def dataLoaderNations = new DataLoader<Object, Object>(new BatchLoader<DataFetchingEnvironment, List<Object>>() {
            @Override
            CompletionStage<List<List<Object>>> load(List<DataFetchingEnvironment> keys) {
                return CompletableFuture.supplyAsync({
                    def nations = []
                    for (int i = 1; i <= 1; i++) {
                        nations.add(['id': "nation-$i", 'name': "nation-$i"])
                    }
                    return nations
                }, executor) as CompletionStage<List<List<Object>>>
            }
        }, DataLoaderOptions.newOptions().setMaxBatchSize(5))

        def dataLoaderOwners = new DataLoader<Object, Object>(new BatchLoader<DataFetchingEnvironment, List<Object>>() {
            @Override
            CompletionStage<List<List<Object>>> load(List<DataFetchingEnvironment> keys) {
                return CompletableFuture.supplyAsync({
                    def owners = []
                    for (int i = 1; i <= 1; i++) {
                        owners.add(['id': "owner-$i"])
                    }
                    return owners
                }, executor) as CompletionStage<List<List<Object>>>
            }
        }, DataLoaderOptions.newOptions().setMaxBatchSize(5))

        def dataLoaderRegistry = new DataLoaderRegistry()
        dataLoaderRegistry.register("dog.owner", dataLoaderOwners)
        dataLoaderRegistry.register("owner.nation", dataLoaderNations)
        dataLoaderRegistry
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions