OpenTelemetry 和 Flask

Harvest-and-build

在上一篇文章 OpenTelemetry 入门和部署 中介绍了 OpenTelemetry的架构,并部署了一套极简的OpenTelemetry服务。在上一篇文章的最后,我们使用curl向OpenTelemetry Collector服务的HTTP接口上报了Traces、Metrics和Logs数据,并进行了验证。本文将从测试场景进入到应用场景,编写两个基于Flask的Web服务,看看在Python Web开发场景中,如何将应用的Traces、Metrics和Logs数据上报到OpenTelemetry。

如果还不了解OpenTelemetry,请先移步上一篇文章 OpenTelemetry 入门和部署 ,了解OpenTelemetry的相关架构和原理。本文主要探索SDK端的用法,各个后端平台还是使用上一篇文章中搭建的OpenTelemetry服务。

项目初始化

OpenTelemetry的Python SDK文档:https://opentelemetry.io/docs/languages/python/getting-started/

OpenTelemetry官方也提供了一个包含十几个微服务的测试项目,分别用十几种语言实现,详情参考 OpenTelemetry Demo ,比较复杂,可以在遇到问题时查看和参考。

本案例中的各个软件版本: OS:Debian 13 Python: 3.13.5 Flask:3.1.2

本测试项目受OpenTelemetry文档中的 骰子项目 启发,将其改造为2个HTTP服务,形成调用关系,方便我们观察跨微服务的链路跟踪。

使用uv做项目管理,在项目中包含两个Python源码文件,分别是 dice-app.pydice-backend.py ,前者通过HTTP方法调用后者,分别上报各自的指标到OpenTelemetry。

$ uv init . --name dice
$ uv add flask

dice-backend应用

创建一个简单的Flask应用——摇骰子,功能是在sqlite3数据库执行 select random()命令,然后把取得的数字对7做取模操作,获得余数作为骰子点数,如果点数在1~6之间则返回,否则抛出ValueError异常。dice-backend.py 的代码如下:

from flask import Flask
import logging
import sqlite3
from opentelemetry import metrics, trace

app = Flask(__name__)
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

cnx = sqlite3.connect(":memory:", check_same_thread=False)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
roll_counter = meter.create_counter(    # 定义Counter类型的指标
    "dice_number_counter",
    description="The number of rolls by roll value",
)

@app.route("/rolldice")
def roll_dice():
    number = roll()
    logger.info(f"random dice number: {number}")
    return str(number)

def roll():
    with tracer.start_as_current_span("roll") as roll_span:   # 手工插桩
        cursor = cnx.cursor()
        number = cursor.execute("select random()").fetchone()[0]
        cursor.close()
        roll_span.set_attribute("origin_random_number", number)   # 设置span的属性
        logging.info(f"raw number from sqlite3 is {number}")    # 在 span 上下文记录的日志会携带 trace_id 等字段

    if (ret := int(number) % 7) in range(1, 7):       # 骰子点数必须在1-6之间
        roll_counter.add(1, {"roll.value": ret})      # 对metric进行设置
        return ret
    raise ValueError(f"dice must in (1, 6), given: {ret}")

代码的内容非常简单,特殊的地方在于手动添加了2个OpenTelemetry的数据上报点。第1个是把从sqlite3中取出的数据设置到trace的origin_random_number这个属性中。第二个是metrics的统计功能,对骰子点数出现的次数进行+1操作,用于统计骰子每个点数出现的次数。

代码中使用 with tracer.start_as_current_span("roll") as roll_span: ...做了手动插桩, 这是链路跟踪的常见操作,创建了名为roll的span,span是分布式系统中单个操作的执行记录,包含开始时间、持续时间、状态和元数据等信息。在这个span的上下文内,我们为span添加了一个属性(使用set_attribute函数),记录执行过程中的信息。

span是分布式链路追踪(Distributed Tracing)领域的通用术语。在分布式系统中,一次请求的完整路径被称为 Trace(追踪),而路径上的每一个逻辑节点(比如一次数据库查询、一次 RPC 调用、一次函数执行)都被形象地称为 Span(跨度)。之所以叫“跨度”,是因为它代表了:时间的跨度,从操作开始(Start Time)到结束(End Time)的时间区间;逻辑的跨度, 在复杂的拓扑图中,它是连接两个组件的“桥梁”。在SkyWalking和Zipkin等跟踪系统中均有此术语。

另外,这个代码有个明显的bug,骰子有1/7的概率结果为0, 触发ValueError的异常。

对于上报的logs数据来说,在span上下文中记录的日志在OTLP协议上报时会被附上trace_id等字段,在问题排查时,可以把日志中的trace id和链路跟踪系统中的trace id对应起来综合排查。

运行和测试

OpenTelemetry提供了对常见Python库的自动插桩(Instrumentation)支持,比如Flask、sqlite3、logging、httpx等,官方支持的一些Python库,见文档 Instrumentation

在这个项目中,我们使用OpenTelementy为Python提供的自动插桩工具,并为代码中使用到的库安装opentelemetry提供的专用SDK。

安装依赖:

$ uv add opentelemetry-instrumentation \
    opentelemetry-distro \
    opentelemetry-exporter-otlp \
    opentelemetry-instrumentation-sqlite3 \
    opentelemetry-instrumentation-flask \
    opentelemetry-instrumentation-logging

安装完依赖后使用自动插桩的方法运行dice-backend.py应用:

$ OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true \
OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" \
FLASK_APP=dice-backend.py \
opentelemetry-instrument \
    --traces_exporter otlp \
    --metrics_exporter otlp \
    --logs_exporter otlp \
    --service_name dice-backend \
    uv run flask run -p 5001
  • 环境变量OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED 设置启用日志库的自动插桩。
  • 环境变量OTEL_EXPORTER_OTLP_ENDPOINT 指定了OpenTelemetry的GRPC地址为本机的 http://localhost:4317,也就是上一篇文章中在本地搭建的Opentelemetry服务。
  • 环境变量FLASK_APP 指定了 flask项目的文件。
  • opentelemetry-instrument命令是自动插桩用的,参数中指定了 traces、metrics和logs的exporter类型为otlp。表示将数据上报到OpenTelemetry。其实exporter的类型很丰富,调试时也常常把exporter设置为console输出到命令行,比如 --traces_exporter console ,也支持同时设置多个exporter,比如 --traces_exporter otlp,console--service_name 参数指定了应用标识为dice-backend,OTLP上报时被转换为 service.name字段 。
  • 使用uv环境和flask run命令运行项目,并监听在5001端口。

反复运行下面的curl命令对dice-backend服务进行测试

$ curl http://localhost:5001/rolldice

多次请求这个接口,大概有1/7的可能性会返回 500 INTERNAL SERVER ERROR 报错。正常的时候应该都会返回1~6之间的数字。

  • 打开Jaeger的ui http://127.0.0.1:16686,查看dice-backend应用的traces数据, 点进细看请求的链路的树形结构,会看到3个span,分别是:
    • 对sqlite的自动插桩,内容是执行的SQL语句;
    • 对roll函数的手动插桩,内容是select randome()语句的执行结果;
    • 对flask的自动插桩,记录HTTP请求参数;
    • 以上3个span有各自的span id,却有共同的trace id
  • 当接口报错时,在Jaeger上可以看到具体的异常报错
  • 打开Prometheus的ui http://127.0.0.1:9090,搜索 dice_mumber_counter_total 指标的结果,查看骰子的每个点数出现的次数。
    • 此外,还有 http_server_* 等OpenTelemetry的flask库内置指标
  • 打开Elasticsearch 的 http://127.0.0.1:9200/logs-dice-backend/_search,查看上报的日志,能看到flask输出的access log。

三个平台的数据之间有一些共同的字段,比如 server.name(prometheus中一般是service_name),traces和logs中也可能有共同的trace_id。

使用Opentelemetry提供的自动插桩功能就能实现对常见库调用的traces数据收集,也很方便的手动对函数做插桩,实现自定义指标数据上报。这是开发中的一种常见做法,不用对代码进行大面积改动。

接下来实现dice项目的app服务,在该服务中,将完全使用代码进行opentelemetry各个exporter的初始化。

dice-app 应用

在 dice-app 服务中,将用httpx库批量调用dice-backend服务的HTTP接口,获取骰子点数,用jinja2渲染为Web页面。功能也很简单。dice-app.py文件的代码如下:

from flask import Flask, request
import logging
import httpx
import jinja2

##### OpenTelemetry 的初始化: traces、logs、metrices
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor  # ConsoleSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
    BatchLogRecordProcessor,
)  # ConsoleLogExporter,
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk.metrics.export import (
    PeriodicExportingMetricReader,
)  # ConsoleMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.jinja2 import Jinja2Instrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor  # noqa
from opentelemetry.instrumentation.logging import LoggingInstrumentor

OTLP_EXPORTER_ENDPOINT = "http://127.0.0.1:4317"
resource = Resource.create({SERVICE_NAME: __name__, SERVICE_VERSION: "1.0.0"})

trace_provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint=OTLP_EXPORTER_ENDPOINT, insecure=True)
trace_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(trace_provider)

logger_provider = LoggerProvider(resource=resource)
log_exporter = OTLPLogExporter(endpoint=OTLP_EXPORTER_ENDPOINT, insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)
logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(logging.StreamHandler())

meter_exporter = OTLPMetricExporter(endpoint=OTLP_EXPORTER_ENDPOINT, insecure=True)
meter_reader = PeriodicExportingMetricReader(
    meter_exporter,
    export_interval_millis=15000,  # metrics 每15s更新
)
meter_provider = MeterProvider(resource=resource, metric_readers=[meter_reader])
metrics.set_meter_provider(meter_provider)

tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

Jinja2Instrumentor().instrument()
HTTPXClientInstrumentor().instrument()
LoggingInstrumentor().instrument()

##### OpenTelemetry 的初始化结束

access_counter = meter.create_counter(
    "access_times",
    description="index accesse times",
)

app = Flask(__name__)

logger = logging.getLogger(__name__)
FlaskInstrumentor().instrument_app(app)   # 对Flask应用进行插桩


@app.route("/")
def roll_dice_index():
    client_ip = str(request.remote_addr)
    with tracer.start_as_current_span("get_client_ip") as client_span:
        client_span.set_attribute("client_ip", client_ip)

    access_counter.add(1)
    template = jinja2.Template("""<ul>
        {% for j in data %}
        <li>{{j}}</li>
        {% endfor %}
    </ul>""")

    with httpx.Client() as client:
        data = [client.get("http://127.0.0.1:5001/rolldice").text for _ in range(2)]

    return template.render(data=data)

上面的代码看着很多,近80行。但实现的功能挺简单的,都集中在roll_dice_index这一个简单的flask视图函数中:

  • 直到app = Flask(__name__) 之前,大部分的代码都是在设置OpenTelemetry指标上报相关的功能。忽略掉import语句,这部分内容主要做了如下工作:
    • 定义上报的OpenTelemetry的目标地址为 "http://127.0.0.1:4317" 。
    • 创建resource,其中的字段会作为属性一起上报给OpenTelemetry。
    • 初始化traces、logs和metrics实例。初始化步骤其实差不多,都是定义provider、定义基于GRPC的exporter、将exporter关联到provider上,将provider设置给全局的traces、metrics和logging。
    • 对于项目中用到的httpx、jinja2、logging、flask库进行自动化插桩库的启动。
    • 创建了一个名为 access_times 的counter类型指标,用于统计主页被访问的次数。
    • 创建了两个钩子函数request_hookresponse_hook,分别记录用户对这个服务的请求和响应的内容。
  • 实例化Flask对象后,对flask应用进行插桩,关联了上面定义的两个钩子函数。
  • 本案例代码中的手动插桩函数在dice-backend 项目中都见过了,不做详细介绍了。
  • 在完全使用代码实现的插桩方式中,也可以引入console或者file类型的exporter。

OpenTelemetry初始化和业务代码混在一起,上面的代码看着有点吓人。在实际的项目中,一般会将OpenTelemetry的初始化代码放到独立的模块中,避免影响业务代码的整洁度。

运行和测试

这个项目中用到了httpx和jinja2模块,安装相关依赖和自动插桩库。

$ uv add httpx \
    opentelemetry-instrumentation-jinja2 \
    opentelemetry-instrumentation-httpx

在代码中初始化OpenTelemetry的场景下,使用常规的方法启动flask应用就行,监听到5000端口。

$ FLASK_APP=dice-app.py uv run flask run -p 5000

反复执行下面的命令。

$ curl http://127.0.0.1:5000/

查看相关的上报数据:

  • Jaeger中可以看到调用链路了,总共12个span,6个来自dice-app应用,另外6个来自dice-backend(对dice-backend调用了2次,每次3个span),在dice-app中的6个span分别是:
    • flask自动插桩,记录请求和响应;
    • get_client_ip 手动插桩,记录请求者的ip地址。其实在flask的插桩中已经有了,没有什么实际价值。
    • 来自jinja2的jinja2.compilejinja2.render的函数自动插桩。
    • 2次httpx的自动插桩。记录请求和响应的http_code。
  • Prometheus:查看指标access_times_total
  • 打开Elasticsearch 的 http://127.0.0.1:9200/logs-dice-app/_search,查看上报的日志,能看到flask输出的access log。

为什么dice-appdice-backend 应用使用同一个trace_id? 因为,我们引入了httpx库的OpenTelemetry插桩,在使用httpx向dice-backend发起请求的时候,httpx在请求的header中,自动包含了trace id字段。

可以看到,即使代码中没有手动添加flask、jinja2和httpx的插桩代码,因为安装了这3个库的OpenTelemetry插桩库,使得相关的trace能够自动创建和采集。非常方便。

OpenTelemetry的flask 和 httpx等许多插桩库都有钩子功能(request_hook和response_hook),可以记录请求和响应的特定字段,有兴趣的查看各个OpenTelemetry的对应库的文档做详细了解。

总结

本文通过2个实际的Flask案例展示在Python开发中的2种不同的OpenTelemetry接入方法,可以从中一窥OpenTelemetry提供的强大能力。开发中,记得常看看OpenTelemetry的官方和依赖库文档,往往有意想不到的收获。

OpenTelemetry正变得越来越流行,OpenTelemetry的跨语言、跨框架的traces、metrics、logs采集和处理能力,能为我们的应用带来十分不错的可观测性收益。开发者可以据此构建更加透明和可控的分布式系统,提高应用的可观测性,从而更快地定位和解决问题。

Last updates at Dec 28,2025

Views (66)

Leave a Comment

total 0 comments