Keresés tartalomra
Kategóriák
Címkék
- Java
- Spring
- Python
- IoC
- Android
- DI
- Dagger
- Thymeleaf
- Markdown
- JDK11
- AOP
- Aspect
- Captcha
- I18n
- JavaSpark
- Microframework
- Testing
- JUnit
- Security
- JWT
- REST
- Database
- JPA
- Gépház
- WebFlux
- ReactiveProgramming
- Microservices
- Continuous Integration
- CircleCI
- Deployment Pipeline
- Docker
- Mocking
- LogProcessing
- PlantUML
- UML
- Modellezés
- OAuth2
- Node.js
- DevOps
- Websocket
Alkalmazás log bejegyzések összegyűjtése automatizáltan
A korábbi megoldás message push jellegű volt, lényegében minden alkalmazás, amitől a logokat össze szerettem volna gyűjteni, egy-egy log appendert tartalmazott, ami a logjait továbbította a már korábban említett TLP nevű servicembe. Ennek hátránya nyilvánvalóan az volt, hogy az alkalmazásoknak egy olyan dologgal kellett foglalkozniuk, mely egyáltalán nem lett volna a feladatuk. Természetesen a megoldás eléggé nyilvánvaló, a logok begyűjtését egy pull mechanikán alapuló megoldásra kell átültetni, tehát a logok begyűjtését egy a logokat létrehozó alkalmazástól független, külső alkalmazás végzi. Ilyen megoldások pedig egyébként léteznek is a piacon, ám a legtöbbjük lényegesen nagyobb mennyiségű konfigurációt és rendszererőforrást igényel, mindamellett hogy a számomra szükséges szinttől lényegesen nagyobb funkcionalitással is rendelkeznek. Persze, nem akarok szépíteni a helyzeten, ha lett volna egy tökéletes "dobozos" megoldás, akkor is nekiálltam volna ennek a fejlesztésnek, mert így sokkal mókásabb. :)
A log collector
Na de mi is lett ez a fejlesztés? Az alkalmazás a Tiny Log Collector (továbbiakban TLC) nevet kapta és lényegében egy pici, TypeScriptben fejlesztett alkalmazás, mely Docker containerben fut és semmilyen módon sem lehet kívülről elérni. Ehelyett, olyan statikus forrásokon alapul a működése, amiket event-source jellegűen tud olvasni, tehát figyeli a forrás változásait, és arra reagálva végzi el a log bejegyzések feldolgozását, majd továbbítását a log processor felé. A feldolgozás négy lépésben folyik szigorú sorrendben, ezen lépések lánca képez egy "log pipeline"-t. A négy lépés a következő:
- A
listener
figyeli a forrást és reaktív módon feliratkozik annak változásaira. - Ezután a
parser
feldolgozza a forrásból érkező adatcsomagokat. A parser felelőssége az is, hogy megállapítsa az adatcsomagok határait, egységekbe szervezze őket, ha szükséges. - Ezt követi a
mapper
, mely valamilyen átalakítást végez a beérkező, már egységekbe szervezett adatcsomagokon. Az átalakítás igazából bármit takarhat, a lényeg, hogy a mapper által átalakított rekordok már közvetlenül felhasználhatóak az utolsó lépésben. - Ami a
publisher
, a log bejegyzések továbbítását végzi a cél feldolgozó rendszerbe.
A fenti lépéssorozat lényegében egy ETL (extract-transform-load) folyamat, bár a transform lépés itt két lépésre van bontva. Ennek egyszerű oka van, így jobban modularizálhatóak a log pipeline-ok, a parser és a mapper szinte tetszőleges módon kombinálható. Például egy adott típusú adatforrást nem feltétlenül kell ugyanarra átalakítani, és ez fordítva is igaz, lehet két pipeline ugyanarra alakítja a nyers adatokat, de a nyers adatok különbözhetnek). Lehet így még nem teljesen egyértelmű mire gondolok, de mindjárt rátérünk a konkrét implementációra.
JSON logok begyűjtése Docker containerektől
A referencia implementáció (a saját rendszerem leggyakoribb use case-e miatt) Docker containerektől képes begyűjteni a JSON formátumú logokat, amiket a TLP által elvárt formátumra konvertál és végül továbbítja annak. Talán mielőtt továbbmennék, érdemes kifejtenem, miért JSON formátumú logokat dolgoz fel az alkalmazás: az ok egyszerű, a JSON egy strukturált adatformátum, a struktúrában egyértelmű pontokon helyezkednek el adott darabkái a log bejegyzéseknek. Így lényegesen könnyebb a feldolgozása, nem kell például regex kifejezésekkel vagy akár "nyersen" tördelni egy sort. Sőt, ami még fontosabb, egyértelmű egy-egy bejegyzés határa: strukturálatlan szöveges fájlok esetén a log processzorok általában az időbélyegzőt tekintik a határoknak, ám sajnos a tapasztalat azt mutatja, hogy ez nem egy alapbeállítás és sokszor nem vesződnek vele az üzemeltetők. (Ilyenkor történik meg az, hogy egyetlen Java stacktrace 70 önálló log bejegyzésként jelenik meg a log processzorban, aztán jó szórakozást a debugoláshoz.)
A folyamat tehát a logok forrására való feliratkozással kezdődik. Ez Docker esetén minden érintett container saját log streamjét jelenti: ismerős lehet a docker logs -f <containernév>
parancs - ennek természetesen a Docker Engine API-jában van egy megfelelője, ami a CLI commandhoz hasonlóan egy nyitvatartott event streamen keresztül, valós időben követi a logok megjelenését a container stdout-ján. A streamre feliratkozás után egy RxJS Observable segítségével tudjuk az adatokat továbbítani feldolgozásra. Ez a kódban így néz ki (a teljes implementációt linkelem a cikk végén):
export default class DockerLogsApiListener implements Listener<Uint8Array> {
// ...
listen(): Observable<Uint8Array> {
return new Observable(subscriber => {
this.initListener()
.then(logStream => logStream.data
.on("data", (line: any) => subscriber.next(line))
.on("end", () => subscriber.complete())
)
.catch(reason => {
log.error(`Failed to initialize listener for container: ${this.containerName}; reason=${reason}`);
subscriber.complete();
});
});
}
// ...
}
Tehát, először is megnyitjuk a streamet (az .initListener()
mindössze egy Axios hívás a Docker Engine API-ra), majd feliratkozunk a data
és end
eventekre - előbbi a subscribernek küld egy értesítést, utóbbi bezárja a subscribert, illetve ugyanezt tesszük hiba esetén is.
A pipeline a nyers adatok parse-olásával folytatódik. Fontos megjegyezni, hogy a Docker Engine API streaming endpointjai minden esetben chunkokkal dolgoznak, és egy byte arrayt kapunk, amit utólag kell a számunkra megfelelő formátumra alakítani. Első lépésben a nyers byte array adatokkal kell kezdenünk valamit: a legegyszerűbb amit tehetünk és tennünk kell, az a stringgé alakítás.
export class ByteArrayParser implements Parser<Uint8Array, string> {
private static readonly stdoutHeaderSizeInBytes = 8;
// ...
parse(inputData: Uint8Array): Observable<string> {
return new Observable(subscriber => {
try {
const line = this.systemConfig.enableTrimmingStdoutHeader
? this.parseLine(inputData.slice(ByteArrayParser.stdoutHeaderSizeInBytes))
: this.parseLine(inputData);
subscriber.next(line);
} catch (error) {
log.warn("Log message could not be parsed as byte array");
}
});
}
private parseLine(inputData: Uint8Array): string {
return Buffer
.from(inputData)
.toString("utf8");
}
}
A fenti parser nem csinál nagyon bonyolult dolgokat, csupán fogja a beérkező byte array chunkot és stringgé konvertálja. Egy apróságot azonban fontos megemlíteni, amibe fejlesztésben közben szaladtam bele: Linux rendszeren az stdoutra érkező adatok tartalmaznak egy 8 byte hosszúságú headert, mely a mi szempontunkból szemét, és ezt le kell vágni a chunkok elejéről - erre szolgál az inputData.slice
hívás. Még mindig a parse-olás részeként most már megpróbálhatjuk a string darabokat nyers JSON adatokká összeállítani. A probléma az, hogy egy chunk nem feltétlenül pontosan egy JSON logbejegyzés (tehát nem feltétlenül pontosan egy összefüggő JSON dokumentum). A következő lépésben ezért egy buffert fogunk alkalmazni, amiben addig gyűjtjük az adatokat, míg egy sorvégjelet nem találunk - szerencsére ez stabil és biztos indikátora annak, hogy egy bejegyzésnek vége. Amint megvan a sorvégjel, a buffer elemeit joinoljuk, majd jöhet a JSON.parse()
hívás - végül pedig továbbítjuk az értéket a subscribernek. Itt egy fontos dologra hívnám fel a figyelmet: a subscriber.next()
metódust ebben a lépésben már nem eventenként hívjuk meg, hanem összeállított JSON dokumentumonként. Ez továbbra is lehet 1:1 megfelelés (sőt, ez lesz a gyakoribb), de lesznek olyan esetek, ahol 2-3 event képez majd egy összefüggő JSON rekordot.
export default class JoiningJsonParser implements Parser<string, object> {
private buffer: string[];
constructor() {
this.buffer = [];
}
parse(inputData: string): Observable<object> {
return new Observable(subscriber => {
this.buffer.push(inputData);
if (inputData.endsWith(newLineCharacter)) {
this.buffer.join("")
.trim()
.split(newLineCharacter)
.forEach(value => {
try {
subscriber.next(JSON.parse(value));
} catch (error) {
log.warn("Log message could not be parsed as JSON");
}
});
this.buffer = [];
}
});
}
}
A következő lépésben már egy JSON objektummal dolgozunk, konkrét modellezhető struktúrával, azonban ez a formátum (valószínűleg) nem felel meg a cél log processzor által elvárt formátumnak. Erre szolgál a mapping lépés. Az alábbi kód a Java alkalmazásaimtól érkező "Logstash-stílusú" logokat konvertálja TLP üzenetekké - az igényeknek megfelelően ebben a lépésben lesz a legnagyobb a fluktuáció, és tulajdonképpen bármilyen struktúrát át tudunk könnyedén alakítani bármilyen más struktúrává.
export default class LogstashToTLPMapper implements Mapper<LogstashDataStructure, TLPLogMessage> {
// ...
map(inputData: LogstashDataStructure): Optional<TLPLogMessage> {
try {
return {
source: this.sourceStream,
timeStamp: new Date(inputData["@timestamp"]).getTime(),
level: {
levelStr: inputData.level
},
loggerName: inputData.logger_name,
threadName: inputData.thread_name,
content: inputData.message,
// ...
};
} catch (error) {
log.error(`Could not map input data; reason=${error}`);
return null;
}
}
// ...
}
A pipeline futtatása
A konfigurált log feldolgozó pipeline-okat egy Pipeline nevű osztály fogja össze, annak a start()
metódusa indítja a feldolgozást a komponens indulásakor. Az RxJS subscription miatt a komponens nem áll le mindaddig, míg legalább egy aktív feliratkozás van, és amennyiben egy log stream megszakad, automatikusan újracsatlakozik (vagy legalábbis megpróbálja). Ezutóbbit a pipelineba bekötött finalize()
RxJS operator intézi, ami a subscriber.complete()
hívásokra fut le, és mivel az csak akkor fordul elő, ha leszakad a log stream, a benne levő, disconnection
nevű RxJS Subject
-re a stream újraindítása van kötve.
export default class Pipeline {
// ...
start(): void {
let listenerObservable = this.listener.listen()
.pipe(finalize(() => setTimeout(
() => this.disconnection.next(this.logStreamName),
this.systemConfig.reconnectionPollRate)
));
this.parsers
.map(parser => mergeMap(value => parser.parse(value)))
.forEach(parserMergeMap => listenerObservable = listenerObservable.pipe(parserMergeMap));
listenerObservable
.pipe(map(data => this.mapper.map(data)))
.pipe(filter(data => data !== null))
.subscribe(value => this.publishers
.forEach(publisher => publisher.publish(value)));
}
}
Egy-egy pipeline elindítását pedig a Controller
végzi, mely meghívja minden konfigurált pipeline start()
metódusát, illetve létrehozza és átadja nekik az újracsatlakoztatásra szolgáló RxJS Subject
-et:
export class Controller {
// ...
init(): void {
const pipelines: Pipeline[] = this.configurationProvider.pipelines
.filter(pipelineConfig => pipelineConfig.enabled)
.map(pipelineConfig => this.pipelineFactory.createPipeline(pipelineConfig, this.disconnectionSubject))
this.attachDisconnectionSubject(pipelines);
this.startPipelines(pipelines);
}
private attachDisconnectionSubject(pipelines: Pipeline[]): void {
this.disconnectionSubject.subscribe(logStreamName => {
log.warn(`Trying to reconnect pipeline [${logStreamName}]`)
pipelines.find(pipeline => pipeline.logStreamName === logStreamName)?.start()
});
}
private startPipelines(pipelines: Pipeline[]): void {
pipelines.forEach(pipeline => pipeline.start())
}
}
A pipeline innentől kezdve önállóan fut, és a feldolgozott log bejegyzéseket továbbítja további használatra a TLP-nek.
Konfigurálás és bővíthetőség
Egy pipeline konfigurálása az alkalmazás .yml konfigurációs file-ából lehetséges, egy a fentihez hasonló pipelinet például az alábbi módon lehet beállítani:
tlc:
# ...
pipelines:
- log-stream-name: app1
listener-type: docker
listener-config:
container-name: container-app1
parsers:
- byte-array
- joining-json
mapper-type: logstash-to-tlp
publishers:
- console
- tlp
A fenti konfiguráció beállítja az "app1" nevű log streamet (ez csak azonosításra szolgál), feliratkoztatja a "container-app1" nevű Docker container log streamjére a pipelinet, amin byte-array és a fentebb ismertetett JSON-chunk parse-olást végez, majd Logstash formátumból konvertálja TLP üzenetekké a bejegyzéseket, amiket aztán a TLP-nek és a console-ra továbbít.
Bővíthetőség szempontjából viszonylag rugalmas az alkalmazás, a lépésekhez tartozó interfészek implementációjára van csak szükség, illetve egy-egy bejegyzésre a tlc.factory
package factory implementációiban (ezutóbbiak képesek a konfiguráció alapján előállítani a megfelelő komponenst a megfelelő módon). Egyelőre csak a fentebb már ismertetett komponensek léteznek, bár egy kollégám jóvoltából (shoutout to tomfol.io, ő is jó dolgokat csinál, tessenek szétnézni a GitHub-ján) már készül egy listener JSON-formátumú log file-ok olvasására is.
Tiny Log Collector teljes implementáció
Komment írásához jelentkezz be
Bejelentkezés
Még senki nem szólt hozzá ehhez a bejegyzéshez.