August 18, 2019

443 words 3 mins read



Real-time Monitoring and Analysis of Data Streams

repo name ucbrise/confluo
repo link
language C++
size (curr.) 11820 kB
stars (curr.) 1409
created 2016-01-26
license Apache License 2.0


Build Status License

Confluo is a system for real-time monitoring and analysis of data, that supports:

  • high-throughput concurrent writes of millions of data points from multiple data streams;
  • online queries at millisecond timescale; and
  • ad-hoc queries using minimal CPU resources.

Please find detailed documentation here.


Required dependencies:

  • MacOS X or Unix-based OS; Windows is not yet supported.
  • C++ compiler that supports C++11 standard (e.g., GCC 5.3 or later)
  • CMake 3.2 or later
  • Boost 1.58 or later

For python client, you will additionally require:

  • Python 2.7 or later
  • Python Packages: setuptools, six 1.7.2 or later

For java client, you will additionally require:

  • Java JDK 1.7 or later
  • ant 1.6.2 or later

Source Build

To download and install Confluo, use the following commands:

git clone
cd confluo
mkdir build
cd build
cmake ..
make -j && make test && make install

Using Confluo

While Confluo supports multiple execution modes, the simplest way to get started is to start Confluo as a server daemon and query it using one of its client APIs.

To start the server daemon, run:

confluod --address= --port=9090

Here’s some sample usage of the Python API:

import sys
from confluo.rpc.client import RpcClient
from import StorageMode

# Connect to the server
client = RpcClient("", 9090)

# Create an Atomic MultiLog with given schema for a performance log
schema = """{
  timestamp: ULONG,
  op_latency_ms: DOUBLE,
  cpu_util: DOUBLE,
  mem_avail: DOUBLE,
  log_msg: STRING(100)
storage_mode = StorageMode.IN_MEMORY
client.create_atomic_multilog("perf_log", schema, storage_mode)

# Add an index

# Add a filter
client.add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1")

# Add an aggregate
client.add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)")

# Install a trigger
client.install_trigger("high_latency_trigger", "max_latency_ms > 1000")

# Load some data
off1 = client.append([100.0, 0.5, 0.9,  "INFO: Launched 1 tasks"])
off2 = client.append([500.0, 0.9, 0.05, "WARN: Server {2} down"])
off3 = client.append([1001.0, 0.9, 0.03, "WARN: Server {2, 4, 5} down"])

# Read the written data
record1 =
record2 =
record3 =

# Query using indexes
record_stream = client.execute_filter("cpu_util>0.5 || mem_avail<0.5")
for r in record_stream:
  print r

# Query using filters
record_stream = client.query_filter("low_resources", 0, sys.maxsize)
for r in record_stream:
  print r

# Query an aggregate
print client.get_aggregate("max_latency_ms", 0, sys.maxsize)

# Query alerts generated by a trigger
alert_stream = client.get_alerts(0, sys.maxsize, "high_latency_trigger")
for a in alert_stream:
  print a


Please create a GitHub issue to file a bug or request a feature. We welcome pull-requests, but request that you review the pull-request process before submitting one.

Please subscribe to the mailing list ( for project announcements, and discussion regarding use-cases and development.

comments powered by Disqus