Livestream

date 11/09/2023

This manual has been tested for fink-client version 7.0. Other versions might work. In case of trouble, send us an email (contact@fink-broker.org) or open an issue.

Purpose

The livestream service is based on the Fink filters. After each exposure, Fink processes the alerts sent by ZTF and the filters select alerts to be transmitted based on their content. These alerts are sent to the Fink Apache Kafka cluster, and substreams are produced (1 filter = 1 substream), identified by their topic name. Each alert pushed is available 4 days in the queue, and consumers can replay streams indefinitely.

As Kafka can be somehow cumbersome, we developed a client to facilitate the stream consuming part for Fink users: fink-client. Users can connect to one or more topics, and new topics can be created via new Fink filters.

Installation of fink-client

To ease the consuming step, the users are recommended to use the fink-client. fink_client requires a version of Python 3.9+.

Install with pip

From a terminal, you can install fink-client simply using pip:

pip install fink-client --upgrade

You will also need to install fastavro==1.6.0 separately (versions above are not compatible with alert schema):

# fastavro 1.6.0 requires Cython<3
pip install "Cython<3"
pip install --no-build-isolation "fastavro==1.6.0"

Use or develop in a controlled environment

For usage:

conda env create -f https://raw.githubusercontent.com/astrolabsoftware/fink-client/master/environment.yml
conda activate fink-client
pip install "Cython<3"
pip install --no-build-isolation "fastavro==1.6.0"
pip install fink-client --upgrade

For development:

git clone https://github.com/astrolabsoftware/fink-client.git
cd fink-client
conda env create -f environment.yml
conda activate fink-client
pip install "Cython<3"
pip install --no-build-isolation "fastavro==1.6.0"
pip install -e .

Registering

In order to connect and poll alerts from Fink, you first need to get your credentials. Subscribe by filling this form. After filling the form, we will send your credentials. Register them on your laptop by simply running on a terminal:

# access help using `fink_client_register -h`
fink_client_register \
    -username <USERNAME> \ # given privately
    -group_id <GROUP_ID> \ # given privately
    -mytopics <topic1 topic2 etc> \ # see https://fink-broker.readthedocs.io/en/latest/science/filters/
    -servers <SERVER> \ # given privately, comma separated if several
    -maxtimeout 10 \ # in seconds
     --verbose

where <USERNAME>, <GROUP_ID>, and <SERVER> have been sent to you privately. By default, the credentials are installed in the home:

cat ~/.finkclient/credentials.yml

For the list of available topics, see https://fink-broker.readthedocs.io/en/latest/science/filters.

First steps: testing the connection

Processed alerts are stored 4 days on our servers, which means if you forget to poll data, you'll be able to retrieve it up to 4 days after emission. This also means on your first connection, you will have 4 days of alert to retrieve. Before you get all of them, let's retrieve the first available alert to check the connection. On a terminal, run the following

# access help using `fink_consumer -h`
fink_consumer --display -limit 1

This will download the first available alert, and print some useful information. The alert schema is automatically downloaded from the GitHub repo (see the Troubleshooting section if that command does not work). Then the alert is consumed and you'll move to the next alert. Of course, if you want to keep the data, you need to store it. This can be easily done:

# create a folder to store alerts
mkdir alertDB

# access help using `fink_consumer-h`
fink_consumer --display --save -outdir alertDB -limit 1

This will download the next available alert, display some useful information on screen, and save it (Apache Avro format) on disk. Then if all works, then you can remove the limit, and let the consumer run for ever!

# access help using `fink_consumer-h`
fink_consumer --display --save -outdir alertDB

Inspecting alerts

Once alerts are saved, you can open it and explore the content. We wrote a small utility to quickly visualise it:

# access help using `fink_alert_viewer -h`
# Adapt the filename accordingly -- it is <objectId>_<candid>.avro
fink_alert_viewer -filename alertDB/ZTF21aaqkqwq_1549473362115015004.avro

of course, you can develop your own tools based on this one! Note Apache Avro is not something supported by default in Pandas for example, so we provide a small utilities to load alerts more easily:

from fink_client.avroUtils import AlertReader

# you can also specify one folder with several alerts directly
r = AlertReader('alertDB/ZTF21aaqkqwq_1549473362115015004.avro')

# convert alert to Pandas DataFrame
r.to_pandas()

Write your own consumer

You can write your own consumer to manipulate alerts upon receival. We give an simple example here. Open your favourite editor, and paste the following lines:

""" Poll the Fink servers only once at a time """
from fink_client.consumer import AlertConsumer
from fink_client.configuration import load_credentials

import time
import tabulate

def poll_single_alert(myconfig, topics) -> None:
    """ Connect to and poll fink servers once.

    Parameters
    ----------
    myconfig: dic
        python dictionnary containing credentials
    topics: list of str
        List of string with topic names
    """
    maxtimeout = 5

    # Instantiate a consumer
    consumer = AlertConsumer(topics, myconfig)

    # Poll the servers
    topic, alert, key = consumer.poll(maxtimeout)

    # Analyse output - we just print some values for example
    if topic is not None:
        utc = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
        table = [
            [
                alert['timestamp'],
                utc,
                topic,
                alert['objectId'],
                alert['cdsxmatch'],
                alert['candidate']['magpsf']
            ],
        ]
        headers = [
            'Emitted at (UTC)',
            'Received at (UTC)',
            'Topic',
            'objectId',
            'Simbad',
            'Magnitude'
        ]
        print(tabulate(table, headers, tablefmt="pretty"))
    else:
        print(
            'No alerts received in the last {} seconds'.format(
                maxtimeout
            )
        )

    # Close the connection to the servers
    consumer.close()


if __name__ == "__main__":
    """ Poll the servers only once at a time """

    # to fill
    myconfig = {
        'username': '',
        'bootstrap.servers': '',
        'group_id': ''
    }

    topics = ['', '']

    poll_single_alert(myconfig, topics)

You only need to update the myconfig dictionnary with the connection information sent to you privately, and the topics list with the topics you want to access. Save the file, and in a terminal walk to where the file has been saved and execute it:

python my_consumer.py

You should start to see alert flowing! Dummy example:

+----------------------------------+---------------------+-------------+--------------+-----------------+--------------------+
|         Emitted at (UTC)         |  Received at (UTC)  |    Topic    |   objectId   |     Simbad      |     Magnitude      |
+----------------------------------+---------------------+-------------+--------------+-----------------+--------------------+
| 2021-11-22 08:33:05.999045+00:00 | 2022-02-09 10:32:51 | test_stream | ZTF17aabvtfi | Candidate_TTau* | 18.799415588378906 |
+----------------------------------+---------------------+-------------+--------------+-----------------+--------------------+

When there is no more alerts available upstream, you will start to see:

# X depends on the timeout you defined in the registration
No alerts the last X seconds

Now it is your turn to modify this script to do something meaningful with alerts coming to you!

Troubleshooting

In case of trouble, send us an email (contact@fink-broker.org) or open an issue (https://github.com/astrolabsoftware/fink-client).

Wrong schema

A typical error though would be:

Traceback (most recent call last):
  File "/Users/julien/anaconda3/bin/fink_consumer", line 10, in <module>
    sys.exit(main())
  File "/Users/julien/Documents/workspace/myrepos/fink-client/fink_client/scripts/fink_consumer.py", line 92, in main
    topic, alert = consumer.poll(timeout=maxtimeout)
  File "/Users/julien/Documents/workspace/myrepos/fink-client/fink_client/consumer.py", line 94, in poll
    alert = _decode_avro_alert(avro_alert, self._parsed_schema)
  File "/Users/julien/Documents/workspace/myrepos/fink-client/fink_client/avroUtils.py", line 381, in _decode_avro_alert
    return fastavro.schemaless_reader(avro_alert, schema)
  File "fastavro/_read.pyx", line 835, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 846, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 561, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 456, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 559, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 431, in fastavro._read.read_union
  File "fastavro/_read.pyx", line 555, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 349, in fastavro._read.read_array
  File "fastavro/_read.pyx", line 561, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 456, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 559, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 405, in fastavro._read.read_union
IndexError: list index out of range

This error happens when the schema to decode the alert is not matching the alert content. Usually this should not happen (schema is included in the alert payload). In case it happens though, you can force a schema:

fink_consumer [...] -schema [path_to_a_good_schema]

In case you do not have replacement schemas, open an issue on the fink-client repository.

Authentication error

If you try to poll the servers and get:

%3|1634555965.502|FAIL|rdkafka#consumer-1| [thrd:sasl_plaintext://134.158.74.95:24499/bootstrap]: sasl_plaintext://134.158.74.95:24499/bootstrap: SASL SCRAM-SHA-512 mechanism handshake failed: Broker: Request not valid in current SASL state: broker's supported mechanisms:  (after 18ms in state AUTH_HANDSHAKE)

You are likely giving a password when instantiating the consumer. Check your ~/.finkclient/credentials.yml, it should contain

password: null

or directly in your code:

# myconfig is a dict that should NOT have
# a 'password' key set
consumer = AlertConsumer(mytopics, myconfig)

However, if you want the old behaviour, then you need to specify it using sasl.* parameters:

myconfig['sasl.username'] = 'your_username'
myconfig['sasl.password'] = None
consumer = AlertConsumer(mytopics, myconfig)

Timeout error

If you get frequent timeouts while you know there are alerts to poll, try to increase the timeout (in seconds) in your configuration file:

# edit ~/.finkclient/credentials.yml
maxtimeout: 30