Asset management and activity stream¶
A Parsec instance that parses an event lane can emit compact activity events to a dedicated Kafka topic. The lmio assets service consumes that topic and updates the asset inventory (last seen, identities, source name in tags).
This is separate from the main events topic: parsed logs still go to kafka.events.topic on the lane. Activity reporting uses another topic resolved from the tenant activity event lane declaration (/EventLanes/<tenant>/activity.yaml, default topic name events.<tenant>.activity).
How it works in Parsec¶
On startup, Parsec loads /Schemas/Mappings/Asset_<schema>.yaml (for example Asset_ECS.yaml for ECS) and maps field names to two processors under lmioparsec/processors/asset/:
- AssetIdentityProcessor runs identity lookups and optional fallback, then fills
context["asset"]on each parsed event. - IndicatorOfActivityProcessor (IOA, Indicator of Activity) reads that context and emits activity events to the activity Kafka topic.
Both processors are attached at the end of ParsecPipeline, after parsing, enrichment, and type check, and before the event is encoded and written to the lane events sink.
A shared ActivityPipeline is created once per Parsec application. The IOA processor pushes activity events into its internal source; the pipeline encodes JSON and writes to the activity Kafka topic.
Asset mapping (Asset_<schema>.yaml)¶
Library file with define.type: asset/mapping (legacy path Assets_<schema>.yaml is still loaded as a fallback).
The mapping has two parts:
- Identity normalization (
identity+fallback): resolvehost.id/user.idfrom event fields using lookups. - Activity payloads (
mapping): map resolved identities to field names on activity Kafka events.
Activity mapping depends on the result of identity normalization. If you need a custom field on activity events, configure it in the event lane instead of the global asset mapping.
Identity normalization¶
Each lookup name lists event fields whose values are used as lookup keys. Parsec calls the corresponding lookup table and reads host.id or user.id from the result.
---
define:
type: asset/mapping
identity:
ips2hostid:
- host.ip
macs2hostid:
- host.mac
hostnames2hostid:
- host.hostname
usernames2userid:
- user.name
useremails2userid:
- user.email
fallback:
host.hostname: lowercase_and_domain_removal
user.name: lowercase_and_domain_removal
| Lookup name | Resolves | Typical event fields (ECS) |
|---|---|---|
ips2hostid |
host.id |
host.ip |
macs2hostid |
host.id |
host.mac |
hostnames2hostid |
host.id |
host.hostname |
usernames2userid |
user.id |
user.name |
useremails2userid |
user.id |
user.email |
Event lane specific fields (for example client.ip or dns.question.name) belong in the event lane configuration, not in the global mapping.
Lookup tables (hostnames2hostid, ips2hostid, and others) are declared in the Library under /Lookups/. See Lookups for general lookup setup.
Fallback methods¶
Fallback runs only when no identity lookup field resolved host.id or user.id on the event. Configured fallback methods are then tried on hostname / username fields (in field order).
| Method | Effect |
|---|---|
lowercase_and_domain_removal |
Strip DOMAIN\ prefix, lowercase; for hostnames also append domain from ips2domain when the name has no dot |
Which fields get fallback: only fields listed under fallback in the library mapping or event lane. The mapping defaults cover host.hostname and user.name only. Fields added via identity.<lookup>+ (for example dns.question.name) run lookup only until you add an explicit entry such as dns.question.name: lowercase_and_domain_removal.
Event lane parsec.asset.fallback merges with the library mapping; it does not replace it. An empty fallback: {} leaves mapping defaults unchanged.
Disabling fallback for a field that already has a mapping default: set false or null. Lookups still run.
parsec:
asset:
fallback:
host.hostname: false
user.name: false
There is no global switch to disable all fallback at once.
Activity field mapping¶
The mapping section defines event field names used on activity Kafka payloads. Principal field names (host.id, user.id, service.id) come from the tenant schema (define.principal_hostid, define.principal_userid, define.principal_serviceid; legacy principal_host / principal_user / principal_service still work).
| Key | Default (ECS) | Role |
|---|---|---|
tags |
tags |
Technical tag list on principal events |
event_lane |
observer.name |
Source / event lane path string |
observer_type |
observer.type |
Always set to lmio-parsec on activity events |
aux_hosts |
related.hosts |
Host identity when host.id is missing |
aux_ips |
related.ip |
IP list on auxiliary host events |
aux_macs |
related.mac |
MAC list on auxiliary host events |
aux_users |
related.user |
User identity when user.id is missing |
mapping:
tags: tags
event_lane: observer.name
observer_type: observer.type
aux_hosts: related.hosts
aux_ips: related.ip
aux_macs: related.mac
aux_users: related.user
Event lane configuration (parsec.asset)¶
Per lane overrides for identity lookups, fallback, and activity reporting live under parsec.asset in the event lane YAML file.
---
define:
type: lmio/event-lane
parsec:
name: /Parsers/MyVendor/MyProduct/
asset:
# Activity Kafka flush (default: enabled, 60s unless overridden).
heartbeat: 120s # or ``off`` / ``false``
identity:
hostnames2hostid+: # append to mapping list
- dns.question.name
ips2hostid+: # append
- client.ip
- source.ip
hostnames2hostid: [] # overwrite mapping list (empty = none from lane)
fallback:
dns.question.name: lowercase_and_domain_removal
user.name: false
| Key | Default | Effect |
|---|---|---|
heartbeat |
enabled, processor default 60s |
Minimum seconds between Kafka flushes; off disables IndicatorOfActivityProcessor |
identity.<lookup> |
(none) | Replace mapping field list for that lookup |
identity.<lookup>+ |
(none) | Append fields to the mapping list |
fallback.<field> |
from mapping | Fallback method name, or false / null to disable for this lane (mapping default applies when omitted) |
See also Event Lanes for general event lane configuration (Kafka topics, parser path, schema, and timezone).
What is extracted from each parsed event¶
AssetIdentityProcessor runs lookups from the merged identity field lists, then optional fallback, and fills context["asset"]:
- When
host.id/user.idis known (lookup, fallback, or already on the event): a singlekind: idrow for principal activity. - When the principal id is not known: auxiliary rows only (
hostswithkindhostname|ip|mac;userswithkindname|email, nokind: idrow).
IndicatorOfActivityProcessor reads that context only (plus service principal from the event):
host_id/user_idpresent → principal activity (schema principal fields on the Kafka payload)- hostname
namewithouthost_id→mapping.aux_hosts - orphan
ip/macwithouthost_id→ combinedmapping.aux_ips/mapping.aux_macsevent name/emailwithoutuser_id→mapping.aux_users
Existing host.id / user.id on the parsed event¶
If a parser or enricher already set the principal field, AssetIdentityProcessor does not overwrite it. Lookups still run and results go to context["asset"]. Fallback runs only when no lookup resolved the principal id and the field was empty.
Host lookup order: hostnames2hostid fields, then ips2hostid, then macs2hostid; then fallback on fields listed in fallback when all lookups failed.
User lookup order: usernames2userid fields, then useremails2userid; then fallback on configured username fields.
Tip
Remove schema enrich on hostname / username fields when using these processors (same lookups as DefaultLookupEnricher, but unified here).
Batching and deduplication¶
Activity events are accumulated in memory keyed by identity (principal, auxiliary host, auxiliary user). On Application.tick/60!, after the configured heartbeat interval, pending events are serialized, assigned a deterministic _id, and written to Kafka.
Example activity payloads¶
Principal activity¶
{
"host.id": "server01.example.com",
"@timestamp": 1710000000000,
"tenant": "mytenant",
"tags": ["lmio-parsec:1.2.3", "instance-abc"],
"observer.name": "/EventLanes/mytenant/fortinet.yaml",
"observer.type": "lmio-parsec",
"_id": 281474976710657
}
Auxiliary user activity¶
{
"related.user": ["jdoe", "jdoe@example.com"],
"@timestamp": 1710000000000,
"tenant": "mytenant",
"observer.name": "/EventLanes/mytenant/ad.yaml",
"observer.type": "lmio-parsec",
"_id": 281474976710658
}
Auxiliary host activity (IP and MAC, no hostname)¶
{
"related.ip": [{"h": 0, "l": 3232235777}],
"related.mac": [11259375],
"@timestamp": 1710000000000,
"tenant": "mytenant",
"observer.name": "/EventLanes/mytenant/dhcp.yaml",
"observer.type": "lmio-parsec",
"_id": 281474976710659
}
Auxiliary related.ip values are normalized to {h, l} in IndicatorOfActivityProcessor. related.mac values are ui64 integers.