Class: SnowplowTracker::Emitter

Inherits:
Object
  • Object
show all
Defined in:
lib/snowplow-tracker/emitters.rb

Overview

This class sends events to the event collector. All Trackers must have at least one associated Emitter or the subclass AsyncEmitter.

The network settings are defined as part of the Emitter initalization. This table displays the default Emitter settings:

Property Default setting
Protocol HTTP
Method GET
Buffer size 1
Path /i

The buffer size is the number of events which will be buffered before they are all sent simultaneously. The process of sending all buffered events is called "flushing". The default buffer size is 1 because GET requests can only contain one event.

If you choose to use POST requests, the buffer_size defaults to 10, and the buffered events are all sent together in a single request. The default path is '/com.snowplowanalytics.snowplow/tp2' for Emitters using POST.

Logging

Emitters log their activity to STDERR by default, using the Ruby standard library Logger class. A different logger can be configured during Emitter initialization. For example, to disable logging, you could provide Logger.new(IO::NULL) in the options hash.

By default, only messages with priority "INFO" or higher will be logged. This can be changed at any time for the default logger, which is saved as a module constant (LOGGER = Logger.new(STDERR)). If you are not using the default logger, set the message level before initializing your Emitter.

Examples:

Changing the logger message level.

require 'logger'
SnowplowTracker::LOGGER.level = Logger::DEBUG

See Also:

Direct Known Subclasses

AsyncEmitter

Constant Summary collapse

DEFAULT_CONFIG =

Default Emitter settings

{
  protocol: 'http',
  method: 'get'
}

Instance Method Summary collapse

Constructor Details

#initialize(endpoint:, options: {}) ⇒ Emitter

Create a new Emitter instance. The endpoint is required.

The options hash can have any of these optional parameters:

Parameter Description Type
path Override the default path for appending to the endpoint String
protocol 'http' or 'https' String
port The port for the connection Integer
method 'get' or 'post' String
buffer_size Number of events to send at once Integer
on_success A method to call if events were sent successfully Method
on_failure A method to call if events did not send Method
thread_count Number of threads to use Integer
logger Log somewhere other than STDERR Logger

Note that thread_count is relevant only to the subclass AsyncEmitter, and will be ignored if provided to an Emitter.

If you choose to use HTTPS, we recommend using port 443.

Only 2xx and 3xx status codes are considered successes.

The on_success callback should accept one argument: the number of requests sent this way. The on_failure callback should accept two arguments: the number of successfully sent events, and an array containing the unsuccessful events.

Examples:

Initializing an Emitter with all the possible extra configuration.

success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
failure_callback = ->(success_count, failures) do
  puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
end

SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
            options: { path: '/my-pipeline/1',
                       protocol: 'https',
                       port: 443,
                       method: 'post',
                       buffer_size: 5,
                       on_success: success_callback,
                       on_failure: failure_callback,
                       logger: Logger.new(STDOUT) })

Parameters:

  • endpoint (String)

    the endpoint to send the events to

  • options (Hash) (defaults to: {})

    allowed configuration options

See Also:



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/snowplow-tracker/emitters.rb', line 124

def initialize(endpoint:, options: {})
  config = DEFAULT_CONFIG.merge(options)
  @lock = Monitor.new
  path = confirm_path(config)
  @collector_uri = create_collector_uri(endpoint, config[:protocol], config[:port], path)
  @buffer = []
  @buffer_size = confirm_buffer_size(config)
  @method = config[:method]
  @on_success = config[:on_success]
  @on_failure = config[:on_failure]
  @logger = config[:logger] || LOGGER
  logger.info("#{self.class} initialized with endpoint #{@collector_uri}")
end

Instance Method Details

#input(payload) ⇒ Object

Add an event to the buffer and flush it if maximum size has been reached. This method is not required for standard Ruby tracker usage. A Tracker privately calls this method once the event payload is ready to send.

We have included it as part of the public API for its possible use in the on_failure callback. This is the optional method, provided in the options Emitter initalization hash, that is called when events fail to send. You could use #input as part of your callback to immediately retry the failed event.

The on_failure callback should accept two arguments: the number of successfully sent events, and an array containing the unsuccessful events.

Examples:

A possible on_failure method using #input

def retry_on_failure(failed_event_count, failed_events)
  # possible backoff-and-retry timeout here
  failed_events.each do |event|
    my_emitter.input(event)
  end
end


188
189
190
191
192
193
194
195
196
# File 'lib/snowplow-tracker/emitters.rb', line 188

def input(payload)
  payload.each { |k, v| payload[k] = v.to_s }
  @lock.synchronize do
    @buffer.push(payload)
    flush if @buffer.size >= @buffer_size
  end

  nil
end