41 lines
1.5 KiB
Python
41 lines
1.5 KiB
Python
#!/usr/bin/env python
|
|
|
|
import gitlab
|
|
import os
|
|
|
|
from security_scanner.file_writer import FileWriter
|
|
|
|
class GitLab:
|
|
def __init__(self):
|
|
gitlab_url = os.environ.get('GITLAB_URL')
|
|
api_token = os.environ.get('PRIVATE_TOKEN')
|
|
project_id = os.environ.get('CI_PROJECT_ID')
|
|
|
|
self._gl = gitlab.Gitlab(gitlab_url, private_token=api_token)
|
|
self._project = self._gl.projects.get(project_id)
|
|
|
|
def getLastSuccessfulJob(self, ref, name):
|
|
pipelines = self._project.pipelines.list()
|
|
|
|
last_successful_job = None
|
|
for pipeline in pipelines:
|
|
jobs = pipeline.jobs.list()
|
|
for job in jobs:
|
|
if job.ref == ref and job.attributes['name'] == name and job.attributes['status'] == 'success':
|
|
if last_successful_job is not None:
|
|
if job.attributes['id'] > last_successful_job.attributes['id']:
|
|
last_successful_job = job
|
|
else:
|
|
last_successful_job = job
|
|
|
|
return last_successful_job
|
|
|
|
def downloadArtifact(self, job, sourcePath, destPath):
|
|
job_id = job.attributes['id']
|
|
print("Downloading artifact {} for job #{}".format(sourcePath, job_id))
|
|
target = FileWriter(destPath)
|
|
artifact = self._project.jobs.get(job_id).artifact(sourcePath, streamed=True, action=target)
|
|
del(target)
|
|
|
|
def createPipeline(self, ref):
|
|
pipeline = self._project.pipelines.create({'ref': ref})
|