Use MySQL as a Cache server – Memcached plugin

Overview

In this post, I will talk about MySQL InnoDB Memcached plugin.

Although there are many other posts about this kind of subject, I focus on the goal of applying the actual service. It also contains its own philosophy of manipulating the flow of data well.

So, Let’s start about this subject!

Cache layer?

As the traffic of the service increases to the level of a monster, it is difficult to service with the DB only.

Of course, most of the small and medium-sized data could be served with only single database, but in the case of giga-unit traffic, there is a performance limit to get static data that does not change from the Database every time.

More than anything! Database has limited resources, it is true that all monster traffic cannot be handled based on DB only.

So, in the past when disk I/O was not good, I remember processing frequently searched data with a memory DB such as ALTIBASE Database which is memory based in order to provide service for this processing.

There are many purposes of the cache layer, but I would like to highlight for two cases which are as below;

  • Queuing or preprocessing in cache and write to file asynchronously.
  • Deliver  service after loading data with high frequency of inquiry but little change in memory.

The latter is what I will be talking about here, the frequency of inquiry is very high but little change.

I add one additional option, it can be said that it is a really interesting story for the requirement to show only the data that has been processed and completed. (like READ-COMMITTED!!)

MySQL Memcached plugin

There are many different cache stores, but the reason I was interested in the MySQL memcached plugin is simple.

It’s accessible InnoDB data directly, without query parsing and optimizer steps. In the past, I was interested in MariaDB with a concept similar to HandlerSocket.

Ref> https://dev.mysql.com/doc/refman/8.0/en/innodb-memcached-intro.html

Oh, of course, it’s possible allocate and use a separate cache memory control the memcached policy in MySQL, but in an environment where there is a device with superior random I/O than expected, such as SSD, and we have already cache memory area, InnoDB buffer pool.

It didn’t really appeal to my point of view. (Of course, since InnoDB is a B-Tree, you cannot compare the performance itself with a hash structure that normally looks up data in O(1).)

Direct access to InnoDB data? This story can also be interpreted differently as non-volatile caching layer. That is, a caching layer with infinite TTL that does not have an expire time separately!! I could see new possibilities.

Configuration

You may refer to the Oracle documentation (innodb-memcached-setup), but today, I will configure the memcached plugin based only on the purpose of using the memcache protocol.

First, configure the schema related to memcached as shown below to configure the InnoDB plugin. 🙂 Here, $MYSQL_HOME means the home directory where MySQL is installed, and it will vary depending on each system configuration environment.

$ mysql -uroot < $MYSQL_HOME/share/innodb_memcached_config.sql

I will set up a separate token table as shown below.

## Create table
mysql> CREATE DATABASE `memcache_test`;
mysql> CREATE TABLE `memcache_test`.`token` (
    ->  `id` varchar(32) NOT NULL,
    ->  `token` varchar(128) NOT NULL,
    ->  PRIMARY KEY (`id`)
    ->);

Since data change will be performed through SQL, set as follows to perform only GET operation.

## Policy to allow only GET operation
mysql> update cache_policies set 
    ->   get_policy = 'innodb_only', 
    ->   set_policy = 'disabled', 
    ->   delete_policy='disabled', 
    ->   flush_policy = 'disabled';

## Delete the existing policy, add add new token policy
mysql> delete from containers;
mysql> insert into containers values ('token', 'memcache_test', 'token', 'id', 'token', 0,0,0, 'PRIMARY');

Now, modify the setting information so that the memcached plugin can see the table created above, and finally upload the plugin.

## Start InnoDB memcached plugin
mysql> INSTALL PLUGIN daemon_memcached soname "libmemcached.so"; 

After going through this process, an environment that can directly access DB data using the memcached protocol. “11211 port” has been opened as the default port, and you can get data directly from InnoDB data area by performing get operation with memcache protocol through this port.

In other words, you can directly read the changed data with SQL. (You can create a VALUE of memcache by mapping table columns to delimiters, but we will skip them here.)

Performance

Non-volatile caching layer? Of course, performance is a concern. No matter how good the expectations are, it is not a good solution if the response time or stability is poor in a caching system that generates a huge amount of reads. So, I compared the speed of InnoDB data lookup through a simple PK-based single lookup query and the memcached protocol.

Let’s configure the environment for a simple performance test. First, create a test table as shown below and create about 1 million data for performance testing.

## Generate test data
mysql> insert ignore into token 
    -> select md5(rand()), concat(uuid(), md5(rand())) from dual;
mysql> insert ignore into token 
    -> select md5(rand()), concat(uuid(), md5(rand())) from token;
... repeat ...

mysql> select count(*) from token;
+----------+
| count(*) |
+----------+
|   950435 |
+----------+

I did a simple test. SQL vs memcached!!

## SQL ##
SELECT * FROM token WHERE id = ${id}

## Memcache ##
get @@token.${id}

Based on the total ID value of the generated data, the system resources were measured by performing approximately 70,000 queries per second for each of the queries in the following random pattern.

In order to even evaluate the stability, I gave a stress test with continuous traffic of 70,000 per second for about 10 days. As a result, no error occurred even at once, and it showed a response time of about 0.3ms to 0.5ms each operation. About 1 second, the response time was only 1.5ms. (For reference, the right side is the interval average seconds.)

At this level, it is at a level that can be used as a cache in production. 🙂

+-----+---------------+
| ms  | interval avg  | <= seconds
+-----+---------------+
| 0.2 |            99 |
| 0.3 |        661736 |
| 0.4 |        162582 |
| 0.5 |          5686 |
| 0.6 |          1769 |
| 0.7 |          1004 |
| 0.8 |           576 |
| 0.9 |           310 |
| 1.0 |           159 |
| 1.1 |            77 |
| 1.2 |            29 |
| 1.3 |            12 |
| 1.4 |             4 |
| 1.5 |             1 |
+-----+---------------+

I even gave 140,000 GET operation stress per second. There was no problem with the service at all, and the average CPU resource utilization was about at 15%.. it was in a very stable state. Amazing!

Monitoring

No matter how great the service is, if there is no tool to check the health status of this service in real time, it will be useless, right? So, either way, I need to find the most effective monitoring method. Personally, I prefer to collect metrics using prometheus.

The reason I prefer prometheus is simple.

There are so many exporters that have already been made, and that if I want something, I can easily access it by adding functions to suit my own taste! The fact that I can collect monitoring metric information really efficiently with Prometheus, a time-series-based data store!

It is no exaggeration to say that Prometheus is optimized for metric collection.

Of course, in the case of the memcached exporter officially released by Prometheus, there are some problems in collecting InnoDB memcached information. The data of more than 4 characters in the stats settings result is recognized as an abnormal pattern, and memcached is collected as not running normally. If there is something that doesn’t work, I just fix it. It’s an open source fun factor.

// AS-IS ======================
stats := map[string]string{}
for err == nil && !bytes.Equal(line, resultEnd) {
    s := bytes.Split(line, []byte(" "))
    if len(s) != 3 || !bytes.HasPrefix(s[0], resultStatPrefix) {
        return fmt.Errorf("memcache: unexpected stats line format %q", line)
    }
    stats[string(s[1])] = string(bytes.TrimSpace(s[2]))
    line, err = rw.ReadSlice('\n')
    if err != nil {
        return err
    }
}

// TO-BE ======================
stats := map[string]string{}
for err == nil && !bytes.Equal(line, resultEnd) {
    s := bytes.Split(line, []byte(" "))
    if len(s) == 3 {
        stats[string(s[1])] = string(bytes.TrimSpace(s[2]))
    } else if len(s) == 4 {
        stats[string(s[1])] = string(bytes.TrimSpace(s[2])) + "-" + string(bytes.TrimSpace(s[2]))
    } else {
        return fmt.Errorf("memcache: unexpected stats line format %q", line)
    }
    line, err = rw.ReadSlice('\n')
    if err != nil {
        return err
    }
}

By simply modifying a few lines, this unusual processing was easily resolved.

From now on, I can collect metrics and send alerts through prometheus, and check real-time metrics through Grafana! I’ve got a powerful monitoring tool.

Beyond physical memory

With only the contents listed above, it is absolutely perfect for production use as a cache layer.

However, I imagined possibilities beyond this.

As you all know, one of the powerful features of MySQL is data replication. If you reliably replicate master data through replication and utilize it well for services, you can naturally induce READ distribution as shown below. 🙂

Since all nodes have the same data, the same result is obtained no matter which device reads the data. In other words, as shown in the figure above, cache data is normally serviced for each group, but in case of a specific node failure, there is no service problem at all, even if data is pulled from another nearby MySQL.

Moreover, since each application reads as much data as needed, there is no reason to include all replicated data in the InnoDB buffer pool. In other words, non-volatile cache data that does not disappear is naturally distributed and serviced.

One more thing! If you take advantage of MySQL replication well, you can’t miss binary logs. Binary log is a file that stores the history of data changes in MySQL and has a role of replicating this data by replaying it as it is in the slave (applying the changed history in the master as it is).

In the case of ROW format, the changed data content itself is included. In the case of FULL image, all data before and after the change is included, so you can think of an interesting CDC structure for caching as shown below.

When data changes occur in tables related to caching (such as member information) in the MySQL Master DB where data changes occur, it is possible to create a replicator that detects and converts data into JSON format and puts data into cache of MySQL.

This related project is at the bottom URL. However, It’s an early project so that it needs to apply more modification
>> https://github.com/gywndi/uldra-binlog-json-transfer

If this happens, the left service (red) DB is converted into JSON as natural as flowing water without any manipulation and is stored as a non-volatile caching layer. Then, the application can perform asynchronous caching service based on the data converted to JSON.

Of course, I talked this easily here, but it will require a lot of defense logic to deal with many side effects such as replication delay.

Conclusion

I do not think that what I have talked about this topic so far is the best solution for each service.

However, alternatives such as InnoDB memcached plugin were able to present a fairly suitable solution in the current problem situation, and I wanted to talk about the process of thinking about new possibilities by harmonizing this with MySQL’s unique function.

The goal is not to have a system without failures, but to create a robust system with a low probability of failure, a system that can be restored quickly even if it fails!!

In order to do that, I wanted to take this opportunity to solve my own solutions that I imagined by combining the good functions of each system well. 🙂

Thanks for reading this long post. ^^

Create your own Exporter in Go !

Overview

Hi, it’s too hot summer in Korea. Today I want to talk about an interesting and exciting topic. Try to making your own exporter in Go  language.

If you register a specific query, it is a simple program that shows the result of this query as an exporter result metrics. Some of you may still be unfamiliar with what Expoter is.

I will explain about Exporter step by step in today’s post.

Exporter?

You can think of an Exporter as an HTTP server for pulling data from a time series database like Prometheus. Prometheus periodically calls the specific URL of the exporter and saves the result of metrics as a time series.

There are many exporters exist in the everywhere.

Typically, there is mysqld_expoter, which is Prometheus’s Offcial projects, and mysqld_expoter, which they fork and distribute additionally in Percona. Besides these, not only node_expoter for monitoring Linux nodes, but also memcached_expoter etc..

For reference, you can see various exporters here.
>> https://exporterhub.io

What I am going to present my Blog that is the process of adding my own new exporter among these various exporters. Let’s go!

Creating a Go project

Exporter can be implemented in various languages, but today I will implement it with Go.

Personally, I think Go is very convenient in terms of distribution and compatibility. I will omit the go installation and environment configuration here.

$ cd ~/go/src

$ mkdir -p query-exporter-simple

$ cd query-exporter-simple

$ go mod init
go: creating new go.mod: module query-exporter-simple

$ ls -al
total 8
drwxr-xr-x   3 chan  staff   96  7 12 13:33 .
drwxr-xr-x  12 chan  staff  384  7 12 13:33 ..
-rw-r--r--   1 chan  staff   38  7 12 13:33 go.mod

$ cat go.mod
module query-exporter-simple

go 1.16
 

Although it is an fundamental project, now everything is ready to make your own exporter. From now on, package management is managed with “go mod”.

Try Empty Exporter

Now, let’s start making the Exporter in earnest.

First, as a taster, let’s try to make an empty Exporter that has no function.. it simply outputs the exporter version only.

This is to read OS parameters using flags. The “bind” is server HTTP binding information when Exporter is started.

package main

import (
    "flag"
)

func main() {
    // =====================
    // Get OS parameter
    // =====================
    var bind string
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()
}
 

Register Collector to collect and run Exporter with HTTP server. Collector is the concept of a thread that collects information, and it implements the Collector interface of Prometheus.

package main

import (
    "flag"
    "net/http"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/common/version"
    log "github.com/sirupsen/logrus"
)

func main() {
    // =====================
    // Get OS parameter
    // =====================
    var bind string
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()

    // ========================
    // Regist handler
    // ========================
    prometheus.Register(version.NewCollector("query_exporter"))

    // Regist http handler
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        h := promhttp.HandlerFor(prometheus.Gatherers{
            prometheus.DefaultGatherer,
        }, promhttp.HandlerOpts{})
        h.ServeHTTP(w, r)
    })

    // start server
    log.Infof("Starting http server - %s", bind)
    if err := http.ListenAndServe(bind, nil); err != nil {
        log.Errorf("Failed to start http server: %s", err)
    }
 }
 

Since the packages used by source code do not exist in the project yet, numerous errors will be occurred.

So, as below, get related packages through “go mod vendor”. Related packages are placed under the vendor directory.

$ go mod vendor
go: finding module for package github.com/prometheus/common/version
go: finding module for package github.com/prometheus/client_golang/prometheus
go: finding module for package github.com/sirupsen/logrus
go: finding module for package github.com/prometheus/client_golang/prometheus/promhttp
go: found github.com/prometheus/client_golang/prometheus in github.com/prometheus/client_golang v1.11.0
go: found github.com/prometheus/client_golang/prometheus/promhttp in github.com/prometheus/client_golang v1.11.0
go: found github.com/prometheus/common/version in github.com/prometheus/common v0.29.0
go: found github.com/sirupsen/logrus in github.com/sirupsen/logrus v1.8.1

$ ls -al
total 112
drwxr-xr-x   6 chan  staff    192  7 13 10:26 .
drwxr-xr-x  12 chan  staff    384  7 12 13:33 ..
-rw-r--r--   1 chan  staff    169  7 13 10:26 go.mod
-rw-r--r--   1 chan  staff  45722  7 13 10:26 go.sum
-rw-r--r--   1 chan  staff   1163  7 13 10:34 main.go
drwxr-xr-x   6 chan  staff    192  7 13 10:26 vendor

If you start the Exporter server, the server will be run on port 9104 (the port specified by default in flag).

$ go run .
INFO[0000] Regist version collector - query_exporter
INFO[0000] HTTP handler path - /metrics
INFO[0000] Starting http server - 0.0.0.0:9104

If you want to change the port, give the bind OS parameter as below, then, the server will  run with that port.

$ go run . --bind=0.0.0.0:9105
INFO[0000] Regist version collector - query_exporter
INFO[0000] HTTP handler path - /metrics
INFO[0000] Starting http server - 0.0.0.0:9105

Even though it is an empty Exporter.. You can see that a lot of information is extracted through the Exporter. (Most of the information is about go itself..)

$ curl 127.0.0.1:9104/metrics
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0
go_gc_duration_seconds{quantile="0.25"} 0

.. skip ..

# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 7
# HELP query_exporter_build_info A metric with a constant '1' value labeled by version, revision, branch, and goversion from which query_exporter was built.
# TYPE query_exporter_build_info gauge
query_exporter_build_info{branch="",goversion="go1.16.5",revision="",version=""} 1

At the very bottom, there is the query_exporter_build_info metric, which is the information collected by the Collector that we added in the previous section. This is the moment we created the new Exporter collecting version information!

Creating an Exporter in earnest

I made an empty Exporter that specifies only the exporter version. Is that easy, right? 🙂

From now on, I’m going to implement a Collector that collects the information we really need from database and sends the result to the HTTP GET method.

1. Configuration format (YAML)

As I said before, I want to make something that passes the result of the registered query to the Exporter result metric. To do this, you need to know information about the target instance as well as the query to be executed.

Let’s set it up in the below format. MySQL connection information and the query to be executed. It will show two pieces of information as a result: “Connections per host” and “Connections per user”.

dsn: test:test123@tcp(127.0.0.1:3306)/information_schema
metrics:
  process_count_by_host:
    query: "select user, 
                  substring_index(host, ':', 1) host, 
                  count(*) sessions 
            from information_schema.processlist
            group by 1,2 "
    type: gauge
    description: "process count by host"
    labels: ["user","host"]
    value: sessions
  process_count_by_user:
    query: "select user, count(*) sessions 
            from information_schema.processlist 
            group by 1 "
    type: gauge
    description: "process count by user"
    labels: ["user"]
    value: sessions

I tried defining the above yaml as Go structure.

type Config struct {
    DSN     string
    Metrics map[string]struct {
        Query       string
        Type        string
        Description string
        Labels      []string
        Value       string
        metricDesc  *prometheus.Desc
    }
}

Here, metricDesc *prometheus.Desc can be understood as a specification used in Prometheus metrics. It also specifies any label and metric types such as Counter/Gauge.

Read the YAML file as below, and finally load the setting information into the structure defined below.

var b []byte
var config Config
if b, err = ioutil.ReadFile("config.yml"); err != nil {
    log.Errorf("Failed to read config file: %s", err)
    os.Exit(1)
}

// Load yaml
if err := yaml.Unmarshal(b, &config); err != nil {
    log.Errorf("Failed to load config: %s", err)
    os.Exit(1)
}

In this way, we can now put the necessary information in the Config structure and use it to implement the desired implementation.

package main

import (
    "flag"
    "io/ioutil"
    "net/http"
    "os"

    "github.com/ghodss/yaml"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/common/version"
    log "github.com/sirupsen/logrus"
)

var config Config

func main() {
    var err error
    var configFile, bind string
    // =====================
    // Get OS parameter
    // =====================
    flag.StringVar(&configFile, "config", "config.yml", "configuration file")
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()

    // =====================
    // Load config & yaml
    // =====================
    var b []byte
    if b, err = ioutil.ReadFile(configFile); err != nil {
        log.Errorf("Failed to read config file: %s", err)
        os.Exit(1)
    }

    // Load yaml
    if err := yaml.Unmarshal(b, &config); err != nil {
        log.Errorf("Failed to load config: %s", err)
        os.Exit(1)
    }

    // ========================
    // Regist handler
    // ========================
    log.Infof("Regist version collector - %s", "query_exporter")
    prometheus.Register(version.NewCollector("query_exporter"))

    // Regist http handler
    log.Infof("HTTP handler path - %s", "/metrics")
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        h := promhttp.HandlerFor(prometheus.Gatherers{
            prometheus.DefaultGatherer,
        }, promhttp.HandlerOpts{})
        h.ServeHTTP(w, r)
    })

    // start server
    log.Infof("Starting http server - %s", bind)
    if err := http.ListenAndServe(bind, nil); err != nil {
        log.Errorf("Failed to start http server: %s", err)
    }
}

// =============================
// Config config structure
// =============================
type Config struct {
    DSN     string
    Metrics map[string]struct {
        Query       string
        Type        string
        Description string
        Labels      []string
        Value       string
        metricDesc  *prometheus.Desc
    }
}

2. Implement Collector

The highlight of today’s post is the implementing a Collector to collect the desired information from database.

All the processes I implemented so far is to get the results as an HTTP result. Collector actually connect to the database and delivering the specified metric result based on the result of executing the specified query.

type QueryCollector struct{}

// Describe prometheus describe
func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
}

// Collect prometheus collect
func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {
}

As I have mentioned earlier, Collector is kind of a thread concept that collects information, and is a structure that implements the Collector interface of Prometheus. In other words, this story means that if you want to create another Collector of your own, you must implement two of the Describe and Collect defined by the prometheus.Collector interface.

Register the Collector defined as below.

func main(){
    .. skip ..
    // ========================
    // Regist handler
    // ========================
    log.Infof("Regist version collector - %s", "query_exporter")
    prometheus.Register(version.NewCollector("query_exporter"))
    prometheus.Register(&QueryCollector{})
    .. skip ..
}

The Version Collector added to the can exporter created earlier and the QueryCollector newly added this time are registered. When an http request comes in to “/metric”, the above two Collectors are finally executed by each thread.

2-1. Create the Describe function

This is the part that defines the specifications of each metric. Actually, it is not necessary to define the specification of the metric here, but it is useful if you consider the case of creating and operating multiple Collectors. This method is executed only once when a Collector is registered with prometheus.Register.

func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
    for metricName, metric := range config.Metrics {
        metric.metricDesc = prometheus.NewDesc(
            prometheus.BuildFQName("query_exporter", "", metricName),
            metric.Description,
            metric.Labels, nil,
        )
        config.Metrics[metricName] = metric
        log.Infof("metric description for \"%s\" registerd", metricName)
    }
}

Here, I have defined the specification of the metric with the information related to Query in the setting information read earlier.

  • prometheus.BuildFQName: name of metric
  • metric.Description: Description
  • metric.Labels: Array of label names, label values should be mapped later in this order

If you look at the config.yml, each mapping will be as follows.

metrics:
  # metricName
  process_count_by_user:
    ## metric.Description
    description: "process count by user"
    ## metric.Labels
    labels: ["user"]

2-2. Create the Collect function

This is the part that connects to the DB, executes the registered SQL, and makes it a metric.

The execution results(rows) of each query are displayed as a metric name and values as shown in the figure above.

func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {

    // Connect to database
    db, err := sql.Open("mysql", config.DSN)
    if err != nil {
        log.Errorf("Connect to database failed: %s", err)
        return
    }
    defer db.Close()

    // Execute each queries in metrics
    for name, metric := range config.Metrics {

        // Execute query
        rows, err := db.Query(metric.Query)
        if err != nil {
            log.Errorf("Failed to execute query: %s", err)
            continue
        }

        // Get column info
        cols, err := rows.Columns()
        if err != nil {
            log.Errorf("Failed to get column meta: %s", err)
            continue
        }

        des := make([]interface{}, len(cols))
        res := make([][]byte, len(cols))
        for i := range cols {
            des[i] = &res[i]
        }

        // fetch database
        for rows.Next() {
            rows.Scan(des...)
            data := make(map[string]string)
            for i, bytes := range res {
                data[cols[i]] = string(bytes)
            }

            // Metric labels
            labelVals := []string{}
            for _, label := range metric.Labels {
                labelVals = append(labelVals, data[label])
            }

            // Metric value
            val, _ := strconv.ParseFloat(data[metric.Value], 64)

            // Add metric
            switch strings.ToLower(metric.Type) {
            case "counter":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
            case "gauge":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)
            default:
                log.Errorf("Fail to add metric for %s: %s is not valid type", name, metric.Type)
                continue
            }
        }
    }
}

As you can see from the labelVals value, you need to pass the label values in the order of Labels of the specification defined in Describe earlier. There are two metric types here: counter and gauge. Each type has the following meaning.

  • COUNTER: A value that only increases. In prometheus, the indicator is displayed as a change calculation function such as rate/irate.
ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
  • GAUGE: A type whose value can increase/decrease, such as like car gauge. In general, it is used to save the current metric value as it is, such as process count.
ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)

For the value to be displayed as a metric, the value item specified in the setting is retrieved from the query result.

QueryExporter Source

Here’s the everything that I’ve done so far:

package main

import (
    "database/sql"
    "flag"
    "io/ioutil"
    "net/http"
    "os"
    "strconv"
    "strings"

    "github.com/ghodss/yaml"
    _ "github.com/go-sql-driver/mysql"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/prometheus/common/version"
    log "github.com/sirupsen/logrus"
)

var config Config

const (
    collector = "query_exporter"
)

func main() {
    var err error
    var configFile, bind string
    // =====================
    // Get OS parameter
    // =====================
    flag.StringVar(&configFile, "config", "config.yml", "configuration file")
    flag.StringVar(&bind, "bind", "0.0.0.0:9104", "bind")
    flag.Parse()

    // =====================
    // Load config & yaml
    // =====================
    var b []byte
    if b, err = ioutil.ReadFile(configFile); err != nil {
        log.Errorf("Failed to read config file: %s", err)
        os.Exit(1)
    }

    // Load yaml
    if err := yaml.Unmarshal(b, &config); err != nil {
        log.Errorf("Failed to load config: %s", err)
        os.Exit(1)
    }

    // ========================
    // Regist handler
    // ========================
    log.Infof("Regist version collector - %s", collector)
    prometheus.Register(version.NewCollector(collector))
    prometheus.Register(&QueryCollector{})

    // Regist http handler
    log.Infof("HTTP handler path - %s", "/metrics")
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        h := promhttp.HandlerFor(prometheus.Gatherers{
            prometheus.DefaultGatherer,
        }, promhttp.HandlerOpts{})
        h.ServeHTTP(w, r)
    })

    // start server
    log.Infof("Starting http server - %s", bind)
    if err := http.ListenAndServe(bind, nil); err != nil {
        log.Errorf("Failed to start http server: %s", err)
    }
}

// =============================
// Config config structure
// =============================
type Config struct {
    DSN     string
    Metrics map[string]struct {
        Query       string
        Type        string
        Description string
        Labels      []string
        Value       string
        metricDesc  *prometheus.Desc
    }
}

// =============================
// QueryCollector exporter
// =============================
type QueryCollector struct{}

// Describe prometheus describe
func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
    for metricName, metric := range config.Metrics {
        metric.metricDesc = prometheus.NewDesc(
            prometheus.BuildFQName(collector, "", metricName),
            metric.Description,
            metric.Labels, nil,
        )
        config.Metrics[metricName] = metric
        log.Infof("metric description for \"%s\" registerd", metricName)
    }
}

// Collect prometheus collect
func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {

    // Connect to database
    db, err := sql.Open("mysql", config.DSN)
    if err != nil {
        log.Errorf("Connect to database failed: %s", err)
        return
    }
    defer db.Close()

    // Execute each queries in metrics
    for name, metric := range config.Metrics {

        // Execute query
        rows, err := db.Query(metric.Query)
        if err != nil {
            log.Errorf("Failed to execute query: %s", err)
            continue
        }

        // Get column info
        cols, err := rows.Columns()
        if err != nil {
            log.Errorf("Failed to get column meta: %s", err)
            continue
        }

        des := make([]interface{}, len(cols))
        res := make([][]byte, len(cols))
        for i := range cols {
            des[i] = &res[i]
        }

        // fetch database
        for rows.Next() {
            rows.Scan(des...)
            data := make(map[string]string)
            for i, bytes := range res {
                data[cols[i]] = string(bytes)
            }

            // Metric labels
            labelVals := []string{}
            for _, label := range metric.Labels {
                labelVals = append(labelVals, data[label])
            }

            // Metric value
            val, _ := strconv.ParseFloat(data[metric.Value], 64)

            // Add metric
            switch strings.ToLower(metric.Type) {
            case "counter":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
            case "gauge":
                ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)
            default:
                log.Errorf("Fail to add metric for %s: %s is not valid type", name, metric.Type)
                continue
            }
        }
    }
}

If the package does not exist, run “go mod vendor” to download the necessary packages.

Start the server and check the information collected by the actual exporter.

$ go run .
INFO[0000] Regist version collector - query_exporter
INFO[0000] metric description for "process_count_by_host" registerd
INFO[0000] metric description for "process_count_by_user" registerd
INFO[0000] HTTP handler path - /metrics
INFO[0000] Starting http server - 0.0.0.0:9104

If you run it with curl, you can see that the session count per user/host defined in the settings is displayed.

$ curl 127.0.0.1:9104/metrics
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0
go_gc_duration_seconds{quantile="0.25"} 0

.. skip ..

# HELP query_exporter_build_info A metric with a constant '1' value labeled by version, revision, branch, and goversion from which query_exporter was built.
# TYPE query_exporter_build_info gauge
query_exporter_build_info{branch="",goversion="go1.16.5",revision="",version=""} 1
# HELP query_exporter_process_count_by_host process count by host
# TYPE query_exporter_process_count_by_host gauge
query_exporter_process_count_by_host{host="localhost",user="event_scheduler"} 1
query_exporter_process_count_by_host{host="localhost",user="test"} 1
# HELP query_exporter_process_count_by_user process count by user
# TYPE query_exporter_process_count_by_user gauge
query_exporter_process_count_by_user{user="event_scheduler"} 1
query_exporter_process_count_by_user{user="test"} 1

This is the moment when your own Exporter is created at final!. 🙂

Concluding..

The post was very long. I put the source code in the body several times.. I feel like the amount of text is getting longer.

Anyway, I’ve created my own unique Exporter! I implemented a simple function to simply register a query and extract this result as a metric result, but I think you can add more interesting elements according to your own thoughts as needed.

For reference, the source written above is organized in the following Git.
>> https://github.com/go-gywn/query-exporter-simple

Sometimes, when I need to monitor hundreds and thousands of servers from one monitoring server, it is sometimes useful to manage the collection of metrics. As of yet, only support with MySQL, I personally create another Query Exporter project. I implemented more parallel processing and timeouts in the above project base.
>> https://github.com/go-gywn/query-exporter

It’s always been like that… If there is nothing, just create and it If there is, use it well!

I hope to all you have a nice summer.