Itt jársz most: Kezdőlap > Alkalmazásfejlesztés > Alkalmazás log bejegyzések összegyűjtése automatizáltan

Szűrő megjelenítése

Alkalmazás log bejegyzések összegyűjtése automatizáltan

A korábbi megoldás message push jellegű volt, lényegében minden alkalmazás, amitől a logokat össze szerettem volna gyűjteni, egy-egy log appendert tartalmazott, ami a logjait továbbította a már korábban említett TLP nevű servicembe. Ennek hátránya nyilvánvalóan az volt, hogy az alkalmazásoknak egy olyan dologgal kellett foglalkozniuk, mely egyáltalán nem lett volna a feladatuk. Természetesen a megoldás eléggé nyilvánvaló, a logok begyűjtését egy pull mechanikán alapuló megoldásra kell átültetni, tehát a logok begyűjtését egy a logokat létrehozó alkalmazástól független, külső alkalmazás végzi. Ilyen megoldások pedig egyébként léteznek is a piacon, ám a legtöbbjük lényegesen nagyobb mennyiségű konfigurációt és rendszererőforrást igényel, mindamellett hogy a számomra szükséges szinttől lényegesen nagyobb funkcionalitással is rendelkeznek. Persze, nem akarok szépíteni a helyzeten, ha lett volna egy tökéletes "dobozos" megoldás, akkor is nekiálltam volna ennek a fejlesztésnek, mert így sokkal mókásabb. :)

A log collector

Na de mi is lett ez a fejlesztés? Az alkalmazás a Tiny Log Collector (továbbiakban TLC) nevet kapta és lényegében egy pici, TypeScriptben fejlesztett alkalmazás, mely Docker containerben fut és semmilyen módon sem lehet kívülről elérni. Ehelyett, olyan statikus forrásokon alapul a működése, amiket event-source jellegűen tud olvasni, tehát figyeli a forrás változásait, és arra reagálva végzi el a log bejegyzések feldolgozását, majd továbbítását a log processor felé. A feldolgozás négy lépésben folyik szigorú sorrendben, ezen lépések lánca képez egy "log pipeline"-t. A négy lépés a következő:

  1. A listener figyeli a forrást és reaktív módon feliratkozik annak változásaira.
  2. Ezután a parser feldolgozza a forrásból érkező adatcsomagokat. A parser felelőssége az is, hogy megállapítsa az adatcsomagok határait, egységekbe szervezze őket, ha szükséges.
  3. Ezt követi a mapper, mely valamilyen átalakítást végez a beérkező, már egységekbe szervezett adatcsomagokon. Az átalakítás igazából bármit takarhat, a lényeg, hogy a mapper által átalakított rekordok már közvetlenül felhasználhatóak az utolsó lépésben.
  4. Ami a publisher, a log bejegyzések továbbítását végzi a cél feldolgozó rendszerbe.

A fenti lépéssorozat lényegében egy ETL (extract-transform-load) folyamat, bár a transform lépés itt két lépésre van bontva. Ennek egyszerű oka van, így jobban modularizálhatóak a log pipeline-ok, a parser és a mapper szinte tetszőleges módon kombinálható. Például egy adott típusú adatforrást nem feltétlenül kell ugyanarra átalakítani, és ez fordítva is igaz, lehet két pipeline ugyanarra alakítja a nyers adatokat, de a nyers adatok különbözhetnek). Lehet így még nem teljesen egyértelmű mire gondolok, de mindjárt rátérünk a konkrét implementációra.

JSON logok begyűjtése Docker containerektől

A referencia implementáció (a saját rendszerem leggyakoribb use case-e miatt) Docker containerektől képes begyűjteni a JSON formátumú logokat, amiket a TLP által elvárt formátumra konvertál és végül továbbítja annak. Talán mielőtt továbbmennék, érdemes kifejtenem, miért JSON formátumú logokat dolgoz fel az alkalmazás: az ok egyszerű, a JSON egy strukturált adatformátum, a struktúrában egyértelmű pontokon helyezkednek el adott darabkái a log bejegyzéseknek. Így lényegesen könnyebb a feldolgozása, nem kell például regex kifejezésekkel vagy akár "nyersen" tördelni egy sort. Sőt, ami még fontosabb, egyértelmű egy-egy bejegyzés határa: strukturálatlan szöveges fájlok esetén a log processzorok általában az időbélyegzőt tekintik a határoknak, ám sajnos a tapasztalat azt mutatja, hogy ez nem egy alapbeállítás és sokszor nem vesződnek vele az üzemeltetők. (Ilyenkor történik meg az, hogy egyetlen Java stacktrace 70 önálló log bejegyzésként jelenik meg a log processzorban, aztán jó szórakozást a debugoláshoz.)

A folyamat tehát a logok forrására való feliratkozással kezdődik. Ez Docker esetén minden érintett container saját log streamjét jelenti: ismerős lehet a docker logs -f <containernév> parancs - ennek természetesen a Docker Engine API-jában van egy megfelelője, ami a CLI commandhoz hasonlóan egy nyitvatartott event streamen keresztül, valós időben követi a logok megjelenését a container stdout-ján. A streamre feliratkozás után egy RxJS Observable segítségével tudjuk az adatokat továbbítani feldolgozásra. Ez a kódban így néz ki (a teljes implementációt linkelem a cikk végén):

export default class DockerLogsApiListener implements Listener<Uint8Array> {
    // ...
    listen(): Observable<Uint8Array> {

        return new Observable(subscriber => {
            this.initListener()
                .then(logStream => logStream.data
                    .on("data", (line: any) => subscriber.next(line))
                    .on("end", () => subscriber.complete())
                )
                .catch(reason => {
                    log.error(`Failed to initialize listener for container: ${this.containerName}; reason=${reason}`);
                    subscriber.complete();
                });
        });
    }
    // ...
}

Tehát, először is megnyitjuk a streamet (az .initListener() mindössze egy Axios hívás a Docker Engine API-ra), majd feliratkozunk a data és end eventekre - előbbi a subscribernek küld egy értesítést, utóbbi bezárja a subscribert, illetve ugyanezt tesszük hiba esetén is.

A pipeline a nyers adatok parse-olásával folytatódik. Fontos megjegyezni, hogy a Docker Engine API streaming endpointjai minden esetben chunkokkal dolgoznak, és egy byte arrayt kapunk, amit utólag kell a számunkra megfelelő formátumra alakítani. Első lépésben a nyers byte array adatokkal kell kezdenünk valamit: a legegyszerűbb amit tehetünk és tennünk kell, az a stringgé alakítás.

export class ByteArrayParser implements Parser<Uint8Array, string> {
    private static readonly stdoutHeaderSizeInBytes = 8;
    // ...

    parse(inputData: Uint8Array): Observable<string> {

        return new Observable(subscriber => {

            try {
                const line = this.systemConfig.enableTrimmingStdoutHeader
                    ? this.parseLine(inputData.slice(ByteArrayParser.stdoutHeaderSizeInBytes))
                    : this.parseLine(inputData);
                subscriber.next(line);
            } catch (error) {
                log.warn("Log message could not be parsed as byte array");
            }
        });
    }

    private parseLine(inputData: Uint8Array): string {

        return Buffer
            .from(inputData)
            .toString("utf8");
    }
}

A fenti parser nem csinál nagyon bonyolult dolgokat, csupán fogja a beérkező byte array chunkot és stringgé konvertálja. Egy apróságot azonban fontos megemlíteni, amibe fejlesztésben közben szaladtam bele: Linux rendszeren az stdoutra érkező adatok tartalmaznak egy 8 byte hosszúságú headert, mely a mi szempontunkból szemét, és ezt le kell vágni a chunkok elejéről - erre szolgál az inputData.slice hívás. Még mindig a parse-olás részeként most már megpróbálhatjuk a string darabokat nyers JSON adatokká összeállítani. A probléma az, hogy egy chunk nem feltétlenül pontosan egy JSON logbejegyzés (tehát nem feltétlenül pontosan egy összefüggő JSON dokumentum). A következő lépésben ezért egy buffert fogunk alkalmazni, amiben addig gyűjtjük az adatokat, míg egy sorvégjelet nem találunk - szerencsére ez stabil és biztos indikátora annak, hogy egy bejegyzésnek vége. Amint megvan a sorvégjel, a buffer elemeit joinoljuk, majd jöhet a JSON.parse() hívás - végül pedig továbbítjuk az értéket a subscribernek. Itt egy fontos dologra hívnám fel a figyelmet: a subscriber.next() metódust ebben a lépésben már nem eventenként hívjuk meg, hanem összeállított JSON dokumentumonként. Ez továbbra is lehet 1:1 megfelelés (sőt, ez lesz a gyakoribb), de lesznek olyan esetek, ahol 2-3 event képez majd egy összefüggő JSON rekordot.

export default class JoiningJsonParser implements Parser<string, object> {

    private buffer: string[];

    constructor() {
        this.buffer = [];
    }

    parse(inputData: string): Observable<object> {

        return new Observable(subscriber => {

            this.buffer.push(inputData);
            if (inputData.endsWith(newLineCharacter)) {
                this.buffer.join("")
                    .trim()
                    .split(newLineCharacter)
                    .forEach(value => {
                        try {
                            subscriber.next(JSON.parse(value));
                        } catch (error) {
                            log.warn("Log message could not be parsed as JSON");
                        }
                    });
                this.buffer = [];
            }
        });
    }
}

A következő lépésben már egy JSON objektummal dolgozunk, konkrét modellezhető struktúrával, azonban ez a formátum (valószínűleg) nem felel meg a cél log processzor által elvárt formátumnak. Erre szolgál a mapping lépés. Az alábbi kód a Java alkalmazásaimtól érkező "Logstash-stílusú" logokat konvertálja TLP üzenetekké - az igényeknek megfelelően ebben a lépésben lesz a legnagyobb a fluktuáció, és tulajdonképpen bármilyen struktúrát át tudunk könnyedén alakítani bármilyen más struktúrává.

export default class LogstashToTLPMapper implements Mapper<LogstashDataStructure, TLPLogMessage> {
    // ...
    map(inputData: LogstashDataStructure): Optional<TLPLogMessage> {

        try {
            return {
                source: this.sourceStream,
                timeStamp: new Date(inputData["@timestamp"]).getTime(),
                level: {
                    levelStr: inputData.level
                },
                loggerName: inputData.logger_name,
                threadName: inputData.thread_name,
                content: inputData.message,
                // ...
            };
        } catch (error) {
            log.error(`Could not map input data; reason=${error}`);
            return null;
        }
    }
    // ...
}

A pipeline futtatása

A konfigurált log feldolgozó pipeline-okat egy Pipeline nevű osztály fogja össze, annak a start() metódusa indítja a feldolgozást a komponens indulásakor. Az RxJS subscription miatt a komponens nem áll le mindaddig, míg legalább egy aktív feliratkozás van, és amennyiben egy log stream megszakad, automatikusan újracsatlakozik (vagy legalábbis megpróbálja). Ezutóbbit a pipelineba bekötött finalize() RxJS operator intézi, ami a subscriber.complete() hívásokra fut le, és mivel az csak akkor fordul elő, ha leszakad a log stream, a benne levő, disconnection nevű RxJS Subject-re a stream újraindítása van kötve.

export default class Pipeline {
    // ...
    start(): void {

        let listenerObservable = this.listener.listen()
            .pipe(finalize(() => setTimeout(
                () => this.disconnection.next(this.logStreamName),
                this.systemConfig.reconnectionPollRate)
            ));
        this.parsers
            .map(parser => mergeMap(value => parser.parse(value)))
            .forEach(parserMergeMap => listenerObservable = listenerObservable.pipe(parserMergeMap));

        listenerObservable
            .pipe(map(data => this.mapper.map(data)))
            .pipe(filter(data => data !== null))
            .subscribe(value => this.publishers
                .forEach(publisher => publisher.publish(value)));
    }
}

Egy-egy pipeline elindítását pedig a Controller végzi, mely meghívja minden konfigurált pipeline start() metódusát, illetve létrehozza és átadja nekik az újracsatlakoztatásra szolgáló RxJS Subject-et:

export class Controller {
    // ...
    init(): void {

        const pipelines: Pipeline[] = this.configurationProvider.pipelines
            .filter(pipelineConfig => pipelineConfig.enabled)
            .map(pipelineConfig => this.pipelineFactory.createPipeline(pipelineConfig, this.disconnectionSubject))

        this.attachDisconnectionSubject(pipelines);
        this.startPipelines(pipelines);
    }

    private attachDisconnectionSubject(pipelines: Pipeline[]): void {

        this.disconnectionSubject.subscribe(logStreamName => {
            log.warn(`Trying to reconnect pipeline [${logStreamName}]`)
            pipelines.find(pipeline => pipeline.logStreamName === logStreamName)?.start()
        });
    }

    private startPipelines(pipelines: Pipeline[]): void {
        pipelines.forEach(pipeline => pipeline.start())
    }
}

A pipeline innentől kezdve önállóan fut, és a feldolgozott log bejegyzéseket továbbítja további használatra a TLP-nek.

Konfigurálás és bővíthetőség

Egy pipeline konfigurálása az alkalmazás .yml konfigurációs file-ából lehetséges, egy a fentihez hasonló pipelinet például az alábbi módon lehet beállítani:

tlc:
  # ...
  pipelines:
    - log-stream-name: app1
      listener-type: docker
      listener-config:
        container-name: container-app1
      parsers:
        - byte-array
        - joining-json
      mapper-type: logstash-to-tlp
      publishers:
        - console
        - tlp

A fenti konfiguráció beállítja az "app1" nevű log streamet (ez csak azonosításra szolgál), feliratkoztatja a "container-app1" nevű Docker container log streamjére a pipelinet, amin byte-array és a fentebb ismertetett JSON-chunk parse-olást végez, majd Logstash formátumból konvertálja TLP üzenetekké a bejegyzéseket, amiket aztán a TLP-nek és a console-ra továbbít.

Bővíthetőség szempontjából viszonylag rugalmas az alkalmazás, a lépésekhez tartozó interfészek implementációjára van csak szükség, illetve egy-egy bejegyzésre a tlc.factory package factory implementációiban (ezutóbbiak képesek a konfiguráció alapján előállítani a megfelelő komponenst a megfelelő módon). Egyelőre csak a fentebb már ismertetett komponensek léteznek, bár egy kollégám jóvoltából (shoutout to tomfol.io, ő is jó dolgokat csinál, tessenek szétnézni a GitHub-ján) már készül egy listener JSON-formátumú log file-ok olvasására is.

Tiny Log Collector teljes implementáció

Kommentek

Komment írásához jelentkezz be
Bejelentkezés

Még senki nem szólt hozzá ehhez a bejegyzéshez.