Conversation
Codecov Report
@@ Coverage Diff @@
## master #273 +/- ##
==========================================
- Coverage 93.02% 91.32% -1.71%
==========================================
Files 33 33
Lines 502 530 +28
Branches 9 14 +5
==========================================
+ Hits 467 484 +17
- Misses 35 46 +11
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
|
Outstanding items:
|
I didn't investigate this issue yet. |
|
I haven't done any benchmarking with CE3. @vasilmkd you might be interested |
|
Can confirm, unsafeRunSync on IO initially shifts to the IO compute execution context and there are some other bookkeeping actions taken to ensure safe execution and propagation of errors, which has a non-trivial overhead especially visible in benchmarks. It's basically not an apples to apples comparison anymore. Not sure if this helps... |
| import cats.effect.{ContextShift, IO} | ||
| import cats.effect.Clock | ||
| import cats.effect.IO | ||
| import cats.effect.unsafe.IORuntime |
There was a problem hiding this comment.
| import cats.effect.unsafe.IORuntime | |
| import cats.effect.unsafe.implicits._ |
and you won't need to define the implicit val for IORuntime.
| implicit val F: Effect[IO] = IO.ioEffect | ||
| implicit val clock: Clock[IO] = Clock.create | ||
| implicit val F: Sync[IO] = IO.asyncForIO | ||
| implicit val dispatcher: Dispatcher[IO] = Dispatcher[IO].allocated.unsafeRunSync()._1 |
There was a problem hiding this comment.
bad idea to do allocated on Dispatcher, can you use an IORuntime instead?
There was a problem hiding this comment.
I guess it's not really possible since the interface is supposed to work on any F[_]...
| .settings(sharedSettings) | ||
| .settings( | ||
| libraryDependencies ++= (monix % Test) :: catsMtl :: sourcecode :: monixCatnap :: perfolation :: catsEffect :: cats | ||
| libraryDependencies ++= (catsEffect % Test) :: catsMtl :: sourcecode :: perfolation :: catsEffectStd :: cats |
There was a problem hiding this comment.
You don't need to add catsEffectStd if you have core already.
There was a problem hiding this comment.
@kubukoz I want to have IO only in tests, since cats-effect-std is enough for the core.
| timer: Timer[F], | ||
| contextShift: ContextShift[F] | ||
| case class AsyncLogger[F[_]](queue: Queue[F, LoggerMessage], timeWindow: FiniteDuration, inner: Logger[F])( | ||
| implicit F: Async[F] |
There was a problem hiding this comment.
Not sure if you need the full Async here
There was a problem hiding this comment.
The runF method uses F.start under the hood. If we move the event consumer loop outside of the class, the constraints can be relaxed to Monad and Clock.
Related comment: #273 (comment)
There was a problem hiding this comment.
start is from Spawn, Async is way more powerful than that ;)
| implicit F: ConcurrentEffect[F] | ||
| ): Logger[F] = F.toIO(withAsync(inner, timeWindow, maxBufferSize).allocated).unsafeRunSync()._1 | ||
| implicit F: Async[F], | ||
| dispatcher: Dispatcher[F] |
There was a problem hiding this comment.
It's not recommended to pass Dispatcher implicitly, you might be better off creating one here and using allocated here.
There was a problem hiding this comment.
Due to semantic of the withAsyncUnsafe method the Dispatcher cannot be instantiated:
def withAsyncUnsafe[F[_]](
inner: Logger[F],
timeWindow: FiniteDuration,
maxBufferSize: Option[Int]
)(
implicit F: Async[F]
): Logger[F] = {
val dispatcher: F[(Dispatcher[F], F[Unit])] = Dispatcher[F].allocated <- still cannot run an effect and access the dispatcher
}It can work in a case of the different signature:
def withAsync[F[_]]: Resource[F, Logger[F]] = ???
- def withAsyncUnsafe[F[_]](...): Logger[F] = ???
+ def withAsyncUnsafe[F[_]](...): F[Logger[F]] = ???@sergeykolbasov what do you think?
There was a problem hiding this comment.
I'd say if someone is feeling adventurous enough to deal with unsafe API, let them do it on their own by providing a custom dispatcher implicitly. If the users want to deal with unsafety, they should know better how to deal with it on their side.
There was a problem hiding this comment.
I'd take it explicitly, but I agree about having to manage it on the user's side :)
| @@ -28,7 +26,7 @@ abstract class DefaultLogger[F[_]](val minLevel: Level)(implicit clock: Clock[F] | |||
| exception = t, | |||
| position = position, | |||
| threadName = Thread.currentThread().getName, | |||
There was a problem hiding this comment.
random thought, this should be suspended
| _ <- timer.sleep(100.millis) | ||
| _ <- cs.shift | ||
| _ <- F.sleep(100.millis) | ||
| _ <- F.cede |
There was a problem hiding this comment.
Didn't know this, thanks!
| LoggerTests[F]( | ||
| new WriterTLogger[IO].withConstContext(Map.empty), | ||
| _.written.unsafeRunSync() | ||
| _.written.evalOn(singleThreadCtx).unsafeRunSync() |
There was a problem hiding this comment.
IO executes effects on different threads more often comparing to CE2. Therefore loggerMessageEq returns false since message.threadName is different. Executing effects on a single thread prevents such an issue.
On the other hand, evalOn feels more a bandaid than a proper fix. Perhaps I should ignore threadName field in the Eq logic.
| def runF: F[Fiber[F, Throwable, Unit]] = { | ||
| def drainOnce: F[Unit] = drain >> F.sleep(timeWindow) >> F.cede | ||
|
|
||
| F.start(drainOnce.foreverM[Unit]).map { fiber => |
There was a problem hiding this comment.
there is a Supervisor abstraction in CE3:
Would it fit better here?
There was a problem hiding this comment.
There was a problem hiding this comment.
I'm not sure to be honest. runF already is a part of the AsyncLogger lifecycle.
Btw, should runF even be public or part of the class? The event consumer loop should be started only once.
At this case, the definition of the AsyncLogger can be simplified:
case class AsyncLogger[F: Monad: Clock] private (...) extends DefaultLogger(...) {
def submit(msg: LoggerMessage): F[Unit] = ...
private def drain: F[Unit] = ...
}
object AsyncLogger {
def withAsync[F[_]: Async](inner: Logger[F], timeWindow: FiniteDuration, maxBufferSize: Option[Int]): Resource[F, Logger[F]] = {
val createQueue = ...
def backgroundConsumer(logger: AsyncLogger[F]): Resource[F, Unit] = {
def drainLoop: F[Unit] = F.andWait(logger.drain, timeWindow).foreverM[Unit]
// cannot use F.background due to a custom cancellation logic
Resource.make(F.start(drainLoop))(fiber => logger.drain >> fiber.cancel).void
}
for {
queue <- Resource.eval(createQueue)
logger <- Resource.pure(AsyncLogger(queue, timeWindow, inner))
_ <- backgroundConsumer(logger)
} yield logger
}
}|
Thanks for the effort @iRevive That RollingFileLogger spec is indeed annoying, however I couldn't manage the timer mock for that specific case back in the days. I guess it's related to the internal rolling loop, but gave up tracing it down to the root cause |
|
fs2-io is using Hotswap internally to implement something similar, for reference: https://github.com/typelevel/fs2/blob/24370abb527147da78b93d59a5be60e1079fdfbe/io/src/main/scala/fs2/io/file/Files.scala#L507-L555 |
|
Switched to |
| * Run internal loop of consuming events from the queue and push them down the chain | ||
| */ | ||
| def backgroundConsumer(logger: AsyncLogger[F]): Resource[F, Unit] = { | ||
| def drainLoop: F[Unit] = F.andWait(logger.drain, timeWindow).foreverM[Unit] |
There was a problem hiding this comment.
Seeing andWait being actually used makes me happy.
|
I can't promise I'll make it before the weekend, but I'm planning to go through this again and search for any potential fiber leaks etc. :) |
| minLevel: Level = Level.Trace | ||
| ): Managed[LoggerError, Logger[IO[LoggerError, *]]] = | ||
| ): ZManaged[Clock & CBlocking, LoggerError, Logger[IO[LoggerError, *]]] = | ||
| ZManaged |
There was a problem hiding this comment.
@kubukoz A gut feeling says there should be a more elegant combinator. Something similar to Resource.suspend.
|
Can we get this merged? :) |
|
@iRevive Do you mind repeating the |
|
@vasilmkd sure, I will give it a try. |
|
hey @sergeykolbasov, I don't see a release in maven, did it fail or did you just not have a chance to do it yet? Just FYI, I checked a local snapshot of this and it seems to work :) a little bummer about having to do |
Temporarily disabled modules:
,odin-zioodin-monixMissing CE3 dependencies:
,zio-catsmonixodin-coredepends oncats-effect-std.Changes due to missing Monix dependency:
ConcurrentQueuefrom Monix withQueuefromcats.effect.stdmonix.Taskwithcats.effect.IOin testsmonix.execution.Schedulerin favor ofIORuntimeOnce a compatible version of Monix will be released, I can revert these changes.
Benchmarks
I observed a performance degradation after the upgrade to CE3. Evaluating a task via
.unsafeRunSync()in a for-loop is 3x slower comparing to the CE2. Usingtraverseinstead of a for-loop leads to more clear results.for-loop
The results below represent the evaluation of a logging effect in a for-loop. Example:
traverse
AsyncLoggerBenchmark issue
From my point of view, the async logger benchmark implemented in a bit wrong way. And it does not measure the real throughput.
The key element of the
AsyncLoggeris a Queue. Logging a message, basically, an enqueue operation:In benchmarks, the size of a queue is
1_000_000elements and the flush period is1 millisecond. Since the JMH executes the code thousands of times, the queue is populated up to the limit almost immediately. Hence thetryOffermethod does nothing during evaluation:To prove my assumption I changed the logic of the background fiber:
def runF: F[Fiber[F, Throwable, Unit]] = { - def drainLoop: F[Unit] = drain >> F.sleep(timeWindow) >> F.cede >> drainLoop + def drainLoop: F[Unit] = F.unit F.start(drainLoop).map { fiber => new Fiber[F, Throwable, Unit] { override def cancel: F[Unit] = drain >> fiber.cancel override def join: F[Outcome[F, Throwable, Unit]] = fiber.join } } }The queue never being drained and
tryOfferdoes nothing. And the measurements became similar to the CE2 version: