Log parsing pipeline and functions

Here is an illustration of a typical log parsing pipeline in Kloudfuse stack:

Log parsing pipeline

Kloudfuse ingests log lines through the ingester service, which writes the incoming log payloads to a Kafka topic. Each payload is then read by the LogParser service, and unmarshalled (unwrapped) into a JSON string. This JSON string is the input into the log parsing pipeline.

  • Unwrap/unmarshall converts the incoming LogEventPayload into a JSON string, to prepare it for the Remap stage and subsequent processing.

  • Δ indicates that the fields changed at the previous stage of the pipeline.

  • Wrap/marshall converts the processed log into a protoBuf load.

Stage 1 Remap

Remap is the first stage in the log parsing pipeline. It maps fields from the incoming payload to a field in the internal representation of the log event. If the log payload is in msgpack format or proto format, ParseBuilder converts them into a JSON format before beginning a remap.

We support logs ingestion from many agents into various formats (JSON, msgpack, and proto).

Remap extracts the following fields from the log payload:

  • Log message

  • Timestamp

  • Log source

  • Labels and Tags

  • Log Facets

This stage is fully configurable; you can instruct how to map fields from the incoming log payload to a field in ParseBuilder.

The code snippet in the Sample remap function is an exhaustive list, and covers all possible arguments of the remap function. Most of these fields ship with a good set of defaults, so you typically don’t have to define all arguments and override them, unless you plan to deviate from default configuration. Note that all the fields in the Remap function must be specified in JSONPath notation.

Sample remap function
- remap:
    args:
      kf_source:
        - "$.logSource"
      kf_msg:
        - "$.logMessage"
      kf_cloud_cluster_name:
        - "$.cloudClusterName"
      kf_cloud_az:
        - "$.cloudAz"
      kf_cloud_instance:
        - "$.cloudInstance"
      kf_cloud_account_id:
        - "$.cloudAccount"
      kf_cloud_project:
        - "$.cloudProject"
      kf_k8s_namespace:
        - "$.k8sNamespace"
      kf_k8s_container_name:
        - "$.k8sContainerName"
      kf_k8s_hostname:
        - "$.k8sHostname"
      kf_k8s_docker_image:
        - "$.k8sDockerImage"
      kf_k8s_service:
        - "$.k8sService"
      kf_k8s_pod_name:
        - "$.k8sPodName"
      kf_timestamp:
        - "$.myTimestamp"
      kf_additional_tags:
        - "$.myTag"
    conditions:
      - matcher: "__kf_agent"
        value: "fluent-bit"
        op: "=="
yaml

The Fluent-bit log agent includes the log line in a field called log in the payload. Example Modify field in the remap function demonstrates how to use the remap function to modify the filter to change the field name from log to logLine.

Modify field in the remap function
- remap:
    args:
      kf_source:
        - "$.logLine"
    conditions:
      - matcher: "__kf_agent"
        value: "fluent-bit"
        op: "=="
yaml
  • __kf_agent is a special reserved matcher value, and supports these agent values: datadog, fluent-bit, fluentd, kinesis, gcp, otlp, and filebeat.

Stage 2 Relabel

The Relabeling stage of the log parsing pipeline operates on labels and tags extracted from the previous stage, Remap.

In this stage you can make the following changes: - Add, Drop, or Replace a label. - Create a new label by combining various label value(s). - Keep or Drop a log event that matches label value(s).

Relabel stage follows the same syntax and semantics as Prometheus relabeling; see <relabel_config> Prometheus documentation.

Here are a few sample config for relabel function:

Drop labels where values don’t match kata@webserver.* regex
- relabel:
    args:
      - action: "keep"
      - sourceLabels: "subsystem,server"
      - regex: "kata@webserver.*"
      - separator: "@"
yaml
Add a label env with value "production"
- relabel:
    args:
      - action: "replace"
      - replacement: "production"
      - targetLabel: "env"
yaml
Extract values from labels meta_kubernetes_pod_name and meta_kubernetes_pod_port, concatenate, and use as value for label "address"
- relabel:
    args:
      - action: "replace"
      - sourceLabels: "__meta_kubernetes_pod_name,__meta_kubernetes_pod_port"
      - separator: ":"
      - targetLabel: "address"
yaml
Create labels by copying values of label names that match regex _meta_kubernetes(.*), new label names start with "k8s_", rest is based on regex capture
- relabel:
    args:
      - action: "label_map"
      - regex: "__meta_kubernetes_(.*)"
      - replacement: "k8s_$1"
yaml

Stage 3 PreParse

PreParse stage is internal to the Kloudfuse stack; it is not configurable. PreParse truncates log messages that exceed 64KB.

Stage 4 Grammar

Kloudfuse automatically detects log facets and timestamp from the input log line. This is a heuristic-based approach, and the extracted facets may not be accurate all the time. Users can therefore define custom grammars to facilitate log facet extraction. Kloudfuse applies any user-defined grammar in the config in this stage of the pipeline.

Kloudfuse has two approaches for defining grammar: dissect patterns and grok patterns.

Dissect patterns

The dissect patterns approach, also known as a dissection, uses a simple text-based tokenizer as a set of fields and delimiters that describe the textual format. The set pattern is composed of sections %{a}, %{b}, and %{c}.

field

The text from %{ to }, inclusive.

delimiter

The text between } and the next %{ pattern. Any set of characters other than %{, 'not }', or } is a delimiter.

key

The text between the %{ and }, exclusive of the ?, +, & prefixes, and the ordinal suffix.

Example Key

%{?aaa}

aaa

%{+bbb/3}

bbb

%{&ccc}

ccc

If you use the . (dot) as a key, it generates fields that has . in the field name.

To generate nested fields, use the brackets notation:

%{[fieldname][subfieldname]}

Test the patterns with Dissect debugger before importing them into the logs parser configuration.

Grok patterns

Grok patterns are based on regexes (named regexes). See the Elastic documentation on Grok filter plugin or IBM’s documentation on [General Grok Patterns,role=external,window=_blank] for more information about grok patterns.

Sample grammar config with dissect and grok patterns
  kf_parsing_config:
    config: |-
      - parser:
          dissect:
            args:
              - tokenizer: '%{timestamp} %{level} [LLRealtimeSegmentDataManager_%{segment_name}]'
            conditions:
              - matcher: "%kf_msg"
                value: "LLRealtimeSegmentDataManager_"
                op: "contains"
      - parser:
          grok:
            args:
              patterns:
                - (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx_ingress_controller_remote_ip_list}|%{NOTSPACE:source_address})
                - (-|%{DATA:user_name}) \[%{HTTPDATE:nginx_ingress_controller_time}\] "%{DATA:nginx_ingress_controller_info}"
            %{NUMBER:http_response_status_code:long} %{NUMBER:http_response_body_bytes:long}
            "(-|%{DATA:http_request_referrer})" "(-|%{DATA:user_agent_original})" %{NUMBER:nginx_ingress_controller_http_request_length:long}
            %{NUMBER:nginx_ingress_controller_http_request_time:double} \[%{DATA:nginx_ingress_controller_upstream_name}\]
            \[%{DATA:nginx_ingress_controller_upstream_alternative_name}\] (%{UPSTREAM_ADDRESS_LIST:nginx_ingress_controller_upstream_address_list}|-)
            (%{UPSTREAM_RESPONSE_LENGTH_LIST:nginx_ingress_controller_upstream_response_length_list}|-) (%{UPSTREAM_RESPONSE_TIME_LIST:nginx_ingress_controller_upstream_response_time_list}|-)
            (%{UPSTREAM_RESPONSE_STATUS_CODE_LIST:nginx_ingress_controller_upstream_response_status_code_list}|-) %{GREEDYDATA:nginx_ingress_controller_http_request_id}
            conditions:
              - matcher: "#source"
                op: "=="
                value: "controller"
yaml

Test with a Grok debugger before importing grok patterns into your configuration.

Stage 5 KfParse

The KFParse stage of the log parsing pipeline automatically detects facets and generates fingerprints. It is also internal, and not configurable.

Stage 6 Transform

Transform is the last function of the pipeline, where you can derive labels based on extracted facets from the previous stages, Stage 4 Grammar and Stage 5 KfParse. The Transform syntax is similar to relabel function.

Example shows how you can add value from a log facet eventSource as a label value, where label name is source.

Add value eventSource to label source
- transform:
    args:
      - action: "facet_to_label_map"
      - sourceLabels: "@eventSource"
      - targetLabel: "source"
    conditions:
      - matcher: "#source"
        op: "=="
        value: "awsLogSource"
yaml

Write Kafka topic

As the log line completes its journey through the entire log parsing pipeline, the ParseBuilder extracts all facets and labels. Kloudfuse then wraps (marshals) all fields into a protoBuf object, and writes it to a Kafka topic, which Pinot subsequently consumes.