In this tutorial we solve the optimal control problem
$$\min J(y, u) = \frac{1}{2} \int_{\Omega} |v - v_d|^2 dx + \frac{\alpha}{2} \int_{\Gamma_2} |u|^2 ds$$ s.t. $$\begin{cases} - \nu \Delta v + v \cdot \nabla v + \nabla p = f & \text{in } \Omega\\ \text{div} v = 0 & \text{in } \Omega\\ v = 0 & \text{on } \Gamma_1\\ pn - \nu \partial_n v = u & \text{on } \Gamma_2\\ v = 0 & \text{on } \Gamma_3\\ v = 0 & \text{on } \Gamma_4 \end{cases}$$
where $$\begin{align*} & \Omega & \text{unit square}\\ & \Gamma_1 & \text{bottom boundary of the square}\\ & \Gamma_2 & \text{left boundary of the square}\\ & \Gamma_3 & \text{top boundary of the square}\\ & \Gamma_4 & \text{right boundary of the square}\\ & u \in [L^2(\Gamma_2)]^2 & \text{control variable}\\ & v \in [H^1(\Omega)]^2 & \text{state velocity variable}\\ & p \in L^2(\Omega) & \text{state pressure variable}\\ & \alpha > 0 & \text{penalization parameter}\\ & v_d & \text{desired state}\\ & \nu & \text{kinematic viscosity}\\ & f & \text{forcing term} \end{align*}$$ using an adjoint formulation solved by a one shot approach
import typing
import dolfinx.fem
import dolfinx.fem.petsc
import dolfinx.io
import dolfinx.mesh
import mpi4py.MPI
import numpy as np
import numpy.typing
import petsc4py.PETSc
import sympy
import ufl
import viskex
import multiphenicsx.fem
import multiphenicsx.fem.petsc
mesh = dolfinx.mesh.create_unit_square(mpi4py.MPI.COMM_WORLD, 32, 32)
# Create connectivities required by the rest of the code
mesh.topology.create_connectivity(mesh.topology.dim - 1, mesh.topology.dim)
def bottom(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the bottom boundary."""
return abs(x[1] - 0.) < np.finfo(float).eps # type: ignore[no-any-return]
def left(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the left boundary."""
return abs(x[0] - 0.) < np.finfo(float).eps # type: ignore[no-any-return]
def top(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the top boundary."""
return abs(x[1] - 1.) < np.finfo(float).eps # type: ignore[no-any-return]
def right(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the right boundary."""
return abs(x[0] - 1.) < np.finfo(float).eps # type: ignore[no-any-return]
boundaries_entities = dict()
boundaries_values = dict()
for (boundary, boundary_id) in zip((bottom, left, top, right), (1, 2, 3, 4)):
boundaries_entities[boundary_id] = dolfinx.mesh.locate_entities_boundary(
mesh, mesh.topology.dim - 1, boundary)
boundaries_values[boundary_id] = np.full(
boundaries_entities[boundary_id].shape, boundary_id, dtype=np.int32)
boundaries_entities_unsorted = np.hstack(list(boundaries_entities.values()))
boundaries_values_unsorted = np.hstack(list(boundaries_values.values()))
boundaries_entities_argsort = np.argsort(boundaries_entities_unsorted)
boundaries_entities_sorted = boundaries_entities_unsorted[boundaries_entities_argsort]
boundaries_values_sorted = boundaries_values_unsorted[boundaries_entities_argsort]
boundaries = dolfinx.mesh.meshtags(
mesh, mesh.topology.dim - 1,
boundaries_entities_sorted, boundaries_values_sorted)
boundaries.name = "boundaries"
boundaries_134 = boundaries.indices[np.isin(boundaries.values, (1, 3, 4))]
boundaries_2 = boundaries.indices[boundaries.values == 2]
# Define associated measures
ds = ufl.Measure("ds", subdomain_data=boundaries)
viskex.dolfinx.plot_mesh(mesh)
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
viskex.dolfinx.plot_mesh_tags(mesh, boundaries, "boundaries")
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
Y_velocity = dolfinx.fem.functionspace(mesh, ("Lagrange", 2, (mesh.geometry.dim, )))
Y_pressure = dolfinx.fem.functionspace(mesh, ("Lagrange", 1))
U = dolfinx.fem.functionspace(mesh, ("Lagrange", 2, (mesh.geometry.dim, )))
Q_velocity = Y_velocity.clone()
Q_pressure = Y_pressure.clone()
dofs_Y_velocity = np.arange(0, Y_velocity.dofmap.index_map.size_local + Y_velocity.dofmap.index_map.num_ghosts)
dofs_Y_pressure = np.arange(0, Y_pressure.dofmap.index_map.size_local + Y_pressure.dofmap.index_map.num_ghosts)
dofs_U = dolfinx.fem.locate_dofs_topological(U, boundaries.dim, boundaries_2)
dofs_Q_velocity = dofs_Y_velocity
dofs_Q_pressure = dofs_Y_pressure
restriction_Y_velocity = multiphenicsx.fem.DofMapRestriction(Y_velocity.dofmap, dofs_Y_velocity)
restriction_Y_pressure = multiphenicsx.fem.DofMapRestriction(Y_pressure.dofmap, dofs_Y_pressure)
restriction_U = multiphenicsx.fem.DofMapRestriction(U.dofmap, dofs_U)
restriction_Q_velocity = multiphenicsx.fem.DofMapRestriction(Q_velocity.dofmap, dofs_Q_velocity)
restriction_Q_pressure = multiphenicsx.fem.DofMapRestriction(Q_pressure.dofmap, dofs_Q_pressure)
restriction = [
restriction_Y_velocity, restriction_Y_pressure, restriction_U, restriction_Q_velocity, restriction_Q_pressure]
(dv, dp) = (ufl.TrialFunction(Y_velocity), ufl.TrialFunction(Y_pressure))
(w, q) = (ufl.TestFunction(Y_velocity), ufl.TestFunction(Y_pressure))
du = ufl.TrialFunction(U)
r = ufl.TestFunction(U)
(dz, db) = (ufl.TrialFunction(Q_velocity), ufl.TrialFunction(Q_pressure))
(s, d) = (ufl.TestFunction(Q_velocity), ufl.TestFunction(Q_pressure))
(v, p) = (dolfinx.fem.Function(Y_velocity), dolfinx.fem.Function(Y_pressure))
u = dolfinx.fem.Function(U)
(z, b) = (dolfinx.fem.Function(Q_velocity), dolfinx.fem.Function(Q_pressure))
alpha = 1.e-5
x, y = sympy.symbols("x[0], x[1]")
psi_d = 10 * (1 - sympy.cos(0.8 * np.pi * x)) * (1 - sympy.cos(0.8 * np.pi * y)) * (1 - x)**2 * (1 - y)**2
v_d_x = sympy.lambdify([x, y], psi_d.diff(y, 1))
v_d_y = sympy.lambdify([x, y], - psi_d.diff(x, 1))
v_d = dolfinx.fem.Function(Y_velocity)
v_d.interpolate(lambda x: np.stack((v_d_x(x[0], x[1]), v_d_y(x[0], x[1])), axis=0))
nu = 0.1
ff = dolfinx.fem.Constant(mesh, tuple(petsc4py.PETSc.ScalarType(0) for _ in range(2)))
bc0 = np.zeros((2, ), dtype=petsc4py.PETSc.ScalarType)
F = [nu * ufl.inner(ufl.grad(z), ufl.grad(w)) * ufl.dx
+ ufl.inner(z, ufl.grad(w) * v) * ufl.dx + ufl.inner(z, ufl.grad(v) * w) * ufl.dx
- ufl.inner(b, ufl.div(w)) * ufl.dx + ufl.inner(v - v_d, w) * ufl.dx,
- ufl.inner(ufl.div(z), q) * ufl.dx,
alpha * ufl.inner(u, r) * ds(2) - ufl.inner(z, r) * ds(2),
nu * ufl.inner(ufl.grad(v), ufl.grad(s)) * ufl.dx + ufl.inner(ufl.grad(v) * v, s) * ufl.dx
- ufl.inner(p, ufl.div(s)) * ufl.dx - ufl.inner(ff, s) * ufl.dx - ufl.inner(u, s) * ds(2),
- ufl.inner(ufl.div(v), d) * ufl.dx]
dF = [[ufl.derivative(F_i, u_j, du_j) for (u_j, du_j) in zip((v, p, u, z, b), (dv, dp, du, dz, db))] for F_i in F]
dF[3][3] = dolfinx.fem.Constant(mesh, petsc4py.PETSc.ScalarType(0)) * ufl.inner(dz, s) * (ds(1) + ds(3) + ds(4))
bdofs_Y_velocity_134 = dolfinx.fem.locate_dofs_topological(
Y_velocity, mesh.topology.dim - 1, boundaries_134)
bdofs_Q_velocity_134 = dolfinx.fem.locate_dofs_topological(
Q_velocity, mesh.topology.dim - 1, boundaries_134)
bc = [dolfinx.fem.dirichletbc(bc0, bdofs_Y_velocity_134, Y_velocity),
dolfinx.fem.dirichletbc(bc0, bdofs_Q_velocity_134, Q_velocity)]
J = 0.5 * ufl.inner(v - v_d, v - v_d) * ufl.dx + 0.5 * alpha * ufl.inner(u, u) * ds(2)
J_cpp = dolfinx.fem.form(J)
class NonlinearBlockProblem:
"""Define a nonlinear problem, interfacing with SNES."""
def __init__( # type: ignore[no-any-unimported]
self, F: list[ufl.Form], dF: list[list[ufl.Form]],
solutions: tuple[dolfinx.fem.Function, ...], bcs: list[dolfinx.fem.DirichletBC],
restriction: typing.Optional[list[multiphenicsx.fem.DofMapRestriction]] = None
) -> None:
self._F = dolfinx.fem.form(F)
self._dF = dolfinx.fem.form(dF)
self._obj_vec = multiphenicsx.fem.petsc.create_vector_block(self._F, restriction)
self._solutions = solutions
self._bcs = bcs
self._restriction = restriction
def create_snes_solution(self) -> petsc4py.PETSc.Vec: # type: ignore[no-any-unimported]
"""
Create a petsc4py.PETSc.Vec to be passed to petsc4py.PETSc.SNES.solve.
The returned vector will be initialized with the initial guesses provided in `self._solutions`,
properly stacked together and restricted in a single block vector.
"""
x = multiphenicsx.fem.petsc.create_vector_block(self._F, restriction=self._restriction)
with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(
x, [c.function_space.dofmap for c in self._solutions], self._restriction) as x_wrapper:
for x_wrapper_local, component in zip(x_wrapper, self._solutions):
with component.x.petsc_vec.localForm() as component_local:
x_wrapper_local[:] = component_local
return x
def update_solutions(self, x: petsc4py.PETSc.Vec) -> None: # type: ignore[no-any-unimported]
"""Update `self._solutions` with data in `x`."""
x.ghostUpdate(addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(
x, [c.function_space.dofmap for c in self._solutions], self._restriction) as x_wrapper:
for x_wrapper_local, component in zip(x_wrapper, self._solutions):
with component.x.petsc_vec.localForm() as component_local:
component_local[:] = x_wrapper_local
def obj( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec
) -> np.float64:
"""Compute the norm of the residual."""
self.F(snes, x, self._obj_vec)
return self._obj_vec.norm() # type: ignore[no-any-return]
def F( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, F_vec: petsc4py.PETSc.Vec
) -> None:
"""Assemble the residual."""
self.update_solutions(x)
with F_vec.localForm() as F_vec_local:
F_vec_local.set(0.0)
multiphenicsx.fem.petsc.assemble_vector_block( # type: ignore[misc]
F_vec, self._F, self._dF, self._bcs, x0=x, alpha=-1.0,
restriction=self._restriction, restriction_x0=self._restriction)
def dF( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, dF_mat: petsc4py.PETSc.Mat,
_: petsc4py.PETSc.Mat
) -> None:
"""Assemble the jacobian."""
dF_mat.zeroEntries()
if self._restriction is None:
restriction = None
else:
restriction = (self._restriction, self._restriction)
multiphenicsx.fem.petsc.assemble_matrix_block(
dF_mat, self._dF, self._bcs, diagonal=1.0, restriction=restriction) # type: ignore[arg-type]
dF_mat.assemble()
# Create problem by extracting state forms from the optimality conditions
F_state = [ufl.replace(F[i], {
s: w, d: q,
u: dolfinx.fem.Constant(mesh, tuple(petsc4py.PETSc.ScalarType(0) for _ in range(2)))}) for i in (3, 4)]
dF_state = [[ufl.derivative(Fs_i, u_j, du_j) for (u_j, du_j) in zip((v, p), (dv, dp))] for Fs_i in F_state]
dF_state[1][1] = dolfinx.fem.Constant(mesh, petsc4py.PETSc.ScalarType(0)) * ufl.inner(dp, q) * ufl.dx
bc_state = [bc[0]]
problem_state = NonlinearBlockProblem(F_state, dF_state, (v, p), bc_state)
F_vec_state = dolfinx.fem.petsc.create_vector_block(problem_state._F)
dF_mat_state = dolfinx.fem.petsc.create_matrix_block(problem_state._dF)
# Solve
snes = petsc4py.PETSc.SNES().create(mesh.comm)
snes.setTolerances(max_it=20)
snes.getKSP().setType("preonly")
snes.getKSP().getPC().setType("lu")
snes.getKSP().getPC().setFactorSolverType("mumps")
snes.setObjective(problem_state.obj)
snes.setFunction(problem_state.F, F_vec_state)
snes.setJacobian(problem_state.dF, J=dF_mat_state, P=None)
snes.setMonitor(lambda _, it, residual: print(it, residual))
vp = problem_state.create_snes_solution()
snes.solve(None, vp)
problem_state.update_solutions(vp) # TODO can this be safely removed?
vp.destroy()
snes.destroy()
0 0.0
<petsc4py.PETSc.SNES at 0x7fe267ff0720>
J_uncontrolled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Uncontrolled J =", J_uncontrolled)
assert np.isclose(J_uncontrolled, 0.1784542)
Uncontrolled J = 0.17845361493420545
viskex.dolfinx.plot_vector_field(v, "uncontrolled state velocity", glyph_factor=1e-1)
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
viskex.dolfinx.plot_scalar_field(p, "uncontrolled state pressure")
error: XDG_RUNTIME_DIR is invalid or not set in the environment. MESA: error: ZINK: failed to choose pdev glx: failed to create drisw screen
# PYTEST_XFAIL_AND_SKIP_NEXT: ufl.derivative(adjoint of the trilinear term) introduces spurious conj(trial)
"""Expect this cell to fail.
assert not np.issubdtype(petsc4py.PETSc.ScalarType, np.complexfloating)
"""
'Expect this cell to fail.\n\nassert not np.issubdtype(petsc4py.PETSc.ScalarType, np.complexfloating)\n'
"""Skip cell due to a previously xfailed cell.
# Create problem associated to the optimality conditions
problem = NonlinearBlockProblem(F, dF, (v, p, u, z, b), bc, restriction)
F_vec = multiphenicsx.fem.petsc.create_vector_block(problem._F, restriction=restriction)
dF_mat = multiphenicsx.fem.petsc.create_matrix_block(problem._dF, restriction=(restriction, restriction))
"""
'Skip cell due to a previously xfailed cell.\n\n# Create problem associated to the optimality conditions\nproblem = NonlinearBlockProblem(F, dF, (v, p, u, z, b), bc, restriction)\nF_vec = multiphenicsx.fem.petsc.create_vector_block(problem._F, restriction=restriction)\ndF_mat = multiphenicsx.fem.petsc.create_matrix_block(problem._dF, restriction=(restriction, restriction))\n'
"""Skip cell due to a previously xfailed cell.
# Solve
snes = petsc4py.PETSc.SNES().create(mesh.comm)
snes.setTolerances(max_it=20)
snes.getKSP().setType("preonly")
snes.getKSP().getPC().setType("lu")
snes.getKSP().getPC().setFactorSolverType("mumps")
snes.setObjective(problem.obj)
snes.setFunction(problem.F, F_vec)
snes.setJacobian(problem.dF, J=dF_mat, P=None)
snes.setMonitor(lambda _, it, residual: print(it, residual))
vpuzb = problem.create_snes_solution()
snes.solve(None, vpuzb)
problem.update_solutions(vpuzb) # TODO can this be safely removed?
vpuzb.destroy()
snes.destroy()
"""
'Skip cell due to a previously xfailed cell.\n\n# Solve\nsnes = petsc4py.PETSc.SNES().create(mesh.comm)\nsnes.setTolerances(max_it=20)\nsnes.getKSP().setType("preonly")\nsnes.getKSP().getPC().setType("lu")\nsnes.getKSP().getPC().setFactorSolverType("mumps")\nsnes.setObjective(problem.obj)\nsnes.setFunction(problem.F, F_vec)\nsnes.setJacobian(problem.dF, J=dF_mat, P=None)\nsnes.setMonitor(lambda _, it, residual: print(it, residual))\nvpuzb = problem.create_snes_solution()\nsnes.solve(None, vpuzb)\nproblem.update_solutions(vpuzb) # TODO can this be safely removed?\nvpuzb.destroy()\nsnes.destroy()\n'
"""Skip cell due to a previously xfailed cell.
J_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Optimal J =", J_controlled)
assert np.isclose(J_controlled, 0.1249381)
"""
'Skip cell due to a previously xfailed cell.\n\nJ_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)\nprint("Optimal J =", J_controlled)\nassert np.isclose(J_controlled, 0.1249381)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-1)
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-1)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_scalar_field(p, "state pressure")
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_scalar_field(p, "state pressure")\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1)
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1)\n'
"""Skip cell due to a previously xfailed cell.
viskex.dolfinx.plot_scalar_field(b, "adjoint pressure")
"""
'Skip cell due to a previously xfailed cell.\n\nviskex.dolfinx.plot_scalar_field(b, "adjoint pressure")\n'