vulncheck-oss/sdk-python
GitHub: vulncheck-oss/sdk-python
VulnCheck 官方 Python SDK,提供对漏洞、索引与备份 API 的便捷调用。
Stars: 11 | Forks: 7
# Python 用的 VulnCheck SDK
将 VulnCheck API 集成到您的 Python 应用中。
[](https://badge.fury.io/py/vulncheck-sdk)
[](https://jupyter.org/)
- [Python 用的 VulnCheck SDK](#the-vulncheck-sdk-for-python)
- [安装](#installation)
- [资源](#resources)
- [快速开始](#quickstart)
- [示例](#examples)
- [公告](#advisory)
- [备份](#backup)
- [备份 v4](#backup-v4)
- [CPE](#cpe)
- [索引](#index)
- [索引列表](#indices)
- [分页](#pagination)
- [PURL](#purl)
- [贡献](#contributing)
- [安全](#security)
- [赞助](#sponsorship)
- [许可证](#license)
## 安装
```
# 从 PyPi
pip install vulncheck-sdk
```
## 资源
- [OpenAPI 文档](./vulncheck_sdk_README.md)
- [VulnCheck API 文档](https://docs.vulncheck.com/api)
- [VulnCheck Python SDK 文档](https://docs.vulncheck.com/tools/python-sdk/introduction)
## 快速开始
```
import urllib.request
import vulncheck_sdk
import os
# 首先让我们设置一些变量来帮助我们
TOKEN = os.environ["VULNCHECK_API_TOKEN"] # Remember to store your token securely!
# 现在让我们创建一个配置对象
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
# 将该配置对象传递给我们的 API 客户端,现在...
with vulncheck_sdk.ApiClient(configuration) as api_client:
# We can use two classes to explore the VulnCheck API: EndpointsApi & IndicesApi
### EndpointsApi has methods to query every endpoint except `/v3/index`
# See the full list of endpoints here: https://docs.vulncheck.com/api
endpoints_client = vulncheck_sdk.EndpointsApi(api_client)
# PURL
api_response = endpoints_client.purl_get("pkg:hex/coherence@0.1.2")
data = V3controllersPurlResponseData = api_response.data
print(data.cves)
# CPE
cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta"
api_response = endpoints_client.cpe_get(cpe)
for cve in api_response.data:
print(cve)
# Download a Backup
index = "initial-access"
api_response = endpoints_client.backup_index_get(index)
file_path = f"{index}.zip"
with urllib.request.urlopen(api_response.data[0].url) as response:
with open(file_path, "wb") as file:
file.write(response.read())
### IndicesApi has methods for each index
indices_client = vulncheck_sdk.IndicesApi(api_client)
# Add query parameters to filter what you need
api_response = indices_client.index_vulncheck_nvd2_get(cve="CVE-2019-19781")
print(api_response.data)
```
## 示例
### 公告
列出所有公告源并按源查询公告
```
import vulncheck_sdk
from vulncheck_sdk.models.search_v4_advisory_return_value import SearchV4AdvisoryReturnValue
from vulncheck_sdk.models.search_v4_list_feed_return_value import SearchV4ListFeedReturnValue
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
advisory_client = vulncheck_sdk.AdvisoryApi(api_client)
# List all available advisory feeds (/v4/advisory)
feeds: SearchV4ListFeedReturnValue = advisory_client.v4_list_advisory_feeds()
print("Available feeds:")
for feed in feeds.data:
print(f"name: {feed.name}")
feed = "wolfi"
# Query advisories filtered by feed=wolfi (/v4/advisory?feed=wolfi)
advisories: SearchV4AdvisoryReturnValue = advisory_client.v4_query_advisories(name=feed)
print(f"{feed.capitalize()} advisories (page 1): {len(advisories.data)} results")
for advisory in advisories.data:
print(f"cve: {advisory.cve_metadata.cve_id}")
```
### 备份
下载某个索引的备份
```
import urllib.request
import vulncheck_sdk
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
endpoints_client = vulncheck_sdk.EndpointsApi(api_client)
index = "initial-access"
api_response = endpoints_client.backup_index_get(index)
file_path = f"{index}.zip"
with urllib.request.urlopen(api_response.data[0].url) as response:
with open(file_path, "wb") as file:
file.write(response.read())
```
### 备份 v4
列出可用的 v4 备份并按源名称下载备份
```
import urllib.request
import vulncheck_sdk
from vulncheck_sdk.models.backup_list_backups_response import BackupListBackupsResponse
from vulncheck_sdk.models.backup_feed_item import BackupFeedItem
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
backup_client = vulncheck_sdk.BackupApi(api_client)
# List available backups (/v4/backup)
available: BackupListBackupsResponse = backup_client.v4_list_backups()
for potential_backup in available.data:
print(f"Found backup: {potential_backup.name}")
# Get backup for the wolfi feed (/v4/backup/wolfi)
feed = "wolfi"
response: BackupListBackupsResponse = backup_client.v4_get_backup_by_name(feed)
print(f"Downloading {feed} backup")
file_path = f"{feed}.zip"
with urllib.request.urlopen(response.url) as r:
with open(file_path, "wb") as f:
f.write(r.read())
print(f"Successfully saved to {file_path}")
```
### CPE
获取与指定 CVE 相关的所有 CPE
```
import vulncheck_sdk
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
endpoints_client = vulncheck_sdk.EndpointsApi(api_client)
cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta"
api_response = endpoints_client.cpe_get(cpe)
for cve in api_response.data:
print(cve)
```
### 索引
查询 VulnCheck-NVD2 中的 `CVE-2019-19781`
```
import vulncheck_sdk
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
indices_client = vulncheck_sdk.IndicesApi(api_client)
api_response = indices_client.index_vulncheck_nvd2_get(cve="CVE-2019-19781")
print(api_response.data)
```
### 索引列表
获取所有可用索引
```
import vulncheck_sdk
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
endpoints_client = vulncheck_sdk.EndpointsApi(api_client)
api_response = endpoints_client.index_get()
for index in api_response.data:
print(index.name)
```
### 分页
使用 `cursor` 对 VulnCheck-KEV 查询结果进行分页
```
import vulncheck_sdk
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
indices_client = vulncheck_sdk.IndicesApi(api_client)
api_response = indices_client.index_vulncheck_kev_get(
start_cursor="true",
# `limit` increases the size of each page, making it faster
# to download large datasets
limit=300,
)
print(api_response.data)
while api_response.meta.next_cursor is not None:
api_response = indices_client.index_vulncheck_kev_get(
cursor=api_response.meta.next_cursor
)
print(api_response.data)
```
### PURL
获取指定 PURL 对应的 CVE 列表
```
import vulncheck_sdk
from vulncheck_sdk.models.v3controllers_purl_response_data import (
V3controllersPurlResponseData,
)
import os
TOKEN = os.environ["VULNCHECK_API_TOKEN"]
configuration = vulncheck_sdk.Configuration()
configuration.api_key["Bearer"] = TOKEN
with vulncheck_sdk.ApiClient(configuration) as api_client:
endpoints_client = vulncheck_sdk.EndpointsApi(api_client)
purl = "pkg:hex/coherence@0.1.2"
api_response = endpoints_client.purl_get(purl)
data: V3controllersPurlResponseData = api_response.data
print(data.cves)
```
## 安全
如果发现任何安全问题,请创建 [问题](https://github.com/vulncheck-oss/sdk-python/issues/new?template=1_BUG-FORM.yaml)。
## 赞助
本项目的开发由 [VulnCheck](https://vulncheck.com/) 赞助,了解更多关于我们!
## 许可证
Apache License 2.0。更多信息请参见 [许可证文件](./LICENSE)。
点击查看异步实现
``` import asyncio import os import aiohttp import vulncheck_sdk.aio as vcaio # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def run_vulnerability_checks(): # Use 'async with' to manage the ApiClient connection pool async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) indices_client = vcaio.IndicesApi(api_client) # --- PURL Search --- # 'await' the coroutine to get results purl_response = await endpoints_client.purl_get("pkg:hex/coherence@0.1.2") if purl_response.data: print(f"PURL CVEs: {purl_response.data.cves}") # --- CPE Search --- cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta" # 'await' the coroutine to get results cpe_response = await endpoints_client.cpe_get(cpe) print(f"CPE Results for {cpe}:") for cve in cpe_response.data: print(f" - {cve}") # --- Index Query (NVD2) --- # 'await' the coroutine to get results nvd_response = await indices_client.index_vulncheck_nvd2_get( cve="CVE-2019-19781" ) print(f"NVD2 Data: {nvd_response.data}") # --- Download Backup (Async) --- index_name = "initial-access" # 'await' the coroutine to get results backup_response = await endpoints_client.backup_index_get(index_name) if backup_response.data: download_url = backup_response.data[0].url file_path = f"{index_name}.zip" print(f"Downloading backup from {download_url}...") # Use aiohttp (already in your environment) for async download async with aiohttp.ClientSession() as session: async with session.get(download_url) as resp: if resp.status == 200: # 'await' the coroutine to get results content = await resp.read() with open(file_path, "wb") as f: f.write(content) print(f"Saved backup to {file_path}") if __name__ == "__main__": # Entry point to start the event loop asyncio.run(run_vulnerability_checks()) ```点击查看异步实现
``` import asyncio import os import vulncheck_sdk.aio as vcaio from vulncheck_sdk.aio.models.search_v4_advisory_return_value import SearchV4AdvisoryReturnValue from vulncheck_sdk.aio.models.search_v4_list_feed_return_value import SearchV4ListFeedReturnValue TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def main(): async with vcaio.ApiClient(configuration) as api_client: advisory_client = vcaio.AdvisoryApi(api_client) # List all available advisory feeds (/v4/advisory) feeds: SearchV4ListFeedReturnValue = await advisory_client.v4_list_advisory_feeds() print("Available feeds:") for feed in feeds.data: print(f"name: {feed.name}") feed = "wolfi" # Query advisories filtered by feed=wolfi (/v4/advisory?feed=wolfi) advisories: SearchV4AdvisoryReturnValue = await advisory_client.v4_query_advisories(name=feed) print(f"{feed.capitalize()} advisories (page 1): {len(advisories.data)} results") for advisory in advisories.data: print(f"cve: {advisory.cve_metadata.cve_id}") if __name__ == "__main__": asyncio.run(main()) ```点击查看异步实现
``` import asyncio import os import urllib.request import vulncheck_sdk.aio as vcaio # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN def download_sync(url, file_path): """ Standard synchronous download using urllib.request. This runs in a separate thread to avoid blocking the event loop. """ with urllib.request.urlopen(url) as response: with open(file_path, "wb") as file: file.write(response.read()) async def main(): # Use 'async with' to manage the connection life-cycle async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) index = "initial-access" # 'await' the coroutine to get the actual response data api_response = await endpoints_client.backup_index_get(index) if not api_response.data: print("No backup URL found.") return download_url = api_response.data[0].url file_path = f"{index}.zip" print(f"Downloading {index} via urllib (offloaded to thread)...") # Use asyncio.to_thread to run the blocking call safely # 'await' the coroutine to get the actual response data await asyncio.to_thread(download_sync, download_url, file_path) print(f"Successfully saved to {file_path}") if __name__ == "__main__": asyncio.run(main()) ```点击查看异步实现
``` import asyncio import os import urllib.request import vulncheck_sdk.aio as vcaio from vulncheck_sdk.aio.models.backup_list_backups_response import BackupListBackupsResponse from vulncheck_sdk.aio.models.backup_backup_response import BackupBackupResponse TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN def download_sync(url, file_path): """ Standard synchronous download using urllib.request. This runs in a separate thread to avoid blocking the event loop. """ with urllib.request.urlopen(url) as response: with open(file_path, "wb") as file: file.write(response.read()) async def main(): async with vcaio.ApiClient(configuration) as api_client: backup_client = vcaio.BackupApi(api_client) # List available backups (/v4/backup) available: BackupListBackupsResponse = await backup_client.v4_list_backups() for potential_backup in available.data: print(f"Found backup: {potential_backup.name}") # Get backup for the wolfi feed (/v4/backup/wolfi) feed = "wolfi" response: BackupBackupResponse = await backup_client.v4_get_backup_by_name(feed) file_path = f"{feed}.zip" print(f"Downloading {feed} backup via urllib (offloaded to thread)...") await asyncio.to_thread(download_sync, response.url, file_path) print(f"Successfully saved to {file_path}") if __name__ == "__main__": asyncio.run(main()) ```点击查看异步实现
``` import asyncio import os import vulncheck_sdk.aio as vcaio # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def get_cpe_vulnerabilities(): # 'async with' to manage the connection life-cycle async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) cpe = "cpe:/a:microsoft:internet_explorer:8.0.6001:beta" # 'await' the coroutine to get the actual response data api_response = await endpoints_client.cpe_get(cpe) # Iterate through the results if api_response.data: for cve in api_response.data: print(cve) else: print(f"No vulnerabilities found for CPE: {cpe}") if __name__ == "__main__": # Run the main async entry point asyncio.run(get_cpe_vulnerabilities()) ```点击查看异步实现
``` import asyncio import os import vulncheck_sdk.aio as vcaio # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def get_cve_details(): # Use 'async with' for the ApiClient async with vcaio.ApiClient(configuration) as api_client: indices_client = vcaio.IndicesApi(api_client) # 'await' the API call api_response = await indices_client.index_vulncheck_nvd2_get( cve="CVE-2019-19781" ) # Access and print the data if api_response.data: print(api_response.data) else: print("No data found for the specified CVE.") if __name__ == "__main__": # Start the async event loop asyncio.run(get_cve_details()) ```点击查看异步实现
``` import asyncio import os import vulncheck_sdk.aio as vcaio # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def list_indices(): # Use 'async with' to manage the connection life-cycle async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) # 'await' the coroutine to get the actual response api_response = await endpoints_client.index_get() # Iterate through the results if api_response.data: print(f"{'Index Name':<30} | {'Description'}") print("-" * 50) for index in api_response.data: print(f"{index.name:<30}") else: print("No indices found.") if __name__ == "__main__": # 4. Entry point to run the asynchronous event loop asyncio.run(list_indices()) ```点击查看异步实现
``` import asyncio import os import vulncheck_sdk.aio as vcaio # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def fetch_kev_data(): # Use 'async with' to properly manage the lifecycle of the async client async with vcaio.ApiClient(configuration) as api_client: indices_client = vcaio.IndicesApi(api_client) # 'await' the coroutine to get the actual response data api_response = await indices_client.index_vulncheck_kev_get( start_cursor="true", limit=300 ) print(f"Fetched {len(api_response.data)} records...") # Process initial data # (e.g., save to a list or database) # Pagination loop while api_response.meta and api_response.meta.next_cursor: print(f"Fetching next page: {api_response.meta.next_cursor}") # 'await' the coroutine to get the actual response data api_response = await indices_client.index_vulncheck_kev_get( cursor=api_response.meta.next_cursor, limit=300 ) if api_response.data: print(f"Fetched {len(api_response.data)} records...") else: break if __name__ == "__main__": # Entry point to run the async event loop asyncio.run(fetch_kev_data()) ```点击查看异步实现
``` import asyncio import os import vulncheck_sdk.aio as vcaio from vulncheck_sdk.aio.models.v3controllers_purl_response_data import ( V3controllersPurlResponseData, ) # 配置 TOKEN = os.environ.get("VULNCHECK_API_TOKEN") configuration = vcaio.Configuration() configuration.api_key["Bearer"] = TOKEN async def get_data(client, purl: str): # Await the client call directly api_response = await client.purl_get(purl) # Access the data attribute from the response object return api_response.data async def main(): async with vcaio.ApiClient(configuration) as api_client: endpoints_client = vcaio.EndpointsApi(api_client) purl = "pkg:hex/coherence@0.1.2" # 'await' the async function call data: V3controllersPurlResponseData = await get_data(endpoints_client, purl) if data and data.cves: print(f"Found {len(data.cves)} CVEs:") for cve in data.cves: print(f"- {cve}") else: print("No CVEs found or data is empty.") if __name__ == "__main__": asyncio.run(main()) ```标签:API客户端, CPE, Jupyter, OpenAPI, PURL, PyPI, Python, SEO, URLLib, 令牌认证, 分页, 备份, 安全咨询, 文档生成, 无后门, 漏洞查询, 生成式AI, 索引, 网络调试, 自动化, 逆向工具