Parsers are pluggable components which are used to transform raw data (textual or raw bytes) into JSON messages suitable for downstream enrichment and indexing.
There are two general types types of parsers:
Data flows through the parser bolt via kafka and into the enrichments topology in kafka. Errors are collected with the context of the error (e.g. stacktrace) and original message causing the error and sent to an error queue. Invalid messages as determined by global validation functions are also treated as errors and sent to an error queue.
All Metron messages follow a specific format in order to ingest a message. If a message does not conform to this format it will be dropped and put onto an error queue for further examination. The message must be of a JSON format and must have a JSON tag message like so:
{"message" : message content}
Where appropriate there is also a standardization around the 5-tuple JSON fields. This is done so the topology correlation engine further down stream can correlate messages from different topologies by these fields. We are currently working on expanding the message standardization beyond these fields, but this feature is not yet availabe. The standard field names are as follows:
The timestamp and original_string fields are madatory. The remaining standard fields are optional. If any of the optional fields are not applicable then the field should be left out of the JSON.
So putting it all together a typical Metron message with all 5-tuple fields present would look like the following:
{ "message": {"ip_src_addr": xxxx, "ip_dst_addr": xxxx, "ip_src_port": xxxx, "ip_dst_port": xxxx, "protocol": xxxx, "original_string": xxx, "additional-field 1": xxx, } }
See the “Global Configuration” section.
The configuration for the various parser topologies is defined by JSON documents stored in zookeeper.
The document is structured in the following way
{ "filterClassName" : "STELLAR" ,"parserConfig" : { "filter.query" : "exists(field1)" } }
The fieldTransformations is a complex object which defines a transformation which can be done to a message. This transformation can
Metadata is a useful thing to send to Metron and use during enrichment or threat intelligence.
Consider the following scenarios:
As such, there are two types of metadata that we seek to support in Metron:
Metadata is controlled by two fields in the parser:
mergeMetadata : This is a boolean indicating whether metadata fields will be merged with the message automatically.
That is to say, if this property is set to true then every metadata field will become part of the messages and, consequently, also available for use in field transformations.
In order to avoid collisions from metadata fields, metadata fields will be prefixed with metron.metadata..
So, for instance the kafka topic would be in the field metron.metadata.topic.
Custom metadata is specified by sending a JSON Map in the key. If no key is sent, then, obviously, no metadata will be parsed. For instance, sending a metadata field called customer_id could be done by sending
{ "customer_id" : "my_customer_id" }
in the kafka key. This would be exposed as the field metron.metadata.customer_id to stellar field transformations as well, if mergeMetadata is true, available as a field in its own right.
The format of a fieldTransformation is as follows:
The currently implemented fieldTransformations are:
Consider the following simple configuration which will remove field1 unconditionally:
{ ... "fieldTransformations" : [ { "input" : "field1" , "transformation" : "REMOVE" } ] }
Consider the following simple sensor parser configuration which will remove field1 whenever field2 exists and whose corresponding equal to ‘foo’:
{ ... "fieldTransformations" : [ { "input" : "field1" , "transformation" : "REMOVE" , "config" : { "condition" : "exists(field2) and field2 == 'foo'" } } ] }
Consider the following sensor parser config to map the protocol field to a textual representation of the protocol:
{ ... "fieldTransformations" : [ { "input" : "protocol" , "transformation" : "IP_PROTOCOL" } ] }
This transformation would transform { "protocol" : 6, "source.type" : "bro", ... } into { "protocol" : "TCP", "source.type" : "bro", ...}
If, in your field transformation, you assign a field to null, the field will be removed. You can use this capability to rename variables.
Consider this example:
"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "new_field", "old_field"] ,"config" : { "new_field" : "old_field" ,"old_field" : "null" } } ]
This would set new_field to the value of old_field and remove old_field.
Currently, the stellar expressions are expressed in the form of a map where the keys define the fields and the values define the Stellar expressions. You order the expression evaluation in the output field. A consequence of this choice to store the assignments as a map is that the same field cannot appear in the map as a key twice.
For instance, the following will not function as expected:
"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "new_field"] ,"config" : { "new_field" : "TO_UPPER(field1)" ,"new_field" : "TO_LOWER(new_field)" } } ]
In the above example, the last instance of new_field will win and TO_LOWER(new_field) will be evaluated while TO_UPPER(field1) will be skipped.
Consider the following sensor parser config to add three new fields to a message:
{ ... "fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "utc_timestamp", "url_host", "url_protocol" ] ,"config" : { "utc_timestamp" : "TO_EPOCH_TIMESTAMP(timestamp, 'yyyy-MM-dd HH:mm:ss', MAP_GET(dc, dc2tz, 'UTC') )" ,"url_host" : "URL_TO_HOST(url)" ,"url_protocol" : "URL_TO_PROTOCOL(url)" } } ] ,"parserConfig" : { "dc2tz" : { "nyc" : "EST" ,"la" : "PST" ,"london" : "UTC" } } }
Note that the dc2tz map is in the parser config, so it is accessible in the functions.
Consider the following example configuration for the yaf sensor:
{ "parserClassName":"org.apache.metron.parsers.GrokParser", "sensorTopic":"yaf", "fieldTransformations" : [ { "input" : "protocol" ,"transformation": "IP_PROTOCOL" } ], "parserConfig": { "grokPath":"/patterns/yaf", "patternLabel":"YAF_DELIMITED", "timestampField":"start_time", "timeFields": ["start_time", "end_time"], "dateFormat":"yyyy-MM-dd HH:mm:ss.S" } }
Parser adapters are loaded dynamically in each Metron topology. They are defined in the Parser Config (defined above) JSON file in Zookeeper.
Java parser adapters are indended for higher-velocity topologies and are not easily changed or extended. As the adoption of Metron continues we plan on extending our library of Java adapters to process more log formats. As of this moment the Java adapters included with Metron are:
Grok parser adapters are designed primarly for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recombiled in order to make changes to them. An example of a Grok perser is:
For more information on the Grok project please refer to the following link:
https://github.com/thekrakken/java-grok
Starting a particular parser topology on a running Metron deployment is as easy as running the start_parser_topology.sh script located in $METRON_HOME/bin. This utility will allow you to configure and start the running topology assuming that the sensor specific parser configuration exists within zookeeper.
The usage for start_parser_topology.sh is as follows:
usage: start_parser_topology.sh -e,--extra_topology_options <JSON_FILE> Extra options in the form of a JSON file with a map for content. -esc,--extra_kafka_spout_config <JSON_FILE> Extra spout config options in the form of a JSON file with a map for content. Possible keys are: retryDelayMaxMs,retryDelay Multiplier,retryInitialDel ayMs,stateUpdateIntervalMs ,bufferSizeBytes,fetchMaxW ait,fetchSizeBytes,maxOffs etBehind,metricsTimeBucket SizeInSecs,socketTimeoutMs -ewnt,--error_writer_num_tasks <NUM_TASKS> Error Writer Num Tasks -ewp,--error_writer_p <PARALLELISM_HINT> Error Writer Parallelism Hint -h,--help This screen -k,--kafka <BROKER_URL> Kafka Broker URL -mt,--message_timeout <TIMEOUT_IN_SECS> Message Timeout in Seconds -mtp,--max_task_parallelism <MAX_TASK> Max task parallelism -na,--num_ackers <NUM_ACKERS> Number of Ackers -nw,--num_workers <NUM_WORKERS> Number of Workers -pnt,--parser_num_tasks <NUM_TASKS> Parser Num Tasks -pp,--parser_p <PARALLELISM_HINT> Parser Parallelism Hint -s,--sensor <SENSOR_TYPE> Sensor Type -snt,--spout_num_tasks <NUM_TASKS> Spout Num Tasks -sp,--spout_p <SPOUT_PARALLELISM_HINT> Spout Parallelism Hint -t,--test <TEST> Run in Test Mode -z,--zk <ZK_QUORUM> Zookeeper Quroum URL (zk1:2181,zk2:2181,...
These options are intended to configure the Storm Kafka Spout more completely. These options can be specified in a JSON file containing a map associating the kafka spout configuration parameter to a value. The range of values possible to configure are:
For instance, creating a JSON file which will set the offsets to UNCOMMITTED_EARLIEST
{ "spout.firstPollOffsetStrategy" : "UNCOMMITTED_EARLIEST" }
This would be loaded by passing the file as argument to --extra_kafka_spout_config
These options are intended to be Storm configuration options and will live in a JSON file which will be loaded into the Storm config. For instance, if you wanted to set a storm property on the config called topology.ticks.tuple.freq.secs to 1000 and storm.local.dir to /opt/my/path you could create a file called custom_config.json containing
{ "topology.ticks.tuple.freq.secs" : 1000, "storm.local.dir" : "/opt/my/path" }
and pass --extra_topology_options custom_config.json to start_parser_topology.sh.
Default installed Metron is untuned for production deployment. There are a few knobs to tune to get the most out of your system.
The kafka queue associated with your parser is a collection point for all of the data sent to your parser. As such, make sure that the number of partitions in the kafka topic is sufficient to handle the throughput that you expect from your parser topology.
The enrichment topology as started by the $METRON_HOME/bin/start_parser_topology.sh script uses a default of one executor per bolt. In a real production system, this should be customized by modifying the arguments sent to this utility.
Finally, if workers and executors are new to you, the following might be of use to you: