aboutsummaryrefslogtreecommitdiff
path: root/contrib/unbound/dnstap
diff options
context:
space:
mode:
authorCy Schubert <cy@FreeBSD.org>2020-08-24 18:14:04 +0000
committerCy Schubert <cy@FreeBSD.org>2020-08-24 18:14:04 +0000
commit25039b37d3883b8fdae50475cbea41a255a08ee2 (patch)
tree107a2df1bfda36ef3220a93fff6212e0c354ed36 /contrib/unbound/dnstap
parent0a5eb308d3006e2f386620f7266b6593b300b987 (diff)
parent7973006f41cdaf144441d1a39f9f075053435e2f (diff)
downloadsrc-25039b37d3883b8fdae50475cbea41a255a08ee2.tar.gz
src-25039b37d3883b8fdae50475cbea41a255a08ee2.zip
MFV 364468:
Update unbound 1.10.1 --> 1.11.0. MFH: 1 month
Notes
Notes: svn path=/head/; revision=364721
Diffstat (limited to 'contrib/unbound/dnstap')
-rw-r--r--contrib/unbound/dnstap/dnstap.c90
-rw-r--r--contrib/unbound/dnstap/dnstap.h22
-rw-r--r--contrib/unbound/dnstap/dnstap.m49
-rw-r--r--contrib/unbound/dnstap/dnstap_fstrm.c236
-rw-r--r--contrib/unbound/dnstap/dnstap_fstrm.h194
-rw-r--r--contrib/unbound/dnstap/dtstream.c2128
-rw-r--r--contrib/unbound/dnstap/dtstream.h341
-rw-r--r--contrib/unbound/dnstap/unbound-dnstap-socket.c1594
8 files changed, 4544 insertions, 70 deletions
diff --git a/contrib/unbound/dnstap/dnstap.c b/contrib/unbound/dnstap/dnstap.c
index aabf8eec9071..cc5449dff4a1 100644
--- a/contrib/unbound/dnstap/dnstap.c
+++ b/contrib/unbound/dnstap/dnstap.c
@@ -49,13 +49,12 @@
#include "util/netevent.h"
#include "util/log.h"
-#include <fstrm.h>
#include <protobuf-c/protobuf-c.h>
#include "dnstap/dnstap.h"
+#include "dnstap/dtstream.h"
#include "dnstap/dnstap.pb-c.h"
-#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
#define DNSTAP_INITIAL_BUF_SIZE 256
struct dt_msg {
@@ -90,13 +89,7 @@ dt_pack(const Dnstap__Dnstap *d, void **buf, size_t *sz)
static void
dt_send(const struct dt_env *env, void *buf, size_t len_buf)
{
- fstrm_res res;
- if (!buf)
- return;
- res = fstrm_iothr_submit(env->iothr, env->ioq, buf, len_buf,
- fstrm_free_wrapper, NULL);
- if (res != fstrm_res_success)
- free(buf);
+ dt_msg_queue_submit(env->msgqueue, buf, len_buf);
}
static void
@@ -135,56 +128,33 @@ check_socket_file(const char* socket_path)
}
struct dt_env *
-dt_create(const char *socket_path, unsigned num_workers)
+dt_create(struct config_file* cfg)
{
-#ifdef UNBOUND_DEBUG
- fstrm_res res;
-#endif
struct dt_env *env;
- struct fstrm_iothr_options *fopt;
- struct fstrm_unix_writer_options *fuwopt;
- struct fstrm_writer *fw;
- struct fstrm_writer_options *fwopt;
- verbose(VERB_OPS, "attempting to connect to dnstap socket %s",
- socket_path);
- log_assert(socket_path != NULL);
- log_assert(num_workers > 0);
- check_socket_file(socket_path);
+ if(cfg->dnstap && cfg->dnstap_socket_path && cfg->dnstap_socket_path[0] &&
+ (cfg->dnstap_ip==NULL || cfg->dnstap_ip[0]==0)) {
+ verbose(VERB_OPS, "attempting to connect to dnstap socket %s",
+ cfg->dnstap_socket_path);
+ check_socket_file(cfg->dnstap_socket_path);
+ }
env = (struct dt_env *) calloc(1, sizeof(struct dt_env));
if (!env)
return NULL;
- fwopt = fstrm_writer_options_init();
-#ifdef UNBOUND_DEBUG
- res =
-#else
- (void)
-#endif
- fstrm_writer_options_add_content_type(fwopt,
- DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
- log_assert(res == fstrm_res_success);
-
- fuwopt = fstrm_unix_writer_options_init();
- fstrm_unix_writer_options_set_socket_path(fuwopt, socket_path);
-
- fw = fstrm_unix_writer_init(fuwopt, fwopt);
- log_assert(fw != NULL);
-
- fopt = fstrm_iothr_options_init();
- fstrm_iothr_options_set_num_input_queues(fopt, num_workers);
- env->iothr = fstrm_iothr_init(fopt, &fw);
- if (env->iothr == NULL) {
- verbose(VERB_DETAIL, "dt_create: fstrm_iothr_init() failed");
- fstrm_writer_destroy(&fw);
+ env->dtio = dt_io_thread_create();
+ if(!env->dtio) {
+ log_err("malloc failure");
free(env);
- env = NULL;
+ return NULL;
}
- fstrm_iothr_options_destroy(&fopt);
- fstrm_unix_writer_options_destroy(&fuwopt);
- fstrm_writer_options_destroy(&fwopt);
-
+ if(!dt_io_thread_apply_cfg(env->dtio, cfg)) {
+ dt_io_thread_delete(env->dtio);
+ free(env);
+ return NULL;
+ }
+ dt_apply_cfg(env, cfg);
return env;
}
@@ -272,19 +242,33 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg)
int
dt_init(struct dt_env *env)
{
- env->ioq = fstrm_iothr_get_input_queue(env->iothr);
- if (env->ioq == NULL)
+ env->msgqueue = dt_msg_queue_create();
+ if(!env->msgqueue) {
+ log_err("malloc failure");
+ return 0;
+ }
+ if(!dt_io_thread_register_queue(env->dtio, env->msgqueue)) {
+ log_err("malloc failure");
+ dt_msg_queue_delete(env->msgqueue);
+ env->msgqueue = NULL;
return 0;
+ }
return 1;
}
void
+dt_deinit(struct dt_env* env)
+{
+ dt_io_thread_unregister_queue(env->dtio, env->msgqueue);
+ dt_msg_queue_delete(env->msgqueue);
+}
+
+void
dt_delete(struct dt_env *env)
{
if (!env)
return;
- verbose(VERB_OPS, "closing dnstap socket");
- fstrm_iothr_destroy(&env->iothr);
+ dt_io_thread_delete(env->dtio);
free(env->identity);
free(env->version);
free(env);
diff --git a/contrib/unbound/dnstap/dnstap.h b/contrib/unbound/dnstap/dnstap.h
index 0103c1c0e201..cfef6fc420b9 100644
--- a/contrib/unbound/dnstap/dnstap.h
+++ b/contrib/unbound/dnstap/dnstap.h
@@ -40,16 +40,16 @@
#ifdef USE_DNSTAP
struct config_file;
-struct fstrm_io;
-struct fstrm_queue;
struct sldns_buffer;
+struct dt_msg_queue;
struct dt_env {
- /** dnstap I/O thread */
- struct fstrm_iothr *iothr;
+ /** the io thread (made by the struct daemon) */
+ struct dt_io_thread* dtio;
- /** dnstap I/O thread input queue */
- struct fstrm_iothr_queue *ioq;
+ /** valid in worker struct, not in daemon struct, the per-worker
+ * message list */
+ struct dt_msg_queue* msgqueue;
/** dnstap "identity" field, NULL if disabled */
char *identity;
@@ -84,12 +84,11 @@ struct dt_env {
* of the structure) to ensure lock-free access to its own per-worker circular
* queue. Duplicate the environment object if more than one worker needs to
* share access to the dnstap I/O socket.
- * @param socket_path: path to dnstap logging socket, must be non-NULL.
- * @param num_workers: number of worker threads, must be > 0.
+ * @param cfg: with config settings.
* @return dt_env object, NULL on failure.
*/
struct dt_env *
-dt_create(const char *socket_path, unsigned num_workers);
+dt_create(struct config_file* cfg);
/**
* Apply config settings.
@@ -108,6 +107,11 @@ int
dt_init(struct dt_env *env);
/**
+ * Deletes the per-worker state created by dt_init
+ */
+void dt_deinit(struct dt_env *env);
+
+/**
* Delete dnstap environment object. Closes dnstap I/O socket and deletes all
* per-worker I/O queues.
*/
diff --git a/contrib/unbound/dnstap/dnstap.m4 b/contrib/unbound/dnstap/dnstap.m4
index 5b78b3e267c3..ba723e0becec 100644
--- a/contrib/unbound/dnstap/dnstap.m4
+++ b/contrib/unbound/dnstap/dnstap.m4
@@ -7,7 +7,7 @@ AC_DEFUN([dt_DNSTAP],
[
AC_ARG_ENABLE([dnstap],
AS_HELP_STRING([--enable-dnstap],
- [Enable dnstap support (requires fstrm, protobuf-c)]),
+ [Enable dnstap support (requires protobuf-c)]),
[opt_dnstap=$enableval], [opt_dnstap=no])
AC_ARG_WITH([dnstap-socket-path],
@@ -40,13 +40,6 @@ AC_DEFUN([dt_DNSTAP],
fi
fi
])
- AC_ARG_WITH([libfstrm], AC_HELP_STRING([--with-libfstrm=path],
- [Path where libfstrm is installed, for dnstap]), [
- CFLAGS="$CFLAGS -I$withval/include"
- LDFLAGS="$LDFLAGS -L$withval/lib"
- ])
- AC_SEARCH_LIBS([fstrm_iothr_init], [fstrm], [],
- AC_MSG_ERROR([The fstrm library was not found. Please install fstrm!]))
AC_SEARCH_LIBS([protobuf_c_message_pack], [protobuf-c], [],
AC_MSG_ERROR([The protobuf-c library was not found. Please install protobuf-c!]))
$2
diff --git a/contrib/unbound/dnstap/dnstap_fstrm.c b/contrib/unbound/dnstap/dnstap_fstrm.c
new file mode 100644
index 000000000000..289e78bdf0f4
--- /dev/null
+++ b/contrib/unbound/dnstap/dnstap_fstrm.c
@@ -0,0 +1,236 @@
+/*
+ * dnstap/dnstap_fstrm.c - Frame Streams protocol for dnstap
+ *
+ * Copyright (c) 2020, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+ * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * \file
+ *
+ * Definitions for the Frame Streams data transport protocol for
+ * dnstap message logs.
+ */
+
+#include "config.h"
+#include "dnstap/dnstap_fstrm.h"
+#include "sldns/sbuffer.h"
+#include "sldns/wire2str.h"
+
+void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* start framestream message:
+ * 4byte 0: control indicator.
+ * 4byte bigendian: length of control frame
+ * 4byte bigendian: type START
+ * 4byte bigendian: option: content-type
+ * 4byte bigendian: length of string
+ * string of content type (dnstap)
+ */
+ n = 4+4+4+4+4+strlen(contenttype);
+ control = malloc(n);
+ if(!control)
+ return NULL;
+ control[0] = 0;
+ control[1] = htonl(4+4+4+strlen(contenttype));
+ control[2] = htonl(FSTRM_CONTROL_FRAME_START);
+ control[3] = htonl(FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE);
+ control[4] = htonl(strlen(contenttype));
+ memmove(&control[5], contenttype, strlen(contenttype));
+ *len = n;
+ return control;
+}
+
+void* fstrm_create_control_frame_stop(size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* stop framestream message:
+ * 4byte 0: control indicator.
+ * 4byte bigendian: length of control frame
+ * 4byte bigendian: type STOP
+ */
+ n = 4+4+4;
+ control = malloc(n);
+ if(!control)
+ return NULL;
+ control[0] = 0;
+ control[1] = htonl(4);
+ control[2] = htonl(FSTRM_CONTROL_FRAME_STOP);
+ *len = n;
+ return control;
+}
+
+void* fstrm_create_control_frame_ready(char* contenttype, size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* start bidirectional stream:
+ * 4 bytes 0 escape
+ * 4 bytes bigendian length of frame
+ * 4 bytes bigendian type READY
+ * 4 bytes bigendian frame option content type
+ * 4 bytes bigendian length of string
+ * string of content type.
+ */
+ /* len includes the escape and framelength */
+ n = 4+4+4+4+4+strlen(contenttype);
+ control = malloc(n);
+ if(!control) {
+ return NULL;
+ }
+ control[0] = 0;
+ control[1] = htonl(4+4+4+strlen(contenttype));
+ control[2] = htonl(FSTRM_CONTROL_FRAME_READY);
+ control[3] = htonl(FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE);
+ control[4] = htonl(strlen(contenttype));
+ memmove(&control[5], contenttype, strlen(contenttype));
+ *len = n;
+ return control;
+}
+
+void* fstrm_create_control_frame_accept(char* contenttype, size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* control frame on reply:
+ * 4 bytes 0 escape
+ * 4 bytes bigendian length of frame
+ * 4 bytes bigendian type ACCEPT
+ * 4 bytes bigendian frame option content type
+ * 4 bytes bigendian length of string
+ * string of content type.
+ */
+ /* len includes the escape and framelength */
+ n = 4+4+4+4+4+strlen(contenttype);
+ control = malloc(n);
+ if(!control) {
+ return NULL;
+ }
+ control[0] = 0;
+ control[1] = htonl(4+4+4+strlen(contenttype));
+ control[2] = htonl(FSTRM_CONTROL_FRAME_ACCEPT);
+ control[3] = htonl(FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE);
+ control[4] = htonl(strlen(contenttype));
+ memmove(&control[5], contenttype, strlen(contenttype));
+ *len = n;
+ return control;
+}
+
+void* fstrm_create_control_frame_finish(size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* control frame on reply:
+ * 4 bytes 0 escape
+ * 4 bytes bigendian length of frame
+ * 4 bytes bigendian type FINISH
+ */
+ /* len includes the escape and framelength */
+ n = 4+4+4;
+ control = malloc(n);
+ if(!control) {
+ return NULL;
+ }
+ control[0] = 0;
+ control[1] = htonl(4);
+ control[2] = htonl(FSTRM_CONTROL_FRAME_FINISH);
+ *len = n;
+ return control;
+}
+
+char* fstrm_describe_control(void* pkt, size_t len)
+{
+ uint32_t frametype = 0;
+ char buf[512];
+ char* str = buf;
+ size_t remain, slen = sizeof(buf);
+ uint8_t* pos;
+
+ buf[0]=0;
+ if(len < 4) {
+ snprintf(buf, sizeof(buf), "malformed control frame, "
+ "too short, len=%u", (unsigned int)len);
+ return strdup(buf);
+ }
+ frametype = sldns_read_uint32(pkt);
+ if(frametype == FSTRM_CONTROL_FRAME_ACCEPT) {
+ (void)sldns_str_print(&str, &slen, "accept");
+ } else if(frametype == FSTRM_CONTROL_FRAME_START) {
+ (void)sldns_str_print(&str, &slen, "start");
+ } else if(frametype == FSTRM_CONTROL_FRAME_STOP) {
+ (void)sldns_str_print(&str, &slen, "stop");
+ } else if(frametype == FSTRM_CONTROL_FRAME_READY) {
+ (void)sldns_str_print(&str, &slen, "ready");
+ } else if(frametype == FSTRM_CONTROL_FRAME_FINISH) {
+ (void)sldns_str_print(&str, &slen, "finish");
+ } else {
+ (void)sldns_str_print(&str, &slen, "type%d", (int)frametype);
+ }
+
+ /* show the content type options */
+ pos = pkt + 4;
+ remain = len - 4;
+ while(remain >= 8) {
+ uint32_t field_type = sldns_read_uint32(pos);
+ uint32_t field_len = sldns_read_uint32(pos+4);
+ if(remain < field_len) {
+ (void)sldns_str_print(&str, &slen, "malformed_field");
+ break;
+ }
+ if(field_type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
+ char tempf[512];
+ (void)sldns_str_print(&str, &slen, " content-type(");
+ if(field_len < sizeof(tempf)-1) {
+ memmove(tempf, pos+8, field_len);
+ tempf[field_len] = 0;
+ (void)sldns_str_print(&str, &slen, "%s", tempf);
+ } else {
+ (void)sldns_str_print(&str, &slen, "<error-too-long>");
+ }
+ (void)sldns_str_print(&str, &slen, ")");
+ } else {
+ (void)sldns_str_print(&str, &slen,
+ " field(type %u, length %u)",
+ (unsigned int)field_type,
+ (unsigned int)field_len);
+ }
+ pos += 8 + field_len;
+ remain -= (8 + field_len);
+ }
+ if(remain > 0)
+ (void)sldns_str_print(&str, &slen, " trailing-bytes"
+ "(length %u)", (unsigned int)remain);
+ return strdup(buf);
+}
diff --git a/contrib/unbound/dnstap/dnstap_fstrm.h b/contrib/unbound/dnstap/dnstap_fstrm.h
new file mode 100644
index 000000000000..8b37d5f9e2d8
--- /dev/null
+++ b/contrib/unbound/dnstap/dnstap_fstrm.h
@@ -0,0 +1,194 @@
+/*
+ * dnstap/dnstap_fstrm.h - Frame Streams protocol for dnstap
+ *
+ * Copyright (c) 2020, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+ * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * \file
+ *
+ * Definitions for the Frame Streams data transport protocol for
+ * dnstap message logs.
+ */
+
+#ifndef DNSTAP_FSTRM_H
+#define DNSTAP_FSTRM_H
+
+/* Frame Streams data transfer protocol encode for DNSTAP messages.
+ * The protocol looks to be specified in the libfstrm library.
+ *
+ * Quick writeup for DNSTAP usage, from reading fstrm/control.h eloquent
+ * comments and fstrm/control.c for some bytesize details (the content type
+ * length).
+ *
+ * The Frame Streams can be unidirectional or bi-directional.
+ * bi-directional streams use control frame types READY, ACCEPT and FINISH.
+ * uni-directional streams use control frame types START and STOP.
+ * unknown control frame types should be ignored by the receiver, they
+ * do not change the data frame encoding.
+ *
+ * bi-directional control frames implement a simple handshake protocol
+ * between sender and receiver.
+ *
+ * The uni-directional control frames have one start and one stop frame,
+ * before and after the data. The start frame can have a content type.
+ * The start and stop frames are not optional.
+ *
+ * data frames are preceded by 4byte length, bigendian.
+ * zero length data frames are not possible, they are an escape that
+ * signals the presence of a control frame.
+ *
+ * a control frame consists of 0 value in 4byte bigendian, this is really
+ * the data frame length, with 0 the escape sequence that indicates one
+ * control frame follows.
+ * Then, 4byte bigendian, length of the control frame message.
+ * Then, the control frame payload (of that length). with in it:
+ * 4byte bigendian, control type (eg. START, STOP, READY, ACCEPT, FINISH).
+ * perhaps nothing more (STOP, FINISH), but for other types maybe
+ * control fields
+ * 4byte bigendian, the control-field-type, currently only content-type.
+ * 4byte bigendian, length of the string for this option.
+ * .. bytes of that string.
+ *
+ * The START type can have only one field. Field max len 256.
+ * control frame max frame length 512 (excludes the 0-escape and control
+ * frame length bytes).
+ *
+ * the bidirectional type of transmission is like this:
+ * client sends READY (with content type included),
+ * client waits for ACCEPT (with content type included),
+ * client sends START (with matched content type from ACCEPT)
+ * .. data frames
+ * client sends STOP.
+ * client waits for FINISH frame.
+ *
+ */
+
+/** max length of Frame Streams content type field string */
+#define FSTRM_CONTENT_TYPE_LENGTH_MAX 256
+/** control frame value to denote the control frame ACCEPT */
+#define FSTRM_CONTROL_FRAME_ACCEPT 0x01
+/** control frame value to denote the control frame START */
+#define FSTRM_CONTROL_FRAME_START 0x02
+/** control frame value to denote the control frame STOP */
+#define FSTRM_CONTROL_FRAME_STOP 0x03
+/** control frame value to denote the control frame READY */
+#define FSTRM_CONTROL_FRAME_READY 0x04
+/** control frame value to denote the control frame FINISH */
+#define FSTRM_CONTROL_FRAME_FINISH 0x05
+/** the constant that denotes the control field type that is the
+ * string for the content type of the stream. */
+#define FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE 0x01
+/** the content type for DNSTAP frame streams */
+#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
+
+/**
+ * This creates an FSTRM control frame of type START.
+ * @param contenttype: a zero delimited string with the content type.
+ * eg. use the constant DNSTAP_CONTENT_TYPE, which is defined as
+ * "protobuf:dnstap.Dnstap", for a dnstap frame stream.
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4 bytes of 0 that indicate
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable (like data frames are), but straight the content of the
+ * buffer, because the lengths are included in the buffer. This is so that
+ * the zero control indicator can be included before the control frame length.
+ */
+void* fstrm_create_control_frame_start(char* contenttype, size_t* len);
+
+/**
+ * This creates an FSTRM control frame of type READY.
+ * @param contenttype: a zero delimited string with the content type.
+ * eg. use the constant DNSTAP_CONTENT_TYPE, which is defined as
+ * "protobuf:dnstap.Dnstap", for a dnstap frame stream.
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4 bytes of 0 that indicate
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable (like data frames are), but straight the content of the
+ * buffer, because the lengths are included in the buffer. This is so that
+ * the zero control indicator can be included before the control frame length.
+ */
+void* fstrm_create_control_frame_ready(char* contenttype, size_t* len);
+
+/**
+ * This creates an FSTRM control frame of type STOP.
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4 bytes of 0 that indicate
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable (like data frames are), but straight the content of the
+ * buffer, because the lengths are included in the buffer. This is so that
+ * the zero control indicator can be included before the control frame length.
+ */
+void* fstrm_create_control_frame_stop(size_t* len);
+
+/**
+ * This creates an FSTRM control frame of type ACCEPT.
+ * @param contenttype: a zero delimited string with the content type.
+ * for dnstap streams use DNSTAP_CONTENT_TYPE.
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4 bytes of 0 that indicate
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable (like data frames are), but straight the content of the
+ * buffer, because the lengths are included in the buffer. This is so that
+ * the zero control indicator can be included before the control frame length.
+ */
+void* fstrm_create_control_frame_accept(char* contenttype, size_t* len);
+
+/**
+ * This creates an FSTRM control frame of type FINISH.
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4 bytes of 0 that indicate
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable (like data frames are), but straight the content of the
+ * buffer, because the lengths are included in the buffer. This is so that
+ * the zero control indicator can be included before the control frame length.
+ */
+void* fstrm_create_control_frame_finish(size_t* len);
+
+/**
+ * Return string that describes a control packet. For debug, logs.
+ * Like 'start content-type(protobuf:dnstap.Dnstap)' or 'stop'.
+ * @param pkt: the packet data, that is the data after the 4 zero start
+ * bytes and 4 length bytes.
+ * @param len: the length of the control packet data, in pkt. This is the
+ * ntohl of the 4 bytes length preceding the data.
+ * @return zero delimited string, malloced. Or NULL on malloc failure.
+ */
+char* fstrm_describe_control(void* pkt, size_t len);
+
+#endif /* DNSTAP_FSTRM_H */
diff --git a/contrib/unbound/dnstap/dtstream.c b/contrib/unbound/dnstap/dtstream.c
new file mode 100644
index 000000000000..dda3ef1ff485
--- /dev/null
+++ b/contrib/unbound/dnstap/dtstream.c
@@ -0,0 +1,2128 @@
+/*
+ * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
+ *
+ * Copyright (c) 2020, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+ * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * \file
+ *
+ * An implementation of the Frame Streams data transport protocol for
+ * the Unbound DNSTAP message logging facility.
+ */
+
+#include "config.h"
+#include "dnstap/dtstream.h"
+#include "dnstap/dnstap_fstrm.h"
+#include "util/config_file.h"
+#include "util/ub_event.h"
+#include "util/net_help.h"
+#include "services/outside_network.h"
+#include "sldns/sbuffer.h"
+#ifdef HAVE_SYS_UN_H
+#include <sys/un.h>
+#endif
+#include <fcntl.h>
+#ifdef HAVE_OPENSSL_SSL_H
+#include <openssl/ssl.h>
+#endif
+#ifdef HAVE_OPENSSL_ERR_H
+#include <openssl/err.h>
+#endif
+
+/** number of messages to process in one output callback */
+#define DTIO_MESSAGES_PER_CALLBACK 100
+/** the msec to wait for reconnect (if not immediate, the first attempt) */
+#define DTIO_RECONNECT_TIMEOUT_MIN 10
+/** the msec to wait for reconnect max after backoff */
+#define DTIO_RECONNECT_TIMEOUT_MAX 1000
+/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
+#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
+
+/** maximum length of received frame */
+#define DTIO_RECV_FRAME_MAX_LEN 1000
+
+struct stop_flush_info;
+/** DTIO command channel commands */
+enum {
+ /** DTIO command channel stop */
+ DTIO_COMMAND_STOP = 0,
+ /** DTIO command channel wakeup */
+ DTIO_COMMAND_WAKEUP = 1
+} dtio_channel_command;
+
+/** open the output channel */
+static void dtio_open_output(struct dt_io_thread* dtio);
+/** add output event for read and write */
+static int dtio_add_output_event_write(struct dt_io_thread* dtio);
+/** start reconnection attempts */
+static void dtio_reconnect_enable(struct dt_io_thread* dtio);
+/** stop from stop_flush event loop */
+static void dtio_stop_flush_exit(struct stop_flush_info* info);
+/** setup a start control message */
+static int dtio_control_start_send(struct dt_io_thread* dtio);
+#ifdef HAVE_SSL
+/** enable briefly waiting for a read event, for SSL negotiation */
+static int dtio_enable_brief_read(struct dt_io_thread* dtio);
+/** enable briefly waiting for a write event, for SSL negotiation */
+static int dtio_enable_brief_write(struct dt_io_thread* dtio);
+#endif
+
+struct dt_msg_queue*
+dt_msg_queue_create(void)
+{
+ struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
+ if(!mq) return NULL;
+ mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
+ about 1 M should contain 64K messages with some overhead,
+ or a whole bunch smaller ones */
+ lock_basic_init(&mq->lock);
+ lock_protect(&mq->lock, mq, sizeof(*mq));
+ return mq;
+}
+
+/** clear the message list, caller must hold the lock */
+static void
+dt_msg_queue_clear(struct dt_msg_queue* mq)
+{
+ struct dt_msg_entry* e = mq->first, *next=NULL;
+ while(e) {
+ next = e->next;
+ free(e->buf);
+ free(e);
+ e = next;
+ }
+ mq->first = NULL;
+ mq->last = NULL;
+ mq->cursize = 0;
+}
+
+void
+dt_msg_queue_delete(struct dt_msg_queue* mq)
+{
+ if(!mq) return;
+ lock_basic_destroy(&mq->lock);
+ dt_msg_queue_clear(mq);
+ free(mq);
+}
+
+/** make the dtio wake up by sending a wakeup command */
+static void dtio_wakeup(struct dt_io_thread* dtio)
+{
+ uint8_t cmd = DTIO_COMMAND_WAKEUP;
+ if(!dtio) return;
+ if(!dtio->started) return;
+
+ while(1) {
+ ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ continue;
+ log_err("dnstap io wakeup: write: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ continue;
+ if(WSAGetLastError() == WSAEWOULDBLOCK)
+ continue;
+ log_err("dnstap io stop: write: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ break;
+ }
+ break;
+ }
+}
+
+void
+dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
+{
+ int wakeup = 0;
+ struct dt_msg_entry* entry;
+
+ /* check conditions */
+ if(!buf) return;
+ if(len == 0) {
+ /* it is not possible to log entries with zero length,
+ * because the framestream protocol does not carry it.
+ * However the protobuf serialization does not create zero
+ * length datagrams for dnstap, so this should not happen. */
+ free(buf);
+ return;
+ }
+ if(!mq) {
+ free(buf);
+ return;
+ }
+
+ /* allocate memory for queue entry */
+ entry = malloc(sizeof(*entry));
+ if(!entry) {
+ log_err("out of memory logging dnstap");
+ free(buf);
+ return;
+ }
+ entry->next = NULL;
+ entry->buf = buf;
+ entry->len = len;
+
+ /* aqcuire lock */
+ lock_basic_lock(&mq->lock);
+ /* list was empty, wakeup dtio */
+ if(mq->first == NULL)
+ wakeup = 1;
+ /* see if it is going to fit */
+ if(mq->cursize + len > mq->maxsize) {
+ /* buffer full, or congested. */
+ /* drop */
+ lock_basic_unlock(&mq->lock);
+ free(buf);
+ free(entry);
+ return;
+ }
+ mq->cursize += len;
+ /* append to list */
+ if(mq->last) {
+ mq->last->next = entry;
+ } else {
+ mq->first = entry;
+ }
+ mq->last = entry;
+ /* release lock */
+ lock_basic_unlock(&mq->lock);
+
+ if(wakeup)
+ dtio_wakeup(mq->dtio);
+}
+
+struct dt_io_thread* dt_io_thread_create(void)
+{
+ struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
+ return dtio;
+}
+
+void dt_io_thread_delete(struct dt_io_thread* dtio)
+{
+ struct dt_io_list_item* item, *nextitem;
+ if(!dtio) return;
+ item=dtio->io_list;
+ while(item) {
+ nextitem = item->next;
+ free(item);
+ item = nextitem;
+ }
+ free(dtio->socket_path);
+ free(dtio->ip_str);
+ free(dtio->tls_server_name);
+ free(dtio->client_key_file);
+ free(dtio->client_cert_file);
+ if(dtio->ssl_ctx) {
+#ifdef HAVE_SSL
+ SSL_CTX_free(dtio->ssl_ctx);
+#endif
+ }
+ free(dtio);
+}
+
+int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
+{
+ if(!cfg->dnstap) {
+ log_warn("cannot setup dnstap because dnstap-enable is no");
+ return 0;
+ }
+
+ /* what type of connectivity do we have */
+ if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
+ if(cfg->dnstap_tls)
+ dtio->upstream_is_tls = 1;
+ else dtio->upstream_is_tcp = 1;
+ } else {
+ dtio->upstream_is_unix = 1;
+ }
+ dtio->is_bidirectional = cfg->dnstap_bidirectional;
+
+ if(dtio->upstream_is_unix) {
+ if(!cfg->dnstap_socket_path ||
+ cfg->dnstap_socket_path[0]==0) {
+ log_err("dnstap setup: no dnstap-socket-path for "
+ "socket connect");
+ return 0;
+ }
+ free(dtio->socket_path);
+ dtio->socket_path = strdup(cfg->dnstap_socket_path);
+ if(!dtio->socket_path) {
+ log_err("dnstap setup: malloc failure");
+ return 0;
+ }
+ }
+
+ if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
+ if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
+ log_err("dnstap setup: no dnstap-ip for TCP connect");
+ return 0;
+ }
+ free(dtio->ip_str);
+ dtio->ip_str = strdup(cfg->dnstap_ip);
+ if(!dtio->ip_str) {
+ log_err("dnstap setup: malloc failure");
+ return 0;
+ }
+ }
+
+ if(dtio->upstream_is_tls) {
+#ifdef HAVE_SSL
+ if(cfg->dnstap_tls_server_name &&
+ cfg->dnstap_tls_server_name[0]) {
+ free(dtio->tls_server_name);
+ dtio->tls_server_name = strdup(
+ cfg->dnstap_tls_server_name);
+ if(!dtio->tls_server_name) {
+ log_err("dnstap setup: malloc failure");
+ return 0;
+ }
+ if(!check_auth_name_for_ssl(dtio->tls_server_name))
+ return 0;
+ }
+ if(cfg->dnstap_tls_client_key_file &&
+ cfg->dnstap_tls_client_key_file[0]) {
+ dtio->use_client_certs = 1;
+ free(dtio->client_key_file);
+ dtio->client_key_file = strdup(
+ cfg->dnstap_tls_client_key_file);
+ if(!dtio->client_key_file) {
+ log_err("dnstap setup: malloc failure");
+ return 0;
+ }
+ if(!cfg->dnstap_tls_client_cert_file ||
+ cfg->dnstap_tls_client_cert_file[0]==0) {
+ log_err("dnstap setup: client key "
+ "authentication enabled with "
+ "dnstap-tls-client-key-file, but "
+ "no dnstap-tls-client-cert-file "
+ "is given");
+ return 0;
+ }
+ free(dtio->client_cert_file);
+ dtio->client_cert_file = strdup(
+ cfg->dnstap_tls_client_cert_file);
+ if(!dtio->client_cert_file) {
+ log_err("dnstap setup: malloc failure");
+ return 0;
+ }
+ } else {
+ dtio->use_client_certs = 0;
+ dtio->client_key_file = NULL;
+ dtio->client_cert_file = NULL;
+ }
+
+ if(cfg->dnstap_tls_cert_bundle) {
+ dtio->ssl_ctx = connect_sslctx_create(
+ dtio->client_key_file,
+ dtio->client_cert_file,
+ cfg->dnstap_tls_cert_bundle, 0);
+ } else {
+ dtio->ssl_ctx = connect_sslctx_create(
+ dtio->client_key_file,
+ dtio->client_cert_file,
+ cfg->tls_cert_bundle, cfg->tls_win_cert);
+ }
+ if(!dtio->ssl_ctx) {
+ log_err("could not setup SSL CTX");
+ return 0;
+ }
+ dtio->tls_use_sni = cfg->tls_use_sni;
+#endif /* HAVE_SSL */
+ }
+ return 1;
+}
+
+int dt_io_thread_register_queue(struct dt_io_thread* dtio,
+ struct dt_msg_queue* mq)
+{
+ struct dt_io_list_item* item = malloc(sizeof(*item));
+ if(!item) return 0;
+ lock_basic_lock(&mq->lock);
+ mq->dtio = dtio;
+ lock_basic_unlock(&mq->lock);
+ item->queue = mq;
+ item->next = dtio->io_list;
+ dtio->io_list = item;
+ dtio->io_list_iter = NULL;
+ return 1;
+}
+
+void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
+ struct dt_msg_queue* mq)
+{
+ struct dt_io_list_item* item, *prev=NULL;
+ if(!dtio) return;
+ item = dtio->io_list;
+ while(item) {
+ if(item->queue == mq) {
+ /* found it */
+ if(prev) prev->next = item->next;
+ else dtio->io_list = item->next;
+ /* the queue itself only registered, not deleted */
+ lock_basic_lock(&item->queue->lock);
+ item->queue->dtio = NULL;
+ lock_basic_unlock(&item->queue->lock);
+ free(item);
+ dtio->io_list_iter = NULL;
+ return;
+ }
+ prev = item;
+ item = item->next;
+ }
+}
+
+/** pick a message from the queue, the routine locks and unlocks,
+ * returns true if there is a message */
+static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
+ size_t* len)
+{
+ lock_basic_lock(&mq->lock);
+ if(mq->first) {
+ struct dt_msg_entry* entry = mq->first;
+ mq->first = entry->next;
+ if(!entry->next) mq->last = NULL;
+ mq->cursize -= entry->len;
+ lock_basic_unlock(&mq->lock);
+
+ *buf = entry->buf;
+ *len = entry->len;
+ free(entry);
+ return 1;
+ }
+ lock_basic_unlock(&mq->lock);
+ return 0;
+}
+
+/** find message in queue, false if no message, true if message to send */
+static int dtio_find_in_queue(struct dt_io_thread* dtio,
+ struct dt_msg_queue* mq)
+{
+ void* buf=NULL;
+ size_t len=0;
+ if(dt_msg_queue_pop(mq, &buf, &len)) {
+ dtio->cur_msg = buf;
+ dtio->cur_msg_len = len;
+ dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 0;
+ return 1;
+ }
+ return 0;
+}
+
+/** find a new message to write, search message queues, false if none */
+static int dtio_find_msg(struct dt_io_thread* dtio)
+{
+ struct dt_io_list_item *spot, *item;
+
+ spot = dtio->io_list_iter;
+ /* use the next queue for the next message lookup,
+ * if we hit the end(NULL) the NULL restarts the iter at start. */
+ if(spot)
+ dtio->io_list_iter = spot->next;
+ else if(dtio->io_list)
+ dtio->io_list_iter = dtio->io_list->next;
+
+ /* scan from spot to end-of-io_list */
+ item = spot;
+ while(item) {
+ if(dtio_find_in_queue(dtio, item->queue))
+ return 1;
+ item = item->next;
+ }
+ /* scan starting at the start-of-list (to wrap around the end) */
+ item = dtio->io_list;
+ while(item) {
+ if(dtio_find_in_queue(dtio, item->queue))
+ return 1;
+ item = item->next;
+ }
+ return 0;
+}
+
+/** callback for the dnstap reconnect, to start reconnecting to output */
+void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
+ short ATTR_UNUSED(bits), void* arg)
+{
+ struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+ dtio->reconnect_is_added = 0;
+ verbose(VERB_ALGO, "dnstap io: reconnect timer");
+
+ dtio_open_output(dtio);
+ if(dtio->event) {
+ if(!dtio_add_output_event_write(dtio))
+ return;
+ /* nothing wrong so far, wait on the output event */
+ return;
+ }
+ /* exponential backoff and retry on timer */
+ dtio_reconnect_enable(dtio);
+}
+
+/** attempt to reconnect to the output, after a timeout */
+static void dtio_reconnect_enable(struct dt_io_thread* dtio)
+{
+ struct timeval tv;
+ int msec;
+ if(dtio->want_to_exit) return;
+ if(dtio->reconnect_is_added)
+ return; /* already done */
+
+ /* exponential backoff, store the value for next timeout */
+ msec = dtio->reconnect_timeout;
+ if(msec == 0) {
+ dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
+ } else {
+ dtio->reconnect_timeout = msec*2;
+ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
+ dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
+ }
+ verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
+ msec);
+
+ /* setup wait timer */
+ memset(&tv, 0, sizeof(tv));
+ tv.tv_sec = msec/1000;
+ tv.tv_usec = (msec%1000)*1000;
+ if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
+ &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
+ log_err("dnstap io: could not reconnect ev timer add");
+ return;
+ }
+ dtio->reconnect_is_added = 1;
+}
+
+/** remove dtio reconnect timer */
+static void dtio_reconnect_del(struct dt_io_thread* dtio)
+{
+ if(!dtio->reconnect_is_added)
+ return;
+ ub_timer_del(dtio->reconnect_timer);
+ dtio->reconnect_is_added = 0;
+}
+
+/** clear the reconnect exponential backoff timer.
+ * We have successfully connected so we can try again with short timeouts. */
+static void dtio_reconnect_clear(struct dt_io_thread* dtio)
+{
+ dtio->reconnect_timeout = 0;
+ dtio_reconnect_del(dtio);
+}
+
+/** reconnect slowly, because we already know we have to wait for a bit */
+static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
+{
+ dtio_reconnect_del(dtio);
+ dtio->reconnect_timeout = msec;
+ dtio_reconnect_enable(dtio);
+}
+
+/** delete the current message in the dtio, and reset counters */
+static void dtio_cur_msg_free(struct dt_io_thread* dtio)
+{
+ free(dtio->cur_msg);
+ dtio->cur_msg = NULL;
+ dtio->cur_msg_len = 0;
+ dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 0;
+}
+
+/** delete the buffer and counters used to read frame */
+static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
+{
+ if(rb->buf) {
+ free(rb->buf);
+ rb->buf = NULL;
+ }
+ rb->buf_count = 0;
+ rb->buf_cap = 0;
+ rb->frame_len = 0;
+ rb->frame_len_done = 0;
+ rb->control_frame = 0;
+}
+
+/** del the output file descriptor event for listening */
+static void dtio_del_output_event(struct dt_io_thread* dtio)
+{
+ if(!dtio->event_added)
+ return;
+ ub_event_del(dtio->event);
+ dtio->event_added = 0;
+ dtio->event_added_is_write = 0;
+}
+
+/** close dtio socket and set it to -1 */
+static void dtio_close_fd(struct dt_io_thread* dtio)
+{
+#ifndef USE_WINSOCK
+ close(dtio->fd);
+#else
+ closesocket(dtio->fd);
+#endif
+ dtio->fd = -1;
+}
+
+/** close and stop the output file descriptor event */
+static void dtio_close_output(struct dt_io_thread* dtio)
+{
+ if(!dtio->event)
+ return;
+ ub_event_free(dtio->event);
+ dtio->event = NULL;
+ if(dtio->ssl) {
+#ifdef HAVE_SSL
+ SSL_shutdown(dtio->ssl);
+ SSL_free(dtio->ssl);
+ dtio->ssl = NULL;
+#endif
+ }
+ dtio_close_fd(dtio);
+
+ /* if there is a (partial) message, discard it
+ * we cannot send (the remainder of) it, and a new
+ * connection needs to start with a control frame. */
+ if(dtio->cur_msg) {
+ dtio_cur_msg_free(dtio);
+ }
+
+ dtio->ready_frame_sent = 0;
+ dtio->accept_frame_received = 0;
+ dtio_read_frame_free(&dtio->read_frame);
+
+ dtio_reconnect_enable(dtio);
+}
+
+/** check for pending nonblocking connect errors,
+ * returns 1 if it is okay. -1 on error (close it), 0 to try later */
+static int dtio_check_nb_connect(struct dt_io_thread* dtio)
+{
+ int error = 0;
+ socklen_t len = (socklen_t)sizeof(error);
+ if(!dtio->check_nb_connect)
+ return 1; /* everything okay */
+ if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
+ &len) < 0) {
+#ifndef USE_WINSOCK
+ error = errno; /* on solaris errno is error */
+#else
+ error = WSAGetLastError();
+#endif
+ }
+#ifndef USE_WINSOCK
+#if defined(EINPROGRESS) && defined(EWOULDBLOCK)
+ if(error == EINPROGRESS || error == EWOULDBLOCK)
+ return 0; /* try again later */
+#endif
+#else
+ if(error == WSAEINPROGRESS) {
+ return 0; /* try again later */
+ } else if(error == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
+ dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
+ return 0; /* try again later */
+ }
+#endif
+ if(error != 0) {
+ char* to = dtio->socket_path;
+ if(!to) to = dtio->ip_str;
+ if(!to) to = "";
+#ifndef USE_WINSOCK
+ log_err("dnstap io: failed to connect to \"%s\": %s",
+ to, strerror(error));
+#else
+ log_err("dnstap io: failed to connect to \"%s\": %s",
+ to, wsa_strerror(error));
+#endif
+ return -1; /* error, close it */
+ }
+
+ if(dtio->ip_str)
+ verbose(VERB_DETAIL, "dnstap io: connected to %s",
+ dtio->ip_str);
+ else if(dtio->socket_path)
+ verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
+ dtio->socket_path);
+ dtio_reconnect_clear(dtio);
+ dtio->check_nb_connect = 0;
+ return 1; /* everything okay */
+}
+
+#ifdef HAVE_SSL
+/** write to ssl output
+ * returns number of bytes written, 0 if nothing happened,
+ * try again later, or -1 if the channel is to be closed. */
+static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
+ size_t len)
+{
+ int r;
+ ERR_clear_error();
+ r = SSL_write(dtio->ssl, buf, len);
+ if(r <= 0) {
+ int want = SSL_get_error(dtio->ssl, r);
+ if(want == SSL_ERROR_ZERO_RETURN) {
+ /* closed */
+ return -1;
+ } else if(want == SSL_ERROR_WANT_READ) {
+ /* we want a brief read event */
+ dtio_enable_brief_read(dtio);
+ return 0;
+ } else if(want == SSL_ERROR_WANT_WRITE) {
+ /* write again later */
+ return 0;
+ } else if(want == SSL_ERROR_SYSCALL) {
+#ifdef EPIPE
+ if(errno == EPIPE && verbosity < 2)
+ return -1; /* silence 'broken pipe' */
+#endif
+#ifdef ECONNRESET
+ if(errno == ECONNRESET && verbosity < 2)
+ return -1; /* silence reset by peer */
+#endif
+ if(errno != 0) {
+ log_err("dnstap io, SSL_write syscall: %s",
+ strerror(errno));
+ }
+ return -1;
+ }
+ log_crypto_err("dnstap io, could not SSL_write");
+ return -1;
+ }
+ return r;
+}
+#endif /* HAVE_SSL */
+
+/** write buffer to output.
+ * returns number of bytes written, 0 if nothing happened,
+ * try again later, or -1 if the channel is to be closed. */
+static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
+ size_t len)
+{
+ ssize_t ret;
+ if(dtio->fd == -1)
+ return -1;
+#ifdef HAVE_SSL
+ if(dtio->ssl)
+ return dtio_write_ssl(dtio, buf, len);
+#endif
+ ret = send(dtio->fd, (void*)buf, len, 0);
+ if(ret == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return 0;
+ log_err("dnstap io: failed send: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return 0;
+ if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
+ dtio->stop_flush_event:dtio->event),
+ UB_EV_WRITE);
+ return 0;
+ }
+ log_err("dnstap io: failed send: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return -1;
+ }
+ return ret;
+}
+
+#ifdef HAVE_WRITEV
+/** write with writev, len and message, in one write, if possible.
+ * return true if message is done, false if incomplete */
+static int dtio_write_with_writev(struct dt_io_thread* dtio)
+{
+ uint32_t sendlen = htonl(dtio->cur_msg_len);
+ struct iovec iov[2];
+ ssize_t r;
+ iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
+ iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
+ iov[1].iov_base = dtio->cur_msg;
+ iov[1].iov_len = dtio->cur_msg_len;
+ log_assert(iov[0].iov_len > 0);
+ r = writev(dtio->fd, iov, 2);
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return 0;
+ log_err("dnstap io: failed writev: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return 0;
+ if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
+ dtio->stop_flush_event:dtio->event),
+ UB_EV_WRITE);
+ return 0;
+ }
+ log_err("dnstap io: failed writev: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+ }
+ /* written r bytes */
+ dtio->cur_msg_len_done += r;
+ if(dtio->cur_msg_len_done < 4)
+ return 0;
+ if(dtio->cur_msg_len_done > 4) {
+ dtio->cur_msg_done = dtio->cur_msg_len_done-4;
+ dtio->cur_msg_len_done = 4;
+ }
+ if(dtio->cur_msg_done < dtio->cur_msg_len)
+ return 0;
+ return 1;
+}
+#endif /* HAVE_WRITEV */
+
+/** write more of the length, preceding the data frame.
+ * return true if message is done, false if incomplete. */
+static int dtio_write_more_of_len(struct dt_io_thread* dtio)
+{
+ uint32_t sendlen;
+ int r;
+ if(dtio->cur_msg_len_done >= 4)
+ return 1;
+#ifdef HAVE_WRITEV
+ if(!dtio->ssl) {
+ /* we try writev for everything.*/
+ return dtio_write_with_writev(dtio);
+ }
+#endif /* HAVE_WRITEV */
+ sendlen = htonl(dtio->cur_msg_len);
+ r = dtio_write_buf(dtio,
+ ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
+ sizeof(sendlen)-dtio->cur_msg_len_done);
+ if(r == -1) {
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+ } else if(r == 0) {
+ /* try again later */
+ return 0;
+ }
+ dtio->cur_msg_len_done += r;
+ if(dtio->cur_msg_len_done < 4)
+ return 0;
+ return 1;
+}
+
+/** write more of the data frame.
+ * return true if message is done, false if incomplete. */
+static int dtio_write_more_of_data(struct dt_io_thread* dtio)
+{
+ int r;
+ if(dtio->cur_msg_done >= dtio->cur_msg_len)
+ return 1;
+ r = dtio_write_buf(dtio,
+ ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
+ dtio->cur_msg_len - dtio->cur_msg_done);
+ if(r == -1) {
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+ } else if(r == 0) {
+ /* try again later */
+ return 0;
+ }
+ dtio->cur_msg_done += r;
+ if(dtio->cur_msg_done < dtio->cur_msg_len)
+ return 0;
+ return 1;
+}
+
+/** write more of the current messsage. false if incomplete, true if
+ * the message is done */
+static int dtio_write_more(struct dt_io_thread* dtio)
+{
+ if(dtio->cur_msg_len_done < 4) {
+ if(!dtio_write_more_of_len(dtio))
+ return 0;
+ }
+ if(dtio->cur_msg_done < dtio->cur_msg_len) {
+ if(!dtio_write_more_of_data(dtio))
+ return 0;
+ }
+ return 1;
+}
+
+/** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
+ * -1: continue, >0: number of bytes read into buffer */
+static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
+ ssize_t r;
+ r = recv(dtio->fd, (void*)buf, len, 0);
+ if(r == -1) {
+ char* to = dtio->socket_path;
+ if(!to) to = dtio->ip_str;
+ if(!to) to = "";
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return -1; /* try later */
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS) {
+ return -1; /* try later */
+ } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock(
+ (dtio->stop_flush_event?
+ dtio->stop_flush_event:dtio->event),
+ UB_EV_READ);
+ return -1; /* try later */
+ }
+#endif
+ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
+ verbosity < 4)
+ return 0; /* no log retries on low verbosity */
+ log_err("dnstap io: output closed, recv %s: %s", to,
+ strerror(errno));
+ /* and close below */
+ return 0;
+ }
+ if(r == 0) {
+ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
+ verbosity < 4)
+ return 0; /* no log retries on low verbosity */
+ verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
+ /* and close below */
+ return 0;
+ }
+ /* something was received */
+ return r;
+}
+
+#ifdef HAVE_SSL
+/** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
+ * -1: continue, >0: number of bytes read into buffer */
+static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
+{
+ int r;
+ ERR_clear_error();
+ r = SSL_read(dtio->ssl, buf, len);
+ if(r <= 0) {
+ int want = SSL_get_error(dtio->ssl, r);
+ if(want == SSL_ERROR_ZERO_RETURN) {
+ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
+ verbosity < 4)
+ return 0; /* no log retries on low verbosity */
+ verbose(VERB_DETAIL, "dnstap io: output closed by the "
+ "other side");
+ return 0;
+ } else if(want == SSL_ERROR_WANT_READ) {
+ /* continue later */
+ return -1;
+ } else if(want == SSL_ERROR_WANT_WRITE) {
+ (void)dtio_enable_brief_write(dtio);
+ return -1;
+ } else if(want == SSL_ERROR_SYSCALL) {
+#ifdef ECONNRESET
+ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
+ errno == ECONNRESET && verbosity < 4)
+ return 0; /* silence reset by peer */
+#endif
+ if(errno != 0)
+ log_err("SSL_read syscall: %s",
+ strerror(errno));
+ verbose(VERB_DETAIL, "dnstap io: output closed by the "
+ "other side");
+ return 0;
+ }
+ log_crypto_err("could not SSL_read");
+ verbose(VERB_DETAIL, "dnstap io: output closed by the "
+ "other side");
+ return 0;
+ }
+ return r;
+}
+#endif /* HAVE_SSL */
+
+/** check if the output fd has been closed,
+ * it returns false if the stream is closed. */
+static int dtio_check_close(struct dt_io_thread* dtio)
+{
+ /* we don't want to read any packets, but if there are we can
+ * discard the input (ignore it). Ignore of unknown (control)
+ * packets is okay for the framestream protocol. And also, the
+ * read call can return that the stream has been closed by the
+ * other side. */
+ uint8_t buf[1024];
+ int r = -1;
+
+
+ if(dtio->fd == -1) return 0;
+
+ while(r != 0) {
+ /* not interested in buffer content, overwrite */
+ r = receive_bytes(dtio, (void*)buf, sizeof(buf));
+ if(r == -1)
+ return 1;
+ }
+ /* the other end has been closed */
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+}
+
+/** Read accept frame. Returns -1: continue reading, 0: closed,
+ * 1: valid accept received. */
+static int dtio_read_accept_frame(struct dt_io_thread* dtio)
+{
+ int r;
+ size_t read_frame_done;
+ while(dtio->read_frame.frame_len_done < 4) {
+#ifdef HAVE_SSL
+ if(dtio->ssl) {
+ r = ssl_read_bytes(dtio,
+ (uint8_t*)&dtio->read_frame.frame_len+
+ dtio->read_frame.frame_len_done,
+ 4-dtio->read_frame.frame_len_done);
+ } else {
+#endif
+ r = receive_bytes(dtio,
+ (uint8_t*)&dtio->read_frame.frame_len+
+ dtio->read_frame.frame_len_done,
+ 4-dtio->read_frame.frame_len_done);
+#ifdef HAVE_SSL
+ }
+#endif
+ if(r == -1)
+ return -1; /* continue reading */
+ if(r == 0) {
+ /* connection closed */
+ goto close_connection;
+ }
+ dtio->read_frame.frame_len_done += r;
+ if(dtio->read_frame.frame_len_done < 4)
+ return -1; /* continue reading */
+
+ if(dtio->read_frame.frame_len == 0) {
+ dtio->read_frame.frame_len_done = 0;
+ dtio->read_frame.control_frame = 1;
+ continue;
+ }
+ dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
+ if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
+ verbose(VERB_OPS, "dnstap: received frame exceeds max "
+ "length of %d bytes, closing connection",
+ DTIO_RECV_FRAME_MAX_LEN);
+ goto close_connection;
+ }
+ dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
+ dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
+ if(!dtio->read_frame.buf) {
+ log_err("dnstap io: out of memory (creating read "
+ "buffer)");
+ goto close_connection;
+ }
+ }
+ if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
+#ifdef HAVE_SSL
+ if(dtio->ssl) {
+ r = ssl_read_bytes(dtio, dtio->read_frame.buf+
+ dtio->read_frame.buf_count,
+ dtio->read_frame.buf_cap-
+ dtio->read_frame.buf_count);
+ } else {
+#endif
+ r = receive_bytes(dtio, dtio->read_frame.buf+
+ dtio->read_frame.buf_count,
+ dtio->read_frame.buf_cap-
+ dtio->read_frame.buf_count);
+#ifdef HAVE_SSL
+ }
+#endif
+ if(r == -1)
+ return -1; /* continue reading */
+ if(r == 0) {
+ /* connection closed */
+ goto close_connection;
+ }
+ dtio->read_frame.buf_count += r;
+ if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
+ return -1; /* continue reading */
+ }
+
+ /* Complete frame received, check if this is a valid ACCEPT control
+ * frame. */
+ if(dtio->read_frame.frame_len < 4) {
+ verbose(VERB_OPS, "dnstap: invalid data received");
+ goto close_connection;
+ }
+ if(sldns_read_uint32(dtio->read_frame.buf) !=
+ FSTRM_CONTROL_FRAME_ACCEPT) {
+ verbose(VERB_ALGO, "dnstap: invalid control type received, "
+ "ignored");
+ dtio->ready_frame_sent = 0;
+ dtio->accept_frame_received = 0;
+ dtio_read_frame_free(&dtio->read_frame);
+ return -1;
+ }
+ read_frame_done = 4; /* control frame type */
+
+ /* Iterate over control fields, ignore unknown types.
+ * Need to be able to read at least 8 bytes (control field type +
+ * length). */
+ while(read_frame_done+8 < dtio->read_frame.frame_len) {
+ uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
+ read_frame_done);
+ uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
+ read_frame_done + 4);
+ if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
+ if(len == strlen(DNSTAP_CONTENT_TYPE) &&
+ read_frame_done+8+len <=
+ dtio->read_frame.frame_len &&
+ memcmp(dtio->read_frame.buf + read_frame_done +
+ + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
+ if(!dtio_control_start_send(dtio)) {
+ verbose(VERB_OPS, "dnstap io: out of "
+ "memory while sending START frame");
+ goto close_connection;
+ }
+ dtio->accept_frame_received = 1;
+ return 1;
+ } else {
+ /* unknow content type */
+ verbose(VERB_ALGO, "dnstap: ACCEPT frame "
+ "contains unknown content type, "
+ "closing connection");
+ goto close_connection;
+ }
+ }
+ /* unknown option, try next */
+ read_frame_done += 8+len;
+ }
+
+
+close_connection:
+ dtio_del_output_event(dtio);
+ dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
+ dtio_close_output(dtio);
+ return 0;
+}
+
+/** add the output file descriptor event for listening, read only */
+static int dtio_add_output_event_read(struct dt_io_thread* dtio)
+{
+ if(!dtio->event)
+ return 0;
+ if(dtio->event_added && !dtio->event_added_is_write)
+ return 1;
+ /* we have to (re-)register the event */
+ if(dtio->event_added)
+ ub_event_del(dtio->event);
+ ub_event_del_bits(dtio->event, UB_EV_WRITE);
+ if(ub_event_add(dtio->event, NULL) != 0) {
+ log_err("dnstap io: out of memory (adding event)");
+ dtio->event_added = 0;
+ dtio->event_added_is_write = 0;
+ /* close output and start reattempts to open it */
+ dtio_close_output(dtio);
+ return 0;
+ }
+ dtio->event_added = 1;
+ dtio->event_added_is_write = 0;
+ return 1;
+}
+
+/** add the output file descriptor event for listening, read and write */
+static int dtio_add_output_event_write(struct dt_io_thread* dtio)
+{
+ if(!dtio->event)
+ return 0;
+ if(dtio->event_added && dtio->event_added_is_write)
+ return 1;
+ /* we have to (re-)register the event */
+ if(dtio->event_added)
+ ub_event_del(dtio->event);
+ ub_event_add_bits(dtio->event, UB_EV_WRITE);
+ if(ub_event_add(dtio->event, NULL) != 0) {
+ log_err("dnstap io: out of memory (adding event)");
+ dtio->event_added = 0;
+ dtio->event_added_is_write = 0;
+ /* close output and start reattempts to open it */
+ dtio_close_output(dtio);
+ return 0;
+ }
+ dtio->event_added = 1;
+ dtio->event_added_is_write = 1;
+ return 1;
+}
+
+/** put the dtio thread to sleep */
+static void dtio_sleep(struct dt_io_thread* dtio)
+{
+ /* unregister the event polling for write, because there is
+ * nothing to be written */
+ (void)dtio_add_output_event_read(dtio);
+}
+
+#ifdef HAVE_SSL
+/** enable the brief read condition */
+static int dtio_enable_brief_read(struct dt_io_thread* dtio)
+{
+ dtio->ssl_brief_read = 1;
+ if(dtio->stop_flush_event) {
+ ub_event_del(dtio->stop_flush_event);
+ ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
+ if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
+ log_err("dnstap io, stop flush, could not ub_event_add");
+ return 0;
+ }
+ return 1;
+ }
+ return dtio_add_output_event_read(dtio);
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** disable the brief read condition */
+static int dtio_disable_brief_read(struct dt_io_thread* dtio)
+{
+ dtio->ssl_brief_read = 0;
+ if(dtio->stop_flush_event) {
+ ub_event_del(dtio->stop_flush_event);
+ ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
+ if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
+ log_err("dnstap io, stop flush, could not ub_event_add");
+ return 0;
+ }
+ return 1;
+ }
+ return dtio_add_output_event_write(dtio);
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** enable the brief write condition */
+static int dtio_enable_brief_write(struct dt_io_thread* dtio)
+{
+ dtio->ssl_brief_write = 1;
+ return dtio_add_output_event_write(dtio);
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** disable the brief write condition */
+static int dtio_disable_brief_write(struct dt_io_thread* dtio)
+{
+ dtio->ssl_brief_write = 0;
+ return dtio_add_output_event_read(dtio);
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** check peer verification after ssl handshake connection, false if closed*/
+static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
+{
+ if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
+ /* verification */
+ if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
+ X509* x = SSL_get_peer_certificate(dtio->ssl);
+ if(!x) {
+ verbose(VERB_ALGO, "dnstap io, %s, SSL "
+ "connection failed no certificate",
+ dtio->ip_str);
+ return 0;
+ }
+ log_cert(VERB_ALGO, "dnstap io, peer certificate",
+ x);
+#ifdef HAVE_SSL_GET0_PEERNAME
+ if(SSL_get0_peername(dtio->ssl)) {
+ verbose(VERB_ALGO, "dnstap io, %s, SSL "
+ "connection to %s authenticated",
+ dtio->ip_str,
+ SSL_get0_peername(dtio->ssl));
+ } else {
+#endif
+ verbose(VERB_ALGO, "dnstap io, %s, SSL "
+ "connection authenticated",
+ dtio->ip_str);
+#ifdef HAVE_SSL_GET0_PEERNAME
+ }
+#endif
+ X509_free(x);
+ } else {
+ X509* x = SSL_get_peer_certificate(dtio->ssl);
+ if(x) {
+ log_cert(VERB_ALGO, "dnstap io, peer "
+ "certificate", x);
+ X509_free(x);
+ }
+ verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
+ "failed: failed to authenticate",
+ dtio->ip_str);
+ return 0;
+ }
+ } else {
+ /* unauthenticated, the verify peer flag was not set
+ * in ssl when the ssl object was created from ssl_ctx */
+ verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
+ dtio->ip_str);
+ }
+ return 1;
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** perform ssl handshake, returns 1 if okay, 0 to stop */
+static int dtio_ssl_handshake(struct dt_io_thread* dtio,
+ struct stop_flush_info* info)
+{
+ int r;
+ if(dtio->ssl_brief_read) {
+ /* assume the brief read condition is satisfied,
+ * if we need more or again, we can set it again */
+ if(!dtio_disable_brief_read(dtio)) {
+ if(info) dtio_stop_flush_exit(info);
+ return 0;
+ }
+ }
+ if(dtio->ssl_handshake_done)
+ return 1;
+
+ ERR_clear_error();
+ r = SSL_do_handshake(dtio->ssl);
+ if(r != 1) {
+ int want = SSL_get_error(dtio->ssl, r);
+ if(want == SSL_ERROR_WANT_READ) {
+ /* we want to read on the connection */
+ if(!dtio_enable_brief_read(dtio)) {
+ if(info) dtio_stop_flush_exit(info);
+ return 0;
+ }
+ return 0;
+ } else if(want == SSL_ERROR_WANT_WRITE) {
+ /* we want to write on the connection */
+ return 0;
+ } else if(r == 0) {
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
+ dtio_close_output(dtio);
+ return 0;
+ } else if(want == SSL_ERROR_SYSCALL) {
+ /* SYSCALL and errno==0 means closed uncleanly */
+ int silent = 0;
+#ifdef EPIPE
+ if(errno == EPIPE && verbosity < 2)
+ silent = 1; /* silence 'broken pipe' */
+#endif
+#ifdef ECONNRESET
+ if(errno == ECONNRESET && verbosity < 2)
+ silent = 1; /* silence reset by peer */
+#endif
+ if(errno == 0)
+ silent = 1;
+ if(!silent)
+ log_err("dnstap io, SSL_handshake syscall: %s",
+ strerror(errno));
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
+ dtio_close_output(dtio);
+ return 0;
+ } else {
+ unsigned long err = ERR_get_error();
+ if(!squelch_err_ssl_handshake(err)) {
+ log_crypto_err_code("dnstap io, ssl handshake failed",
+ err);
+ verbose(VERB_OPS, "dnstap io, ssl handshake failed "
+ "from %s", dtio->ip_str);
+ }
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
+ dtio_close_output(dtio);
+ return 0;
+ }
+
+ }
+ /* check peer verification */
+ dtio->ssl_handshake_done = 1;
+
+ if(!dtio_ssl_check_peer(dtio)) {
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
+ dtio_close_output(dtio);
+ return 0;
+ }
+ return 1;
+}
+#endif /* HAVE_SSL */
+
+/** callback for the dnstap events, to write to the output */
+void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
+{
+ struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+ int i;
+
+ if(dtio->check_nb_connect) {
+ int connect_err = dtio_check_nb_connect(dtio);
+ if(connect_err == -1) {
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return;
+ } else if(connect_err == 0) {
+ /* try again later */
+ return;
+ }
+ /* nonblocking connect check passed, continue */
+ }
+
+#ifdef HAVE_SSL
+ if(dtio->ssl &&
+ (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
+ if(!dtio_ssl_handshake(dtio, NULL))
+ return;
+ }
+#endif
+
+ if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
+ if(dtio->ssl_brief_write)
+ (void)dtio_disable_brief_write(dtio);
+ if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
+ if(dtio_read_accept_frame(dtio) <= 0)
+ return;
+ } else if(!dtio_check_close(dtio))
+ return;
+ }
+
+ /* loop to process a number of messages. This improves throughput,
+ * because selecting on write-event if not needed for busy messages
+ * (dnstap log) generation and if they need to all be written back.
+ * The write event is usually not blocked up. But not forever,
+ * because the event loop needs to stay responsive for other events.
+ * If there are no (more) messages, or if the output buffers get
+ * full, it returns out of the loop. */
+ for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
+ /* see if there are messages that need writing */
+ if(!dtio->cur_msg) {
+ if(!dtio_find_msg(dtio)) {
+ if(i == 0) {
+ /* no messages on the first iteration,
+ * the queues are all empty */
+ dtio_sleep(dtio);
+ }
+ return; /* nothing to do */
+ }
+ }
+
+ /* write it */
+ if(dtio->cur_msg_done < dtio->cur_msg_len) {
+ if(!dtio_write_more(dtio))
+ return;
+ }
+
+ /* done with the current message */
+ dtio_cur_msg_free(dtio);
+
+ /* If this is a bidirectional stream the first message will be
+ * the READY control frame. We can only continue writing after
+ * receiving an ACCEPT control frame. */
+ if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
+ dtio->ready_frame_sent = 1;
+ (void)dtio_add_output_event_read(dtio);
+ break;
+ }
+ }
+}
+
+/** callback for the dnstap commandpipe, to stop the dnstap IO */
+void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
+{
+ struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+ uint8_t cmd;
+ ssize_t r;
+ if(dtio->want_to_exit)
+ return;
+ r = read(fd, &cmd, sizeof(cmd));
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return; /* ignore this */
+ log_err("dnstap io: failed to read: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return;
+ if(WSAGetLastError() == WSAEWOULDBLOCK)
+ return;
+ log_err("dnstap io: failed to read: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ /* and then fall through to quit the thread */
+ } else if(r == 0) {
+ verbose(VERB_ALGO, "dnstap io: cmd channel closed");
+ } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
+ verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
+ } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
+ verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
+
+ if(dtio->is_bidirectional && !dtio->accept_frame_received) {
+ verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
+ "waiting for ACCEPT control frame");
+ return;
+ }
+
+ /* reregister event */
+ if(!dtio_add_output_event_write(dtio))
+ return;
+ return;
+ } else if(r == 1) {
+ verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
+ }
+ dtio->want_to_exit = 1;
+ if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
+ != 0) {
+ log_err("dnstap io: could not loopexit");
+ }
+}
+
+#ifndef THREADS_DISABLED
+/** setup the event base for the dnstap io thread */
+static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
+ struct timeval* now)
+{
+ memset(now, 0, sizeof(*now));
+ dtio->event_base = ub_default_event_base(0, secs, now);
+ if(!dtio->event_base) {
+ fatal_exit("dnstap io: could not create event_base");
+ }
+}
+#endif /* THREADS_DISABLED */
+
+/** setup the cmd event for dnstap io */
+static void dtio_setup_cmd(struct dt_io_thread* dtio)
+{
+ struct ub_event* cmdev;
+ fd_set_nonblock(dtio->commandpipe[0]);
+ cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
+ UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
+ if(!cmdev) {
+ fatal_exit("dnstap io: out of memory");
+ }
+ dtio->command_event = cmdev;
+ if(ub_event_add(cmdev, NULL) != 0) {
+ fatal_exit("dnstap io: out of memory (adding event)");
+ }
+}
+
+/** setup the reconnect event for dnstap io */
+static void dtio_setup_reconnect(struct dt_io_thread* dtio)
+{
+ dtio_reconnect_clear(dtio);
+ dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
+ UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
+ if(!dtio->reconnect_timer) {
+ fatal_exit("dnstap io: out of memory");
+ }
+}
+
+/**
+ * structure to keep track of information during stop flush
+ */
+struct stop_flush_info {
+ /** the event base during stop flush */
+ struct ub_event_base* base;
+ /** did we already want to exit this stop-flush event base */
+ int want_to_exit_flush;
+ /** has the timer fired */
+ int timer_done;
+ /** the dtio */
+ struct dt_io_thread* dtio;
+ /** the stop control frame */
+ void* stop_frame;
+ /** length of the stop frame */
+ size_t stop_frame_len;
+ /** how much we have done of the stop frame */
+ size_t stop_frame_done;
+};
+
+/** exit the stop flush base */
+static void dtio_stop_flush_exit(struct stop_flush_info* info)
+{
+ if(info->want_to_exit_flush)
+ return;
+ info->want_to_exit_flush = 1;
+ if(ub_event_base_loopexit(info->base) != 0) {
+ log_err("dnstap io: could not loopexit");
+ }
+}
+
+/** send the stop control,
+ * return true if completed the frame. */
+static int dtio_control_stop_send(struct stop_flush_info* info)
+{
+ struct dt_io_thread* dtio = info->dtio;
+ int r;
+ if(info->stop_frame_done >= info->stop_frame_len)
+ return 1;
+ r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
+ info->stop_frame_done, info->stop_frame_len -
+ info->stop_frame_done);
+ if(r == -1) {
+ verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
+ dtio_stop_flush_exit(info);
+ return 0;
+ }
+ if(r == 0) {
+ /* try again later, or timeout */
+ return 0;
+ }
+ info->stop_frame_done += r;
+ if(info->stop_frame_done < info->stop_frame_len)
+ return 0; /* not done yet */
+ return 1;
+}
+
+void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+ void* arg)
+{
+ struct stop_flush_info* info = (struct stop_flush_info*)arg;
+ if(info->want_to_exit_flush)
+ return;
+ verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
+ info->timer_done = 1;
+ dtio_stop_flush_exit(info);
+}
+
+void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
+{
+ struct stop_flush_info* info = (struct stop_flush_info*)arg;
+ struct dt_io_thread* dtio = info->dtio;
+ if(info->want_to_exit_flush)
+ return;
+ if(dtio->check_nb_connect) {
+ /* we don't start the stop_flush if connect still
+ * in progress, but the check code is here, just in case */
+ int connect_err = dtio_check_nb_connect(dtio);
+ if(connect_err == -1) {
+ /* close the channel, exit the stop flush */
+ dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return;
+ } else if(connect_err == 0) {
+ /* try again later */
+ return;
+ }
+ /* nonblocking connect check passed, continue */
+ }
+#ifdef HAVE_SSL
+ if(dtio->ssl &&
+ (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
+ if(!dtio_ssl_handshake(dtio, info))
+ return;
+ }
+#endif
+
+ if((bits&UB_EV_READ)) {
+ if(!dtio_check_close(dtio)) {
+ if(dtio->fd == -1) {
+ verbose(VERB_ALGO, "dnstap io: "
+ "stop flush: output closed");
+ dtio_stop_flush_exit(info);
+ }
+ return;
+ }
+ }
+ /* write remainder of last frame */
+ if(dtio->cur_msg) {
+ if(dtio->cur_msg_done < dtio->cur_msg_len) {
+ if(!dtio_write_more(dtio)) {
+ if(dtio->fd == -1) {
+ verbose(VERB_ALGO, "dnstap io: "
+ "stop flush: output closed");
+ dtio_stop_flush_exit(info);
+ }
+ return;
+ }
+ }
+ verbose(VERB_ALGO, "dnstap io: stop flush completed "
+ "last frame");
+ dtio_cur_msg_free(dtio);
+ }
+ /* write stop frame */
+ if(info->stop_frame_done < info->stop_frame_len) {
+ if(!dtio_control_stop_send(info))
+ return;
+ verbose(VERB_ALGO, "dnstap io: stop flush completed "
+ "stop control frame");
+ }
+ /* when last frame and stop frame are sent, exit */
+ dtio_stop_flush_exit(info);
+}
+
+/** flush at end, last packet and stop control */
+static void dtio_control_stop_flush(struct dt_io_thread* dtio)
+{
+ /* briefly attempt to flush the previous packet to the output,
+ * this could be a partial packet, or even the start control frame */
+ time_t secs = 0;
+ struct timeval now;
+ struct stop_flush_info info;
+ struct timeval tv;
+ struct ub_event* timer, *stopev;
+
+ if(dtio->fd == -1 || dtio->check_nb_connect) {
+ /* no connection or we have just connected, so nothing is
+ * sent yet, so nothing to stop or flush */
+ return;
+ }
+ if(dtio->ssl && !dtio->ssl_handshake_done) {
+ /* no SSL connection has been established yet */
+ return;
+ }
+
+ memset(&info, 0, sizeof(info));
+ memset(&now, 0, sizeof(now));
+ info.dtio = dtio;
+ info.base = ub_default_event_base(0, &secs, &now);
+ if(!info.base) {
+ log_err("dnstap io: malloc failure");
+ return;
+ }
+ timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
+ &dtio_stop_timer_cb, &info);
+ if(!timer) {
+ log_err("dnstap io: malloc failure");
+ ub_event_base_free(info.base);
+ return;
+ }
+ memset(&tv, 0, sizeof(tv));
+ tv.tv_sec = 2;
+ if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
+ &tv) != 0) {
+ log_err("dnstap io: cannot event_timer_add");
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
+ UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
+ if(!stopev) {
+ log_err("dnstap io: malloc failure");
+ ub_timer_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ if(ub_event_add(stopev, NULL) != 0) {
+ log_err("dnstap io: cannot event_add");
+ ub_event_free(stopev);
+ ub_timer_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ info.stop_frame = fstrm_create_control_frame_stop(
+ &info.stop_frame_len);
+ if(!info.stop_frame) {
+ log_err("dnstap io: malloc failure");
+ ub_event_del(stopev);
+ ub_event_free(stopev);
+ ub_timer_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ dtio->stop_flush_event = stopev;
+
+ /* wait briefly, or until finished */
+ verbose(VERB_ALGO, "dnstap io: stop flush started");
+ if(ub_event_base_dispatch(info.base) < 0) {
+ log_err("dnstap io: dispatch flush failed, errno is %s",
+ strerror(errno));
+ }
+ verbose(VERB_ALGO, "dnstap io: stop flush ended");
+ free(info.stop_frame);
+ dtio->stop_flush_event = NULL;
+ ub_event_del(stopev);
+ ub_event_free(stopev);
+ ub_timer_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+}
+
+/** perform desetup and free stuff when the dnstap io thread exits */
+static void dtio_desetup(struct dt_io_thread* dtio)
+{
+ dtio_control_stop_flush(dtio);
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ ub_event_del(dtio->command_event);
+ ub_event_free(dtio->command_event);
+#ifndef USE_WINSOCK
+ close(dtio->commandpipe[0]);
+#else
+ _close(dtio->commandpipe[0]);
+#endif
+ dtio->commandpipe[0] = -1;
+ dtio_reconnect_del(dtio);
+ ub_event_free(dtio->reconnect_timer);
+ dtio_cur_msg_free(dtio);
+#ifndef THREADS_DISABLED
+ ub_event_base_free(dtio->event_base);
+#endif
+}
+
+/** setup a start control message */
+static int dtio_control_start_send(struct dt_io_thread* dtio)
+{
+ log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
+ dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
+ &dtio->cur_msg_len);
+ if(!dtio->cur_msg) {
+ return 0;
+ }
+ /* setup to send the control message */
+ /* set that the buffer needs to be sent, but the length
+ * of that buffer is already written, that way the buffer can
+ * start with 0 length and then the length of the control frame
+ * in it */
+ dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 4;
+ return 1;
+}
+
+/** setup a ready control message */
+static int dtio_control_ready_send(struct dt_io_thread* dtio)
+{
+ log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
+ dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
+ &dtio->cur_msg_len);
+ if(!dtio->cur_msg) {
+ return 0;
+ }
+ /* setup to send the control message */
+ /* set that the buffer needs to be sent, but the length
+ * of that buffer is already written, that way the buffer can
+ * start with 0 length and then the length of the control frame
+ * in it */
+ dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 4;
+ return 1;
+}
+
+/** open the output file descriptor for af_local */
+static int dtio_open_output_local(struct dt_io_thread* dtio)
+{
+#ifdef HAVE_SYS_UN_H
+ struct sockaddr_un s;
+ dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
+ if(dtio->fd == -1) {
+#ifndef USE_WINSOCK
+ log_err("dnstap io: failed to create socket: %s",
+ strerror(errno));
+#else
+ log_err("dnstap io: failed to create socket: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return 0;
+ }
+ memset(&s, 0, sizeof(s));
+#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
+ /* this member exists on BSDs, not Linux */
+ s.sun_len = (unsigned)sizeof(s);
+#endif
+ s.sun_family = AF_LOCAL;
+ /* length is 92-108, 104 on FreeBSD */
+ (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
+ fd_set_nonblock(dtio->fd);
+ if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
+ == -1) {
+ char* to = dtio->socket_path;
+#ifndef USE_WINSOCK
+ log_err("dnstap io: failed to connect to \"%s\": %s",
+ to, strerror(errno));
+#else
+ log_err("dnstap io: failed to connect to \"%s\": %s",
+ to, wsa_strerror(WSAGetLastError()));
+#endif
+ dtio_close_fd(dtio);
+ return 0;
+ }
+ return 1;
+#else
+ log_err("cannot create af_local socket");
+ return 0;
+#endif /* HAVE_SYS_UN_H */
+}
+
+/** open the output file descriptor for af_inet and af_inet6 */
+static int dtio_open_output_tcp(struct dt_io_thread* dtio)
+{
+ struct sockaddr_storage addr;
+ socklen_t addrlen;
+ memset(&addr, 0, sizeof(addr));
+ addrlen = (socklen_t)sizeof(addr);
+
+ if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
+ log_err("could not parse IP '%s'", dtio->ip_str);
+ return 0;
+ }
+ dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
+ if(dtio->fd == -1) {
+#ifndef USE_WINSOCK
+ log_err("can't create socket: %s", strerror(errno));
+#else
+ log_err("can't create socket: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return 0;
+ }
+ fd_set_nonblock(dtio->fd);
+ if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
+ if(errno == EINPROGRESS)
+ return 1; /* wait until connect done*/
+#ifndef USE_WINSOCK
+ if(tcp_connect_errno_needs_log(
+ (struct sockaddr *)&addr, addrlen)) {
+ log_err("dnstap io: failed to connect to %s: %s",
+ dtio->ip_str, strerror(errno));
+ }
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAEWOULDBLOCK)
+ return 1; /* wait until connect done*/
+ if(tcp_connect_errno_needs_log(
+ (struct sockaddr *)&addr, addrlen)) {
+ log_err("dnstap io: failed to connect to %s: %s",
+ dtio->ip_str, wsa_strerror(WSAGetLastError()));
+ }
+#endif
+ dtio_close_fd(dtio);
+ return 0;
+ }
+ return 1;
+}
+
+/** setup the SSL structure for new connection */
+static int dtio_setup_ssl(struct dt_io_thread* dtio)
+{
+ dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
+ if(!dtio->ssl) return 0;
+ dtio->ssl_handshake_done = 0;
+ dtio->ssl_brief_read = 0;
+
+ if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
+ dtio->tls_use_sni)) {
+ return 0;
+ }
+ return 1;
+}
+
+/** open the output file descriptor */
+static void dtio_open_output(struct dt_io_thread* dtio)
+{
+ struct ub_event* ev;
+ if(dtio->upstream_is_unix) {
+ if(!dtio_open_output_local(dtio)) {
+ dtio_reconnect_enable(dtio);
+ return;
+ }
+ } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
+ if(!dtio_open_output_tcp(dtio)) {
+ dtio_reconnect_enable(dtio);
+ return;
+ }
+ if(dtio->upstream_is_tls) {
+ if(!dtio_setup_ssl(dtio)) {
+ dtio_close_fd(dtio);
+ dtio_reconnect_enable(dtio);
+ return;
+ }
+ }
+ }
+ dtio->check_nb_connect = 1;
+
+ /* the EV_READ is to read ACCEPT control messages, and catch channel
+ * close. EV_WRITE is to write packets */
+ ev = ub_event_new(dtio->event_base, dtio->fd,
+ UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
+ dtio);
+ if(!ev) {
+ log_err("dnstap io: out of memory");
+ if(dtio->ssl) {
+#ifdef HAVE_SSL
+ SSL_free(dtio->ssl);
+ dtio->ssl = NULL;
+#endif
+ }
+ dtio_close_fd(dtio);
+ dtio_reconnect_enable(dtio);
+ return;
+ }
+ dtio->event = ev;
+
+ /* setup protocol control message to start */
+ if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
+ (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
+ log_err("dnstap io: out of memory");
+ ub_event_free(dtio->event);
+ dtio->event = NULL;
+ if(dtio->ssl) {
+#ifdef HAVE_SSL
+ SSL_free(dtio->ssl);
+ dtio->ssl = NULL;
+#endif
+ }
+ dtio_close_fd(dtio);
+ dtio_reconnect_enable(dtio);
+ return;
+ }
+}
+
+/** perform the setup of the writer thread on the established event_base */
+static void dtio_setup_on_base(struct dt_io_thread* dtio)
+{
+ dtio_setup_cmd(dtio);
+ dtio_setup_reconnect(dtio);
+ dtio_open_output(dtio);
+ if(!dtio_add_output_event_write(dtio))
+ return;
+}
+
+#ifndef THREADS_DISABLED
+/** the IO thread function for the DNSTAP IO */
+static void* dnstap_io(void* arg)
+{
+ struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+ time_t secs = 0;
+ struct timeval now;
+ log_thread_set(&dtio->threadnum);
+
+ /* setup */
+ verbose(VERB_ALGO, "start dnstap io thread");
+ dtio_setup_base(dtio, &secs, &now);
+ dtio_setup_on_base(dtio);
+
+ /* run */
+ if(ub_event_base_dispatch(dtio->event_base) < 0) {
+ log_err("dnstap io: dispatch failed, errno is %s",
+ strerror(errno));
+ }
+
+ /* cleanup */
+ verbose(VERB_ALGO, "stop dnstap io thread");
+ dtio_desetup(dtio);
+ return NULL;
+}
+#endif /* THREADS_DISABLED */
+
+int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
+ int numworkers)
+{
+ /* set up the thread, can fail */
+#ifndef USE_WINSOCK
+ if(pipe(dtio->commandpipe) == -1) {
+ log_err("failed to create pipe: %s", strerror(errno));
+ return 0;
+ }
+#else
+ if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
+ log_err("failed to create _pipe: %s",
+ wsa_strerror(WSAGetLastError()));
+ return 0;
+ }
+#endif
+
+ /* start the thread */
+ dtio->threadnum = numworkers+1;
+ dtio->started = 1;
+#ifndef THREADS_DISABLED
+ ub_thread_create(&dtio->tid, dnstap_io, dtio);
+ (void)event_base_nothr;
+#else
+ dtio->event_base = event_base_nothr;
+ dtio_setup_on_base(dtio);
+#endif
+ return 1;
+}
+
+void dt_io_thread_stop(struct dt_io_thread* dtio)
+{
+#ifndef THREADS_DISABLED
+ uint8_t cmd = DTIO_COMMAND_STOP;
+#endif
+ if(!dtio) return;
+ if(!dtio->started) return;
+ verbose(VERB_ALGO, "dnstap io: send stop cmd");
+
+#ifndef THREADS_DISABLED
+ while(1) {
+ ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ continue;
+ log_err("dnstap io stop: write: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ continue;
+ if(WSAGetLastError() == WSAEWOULDBLOCK)
+ continue;
+ log_err("dnstap io stop: write: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ break;
+ }
+ break;
+ }
+ dtio->started = 0;
+#endif /* THREADS_DISABLED */
+
+#ifndef USE_WINSOCK
+ close(dtio->commandpipe[1]);
+#else
+ _close(dtio->commandpipe[1]);
+#endif
+ dtio->commandpipe[1] = -1;
+#ifndef THREADS_DISABLED
+ ub_thread_join(dtio->tid);
+#else
+ dtio->want_to_exit = 1;
+ dtio_desetup(dtio);
+#endif
+}
diff --git a/contrib/unbound/dnstap/dtstream.h b/contrib/unbound/dnstap/dtstream.h
new file mode 100644
index 000000000000..ede491f30d3e
--- /dev/null
+++ b/contrib/unbound/dnstap/dtstream.h
@@ -0,0 +1,341 @@
+/*
+ * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
+ *
+ * Copyright (c) 2020, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+ * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * \file
+ *
+ * An implementation of the Frame Streams data transport protocol for
+ * the Unbound DNSTAP message logging facility.
+ */
+
+#ifndef DTSTREAM_H
+#define DTSTREAM_H
+
+#include "util/locks.h"
+struct dt_msg_entry;
+struct dt_io_list_item;
+struct dt_io_thread;
+struct config_file;
+
+/**
+ * A message buffer with dnstap messages queued up. It is per-worker.
+ * It has locks to synchronize. If the buffer is full, a new message
+ * cannot be added and is discarded. A thread reads the messages and sends
+ * them.
+ */
+struct dt_msg_queue {
+ /** lock of the buffer structure. Hold this lock to add or remove
+ * entries to the buffer. Release it so that other threads can also
+ * put messages to log, or a message can be taken out to send away
+ * by the writer thread.
+ */
+ lock_basic_type lock;
+ /** the maximum size of the buffer, in bytes */
+ size_t maxsize;
+ /** current size of the buffer, in bytes. data bytes of messages.
+ * If a new message make it more than maxsize, the buffer is full */
+ size_t cursize;
+ /** list of messages. The messages are added to the back and taken
+ * out from the front. */
+ struct dt_msg_entry* first, *last;
+ /** reference to the io thread to wakeup */
+ struct dt_io_thread* dtio;
+};
+
+/**
+ * An entry in the dt_msg_queue. contains one DNSTAP message.
+ * It is malloced.
+ */
+struct dt_msg_entry {
+ /** next in the list. */
+ struct dt_msg_entry* next;
+ /** the buffer with the data to send, an encoded DNSTAP message */
+ void* buf;
+ /** the length to send. */
+ size_t len;
+};
+
+/**
+ * Containing buffer and counter for reading DNSTAP frames.
+ */
+struct dt_frame_read_buf {
+ /** Buffer containing frame, except length counter(s). */
+ void* buf;
+ /** Number of bytes written to buffer. */
+ size_t buf_count;
+ /** Capacity of the buffer. */
+ size_t buf_cap;
+
+ /** Frame length field. Will contain the 2nd length field for control
+ * frames. */
+ uint32_t frame_len;
+ /** Number of bytes that have been written to the frame_length field. */
+ size_t frame_len_done;
+
+ /** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */
+ int control_frame;
+};
+
+/**
+ * IO thread that reads from the queues and writes them.
+ */
+struct dt_io_thread {
+ /** the thread number for the dtio thread,
+ * must be first to cast thread arg to int* in checklock code. */
+ int threadnum;
+ /** event base, for event handling */
+ void* event_base;
+ /** list of queues that is registered to get written */
+ struct dt_io_list_item* io_list;
+ /** iterator point in the io_list, to pick from them in a
+ * round-robin fashion, instead of only from the first when busy.
+ * if NULL it means start at the start of the list. */
+ struct dt_io_list_item* io_list_iter;
+ /** thread id, of the io thread */
+ ub_thread_type tid;
+ /** if the io processing has started */
+ int started;
+ /** ssl context for the io thread, for tls connections. type SSL_CTX* */
+ void* ssl_ctx;
+ /** if SNI will be used for TLS connections. */
+ int tls_use_sni;
+
+ /** file descriptor that the thread writes to */
+ int fd;
+ /** event structure that the thread uses */
+ void* event;
+ /** the event is added */
+ int event_added;
+ /** event added is a write event */
+ int event_added_is_write;
+ /** check for nonblocking connect errors on fd */
+ int check_nb_connect;
+ /** ssl for current connection, type SSL* */
+ void* ssl;
+ /** true if the handshake for SSL is done, 0 if not */
+ int ssl_handshake_done;
+ /** true if briefly the SSL wants a read event, 0 if not.
+ * This happens during negotiation, we then do not want to write,
+ * but wait for a read event. */
+ int ssl_brief_read;
+ /** true if SSL_read is waiting for a write event. Set back to 0 after
+ * single write event is handled. */
+ int ssl_brief_write;
+
+ /** the buffer that currently getting written, or NULL if no
+ * (partial) message written now */
+ void* cur_msg;
+ /** length of the current message */
+ size_t cur_msg_len;
+ /** number of bytes written for the current message */
+ size_t cur_msg_done;
+ /** number of bytes of the length that have been written,
+ * for the current message length that precedes the frame */
+ size_t cur_msg_len_done;
+
+ /** command pipe that stops the pipe if closed. Used to quit
+ * the program. [0] is read, [1] is written to. */
+ int commandpipe[2];
+ /** the event to listen to the commandpipe */
+ void* command_event;
+ /** the io thread wants to exit */
+ int want_to_exit;
+
+ /** in stop flush, this is nonNULL and references the stop_ev */
+ void* stop_flush_event;
+
+ /** the timer event for connection retries */
+ void* reconnect_timer;
+ /** if the reconnect timer is added to the event base */
+ int reconnect_is_added;
+ /** the current reconnection timeout, it is increased with
+ * exponential backoff, in msec */
+ int reconnect_timeout;
+
+ /** If the log server is connected to over unix domain sockets,
+ * eg. a file is named that is created to log onto. */
+ int upstream_is_unix;
+ /** if the log server is connected to over TCP. The ip address and
+ * port are used */
+ int upstream_is_tcp;
+ /** if the log server is connected to over TLS. ip address, port,
+ * and client certificates can be used for authentication. */
+ int upstream_is_tls;
+
+ /** Perform bidirectional Frame Streams handshake before sending
+ * messages. */
+ int is_bidirectional;
+ /** Set if the READY control frame has been sent. */
+ int ready_frame_sent;
+ /** Set if valid ACCEPT frame is received. */
+ int accept_frame_received;
+ /** (partially) read frame */
+ struct dt_frame_read_buf read_frame;
+
+ /** the file path for unix socket (or NULL) */
+ char* socket_path;
+ /** the ip address and port number (or NULL) */
+ char* ip_str;
+ /** is the TLS upstream authenticated by name, if nonNULL,
+ * we use the same cert bundle as used by other TLS streams. */
+ char* tls_server_name;
+ /** are client certificates in use */
+ int use_client_certs;
+ /** client cert files: the .key file */
+ char* client_key_file;
+ /** client cert files: the .pem file */
+ char* client_cert_file;
+};
+
+/**
+ * IO thread list of queues list item
+ * lists a worker queue that should be looked at and sent to the log server.
+ */
+struct dt_io_list_item {
+ /** next in the list of buffers to inspect */
+ struct dt_io_list_item* next;
+ /** buffer of this worker */
+ struct dt_msg_queue* queue;
+};
+
+/**
+ * Create new (empty) worker message queue. Limit set to default on max.
+ * @return NULL on malloc failure or a new queue (not locked).
+ */
+struct dt_msg_queue* dt_msg_queue_create(void);
+
+/**
+ * Delete a worker message queue. It has to be unlinked from access,
+ * so it can be deleted without lock worries. The queue is emptied (deleted).
+ * @param mq: message queue.
+ */
+void dt_msg_queue_delete(struct dt_msg_queue* mq);
+
+/**
+ * Submit a message to the queue. The queue is locked by the routine,
+ * the message is inserted, and then the queue is unlocked so the
+ * message can be picked up by the writer thread.
+ * @param mq: message queue.
+ * @param buf: buffer with message (dnstap contents).
+ * The buffer must have been malloced by caller. It is linked in
+ * the queue, and is free()d after use. If the routine fails
+ * the buffer is freed as well (and nothing happens, the item
+ * could not be logged).
+ * @param len: length of buffer.
+ */
+void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
+
+/**
+ * Create IO thread.
+ * @return new io thread object. not yet started. or NULL malloc failure.
+ */
+struct dt_io_thread* dt_io_thread_create(void);
+
+/**
+ * Delete the IO thread structure.
+ * @param dtio: the io thread that is deleted. It must not be running.
+ */
+void dt_io_thread_delete(struct dt_io_thread* dtio);
+
+/**
+ * Apply config to the dtio thread
+ * @param dtio: io thread, not yet started.
+ * @param cfg: config file struct.
+ * @return false on malloc failure.
+ */
+int dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
+ struct config_file *cfg);
+
+/**
+ * Register a msg queue to the io thread. It will be polled to see if
+ * there are messages and those then get removed and sent, when the thread
+ * is running.
+ * @param dtio: the io thread.
+ * @param mq: message queue to register.
+ * @return false on failure (malloc failure).
+ */
+int dt_io_thread_register_queue(struct dt_io_thread* dtio,
+ struct dt_msg_queue* mq);
+
+/**
+ * Unregister queue from io thread.
+ * @param dtio: the io thread.
+ * @param mq: message queue.
+ */
+void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
+ struct dt_msg_queue* mq);
+
+/**
+ * Start the io thread
+ * @param dtio: the io thread.
+ * @param event_base_nothr: the event base to attach the events to, in case
+ * we are running without threads. With threads, this is ignored
+ * and a thread is started to process the dnstap log messages.
+ * @param numworkers: number of worker threads. The dnstap io thread is
+ * that number +1 as the threadnumber (in logs).
+ * @return false on failure.
+ */
+int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
+ int numworkers);
+
+/**
+ * Stop the io thread
+ * @param dtio: the io thread.
+ */
+void dt_io_thread_stop(struct dt_io_thread* dtio);
+
+/** callback for the dnstap reconnect, to start reconnecting to output */
+void dtio_reconnect_timeout_cb(int fd, short bits, void* arg);
+
+/** callback for the dnstap events, to write to the output */
+void dtio_output_cb(int fd, short bits, void* arg);
+
+/** callback for the dnstap commandpipe, to stop the dnstap IO */
+void dtio_cmd_cb(int fd, short bits, void* arg);
+
+/** callback for the timer when the thread stops and wants to finish up */
+void dtio_stop_timer_cb(int fd, short bits, void* arg);
+
+/** callback for the output when the thread stops and wants to finish up */
+void dtio_stop_ev_cb(int fd, short bits, void* arg);
+
+/** callback for unbound-dnstap-socket */
+void dtio_tap_callback(int fd, short bits, void* arg);
+
+/** callback for unbound-dnstap-socket */
+void dtio_mainfdcallback(int fd, short bits, void* arg);
+
+#endif /* DTSTREAM_H */
diff --git a/contrib/unbound/dnstap/unbound-dnstap-socket.c b/contrib/unbound/dnstap/unbound-dnstap-socket.c
new file mode 100644
index 000000000000..44a0eda95994
--- /dev/null
+++ b/contrib/unbound/dnstap/unbound-dnstap-socket.c
@@ -0,0 +1,1594 @@
+/*
+ * dnstap/unbound-dnstap-socket.c - debug program that listens for DNSTAP logs.
+ *
+ * Copyright (c) 2020, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+ * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ *
+ * This program listens on a DNSTAP socket for logged messages.
+ */
+#include "config.h"
+#ifdef HAVE_GETOPT_H
+#include <getopt.h>
+#endif
+#include <signal.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <ctype.h>
+#ifdef HAVE_SYS_UN_H
+#include <sys/un.h>
+#endif
+#include <openssl/ssl.h>
+#include <openssl/rand.h>
+#include <openssl/err.h>
+#include "dnstap/dtstream.h"
+#include "dnstap/dnstap_fstrm.h"
+#include "util/log.h"
+#include "util/ub_event.h"
+#include "util/net_help.h"
+#include "services/listen_dnsport.h"
+#include "sldns/sbuffer.h"
+#include "sldns/wire2str.h"
+#ifdef USE_DNSTAP
+#include <protobuf-c/protobuf-c.h>
+#include "dnstap/dnstap.pb-c.h"
+#endif /* USE_DNSTAP */
+#include "util/config_file.h"
+
+/** listen backlog on TCP connections for dnstap logs */
+#define LISTEN_BACKLOG 16
+
+/** usage information for streamtcp */
+static void usage(char* argv[])
+{
+ printf("usage: %s [options]\n", argv[0]);
+ printf(" Listen to dnstap messages\n");
+ printf("stdout has dnstap log, stderr has verbose server log\n");
+ printf("-u <socketpath> listen to unix socket with this file name\n");
+ printf("-s <serverip[@port]> listen for TCP on the IP and port\n");
+ printf("-t <serverip[@port]> listen for TLS on IP and port\n");
+ printf("-x <server.key> server key file for TLS service\n");
+ printf("-y <server.pem> server cert file for TLS service\n");
+ printf("-z <verify.pem> cert file to verify client connections\n");
+ printf("-l long format for DNS printout\n");
+ printf("-v more verbose log output\n");
+ printf("-h this help text\n");
+ exit(1);
+}
+
+/** long format option, for multiline printout per message */
+static int longformat = 0;
+
+struct tap_socket_list;
+struct tap_socket;
+/** main tap callback data */
+struct main_tap_data {
+ /** the event base (to loopexit) */
+ struct ub_event_base* base;
+ /** the list of accept sockets */
+ struct tap_socket_list* acceptlist;
+};
+
+/** tap callback variables */
+struct tap_data {
+ /** the fd */
+ int fd;
+ /** the ub event */
+ struct ub_event* ev;
+ /** the SSL for TLS streams */
+ SSL* ssl;
+ /** is the ssl handshake done */
+ int ssl_handshake_done;
+ /** we are briefly waiting to write (in the struct event) */
+ int ssl_brief_write;
+ /** string that identifies the socket (or NULL), like IP address */
+ char* id;
+ /** have we read the length, and how many bytes of it */
+ int len_done;
+ /** have we read the data, and how many bytes of it */
+ size_t data_done;
+ /** are we reading a control frame */
+ int control_frame;
+ /** are we bi-directional (if false, uni-directional) */
+ int is_bidirectional;
+ /** data of the frame */
+ uint8_t* frame;
+ /** length of this frame */
+ size_t len;
+};
+
+/** list of sockets */
+struct tap_socket_list {
+ /** next in list */
+ struct tap_socket_list* next;
+ /** the socket */
+ struct tap_socket* s;
+};
+
+/** tap socket */
+struct tap_socket {
+ /** fd of socket */
+ int fd;
+ /** the event for it */
+ struct ub_event *ev;
+ /** has the event been added */
+ int ev_added;
+ /** the callback, for the event, ev_cb(fd, bits, arg) */
+ void (*ev_cb)(int, short, void*);
+ /** data element, (arg for the tap_socket struct) */
+ void* data;
+ /** socketpath, if this is an AF_LOCAL socket */
+ char* socketpath;
+ /** IP, if this is a TCP socket */
+ char* ip;
+ /** for a TLS socket, the tls context */
+ SSL_CTX* sslctx;
+};
+
+/** del the tap event */
+static void tap_socket_delev(struct tap_socket* s)
+{
+ if(!s) return;
+ if(!s->ev) return;
+ if(!s->ev_added) return;
+ ub_event_del(s->ev);
+ s->ev_added = 0;
+}
+
+/** close the tap socket */
+static void tap_socket_close(struct tap_socket* s)
+{
+ if(!s) return;
+ if(s->fd == -1) return;
+ close(s->fd);
+ s->fd = -1;
+}
+
+/** delete tap socket */
+static void tap_socket_delete(struct tap_socket* s)
+{
+ if(!s) return;
+#ifdef HAVE_SSL
+ SSL_CTX_free(s->sslctx);
+#endif
+ ub_event_free(s->ev);
+ free(s->socketpath);
+ free(s->ip);
+ free(s);
+}
+
+/** create new socket (unconnected, not base-added), or NULL malloc fail */
+static struct tap_socket* tap_socket_new_local(char* socketpath,
+ void (*ev_cb)(int, short, void*), void* data)
+{
+ struct tap_socket* s = calloc(1, sizeof(*s));
+ if(!s) {
+ log_err("malloc failure");
+ return NULL;
+ }
+ s->socketpath = strdup(socketpath);
+ if(!s->socketpath) {
+ free(s);
+ log_err("malloc failure");
+ return NULL;
+ }
+ s->fd = -1;
+ s->ev_cb = ev_cb;
+ s->data = data;
+ return s;
+}
+
+/** create new socket (unconnected, not base-added), or NULL malloc fail */
+static struct tap_socket* tap_socket_new_tcpaccept(char* ip,
+ void (*ev_cb)(int, short, void*), void* data)
+{
+ struct tap_socket* s = calloc(1, sizeof(*s));
+ if(!s) {
+ log_err("malloc failure");
+ return NULL;
+ }
+ s->ip = strdup(ip);
+ if(!s->ip) {
+ free(s);
+ log_err("malloc failure");
+ return NULL;
+ }
+ s->fd = -1;
+ s->ev_cb = ev_cb;
+ s->data = data;
+ return s;
+}
+
+/** create new socket (unconnected, not base-added), or NULL malloc fail */
+static struct tap_socket* tap_socket_new_tlsaccept(char* ip,
+ void (*ev_cb)(int, short, void*), void* data, char* server_key,
+ char* server_cert, char* verifypem)
+{
+ struct tap_socket* s = calloc(1, sizeof(*s));
+ if(!s) {
+ log_err("malloc failure");
+ return NULL;
+ }
+ s->ip = strdup(ip);
+ if(!s->ip) {
+ free(s);
+ log_err("malloc failure");
+ return NULL;
+ }
+ s->fd = -1;
+ s->ev_cb = ev_cb;
+ s->data = data;
+ s->sslctx = listen_sslctx_create(server_key, server_cert, verifypem);
+ if(!s->sslctx) {
+ log_err("could not create ssl context");
+ free(s->ip);
+ free(s);
+ return NULL;
+ }
+ return s;
+}
+
+/** setup tcp accept socket on IP string */
+static int make_tcp_accept(char* ip)
+{
+#ifdef SO_REUSEADDR
+ int on = 1;
+#endif
+ struct sockaddr_storage addr;
+ socklen_t len;
+ int s;
+
+ memset(&addr, 0, sizeof(addr));
+ len = (socklen_t)sizeof(addr);
+ if(!extstrtoaddr(ip, &addr, &len)) {
+ log_err("could not parse IP '%s'", ip);
+ return -1;
+ }
+
+ if((s = socket(addr.ss_family, SOCK_STREAM, 0)) == -1) {
+#ifndef USE_WINSOCK
+ log_err("can't create socket: %s", strerror(errno));
+#else
+ log_err("can't create socket: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return -1;
+ }
+#ifdef SO_REUSEADDR
+ if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&on,
+ (socklen_t)sizeof(on)) < 0) {
+#ifndef USE_WINSOCK
+ log_err("setsockopt(.. SO_REUSEADDR ..) failed: %s",
+ strerror(errno));
+ close(s);
+#else
+ log_err("setsockopt(.. SO_REUSEADDR ..) failed: %s",
+ wsa_strerror(WSAGetLastError()));
+ closesocket(s);
+#endif
+ return -1;
+ }
+#endif /* SO_REUSEADDR */
+ if(bind(s, (struct sockaddr*)&addr, len) != 0) {
+#ifndef USE_WINSOCK
+ log_err_addr("can't bind socket", strerror(errno),
+ &addr, len);
+ close(s);
+#else
+ log_err_addr("can't bind socket",
+ wsa_strerror(WSAGetLastError()), &addr, len);
+ closesocket(s);
+#endif
+ return -1;
+ }
+ if(!fd_set_nonblock(s)) {
+#ifndef USE_WINSOCK
+ close(s);
+#else
+ closesocket(s);
+#endif
+ return -1;
+ }
+ if(listen(s, LISTEN_BACKLOG) == -1) {
+#ifndef USE_WINSOCK
+ log_err("can't listen: %s", strerror(errno));
+ close(s);
+#else
+ log_err("can't listen: %s", wsa_strerror(WSAGetLastError()));
+ closesocket(s);
+#endif
+ return -1;
+ }
+ return s;
+}
+
+/** setup socket on event base */
+static int tap_socket_setup(struct tap_socket* s, struct ub_event_base* base)
+{
+ if(s->socketpath) {
+ /* AF_LOCAL accept socket */
+ s->fd = create_local_accept_sock(s->socketpath, NULL, 0);
+ if(s->fd == -1) {
+ log_err("could not create local socket");
+ return 0;
+ }
+ } else if(s->ip || s->sslctx) {
+ /* TCP accept socket */
+ s->fd = make_tcp_accept(s->ip);
+ if(s->fd == -1) {
+ log_err("could not create tcp socket");
+ return 0;
+ }
+ }
+ s->ev = ub_event_new(base, s->fd, UB_EV_READ | UB_EV_PERSIST,
+ s->ev_cb, s);
+ if(!s->ev) {
+ log_err("could not ub_event_new");
+ return 0;
+ }
+ if(ub_event_add(s->ev, NULL) != 0) {
+ log_err("could not ub_event_add");
+ return 0;
+ }
+ s->ev_added = 1;
+ return 1;
+}
+
+/** add tap socket to list */
+static int tap_socket_list_insert(struct tap_socket_list** liststart,
+ struct tap_socket* s)
+{
+ struct tap_socket_list* entry = (struct tap_socket_list*)
+ malloc(sizeof(*entry));
+ if(!entry)
+ return 0;
+ entry->next = *liststart;
+ entry->s = s;
+ *liststart = entry;
+ return 1;
+}
+
+/** delete the list */
+static void tap_socket_list_delete(struct tap_socket_list* list)
+{
+ struct tap_socket_list* e = list, *next;
+ while(e) {
+ next = e->next;
+ tap_socket_delev(e->s);
+ tap_socket_close(e->s);
+ tap_socket_delete(e->s);
+ free(e);
+ e = next;
+ }
+}
+
+/** setup accept events */
+static int tap_socket_list_addevs(struct tap_socket_list* list,
+ struct ub_event_base* base)
+{
+ struct tap_socket_list* entry;
+ for(entry = list; entry; entry = entry->next) {
+ if(!tap_socket_setup(entry->s, base)) {
+ log_err("could not setup socket");
+ return 0;
+ }
+ }
+ return 1;
+}
+
+#ifdef USE_DNSTAP
+/** log control frame contents */
+static void log_control_frame(uint8_t* pkt, size_t len)
+{
+ char* desc;
+ if(verbosity == 0) return;
+ desc = fstrm_describe_control(pkt, len);
+ if(!desc) {
+ log_err("out of memory");
+ return;
+ }
+ log_info("control frame %s", desc);
+ free(desc);
+}
+
+/** convert mtype to string */
+static const char* mtype_to_str(enum _Dnstap__Message__Type mtype)
+{
+ switch(mtype) {
+ case DNSTAP__MESSAGE__TYPE__AUTH_QUERY:
+ return "AUTH_QUERY";
+ case DNSTAP__MESSAGE__TYPE__AUTH_RESPONSE:
+ return "AUTH_RESPONSE";
+ case DNSTAP__MESSAGE__TYPE__RESOLVER_QUERY:
+ return "RESOLVER_QUERY";
+ case DNSTAP__MESSAGE__TYPE__RESOLVER_RESPONSE:
+ return "RESOLVER_RESPONSE";
+ case DNSTAP__MESSAGE__TYPE__CLIENT_QUERY:
+ return "CLIENT_QUERY";
+ case DNSTAP__MESSAGE__TYPE__CLIENT_RESPONSE:
+ return "CLIENT_RESPONSE";
+ case DNSTAP__MESSAGE__TYPE__FORWARDER_QUERY:
+ return "FORWARDER_QUERY";
+ case DNSTAP__MESSAGE__TYPE__FORWARDER_RESPONSE:
+ return "FORWARDER_RESPONSE";
+ case DNSTAP__MESSAGE__TYPE__STUB_QUERY:
+ return "STUB_QUERY";
+ case DNSTAP__MESSAGE__TYPE__STUB_RESPONSE:
+ return "STUB_RESPONSE";
+ default: break;
+ }
+ return "unknown_message_type";
+}
+
+/** convert type address to a string ip4 or ip6, malloced or NULL on fail */
+static char* str_of_addr(ProtobufCBinaryData address)
+{
+ char buf[64];
+ socklen_t len = sizeof(buf);
+ if(address.len == 4) {
+ if(inet_ntop(AF_INET, address.data, buf, len)!=0)
+ return strdup(buf);
+ } else if(address.len == 16) {
+ if(inet_ntop(AF_INET6, address.data, buf, len)!=0)
+ return strdup(buf);
+ }
+ return NULL;
+}
+
+/** convert message buffer (of dns bytes) to the first qname, type, class,
+ * malloced or NULL on fail */
+static char* q_of_msg(ProtobufCBinaryData message)
+{
+ char buf[300];
+ /* header, name, type, class minimum to get the query tuple */
+ if(message.len < 12 + 1 + 4 + 4) return NULL;
+ if(sldns_wire2str_rrquestion_buf(message.data+12, message.len-12,
+ buf, sizeof(buf)) != 0) {
+ /* remove trailing newline, tabs to spaces */
+ /* remove the newline: */
+ if(buf[0] != 0) buf[strlen(buf)-1]=0;
+ /* remove first tab (before type) */
+ if(strrchr(buf, '\t')) *strrchr(buf, '\t')=' ';
+ /* remove second tab (before class) */
+ if(strrchr(buf, '\t')) *strrchr(buf, '\t')=' ';
+ return strdup(buf);
+ }
+ return NULL;
+}
+
+/** convert possible string or hex data to string. malloced or NULL */
+static char* possible_str(ProtobufCBinaryData str)
+{
+ int is_str = 1;
+ size_t i;
+ for(i=0; i<str.len; i++) {
+ if(!isprint((unsigned char)str.data[i]))
+ is_str = 0;
+ }
+ if(is_str) {
+ char* res = malloc(str.len+1);
+ if(res) {
+ memmove(res, str.data, str.len);
+ res[str.len] = 0;
+ return res;
+ }
+ } else {
+ const char* hex = "0123456789ABCDEF";
+ char* res = malloc(str.len*2+1);
+ if(res) {
+ for(i=0; i<str.len; i++) {
+ res[i*2] = hex[(str.data[i]&0xf0)>>4];
+ res[i*2+1] = hex[str.data[i]&0x0f];
+ }
+ res[str.len*2] = 0;
+ return res;
+ }
+ }
+ return NULL;
+}
+
+/** convert timeval to string, malloced or NULL */
+static char* tv_to_str(protobuf_c_boolean has_time_sec, uint64_t time_sec,
+ protobuf_c_boolean has_time_nsec, uint32_t time_nsec)
+{
+ char buf[64], buf2[256];
+ struct timeval tv;
+ time_t time_t_sec;
+ memset(&tv, 0, sizeof(tv));
+ if(has_time_sec) tv.tv_sec = time_sec;
+ if(has_time_nsec) tv.tv_usec = time_nsec;
+
+ buf[0]=0;
+ time_t_sec = tv.tv_sec;
+ (void)ctime_r(&time_t_sec, buf);
+ snprintf(buf2, sizeof(buf2), "%u.%9.9u %s",
+ (unsigned)time_sec, (unsigned)time_nsec, buf);
+ return strdup(buf2);
+}
+
+/** log data frame contents */
+static void log_data_frame(uint8_t* pkt, size_t len)
+{
+ Dnstap__Dnstap* d = dnstap__dnstap__unpack(NULL, len, pkt);
+ const char* mtype = NULL;
+ char* maddr=NULL, *qinf=NULL;
+ if(!d) {
+ log_err("could not unpack");
+ return;
+ }
+ if(d->base.descriptor != &dnstap__dnstap__descriptor) {
+ log_err("wrong base descriptor");
+ dnstap__dnstap__free_unpacked(d, NULL);
+ return;
+ }
+ if(d->type != DNSTAP__DNSTAP__TYPE__MESSAGE) {
+ log_err("dnstap type not type_message");
+ dnstap__dnstap__free_unpacked(d, NULL);
+ return;
+ }
+ if(d->message) {
+ mtype = mtype_to_str(d->message->type);
+ if(d->message->has_query_address)
+ maddr = str_of_addr(d->message->query_address);
+ else if(d->message->has_response_address)
+ maddr = str_of_addr(d->message->response_address);
+ if(d->message->has_query_message)
+ qinf = q_of_msg(d->message->query_message);
+ else if(d->message->has_response_message)
+ qinf = q_of_msg(d->message->response_message);
+
+ } else {
+ mtype = "nomessage";
+ }
+
+ printf("%s%s%s%s%s\n", mtype, (maddr?" ":""), (maddr?maddr:""),
+ (qinf?" ":""), (qinf?qinf:""));
+ free(maddr);
+ free(qinf);
+
+ if(longformat) {
+ char* id=NULL, *vs=NULL;
+ if(d->has_identity) {
+ id=possible_str(d->identity);
+ }
+ if(d->has_version) {
+ vs=possible_str(d->version);
+ }
+ if(id || vs)
+ printf("identity: %s%s%s\n", (id?id:""),
+ (id&&vs?" ":""), (vs?vs:""));
+ free(id);
+ free(vs);
+
+ if(d->message && d->message->has_query_message &&
+ d->message->query_message.data) {
+ char* qmsg = sldns_wire2str_pkt(
+ d->message->query_message.data,
+ d->message->query_message.len);
+ if(qmsg) {
+ printf("query_message:\n%s", qmsg);
+ free(qmsg);
+ }
+ }
+ if(d->message && d->message->has_query_time_sec) {
+ char* qtv = tv_to_str(d->message->has_query_time_sec,
+ d->message->query_time_sec,
+ d->message->has_query_time_nsec,
+ d->message->query_time_nsec);
+ if(qtv) {
+ printf("query_time: %s\n", qtv);
+ free(qtv);
+ }
+ }
+ if(d->message && d->message->has_response_message &&
+ d->message->response_message.data) {
+ char* rmsg = sldns_wire2str_pkt(
+ d->message->response_message.data,
+ d->message->response_message.len);
+ if(rmsg) {
+ printf("response_message:\n%s", rmsg);
+ free(rmsg);
+ }
+ }
+ if(d->message && d->message->has_response_time_sec) {
+ char* rtv = tv_to_str(d->message->has_response_time_sec,
+ d->message->response_time_sec,
+ d->message->has_response_time_nsec,
+ d->message->response_time_nsec);
+ if(rtv) {
+ printf("response_time: %s\n", rtv);
+ free(rtv);
+ }
+ }
+ }
+ fflush(stdout);
+ dnstap__dnstap__free_unpacked(d, NULL);
+}
+#endif /* USE_DNSTAP */
+
+/** receive bytes from fd, prints errors if bad,
+ * returns 0: closed/error, -1: continue, >0 number of bytes */
+static ssize_t receive_bytes(struct tap_data* data, int fd, void* buf,
+ size_t len)
+{
+ ssize_t ret = recv(fd, buf, len, 0);
+ if(ret == 0) {
+ /* closed */
+ if(verbosity) log_info("dnstap client stream closed from %s",
+ (data->id?data->id:""));
+ return 0;
+ } else if(ret == -1) {
+ /* error */
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return -1;
+ log_err("could not recv: %s", strerror(errno));
+#else /* USE_WINSOCK */
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return -1;
+ if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock(data->ev, UB_EV_READ);
+ return -1;
+ }
+ log_err("could not recv: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ if(verbosity) log_info("dnstap client stream closed from %s",
+ (data->id?data->id:""));
+ return 0;
+ }
+ return ret;
+}
+
+/* define routine for have_ssl only to avoid unused function warning */
+#ifdef HAVE_SSL
+/** set to wait briefly for a write event, for one event call */
+static void tap_enable_brief_write(struct tap_data* data)
+{
+ ub_event_del(data->ev);
+ ub_event_del_bits(data->ev, UB_EV_READ);
+ ub_event_add_bits(data->ev, UB_EV_WRITE);
+ if(ub_event_add(data->ev, NULL) != 0)
+ log_err("could not ub_event_add in tap_enable_brief_write");
+ data->ssl_brief_write = 1;
+}
+#endif /* HAVE_SSL */
+
+/* define routine for have_ssl only to avoid unused function warning */
+#ifdef HAVE_SSL
+/** stop the brief wait for a write event. back to reading. */
+static void tap_disable_brief_write(struct tap_data* data)
+{
+ ub_event_del(data->ev);
+ ub_event_del_bits(data->ev, UB_EV_WRITE);
+ ub_event_add_bits(data->ev, UB_EV_READ);
+ if(ub_event_add(data->ev, NULL) != 0)
+ log_err("could not ub_event_add in tap_disable_brief_write");
+ data->ssl_brief_write = 0;
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** receive bytes over ssl stream, prints errors if bad,
+ * returns 0: closed/error, -1: continue, >0 number of bytes */
+static ssize_t ssl_read_bytes(struct tap_data* data, void* buf, size_t len)
+{
+ int r;
+ ERR_clear_error();
+ r = SSL_read(data->ssl, buf, len);
+ if(r <= 0) {
+ int want = SSL_get_error(data->ssl, r);
+ if(want == SSL_ERROR_ZERO_RETURN) {
+ /* closed */
+ if(verbosity) log_info("dnstap client stream closed from %s",
+ (data->id?data->id:""));
+ return 0;
+ } else if(want == SSL_ERROR_WANT_READ) {
+ /* continue later */
+ return -1;
+ } else if(want == SSL_ERROR_WANT_WRITE) {
+ /* set to briefly write */
+ tap_enable_brief_write(data);
+ return -1;
+ } else if(want == SSL_ERROR_SYSCALL) {
+#ifdef ECONNRESET
+ if(errno == ECONNRESET && verbosity < 2)
+ return 0; /* silence reset by peer */
+#endif
+ if(errno != 0)
+ log_err("SSL_read syscall: %s",
+ strerror(errno));
+ if(verbosity) log_info("dnstap client stream closed from %s",
+ (data->id?data->id:""));
+ return 0;
+ }
+ log_crypto_err("could not SSL_read");
+ if(verbosity) log_info("dnstap client stream closed from %s",
+ (data->id?data->id:""));
+ return 0;
+ }
+ return r;
+}
+#endif /* HAVE_SSL */
+
+/** receive bytes on the tap connection, prints errors if bad,
+ * returns 0: closed/error, -1: continue, >0 number of bytes */
+static ssize_t tap_receive(struct tap_data* data, void* buf, size_t len)
+{
+#ifdef HAVE_SSL
+ if(data->ssl)
+ return ssl_read_bytes(data, buf, len);
+#endif
+ return receive_bytes(data, data->fd, buf, len);
+}
+
+/** delete the tap structure */
+void tap_data_free(struct tap_data* data)
+{
+ ub_event_del(data->ev);
+ ub_event_free(data->ev);
+#ifdef HAVE_SSL
+ SSL_free(data->ssl);
+#endif
+ close(data->fd);
+ free(data->id);
+ free(data->frame);
+ free(data);
+}
+
+/** reply with ACCEPT control frame to bidirectional client,
+ * returns 0 on error */
+static int reply_with_accept(struct tap_data* data)
+{
+#ifdef USE_DNSTAP
+ /* len includes the escape and framelength */
+ int r;
+ size_t len = 0;
+ void* acceptframe = fstrm_create_control_frame_accept(
+ DNSTAP_CONTENT_TYPE, &len);
+ if(!acceptframe) {
+ log_err("out of memory");
+ return 0;
+ }
+
+ fd_set_block(data->fd);
+ if(data->ssl) {
+ if((r=SSL_write(data->ssl, acceptframe, len)) <= 0) {
+ if(SSL_get_error(data->ssl, r) == SSL_ERROR_ZERO_RETURN)
+ log_err("SSL_write, peer closed connection");
+ else
+ log_err("could not SSL_write");
+ fd_set_nonblock(data->fd);
+ free(acceptframe);
+ return 0;
+ }
+ } else {
+ if(send(data->fd, acceptframe, len, 0) == -1) {
+#ifndef USE_WINSOCK
+ log_err("send failed: %s", strerror(errno));
+#else
+ log_err("send failed: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ fd_set_nonblock(data->fd);
+ free(acceptframe);
+ return 0;
+ }
+ }
+ if(verbosity) log_info("sent control frame(accept) content-type:(%s)",
+ DNSTAP_CONTENT_TYPE);
+
+ fd_set_nonblock(data->fd);
+ free(acceptframe);
+ return 1;
+#else
+ log_err("no dnstap compiled, no reply");
+ (void)data;
+ return 0;
+#endif
+}
+
+/** reply with FINISH control frame to bidirectional client,
+ * returns 0 on error */
+static int reply_with_finish(int fd)
+{
+#ifdef USE_DNSTAP
+ size_t len = 0;
+ void* finishframe = fstrm_create_control_frame_finish(&len);
+ if(!finishframe) {
+ log_err("out of memory");
+ return 0;
+ }
+
+ fd_set_block(fd);
+ if(send(fd, finishframe, len, 0) == -1) {
+#ifndef USE_WINSOCK
+ log_err("send failed: %s", strerror(errno));
+#else
+ log_err("send failed: %s", wsa_strerror(WSAGetLastError()));
+#endif
+ fd_set_nonblock(fd);
+ free(finishframe);
+ return 0;
+ }
+ if(verbosity) log_info("sent control frame(finish)");
+
+ fd_set_nonblock(fd);
+ free(finishframe);
+ return 1;
+#else
+ log_err("no dnstap compiled, no reply");
+ (void)fd;
+ return 0;
+#endif
+}
+
+#ifdef HAVE_SSL
+/** check SSL peer certificate, return 0 on fail */
+static int tap_check_peer(struct tap_data* data)
+{
+ if((SSL_get_verify_mode(data->ssl)&SSL_VERIFY_PEER)) {
+ /* verification */
+ if(SSL_get_verify_result(data->ssl) == X509_V_OK) {
+ X509* x = SSL_get_peer_certificate(data->ssl);
+ if(!x) {
+ if(verbosity) log_info("SSL connection %s"
+ " failed no certificate", data->id);
+ return 0;
+ }
+ if(verbosity)
+ log_cert(VERB_ALGO, "peer certificate", x);
+#ifdef HAVE_SSL_GET0_PEERNAME
+ if(SSL_get0_peername(data->ssl)) {
+ if(verbosity) log_info("SSL connection %s "
+ "to %s authenticated", data->id,
+ SSL_get0_peername(data->ssl));
+ } else {
+#endif
+ if(verbosity) log_info("SSL connection %s "
+ "authenticated", data->id);
+#ifdef HAVE_SSL_GET0_PEERNAME
+ }
+#endif
+ X509_free(x);
+ } else {
+ X509* x = SSL_get_peer_certificate(data->ssl);
+ if(x) {
+ if(verbosity)
+ log_cert(VERB_ALGO, "peer certificate", x);
+ X509_free(x);
+ }
+ if(verbosity) log_info("SSL connection %s failed: "
+ "failed to authenticate", data->id);
+ return 0;
+ }
+ } else {
+ /* unauthenticated, the verify peer flag was not set
+ * in ssl when the ssl object was created from ssl_ctx */
+ if(verbosity) log_info("SSL connection %s", data->id);
+ }
+ return 1;
+}
+#endif /* HAVE_SSL */
+
+#ifdef HAVE_SSL
+/** perform SSL handshake, return 0 to wait for events, 1 if done */
+static int tap_handshake(struct tap_data* data)
+{
+ int r;
+ if(data->ssl_brief_write) {
+ /* write condition has been satisfied, back to reading */
+ tap_disable_brief_write(data);
+ }
+ if(data->ssl_handshake_done)
+ return 1;
+
+ ERR_clear_error();
+ r = SSL_do_handshake(data->ssl);
+ if(r != 1) {
+ int want = SSL_get_error(data->ssl, r);
+ if(want == SSL_ERROR_WANT_READ) {
+ return 0;
+ } else if(want == SSL_ERROR_WANT_WRITE) {
+ tap_enable_brief_write(data);
+ return 0;
+ } else if(r == 0) {
+ /* closed */
+ tap_data_free(data);
+ return 0;
+ } else if(want == SSL_ERROR_SYSCALL) {
+ /* SYSCALL and errno==0 means closed uncleanly */
+ int silent = 0;
+#ifdef EPIPE
+ if(errno == EPIPE && verbosity < 2)
+ silent = 1; /* silence 'broken pipe' */
+#endif
+#ifdef ECONNRESET
+ if(errno == ECONNRESET && verbosity < 2)
+ silent = 1; /* silence reset by peer */
+#endif
+ if(errno == 0)
+ silent = 1;
+ if(!silent)
+ log_err("SSL_handshake syscall: %s",
+ strerror(errno));
+ tap_data_free(data);
+ return 0;
+ } else {
+ unsigned long err = ERR_get_error();
+ if(!squelch_err_ssl_handshake(err)) {
+ log_crypto_err_code("ssl handshake failed",
+ err);
+ verbose(VERB_OPS, "ssl handshake failed "
+ "from %s", data->id);
+ }
+ tap_data_free(data);
+ return 0;
+ }
+ }
+ /* check peer verification */
+ data->ssl_handshake_done = 1;
+ if(!tap_check_peer(data)) {
+ /* closed */
+ tap_data_free(data);
+ return 0;
+ }
+ return 1;
+}
+#endif /* HAVE_SSL */
+
+/** callback for dnstap listener */
+void dtio_tap_callback(int fd, short ATTR_UNUSED(bits), void* arg)
+{
+ struct tap_data* data = (struct tap_data*)arg;
+ if(verbosity>=3) log_info("tap callback");
+#ifdef HAVE_SSL
+ if(data->ssl && (!data->ssl_handshake_done ||
+ data->ssl_brief_write)) {
+ if(!tap_handshake(data))
+ return;
+ }
+#endif
+ while(data->len_done < 4) {
+ uint32_t l = (uint32_t)data->len;
+ ssize_t ret = tap_receive(data,
+ ((uint8_t*)&l)+data->len_done, 4-data->len_done);
+ if(verbosity>=4) log_info("s recv %d", (int)ret);
+ if(ret == 0) {
+ /* closed or error */
+ tap_data_free(data);
+ return;
+ } else if(ret == -1) {
+ /* continue later */
+ return;
+ }
+ data->len_done += ret;
+ data->len = (size_t)l;
+ if(data->len_done < 4)
+ return; /* continue later */
+ data->len = (size_t)(ntohl(l));
+ if(verbosity>=3) log_info("length is %d", (int)data->len);
+ if(data->len == 0) {
+ /* it is a control frame */
+ data->control_frame = 1;
+ /* read controlframelen */
+ data->len_done = 0;
+ } else {
+ /* allocate frame size */
+ data->frame = calloc(1, data->len);
+ if(!data->frame) {
+ log_err("out of memory");
+ tap_data_free(data);
+ return;
+ }
+ }
+ }
+
+ /* we want to read the full length now */
+ if(data->data_done < data->len) {
+ ssize_t r = tap_receive(data, data->frame + data->data_done,
+ data->len - data->data_done);
+ if(verbosity>=4) log_info("f recv %d", (int)r);
+ if(r == 0) {
+ /* closed or error */
+ tap_data_free(data);
+ return;
+ } else if(r == -1) {
+ /* continue later */
+ return;
+ }
+ data->data_done += r;
+ if(data->data_done < data->len)
+ return; /* continue later */
+ }
+
+ /* we are done with a frame */
+ if(verbosity>=3) log_info("received %sframe len %d",
+ (data->control_frame?"control ":""), (int)data->len);
+#ifdef USE_DNSTAP
+ if(data->control_frame)
+ log_control_frame(data->frame, data->len);
+ else log_data_frame(data->frame, data->len);
+#endif
+
+ if(data->len >= 4 && sldns_read_uint32(data->frame) ==
+ FSTRM_CONTROL_FRAME_READY) {
+ data->is_bidirectional = 1;
+ if(verbosity) log_info("bidirectional stream");
+ if(!reply_with_accept(data)) {
+ tap_data_free(data);
+ }
+ } else if(data->len >= 4 && sldns_read_uint32(data->frame) ==
+ FSTRM_CONTROL_FRAME_STOP && data->is_bidirectional) {
+ if(!reply_with_finish(fd)) {
+ tap_data_free(data);
+ return;
+ }
+ }
+
+ /* prepare for next frame */
+ free(data->frame);
+ data->frame = NULL;
+ data->control_frame = 0;
+ data->len = 0;
+ data->len_done = 0;
+ data->data_done = 0;
+
+}
+
+/** callback for main listening file descriptor */
+void dtio_mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg)
+{
+ struct tap_socket* tap_sock = (struct tap_socket*)arg;
+ struct main_tap_data* maindata = (struct main_tap_data*)
+ tap_sock->data;
+ struct tap_data* data;
+ char* id = NULL;
+ struct sockaddr_storage addr;
+ socklen_t addrlen = (socklen_t)sizeof(addr);
+ int s = accept(fd, (struct sockaddr*)&addr, &addrlen);
+ if(s == -1) {
+#ifndef USE_WINSOCK
+ /* EINTR is signal interrupt. others are closed connection. */
+ if( errno == EINTR || errno == EAGAIN
+#ifdef EWOULDBLOCK
+ || errno == EWOULDBLOCK
+#endif
+#ifdef ECONNABORTED
+ || errno == ECONNABORTED
+#endif
+#ifdef EPROTO
+ || errno == EPROTO
+#endif /* EPROTO */
+ )
+ return;
+ log_err_addr("accept failed", strerror(errno), &addr, addrlen);
+#else /* USE_WINSOCK */
+ if(WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAECONNRESET)
+ return;
+ if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock(maindata->ev, UB_EV_READ);
+ return;
+ }
+ log_err_addr("accept failed", wsa_strerror(WSAGetLastError()),
+ &addr, addrlen);
+#endif
+ return;
+ }
+ fd_set_nonblock(s);
+ if(verbosity) {
+ if(addr.ss_family == AF_LOCAL) {
+#ifdef HAVE_SYS_UN_H
+ struct sockaddr_un* usock = calloc(1, sizeof(struct sockaddr_un) + 1);
+ if(usock) {
+ socklen_t ulen = sizeof(struct sockaddr_un);
+ if(getsockname(fd, (struct sockaddr*)usock, &ulen) != -1) {
+ log_info("accepted new dnstap client from %s", usock->sun_path);
+ id = strdup(usock->sun_path);
+ } else {
+ log_info("accepted new dnstap client");
+ }
+ free(usock);
+ } else {
+ log_info("accepted new dnstap client");
+ }
+#endif /* HAVE_SYS_UN_H */
+ } else if(addr.ss_family == AF_INET ||
+ addr.ss_family == AF_INET6) {
+ char ip[256];
+ addr_to_str(&addr, addrlen, ip, sizeof(ip));
+ log_info("accepted new dnstap client from %s", ip);
+ id = strdup(ip);
+ } else {
+ log_info("accepted new dnstap client");
+ }
+ }
+
+ data = calloc(1, sizeof(*data));
+ if(!data) fatal_exit("out of memory");
+ data->fd = s;
+ data->id = id;
+ if(tap_sock->sslctx) {
+ data->ssl = incoming_ssl_fd(tap_sock->sslctx, data->fd);
+ if(!data->ssl) fatal_exit("could not SSL_new");
+ }
+ data->ev = ub_event_new(maindata->base, s, UB_EV_READ | UB_EV_PERSIST,
+ &dtio_tap_callback, data);
+ if(!data->ev) fatal_exit("could not ub_event_new");
+ if(ub_event_add(data->ev, NULL) != 0) fatal_exit("could not ub_event_add");
+}
+
+/** setup local accept sockets */
+static void setup_local_list(struct main_tap_data* maindata,
+ struct config_strlist_head* local_list)
+{
+ struct config_strlist* item;
+ for(item = local_list->first; item; item = item->next) {
+ struct tap_socket* s;
+ s = tap_socket_new_local(item->str, &dtio_mainfdcallback,
+ maindata);
+ if(!s) fatal_exit("out of memory");
+ if(!tap_socket_list_insert(&maindata->acceptlist, s))
+ fatal_exit("out of memory");
+ }
+}
+
+/** setup tcp accept sockets */
+static void setup_tcp_list(struct main_tap_data* maindata,
+ struct config_strlist_head* tcp_list)
+{
+ struct config_strlist* item;
+ for(item = tcp_list->first; item; item = item->next) {
+ struct tap_socket* s;
+ s = tap_socket_new_tcpaccept(item->str, &dtio_mainfdcallback,
+ maindata);
+ if(!s) fatal_exit("out of memory");
+ if(!tap_socket_list_insert(&maindata->acceptlist, s))
+ fatal_exit("out of memory");
+ }
+}
+
+/** setup tls accept sockets */
+static void setup_tls_list(struct main_tap_data* maindata,
+ struct config_strlist_head* tls_list, char* server_key,
+ char* server_cert, char* verifypem)
+{
+ struct config_strlist* item;
+ for(item = tls_list->first; item; item = item->next) {
+ struct tap_socket* s;
+ s = tap_socket_new_tlsaccept(item->str, &dtio_mainfdcallback,
+ maindata, server_key, server_cert, verifypem);
+ if(!s) fatal_exit("out of memory");
+ if(!tap_socket_list_insert(&maindata->acceptlist, s))
+ fatal_exit("out of memory");
+ }
+}
+
+/** signal variable */
+static struct ub_event_base* sig_base = NULL;
+/** do we have to quit */
+int sig_quit = 0;
+/** signal handler for user quit */
+static RETSIGTYPE main_sigh(int sig)
+{
+ verbose(VERB_ALGO, "exit on signal %d\n", sig);
+ if(sig_base)
+ ub_event_base_loopexit(sig_base);
+ sig_quit = 1;
+}
+
+/** setup and run the server to listen to DNSTAP messages */
+static void
+setup_and_run(struct config_strlist_head* local_list,
+ struct config_strlist_head* tcp_list,
+ struct config_strlist_head* tls_list, char* server_key,
+ char* server_cert, char* verifypem)
+{
+ time_t secs = 0;
+ struct timeval now;
+ struct main_tap_data* maindata;
+ struct ub_event_base* base;
+ const char *evnm="event", *evsys="", *evmethod="";
+
+ maindata = calloc(1, sizeof(*maindata));
+ if(!maindata) fatal_exit("out of memory");
+ memset(&now, 0, sizeof(now));
+ base = ub_default_event_base(1, &secs, &now);
+ if(!base) fatal_exit("could not create ub_event base");
+ maindata->base = base;
+ sig_base = base;
+ if(sig_quit) {
+ ub_event_base_free(base);
+ free(maindata);
+ return;
+ }
+ ub_get_event_sys(base, &evnm, &evsys, &evmethod);
+ if(verbosity) log_info("%s %s uses %s method", evnm, evsys, evmethod);
+
+ setup_local_list(maindata, local_list);
+ setup_tcp_list(maindata, tcp_list);
+ setup_tls_list(maindata, tls_list, server_key, server_cert,
+ verifypem);
+ if(!tap_socket_list_addevs(maindata->acceptlist, base))
+ fatal_exit("could not setup accept events");
+ if(verbosity) log_info("start of service");
+
+ ub_event_base_dispatch(base);
+
+ if(verbosity) log_info("end of service");
+ sig_base = NULL;
+ tap_socket_list_delete(maindata->acceptlist);
+ ub_event_base_free(base);
+ free(maindata);
+}
+
+/** getopt global, in case header files fail to declare it. */
+extern int optind;
+/** getopt global, in case header files fail to declare it. */
+extern char* optarg;
+
+/** main program for streamtcp */
+int main(int argc, char** argv)
+{
+ int c;
+ int usessl = 0;
+ struct config_strlist_head local_list;
+ struct config_strlist_head tcp_list;
+ struct config_strlist_head tls_list;
+ char* server_key = NULL, *server_cert = NULL, *verifypem = NULL;
+#ifdef USE_WINSOCK
+ WSADATA wsa_data;
+ if(WSAStartup(MAKEWORD(2,2), &wsa_data) != 0) {
+ printf("WSAStartup failed\n");
+ return 1;
+ }
+#endif
+ if(signal(SIGINT, main_sigh) == SIG_ERR ||
+#ifdef SIGQUIT
+ signal(SIGQUIT, main_sigh) == SIG_ERR ||
+#endif
+#ifdef SIGHUP
+ signal(SIGHUP, main_sigh) == SIG_ERR ||
+#endif
+#ifdef SIGBREAK
+ signal(SIGBREAK, main_sigh) == SIG_ERR ||
+#endif
+ signal(SIGTERM, main_sigh) == SIG_ERR)
+ fatal_exit("could not bind to signal");
+ memset(&local_list, 0, sizeof(local_list));
+ memset(&tcp_list, 0, sizeof(tcp_list));
+ memset(&tls_list, 0, sizeof(tls_list));
+
+ /* lock debug start (if any) */
+ log_ident_set("unbound-dnstap-socket");
+ log_init(0, 0, 0);
+ checklock_start();
+
+#ifdef SIGPIPE
+ if(signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
+ perror("could not install signal handler for SIGPIPE");
+ return 1;
+ }
+#endif
+
+ /* command line options */
+ while( (c=getopt(argc, argv, "hls:t:u:vx:y:z:")) != -1) {
+ switch(c) {
+ case 'u':
+ if(!cfg_strlist_append(&local_list,
+ strdup(optarg)))
+ fatal_exit("out of memory");
+ break;
+ case 's':
+ if(!cfg_strlist_append(&tcp_list,
+ strdup(optarg)))
+ fatal_exit("out of memory");
+ break;
+ case 't':
+ if(!cfg_strlist_append(&tls_list,
+ strdup(optarg)))
+ fatal_exit("out of memory");
+ usessl = 1;
+ break;
+ case 'x':
+ server_key = optarg;
+ usessl = 1;
+ break;
+ case 'y':
+ server_cert = optarg;
+ usessl = 1;
+ break;
+ case 'z':
+ verifypem = optarg;
+ usessl = 1;
+ break;
+ case 'l':
+ longformat = 1;
+ break;
+ case 'v':
+ verbosity++;
+ break;
+ case 'h':
+ case '?':
+ default:
+ usage(argv);
+ }
+ }
+ argc -= optind;
+ argv += optind;
+
+ if(usessl) {
+#ifdef HAVE_SSL
+#if OPENSSL_VERSION_NUMBER < 0x10100000 || !defined(HAVE_OPENSSL_INIT_SSL)
+ ERR_load_SSL_strings();
+#endif
+#if OPENSSL_VERSION_NUMBER < 0x10100000 || !defined(HAVE_OPENSSL_INIT_CRYPTO)
+# ifndef S_SPLINT_S
+ OpenSSL_add_all_algorithms();
+# endif
+#else
+ OPENSSL_init_crypto(OPENSSL_INIT_ADD_ALL_CIPHERS
+ | OPENSSL_INIT_ADD_ALL_DIGESTS
+ | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, NULL);
+#endif
+#if OPENSSL_VERSION_NUMBER < 0x10100000 || !defined(HAVE_OPENSSL_INIT_SSL)
+ (void)SSL_library_init();
+#else
+ (void)OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS, NULL);
+#endif
+#endif /* HAVE_SSL */
+ }
+ setup_and_run(&local_list, &tcp_list, &tls_list, server_key,
+ server_cert, verifypem);
+ config_delstrlist(local_list.first);
+ config_delstrlist(tcp_list.first);
+ config_delstrlist(tls_list.first);
+
+ checklock_stop();
+#ifdef USE_WINSOCK
+ WSACleanup();
+#endif
+ return 0;
+}
+
+/***--- definitions to make fptr_wlist work. ---***/
+/* These are callbacks, similar to smallapp callbacks, except the debug
+ * tool callbacks are not in it */
+struct tube;
+struct query_info;
+#include "util/data/packed_rrset.h"
+
+void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+ uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+ int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+int worker_handle_request(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(repinfo))
+{
+ log_assert(0);
+ return 0;
+}
+
+int worker_handle_reply(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ log_assert(0);
+ return 0;
+}
+
+int worker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ log_assert(0);
+ return 0;
+}
+
+int remote_accept_callback(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(repinfo))
+{
+ log_assert(0);
+ return 0;
+}
+
+int remote_control_callback(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(repinfo))
+{
+ log_assert(0);
+ return 0;
+}
+
+void worker_sighandler(int ATTR_UNUSED(sig), void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+struct outbound_entry* worker_send_query(
+ struct query_info* ATTR_UNUSED(qinfo), uint16_t ATTR_UNUSED(flags),
+ int ATTR_UNUSED(dnssec), int ATTR_UNUSED(want_dnssec),
+ int ATTR_UNUSED(nocaps), struct sockaddr_storage* ATTR_UNUSED(addr),
+ socklen_t ATTR_UNUSED(addrlen), uint8_t* ATTR_UNUSED(zone),
+ size_t ATTR_UNUSED(zonelen), int ATTR_UNUSED(ssl_upstream),
+ char* ATTR_UNUSED(tls_auth_name), struct module_qstate* ATTR_UNUSED(q))
+{
+ log_assert(0);
+ return 0;
+}
+
+#ifdef UB_ON_WINDOWS
+void
+worker_win_stop_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(ev), void*
+ ATTR_UNUSED(arg)) {
+ log_assert(0);
+}
+
+void
+wsvc_cron_cb(void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+#endif /* UB_ON_WINDOWS */
+
+void
+worker_alloc_cleanup(void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+struct outbound_entry* libworker_send_query(
+ struct query_info* ATTR_UNUSED(qinfo), uint16_t ATTR_UNUSED(flags),
+ int ATTR_UNUSED(dnssec), int ATTR_UNUSED(want_dnssec),
+ int ATTR_UNUSED(nocaps), struct sockaddr_storage* ATTR_UNUSED(addr),
+ socklen_t ATTR_UNUSED(addrlen), uint8_t* ATTR_UNUSED(zone),
+ size_t ATTR_UNUSED(zonelen), int ATTR_UNUSED(ssl_upstream),
+ char* ATTR_UNUSED(tls_auth_name), struct module_qstate* ATTR_UNUSED(q))
+{
+ log_assert(0);
+ return 0;
+}
+
+int libworker_handle_reply(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ log_assert(0);
+ return 0;
+}
+
+int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
+ void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ log_assert(0);
+ return 0;
+}
+
+void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+ uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+ int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+void libworker_fg_done_cb(void* ATTR_UNUSED(arg), int ATTR_UNUSED(rcode),
+ struct sldns_buffer* ATTR_UNUSED(buf), enum sec_status ATTR_UNUSED(s),
+ char* ATTR_UNUSED(why_bogus), int ATTR_UNUSED(was_ratelimited))
+{
+ log_assert(0);
+}
+
+void libworker_bg_done_cb(void* ATTR_UNUSED(arg), int ATTR_UNUSED(rcode),
+ struct sldns_buffer* ATTR_UNUSED(buf), enum sec_status ATTR_UNUSED(s),
+ char* ATTR_UNUSED(why_bogus), int ATTR_UNUSED(was_ratelimited))
+{
+ log_assert(0);
+}
+
+void libworker_event_done_cb(void* ATTR_UNUSED(arg), int ATTR_UNUSED(rcode),
+ struct sldns_buffer* ATTR_UNUSED(buf), enum sec_status ATTR_UNUSED(s),
+ char* ATTR_UNUSED(why_bogus), int ATTR_UNUSED(was_ratelimited))
+{
+ log_assert(0);
+}
+
+int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
+{
+ log_assert(0);
+ return 0;
+}
+
+void worker_stat_timer_cb(void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+void worker_probe_timer_cb(void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+void worker_start_accept(void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+void worker_stop_accept(void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
+/** keep track of lock id in lock-verify application */
+struct order_id {
+ /** the thread id that created it */
+ int thr;
+ /** the instance number of creation */
+ int instance;
+};
+
+int order_lock_cmp(const void* e1, const void* e2)
+{
+ const struct order_id* o1 = e1;
+ const struct order_id* o2 = e2;
+ if(o1->thr < o2->thr) return -1;
+ if(o1->thr > o2->thr) return 1;
+ if(o1->instance < o2->instance) return -1;
+ if(o1->instance > o2->instance) return 1;
+ return 0;
+}
+
+int
+codeline_cmp(const void* a, const void* b)
+{
+ return strcmp(a, b);
+}
+
+int replay_var_compare(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
+{
+ log_assert(0);
+ return 0;
+}
+
+void remote_get_opt_ssl(char* ATTR_UNUSED(str), void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}