Gatling-Influx Custom Logback Appender

GatlingInfluxAppender

package com.example.appender

import ch.qos.logback.classic.Level
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.AppenderBase

import com.example.appender.GatlingLogParser._

/**
 * Logback appender that intercepts Gatling TRACE log lines, parses the
 * performance fields, and ships them to InfluxDB via [[InfluxWriter]].
 *
 * === Configuration (logback.xml) ===
 *
 * {{{
 * <appender name="INFLUX" class="com.example.appender.GatlingInfluxAppender">
 *   <!-- Required -->
 *   <influxUrl>http://localhost:8086/api/v2/write?org=myOrg&bucket=gatling&precision=ns</influxUrl>
 *
 *   <!-- InfluxDB 2.x: "Token <your-token>"  |  v1: "" or "Basic <base64>" -->
 *   <authToken>Token my-super-secret-token</authToken>
 *
 *   <!-- Optional tuning -->
 *   <batchSize>500</batchSize>
 *   <flushIntervalMs>1000</flushIntervalMs>
 *   <connectTimeoutMs>3000</connectTimeoutMs>
 *   <writeTimeoutMs>5000</writeTimeoutMs>
 *   <queueCapacity>10000</queueCapacity>
 *   <maxRetries>3</maxRetries>
 *   <captureUserEvents>true</captureUserEvents>
 * </appender>
 *
 * <!-- Attach only to Gatling's trace logger to avoid noise -->
 * <logger name="io.gatling" level="TRACE" additivity="false">
 *   <appender-ref ref="INFLUX"/>
 * </logger>
 * }}}
 *
 * The appender only processes TRACE-level events; it silently ignores
 * everything else, so it is safe to attach to a broader logger hierarchy.
 */
class GatlingInfluxAppender extends AppenderBase[ILoggingEvent] {

  // ── Logback-injected properties (set via XML <property> tags) ──────────────

  private var influxUrl:        String  = "http://localhost:8086/write?db=gatling&precision=ns"
  private var authToken:        String  = ""
  private var batchSize:        Int     = 500
  private var flushIntervalMs:  Long    = 1000L
  private var connectTimeoutMs: Long    = 3000L
  private var writeTimeoutMs:   Long    = 5000L
  private var queueCapacity:    Int     = 10_000
  private var maxRetries:       Int     = 3
  private var captureUserEvents: Boolean = true

  // Setters — Logback calls these via reflection when parsing the XML config
  def setInfluxUrl(v: String):         Unit = { influxUrl        = v }
  def setAuthToken(v: String):         Unit = { authToken        = v }
  def setBatchSize(v: Int):            Unit = { batchSize        = v }
  def setFlushIntervalMs(v: Long):     Unit = { flushIntervalMs  = v }
  def setConnectTimeoutMs(v: Long):    Unit = { connectTimeoutMs = v }
  def setWriteTimeoutMs(v: Long):      Unit = { writeTimeoutMs   = v }
  def setQueueCapacity(v: Int):        Unit = { queueCapacity    = v }
  def setMaxRetries(v: Int):           Unit = { maxRetries       = v }
  def setCaptureUserEvents(v: Boolean):Unit = { captureUserEvents = v }

  // ── Internal state ─────────────────────────────────────────────────────────

  @volatile private var writer: InfluxWriter = _

  // ── Logback lifecycle ──────────────────────────────────────────────────────

  override def start(): Unit = {
    if (influxUrl == null || influxUrl.isBlank) {
      addError("GatlingInfluxAppender: influxUrl must be set")
      return
    }
    writer = new InfluxWriter(
      influxUrl        = influxUrl,
      authToken        = authToken,
      batchSize        = batchSize,
      flushIntervalMs  = flushIntervalMs,
      connectTimeoutMs = connectTimeoutMs,
      writeTimeoutMs   = writeTimeoutMs,
      queueCapacity    = queueCapacity,
      maxRetries       = maxRetries
    )
    super.start()
    addInfo(s"GatlingInfluxAppender started → $influxUrl (batch=$batchSize, flush=${flushIntervalMs}ms)")
  }

  override def stop(): Unit = {
    super.stop()
    if (writer != null) {
      writer.close()
      addInfo("GatlingInfluxAppender stopped and flushed.")
    }
  }

  // ── Core append logic ──────────────────────────────────────────────────────

  override def append(event: ILoggingEvent): Unit = {
    // Only handle TRACE; ignore DEBUG / INFO / WARN / ERROR
    if (event.getLevel != Level.TRACE) return
    if (writer == null) return

    val line: Option[String] = GatlingLogParser.parse(event.getFormattedMessage) match {
      case e: RequestEvent =>
        Some(InfluxLineProtocol.fromRequest(e))

      case e: UserEvent if captureUserEvents =>
        Some(InfluxLineProtocol.fromUser(e))

      case _ =>
        None // Not a Gatling data record — skip silently
    }

    line.foreach(writer.enqueue)
  }
}

GatlingLogParser

package com.example.appender

import scala.util.matching.Regex

/**
 * Gatling emits TRACE-level log lines in two primary formats via its
 * logback integration (gatling-core DataWriterClient / LogFileData):
 *
 * REQUEST format:
 *   REQUEST\t<userId>\t<scenario>\t<groups>\t<requestName>\t<startMs>\t<endMs>\t<status>\t<message>
 *
 * USER (session lifecycle) format:
 *   USER\t<scenario>\t<userId>\t<event>\t<startMs>\t<endMs>
 *
 * Both are written to logger "io.gatling.http.engine.response" (and friends)
 * at TRACE level. The parser handles both and returns a sealed ADT result.
 */
object GatlingLogParser {

  // ── ADT ────────────────────────────────────────────────────────────────────

  sealed trait GatlingEvent

  /**
   * A completed HTTP request sample.
   *
   * @param scenario    Gatling scenario name
   * @param userId      Virtual user ID (Long)
   * @param groups      Dot-joined group hierarchy (may be empty)
   * @param requestName Name given to the request in the simulation
   * @param startMs     Epoch milliseconds when request was sent
   * @param endMs       Epoch milliseconds when response was received
   * @param status      "OK" or "KO"
   * @param message     Optional error message on KO
   */
  case class RequestEvent(
    scenario:    String,
    userId:      Long,
    groups:      String,
    requestName: String,
    startMs:     Long,
    endMs:       Long,
    status:      String,
    message:     Option[String]
  ) extends GatlingEvent {
    /** Derived response time in milliseconds. */
    def responseTimeMs: Long = endMs - startMs
  }

  /**
   * A virtual-user lifecycle event (START or END).
   *
   * @param scenario  Scenario name
   * @param userId    Virtual user ID
   * @param event     "START" or "END"
   * @param startMs   Epoch ms when the user session started
   * @param endMs     Epoch ms when this lifecycle event occurred
   */
  case class UserEvent(
    scenario: String,
    userId:   Long,
    event:    String,
    startMs:  Long,
    endMs:    Long
  ) extends GatlingEvent

  /** The log line didn't match any known Gatling format. */
  case object UnknownEvent extends GatlingEvent

  // ── Parsing ────────────────────────────────────────────────────────────────

  private val Tab = "\t"

  /**
   * Parse a raw log message string into a [[GatlingEvent]].
   * Returns [[UnknownEvent]] for lines that are not Gatling data records.
   */
  def parse(message: String): GatlingEvent = {
    if (message == null || message.isEmpty) return UnknownEvent

    val parts = message.split(Tab, -1)

    parts.headOption match {
      case Some("REQUEST") => parseRequest(parts)
      case Some("USER")    => parseUser(parts)
      case _               => UnknownEvent
    }
  }

  // REQUEST\t<userId>\t<scenario>\t<groups>\t<requestName>\t<startMs>\t<endMs>\t<status>[\t<message>]
  private def parseRequest(parts: Array[String]): GatlingEvent =
    if (parts.length < 8) UnknownEvent
    else
      safeParseNumbers(parts(5), parts(6)) match {
        case Some((start, end)) =>
          RequestEvent(
            scenario    = parts(2).trim,
            userId      = parts(1).trim.toLong,
            groups      = parts(3).trim,
            requestName = parts(4).trim,
            startMs     = start,
            endMs       = end,
            status      = parts(7).trim,
            message     = if (parts.length > 8 && parts(8).trim.nonEmpty) Some(parts(8).trim) else None
          )
        case None => UnknownEvent
      }

  // USER\t<scenario>\t<userId>\t<event>\t<startMs>\t<endMs>
  private def parseUser(parts: Array[String]): GatlingEvent =
    if (parts.length < 6) UnknownEvent
    else
      safeParseNumbers(parts(4), parts(5)) match {
        case Some((start, end)) =>
          try UserEvent(
            scenario = parts(1).trim,
            userId   = parts(2).trim.toLong,
            event    = parts(3).trim,
            startMs  = start,
            endMs    = end
          )
          catch { case _: NumberFormatException => UnknownEvent }
        case None => UnknownEvent
      }

  private def safeParseNumbers(a: String, b: String): Option[(Long, Long)] =
    try Some((a.trim.toLong, b.trim.toLong))
    catch { case _: NumberFormatException => None }
}

InfluxLineProtocol

package com.example.appender

import com.example.appender.GatlingLogParser.{RequestEvent, UserEvent}

/**
 * Converts [[GatlingLogParser.GatlingEvent]]s into InfluxDB Line Protocol
 * strings ready to POST to the /api/v2/write (InfluxDB 2.x) or
 * /write (InfluxDB 1.x) endpoint.
 *
 * Line Protocol format:
 *   <measurement>[,<tag_key>=<tag_value>...] <field_key>=<field_value>[,...] [<timestamp>]
 *
 * Timestamps are written in nanoseconds (InfluxDB default precision).
 */
object InfluxLineProtocol {

  /**
   * Serialise a [[RequestEvent]] into one Line Protocol string.
   *
   * Measurement : gatling_request
   * Tags        : scenario, request_name, groups, status
   * Fields      : response_time_ms, start_ms, end_ms, user_id
   * Timestamp   : endMs converted to nanoseconds
   */
  def fromRequest(e: RequestEvent): String = {
    val tags = buildTags(
      "scenario"     -> e.scenario,
      "request_name" -> e.requestName,
      "groups"       -> (if (e.groups.nonEmpty) e.groups else "none"),
      "status"       -> e.status
    )

    val fields = buildFields(
      "response_time_ms" -> s"${e.responseTimeMs}i",
      "start_ms"         -> s"${e.startMs}i",
      "end_ms"           -> s"${e.endMs}i",
      "user_id"          -> s"${e.userId}i"
    )

    s"gatling_request,$tags $fields ${toNanos(e.endMs)}"
  }

  /**
   * Serialise a [[UserEvent]] into one Line Protocol string.
   *
   * Measurement : gatling_user
   * Tags        : scenario, event
   * Fields      : user_id, start_ms, end_ms, duration_ms
   * Timestamp   : endMs converted to nanoseconds
   */
  def fromUser(e: UserEvent): String = {
    val tags = buildTags(
      "scenario" -> e.scenario,
      "event"    -> e.event
    )

    val durationMs = e.endMs - e.startMs

    val fields = buildFields(
      "user_id"     -> s"${e.userId}i",
      "start_ms"    -> s"${e.startMs}i",
      "end_ms"      -> s"${e.endMs}i",
      "duration_ms" -> s"${durationMs}i"
    )

    s"gatling_user,$tags $fields ${toNanos(e.endMs)}"
  }

  // ── Helpers ─────────────────────────────────────────────────────────────────

  /** Escape tag keys/values per Line Protocol spec (commas, spaces, equals). */
  private def escapeTag(s: String): String =
    s.replace(",", "\\,")
     .replace(" ", "\\ ")
     .replace("=", "\\=")

  private def buildTags(pairs: (String, String)*): String =
    pairs
      .filter { case (_, v) => v.nonEmpty }
      .map { case (k, v) => s"${escapeTag(k)}=${escapeTag(v)}" }
      .mkString(",")

  private def buildFields(pairs: (String, String)*): String =
    pairs.map { case (k, v) => s"$k=$v" }.mkString(",")

  /** Convert epoch milliseconds to epoch nanoseconds. */
  private def toNanos(ms: Long): Long = ms * 1_000_000L
}

InfluxWriter

package com.example.appender

import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.{ArrayBlockingQueue, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import org.slf4j.LoggerFactory

/**
 * Asynchronous, batching HTTP writer for InfluxDB Line Protocol.
 *
 * Lines are placed on an in-memory queue. A single background thread drains
 * the queue and POSTs batches to InfluxDB using the Java 11 HttpClient.
 *
 * Thread safety: [[enqueue]] may be called from any thread (including
 * Logback's own appender thread). The background flusher is the only thread
 * that calls [[flush]].
 *
 * @param influxUrl     Full write URL, e.g.
 *                      "http://localhost:8086/api/v2/write?org=myOrg&bucket=gatling&precision=ns"
 *                      or for v1: "http://localhost:8086/write?db=gatling&precision=ns"
 * @param authToken     InfluxDB 2.x token ("Token <token>") or empty for v1 with no auth.
 *                      For v1 basic auth pass "Basic <base64(user:pass)>".
 * @param batchSize     Maximum number of lines per HTTP POST.
 * @param flushIntervalMs How often (ms) to flush even if batchSize not reached.
 * @param connectTimeoutMs HTTP connect timeout in milliseconds.
 * @param writeTimeoutMs   HTTP write-response timeout in milliseconds.
 * @param queueCapacity  Max in-flight lines before enqueue blocks (back-pressure).
 * @param maxRetries     How many times to retry a failed POST (with 1s back-off).
 */
class InfluxWriter(
  influxUrl:       String,
  authToken:       String       = "",
  batchSize:       Int          = 500,
  flushIntervalMs: Long         = 1000L,
  connectTimeoutMs: Long        = 3000L,
  writeTimeoutMs:  Long         = 5000L,
  queueCapacity:   Int          = 10_000,
  maxRetries:      Int          = 3
) extends AutoCloseable {

  private val log = LoggerFactory.getLogger(classOf[InfluxWriter])

  private val http: HttpClient = HttpClient.newBuilder()
    .connectTimeout(Duration.ofMillis(connectTimeoutMs))
    .build()

  private val uri = URI.create(influxUrl)

  // Bounded queue — provides back-pressure if InfluxDB is slow
  private val queue = new ArrayBlockingQueue[String](queueCapacity)

  private val running = new AtomicBoolean(true)

  private val executor = Executors.newSingleThreadExecutor { r =>
    val t = new Thread(r, "influx-writer-flusher")
    t.setDaemon(true)
    t
  }

  executor.submit(new Runnable {
    override def run(): Unit = {
      val buf = new java.util.ArrayList[String](batchSize)
      while (running.get() || !queue.isEmpty) {
        // Drain up to batchSize items, waiting up to flushIntervalMs for the first
        val first = queue.poll(flushIntervalMs, TimeUnit.MILLISECONDS)
        if (first != null) {
          buf.add(first)
          queue.drainTo(buf, batchSize - 1)
          flush(buf)
          buf.clear()
        }
      }
    }
  })

  /** Enqueue a single Line Protocol string. Non-blocking if queue has space. */
  def enqueue(line: String): Unit =
    if (!queue.offer(line))
      log.warn("InfluxWriter queue full — dropping metric line")

  /** Force-flush any remaining lines and shut down the background thread. */
  override def close(): Unit = {
    running.set(false)
    executor.shutdown()
    executor.awaitTermination(10, TimeUnit.SECONDS)
    // Final flush of anything left
    val buf = new java.util.ArrayList[String]()
    queue.drainTo(buf)
    if (!buf.isEmpty) flush(buf)
  }

  // ── Internal ───────────────────────────────────────────────────────────────

  private def flush(lines: java.util.List[String]): Unit = {
    if (lines.isEmpty) return
    val body = lines.toArray.mkString("\n")
    postWithRetry(body, maxRetries)
  }

  private def postWithRetry(body: String, retriesLeft: Int): Unit = {
    try {
      val reqBuilder = HttpRequest.newBuilder()
        .uri(uri)
        .timeout(Duration.ofMillis(writeTimeoutMs))
        .header("Content-Type", "text/plain; charset=utf-8")
        .POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8))

      if (authToken.nonEmpty)
        reqBuilder.header("Authorization", authToken)

      val response = http.send(reqBuilder.build(), HttpResponse.BodyHandlers.ofString())

      if (response.statusCode() >= 300) {
        val msg = s"InfluxDB returned HTTP ${response.statusCode()}: ${response.body().take(200)}"
        if (retriesLeft > 0) {
          log.warn(s"$msg — retrying ($retriesLeft left)")
          Thread.sleep(1000L)
          postWithRetry(body, retriesLeft - 1)
        } else {
          log.error(s"$msg — giving up on batch of ${body.linesIterator.size} lines")
        }
      }

    } catch {
      case e: Exception =>
        if (retriesLeft > 0) {
          log.warn(s"InfluxDB write failed (${e.getMessage}) — retrying ($retriesLeft left)")
          Thread.sleep(1000L)
          postWithRetry(body, retriesLeft - 1)
        } else {
          log.error(s"InfluxDB write failed permanently: ${e.getMessage}")
        }
    }
  }
}

logback-example.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Example logback.xml for a Gatling simulation project.
  Place in src/test/resources/logback-test.xml  (or logback.xml).

  The GatlingInfluxAppender only processes TRACE lines; all other levels
  are passed through to CONSOLE unchanged.
-->
<configuration>

  <!-- ── Standard console appender ─────────────────────────────────────── -->
  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <!-- ── InfluxDB appender ─────────────────────────────────────────────── -->
  <appender name="INFLUX" class="com.example.appender.GatlingInfluxAppender">

    <!--
      InfluxDB 2.x write endpoint.
      For v1 use: http://localhost:8086/write?db=gatling&precision=ns
    -->
    <influxUrl>http://localhost:8086/api/v2/write?org=myOrg&bucket=gatling&precision=ns</influxUrl>

    <!--
      InfluxDB 2.x:  "Token <your-token>"
      InfluxDB 1.x no-auth: leave empty
      InfluxDB 1.x basic:   "Basic <base64(user:password)>"
    -->
    <authToken>Token replace-me-with-your-influx-token</authToken>

    <!-- Max lines per HTTP POST -->
    <batchSize>500</batchSize>

    <!-- Flush interval even if batchSize not reached (milliseconds) -->
    <flushIntervalMs>1000</flushIntervalMs>

    <!-- HTTP connect / response timeouts (milliseconds) -->
    <connectTimeoutMs>3000</connectTimeoutMs>
    <writeTimeoutMs>5000</writeTimeoutMs>

    <!-- In-memory queue capacity (lines). Back-pressures when full. -->
    <queueCapacity>10000</queueCapacity>

    <!-- Retry failed POSTs this many times (1 second between each) -->
    <maxRetries>3</maxRetries>

    <!-- Set to false to skip USER lifecycle events (START/END) -->
    <captureUserEvents>true</captureUserEvents>

  </appender>

  <!-- ── Gatling loggers ────────────────────────────────────────────────── -->

  <!--
    Route Gatling's own loggers to INFLUX only.
    additivity="false" prevents double-logging to the root logger.

    Gatling writes request data on "io.gatling.http.engine.response"
    and user events on "io.gatling.core.controller" — both are under
    "io.gatling" so this catches everything.
  -->
  <logger name="io.gatling" level="TRACE" additivity="false">
    <appender-ref ref="INFLUX"/>
  </logger>

  <!-- ── Root logger (everything else → console at INFO) ───────────────── -->
  <root level="INFO">
    <appender-ref ref="CONSOLE"/>
  </root>

</configuration>

AppenderSpec.scala

package com.example.appender

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class GatlingLogParserSpec extends AnyFlatSpec with Matchers {

  import GatlingLogParser._

  // ── Helpers ────────────────────────────────────────────────────────────────

  private def req(
    userId: String      = "42",
    scenario: String    = "MyScenario",
    groups: String      = "",
    requestName: String = "GET /api/health",
    startMs: String     = "1700000000000",
    endMs: String       = "1700000000250",
    status: String      = "OK",
    message: String     = ""
  ): String =
    s"REQUEST\t$userId\t$scenario\t$groups\t$requestName\t$startMs\t$endMs\t$status\t$message"

  private def user(
    scenario: String = "MyScenario",
    userId: String   = "1",
    event: String    = "START",
    startMs: String  = "1700000000000",
    endMs: String    = "1700000000000"
  ): String =
    s"USER\t$scenario\t$userId\t$event\t$startMs\t$endMs"

  // ── REQUEST parsing ────────────────────────────────────────────────────────

  "GatlingLogParser" should "parse a well-formed REQUEST line" in {
    val result = parse(req())
    result shouldBe a[RequestEvent]
    val e = result.asInstanceOf[RequestEvent]
    e.userId      shouldBe 42L
    e.scenario    shouldBe "MyScenario"
    e.requestName shouldBe "GET /api/health"
    e.startMs     shouldBe 1700000000000L
    e.endMs       shouldBe 1700000000250L
    e.responseTimeMs shouldBe 250L
    e.status      shouldBe "OK"
    e.message     shouldBe None
  }

  it should "capture KO message" in {
    val result = parse(req(status = "KO", message = "Request timeout"))
    result shouldBe a[RequestEvent]
    result.asInstanceOf[RequestEvent].message shouldBe Some("Request timeout")
  }

  it should "parse groups field" in {
    val result = parse(req(groups = "checkout.payment"))
    result.asInstanceOf[RequestEvent].groups shouldBe "checkout.payment"
  }

  it should "return UnknownEvent for too-short REQUEST line" in {
    parse("REQUEST\t1\t2") shouldBe UnknownEvent
  }

  it should "return UnknownEvent for non-numeric timestamps" in {
    parse(req(startMs = "abc", endMs = "xyz")) shouldBe UnknownEvent
  }

  // ── USER parsing ───────────────────────────────────────────────────────────

  it should "parse a well-formed USER START line" in {
    val result = parse(user())
    result shouldBe a[UserEvent]
    val e = result.asInstanceOf[UserEvent]
    e.scenario shouldBe "MyScenario"
    e.userId   shouldBe 1L
    e.event    shouldBe "START"
  }

  it should "parse a USER END line" in {
    val result = parse(user(event = "END", endMs = "1700000060000"))
    result.asInstanceOf[UserEvent].event shouldBe "END"
  }

  it should "return UnknownEvent for malformed USER line" in {
    parse("USER\tscenario\tnot-a-number\tSTART\t0\t0") shouldBe UnknownEvent
  }

  // ── Edge cases ─────────────────────────────────────────────────────────────

  it should "return UnknownEvent for unrecognised prefix" in {
    parse("DEBUG some other log line") shouldBe UnknownEvent
  }

  it should "return UnknownEvent for null" in {
    parse(null) shouldBe UnknownEvent
  }

  it should "return UnknownEvent for empty string" in {
    parse("") shouldBe UnknownEvent
  }
}

class InfluxLineProtocolSpec extends AnyFlatSpec with Matchers {

  import GatlingLogParser.{RequestEvent, UserEvent}

  "InfluxLineProtocol" should "produce correct measurement name for requests" in {
    val e = RequestEvent("S", 1L, "", "GET /", 1000L, 1200L, "OK", None)
    val line = InfluxLineProtocol.fromRequest(e)
    line should startWith("gatling_request,")
  }

  it should "include response_time_ms field" in {
    val e = RequestEvent("S", 1L, "", "GET /", 1000L, 1300L, "OK", None)
    val line = InfluxLineProtocol.fromRequest(e)
    line should include("response_time_ms=300i")
  }

  it should "escape spaces in tag values" in {
    val e = RequestEvent("My Scenario", 1L, "", "GET /users", 1000L, 1100L, "OK", None)
    val line = InfluxLineProtocol.fromRequest(e)
    line should include("scenario=My\\ Scenario")
  }

  it should "encode timestamp in nanoseconds" in {
    val endMs = 1700000000250L
    val e = RequestEvent("S", 1L, "", "R", 0L, endMs, "OK", None)
    val line = InfluxLineProtocol.fromRequest(e)
    line should endWith(s" ${endMs * 1_000_000L}")
  }

  it should "produce correct measurement name for user events" in {
    val e = UserEvent("S", 1L, "START", 1000L, 1000L)
    val line = InfluxLineProtocol.fromUser(e)
    line should startWith("gatling_user,")
    line should include("event=START")
  }
}

Architecture.MD

## Architecture

The implementation is split into four focused components:

### `GatlingLogParser` — pure parser
Gatling writes two distinct tab-delimited record types at TRACE level:
- **`REQUEST`** — one line per HTTP call: userId, scenario, groups, requestName, startMs, endMs, status, optional error message
- **`USER`** — session lifecycle (START/END) with timestamps

The parser returns a sealed ADT (`RequestEvent | UserEvent | UnknownEvent`) so pattern matching is exhaustive and no strings leak into downstream logic.

### `InfluxLineProtocol` — serialiser
Converts events to [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/). Two measurements are written:

| Measurement | Tags | Fields |
|---|---|---|
| `gatling_request` | scenario, request_name, groups, status | response_time_ms, start_ms, end_ms, user_id |
| `gatling_user` | scenario, event | user_id, start_ms, end_ms, duration_ms |

Timestamps are in nanoseconds (InfluxDB default). Tag values are properly escaped per spec.

### `InfluxWriter` — async batcher
Uses a bounded `ArrayBlockingQueue` and a single daemon thread to batch lines and POST them. Features: configurable batch size, flush interval, connect/write timeouts, back-pressure (drops with a warning if queue fills), and exponential retry with configurable attempts.

Uses Java 11's built-in `HttpClient` — no extra HTTP dependency needed.

### `GatlingInfluxAppender` — Logback wiring
Extends `AppenderBase[ILoggingEvent]`. All config is injected by Logback via setter reflection from XML properties. Filters to TRACE-only, delegates to the parser, and enqueues Line Protocol strings. Calls `writer.close()` on `stop()` to ensure a final flush when the JVM exits.

---

## Usage in your Gatling project

**1. Build the jar:**
```bash
sbt assembly   # produces target/scala-2.13/gatling-influx-appender-assembly-0.1.0.jar
```

**2. Drop the jar on Gatling's classpath** (e.g. `$GATLING_HOME/lib/` or as a dependency in your Gatling Maven/SBT project).

**3. Configure `logback-test.xml`** (see the included `logback-example.xml`). The key lines are:
```xml
<appender name="INFLUX" class="com.example.appender.GatlingInfluxAppender">
  <influxUrl>http://localhost:8086/api/v2/write?org=myOrg&bucket=gatling&precision=ns</influxUrl>
  <authToken>Token your-token-here</authToken>
</appender>

<logger name="io.gatling" level="TRACE" additivity="false">
  <appender-ref ref="INFLUX"/>
</logger>
```

**4. Grafana** — point a datasource at your InfluxDB bucket and query `gatling_request` with `response_time_ms` and `status` tags for instant performance dashboards.

Build.sbt

ThisBuild / scalaVersion := "2.13.12"
ThisBuild / organization := "com.example"
ThisBuild / version      := "0.1.0"

lazy val root = (project in file("."))
  .settings(
    name := "gatling-influx-appender",
    libraryDependencies ++= Seq(
      // Logback
      "ch.qos.logback" % "logback-classic" % "1.4.14",
      // HTTP client (Java 11 built-in HttpClient used — no extra dep needed)
      // Testing
      "org.scalatest" %% "scalatest" % "3.2.17" % Test,
      "org.mockito"   %% "mockito-scala" % "1.17.29" % Test
    ),
    // Assembly settings if you want a fat jar
    assembly / assemblyMergeStrategy := {
      case PathList("META-INF", _*) => MergeStrategy.discard
      case _                        => MergeStrategy.first
    }
  )