Log parsing pipeline and functions
Here is an illustration of a typical log parsing pipeline in Kloudfuse stack:
Kloudfuse ingests log lines through the ingester service, which writes the incoming log payloads to a Kafka topic. Each payload is then read by the LogParser service, and unmarshalled (unwrapped) into a JSON string. This JSON string is the input into the log parsing pipeline.
-
Unwrap/unmarshall converts the incoming LogEventPayload into a JSON string, to prepare it for the Remap stage and subsequent processing.
-
Δ indicates that the fields changed at the previous stage of the pipeline.
-
Wrap/marshall converts the processed log into a protoBuf load.
Remap
Remap is the first stage in the log parsing pipeline. It maps fields from the incoming payload to a field in the internal representation of the log event. If the log payload is in msgpack format or proto format, ParseBuilder converts them into a JSON format before beginning a remap.
We support logs ingestion from many agents into various formats (JSON, msgpack, and proto).
Remap extracts the following fields from the log payload:
-
Log message
-
Timestamp
-
Log source
-
Labels and Tags
-
Log Facets
This stage is fully configurable; you can instruct how to map fields from the incoming log payload to a field in ParseBuilder.
The code snippet in the Sample remap function is an exhaustive list, and covers all possible arguments of the remap function. Most of these fields ship with a good set of defaults, so you typically don’t have to define all arguments and override them, unless you plan to deviate from default configuration. Note that all the fields in the Remap function must be specified in JSONPath notation.
- remap:
args:
kf_source:
- "$.logSource"
kf_msg:
- "$.logMessage"
kf_cloud_cluster_name:
- "$.cloudClusterName"
kf_cloud_az:
- "$.cloudAz"
kf_cloud_instance:
- "$.cloudInstance"
kf_cloud_account_id:
- "$.cloudAccount"
kf_cloud_project:
- "$.cloudProject"
kf_k8s_namespace:
- "$.k8sNamespace"
kf_k8s_container_name:
- "$.k8sContainerName"
kf_k8s_hostname:
- "$.k8sHostname"
kf_k8s_docker_image:
- "$.k8sDockerImage"
kf_k8s_service:
- "$.k8sService"
kf_k8s_pod_name:
- "$.k8sPodName"
kf_timestamp:
- "$.myTimestamp"
kf_additional_tags:
- "$.myTag"
conditions:
- matcher: "__kf_agent"
value: "fluent-bit"
op: "=="
The Fluent-bit log agent includes the log line in a field called log
in the payload. Example
Modify field in the remap function demonstrates how to use the remap function to modify the filter to change the field name from log
to logLine
.
- remap:
args:
kf_source:
- "$.logLine"
conditions:
- matcher: "__kf_agent"
value: "fluent-bit"
op: "=="
-
__kf_agent
is a special reserved matcher value, and supports these agent values:datadog
,fluent-bit
,fluentd
,kinesis
,gcp
,otlp
, andfilebeat
.
Relabel
The Relabeling stage of the log parsing pipeline operates on labels and tags extracted from the previous stage, Remap.
In this stage you can make the following changes: - Add, Drop, or Replace a label. - Create a new label by combining various label value(s). - Keep or Drop a log event that matches label value(s).
Relabel stage follows the same syntax and semantics as Prometheus relabeling; see <relabel_config> Prometheus documentation.
Here are a few sample config for relabel function:
- relabel:
args:
- action: "keep"
- sourceLabels: "subsystem,server"
- regex: "kata@webserver.*"
- separator: "@"
- relabel:
args:
- action: "replace"
- replacement: "production"
- targetLabel: "env"
- relabel:
args:
- action: "replace"
- sourceLabels: "__meta_kubernetes_pod_name,__meta_kubernetes_pod_port"
- separator: ":"
- targetLabel: "address"
- relabel:
args:
- action: "label_map"
- regex: "__meta_kubernetes_(.*)"
- replacement: "k8s_$1"
PreParse
PreParse stage is internal to the Kloudfuse stack; it is not configurable. PreParse truncates log messages that exceed 64KB.
Grammar
Kloudfuse automatically detects log facets and timestamp from the input log line. This is a heuristic-based approach, and the extracted facets may not be accurate all the time. Users can therefore define custom grammars to facilitate log facet extraction. Kloudfuse applies any user-defined grammar in the config in this stage of the pipeline.
Kloudfuse has two approaches for defining grammar: dissect patterns and grok patterns.
Dissect patterns
The dissect patterns approach, also known as a dissection, uses a simple text-based tokenizer as a set of fields and delimiters that describe the textual format. The set pattern is composed of sections %{a}
, %{b}
, and %{c}
.
- field
-
The text from
%{
to}
, inclusive.
- delimiter
-
The text between
}
and the next%{
pattern. Any set of characters other than%{
,'not }'
, or}
is a delimiter.
- key
-
The text between the
%{
and}
, exclusive of the?
,+
,&
prefixes, and the ordinal suffix.Example Key %{?aaa}
aaa
%{+bbb/3}
bbb
%{&ccc}
ccc
If you use the To generate nested fields, use the brackets notation:
|
Test the patterns with Dissect debugger before importing them into the logs parser configuration.
Grok patterns
Grok patterns are based on regexes (named regexes). See the Elastic documentation on Grok filter plugin or IBM’s documentation on [General Grok Patterns,role=external,window=_blank] for more information about grok patterns.
kf_parsing_config:
config: |-
- parser:
dissect:
args:
- tokenizer: '%{timestamp} %{level} [LLRealtimeSegmentDataManager_%{segment_name}]'
conditions:
- matcher: "%kf_msg"
value: "LLRealtimeSegmentDataManager_"
op: "contains"
- parser:
grok:
args:
patterns:
- (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx_ingress_controller_remote_ip_list}|%{NOTSPACE:source_address})
- (-|%{DATA:user_name}) \[%{HTTPDATE:nginx_ingress_controller_time}\] "%{DATA:nginx_ingress_controller_info}"
%{NUMBER:http_response_status_code:long} %{NUMBER:http_response_body_bytes:long}
"(-|%{DATA:http_request_referrer})" "(-|%{DATA:user_agent_original})" %{NUMBER:nginx_ingress_controller_http_request_length:long}
%{NUMBER:nginx_ingress_controller_http_request_time:double} \[%{DATA:nginx_ingress_controller_upstream_name}\]
\[%{DATA:nginx_ingress_controller_upstream_alternative_name}\] (%{UPSTREAM_ADDRESS_LIST:nginx_ingress_controller_upstream_address_list}|-)
(%{UPSTREAM_RESPONSE_LENGTH_LIST:nginx_ingress_controller_upstream_response_length_list}|-) (%{UPSTREAM_RESPONSE_TIME_LIST:nginx_ingress_controller_upstream_response_time_list}|-)
(%{UPSTREAM_RESPONSE_STATUS_CODE_LIST:nginx_ingress_controller_upstream_response_status_code_list}|-) %{GREEDYDATA:nginx_ingress_controller_http_request_id}
conditions:
- matcher: "#source"
op: "=="
value: "controller"
Test with a Grok debugger before importing grok patterns into your configuration.
KfParse
The KFParse stage of the log parsing pipeline automatically detects facets and generates fingerprints. It is also internal, and not configurable.
Transform
Transform is the last function of the pipeline, where you can derive labels based on extracted facets from the previous stages, Grammar and KfParse. The Transform syntax is similar to relabel function.
Example shows how you can add value from a log facet eventSource
as a label value, where label name is source
.
eventSource
to label source
- transform:
args:
- action: "facet_to_label_map"
- sourceLabels: "@eventSource"
- targetLabel: "source"
conditions:
- matcher: "#source"
op: "=="
value: "awsLogSource"