#!/usr/bin/env python import gitlab import os from security_scanner.file_writer import FileWriter class GitLab: def __init__(self): gitlab_url = os.environ.get('GITLAB_URL') api_token = os.environ.get('PRIVATE_TOKEN') project_id = os.environ.get('CI_PROJECT_ID') self._gl = gitlab.Gitlab(gitlab_url, private_token=api_token) self._project = self._gl.projects.get(project_id) def getLastSuccessfulJob(self, ref, name): pipelines = self._project.pipelines.list() last_successful_job = None for pipeline in pipelines: jobs = pipeline.jobs.list() for job in jobs: if job.ref == ref and job.attributes['name'] == name and job.attributes['status'] == 'success': if last_successful_job is not None: if job.attributes['id'] > last_successful_job.attributes['id']: last_successful_job = job else: last_successful_job = job return last_successful_job def downloadArtifact(self, job, sourcePath, destPath): job_id = job.attributes['id'] print("Downloading artifact {} for job #{}".format(sourcePath, job_id)) target = FileWriter(destPath) artifact = self._project.jobs.get(job_id).artifact(sourcePath, streamed=True, action=target) del(target) def createPipeline(self, ref): pipeline = self._project.pipelines.create({'ref': ref})