Skip to content

Any file operation stops after one buffer #198

@KoenR3

Description

@KoenR3

Steps to reproduce:
Behaviour is the same when using Alpakka Parquet writer using the queryRaw or query functionality.

Query listed as an example. Any query that has more points than the buffer should produce the same results

  val influxDBClient: InfluxDBClientScala = InfluxDBClientScalaFactory
    .create(influxOptions, 10000, OverflowStrategy.backpressure)

  val solar_inverter: Flux = Flux
    .from("electricity")
    .range(
      ZonedDateTime
        .of(
          LocalDate.of(2020, 1, 1).atStartOfDay(),
          ZoneId.of("Europe/Brussels")
        )
        .toInstant,
      ZonedDateTime
        .now(
          ZoneId.of("Europe/Brussels")
        )
        .toInstant
    )
    .filter(
      Restrictions.and(
        Restrictions.measurement().equal("solar_inverter_total_power"),
        Restrictions.value().exists()
      )
    )

  val querySource: Source[String, NotUsed] =
    influxDBClient.getQueryScalaApi().queryRaw(solar_inverter.toString())

  val fp = "./test.txt"

  val filePath = Paths.get(fp)

  val count= querySource
    .log("Source")
    .withAttributes(akkaLoggerAttributes)
    .map( s => ByteString(s + "\n"))
     .runWith(FileIO.toPath(filePath))
    .recover(e => {
      logger.error(e.getMessage)
      throw e
    })

  count onComplete {
    case Success(_) =>
      influxDBClient.close()
      system.terminate()
    case Failure(e) =>
      logger.error(e)
      influxDBClient.close()
      system.terminate()
  }

Expected behavior:
All measurements should be streamed into the file

Actual behavior:
When querying a source which has more measurements than the buffer size the stream finishes after approximately one buffer when using FileIO or Alpakka Parquet writer

Specifications:

  • Client Version: 1.15
  • Akka Streams: 2.6.8
  • InfluxDB Version: 1.8.4
  • JDK Version: 1.8
  • Platform: Mac OS

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions